tether_utils/
tether_playback.rs1use std::{
2 fs::File,
3 io::BufReader,
4 sync::mpsc::{self, Receiver},
5};
6
7use clap::Args;
8use log::{debug, info, warn};
9
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use tether_agent::TetherAgent;
13
14#[derive(Args, Clone)]
15pub struct PlaybackOptions {
16 #[arg(default_value_t=String::from("./demo.json"))]
18 pub file_path: String,
19
20 #[arg(long = "topic.filter")]
22 pub topic_filters: Option<String>,
23
24 #[arg(long = "topic.override")]
26 pub override_topic: Option<String>,
27
28 #[arg(long = "playback.speed", default_value_t = 1.0)]
30 pub playback_speed: f32,
31
32 #[arg(long = "loops.count", default_value_t = 1)]
34 pub loop_count: usize,
35
36 #[arg(long = "loops.infinite")]
38 pub loop_infinite: bool,
39
40 #[arg(long = "ignoreCtrlC")]
43 pub ignore_ctrl_c: bool,
44}
45
46impl Default for PlaybackOptions {
47 fn default() -> Self {
48 PlaybackOptions {
49 file_path: "./demo.json".into(),
50 override_topic: None,
51 loop_count: 1,
52 loop_infinite: false,
53 ignore_ctrl_c: false,
54 playback_speed: 1.0,
55 topic_filters: None,
56 }
57 }
58}
59
60#[derive(Serialize, Deserialize, Debug)]
61#[serde(rename_all = "camelCase")]
62pub struct SimulationMessage {
63 pub r#type: String,
64 pub data: Vec<u8>,
65}
66
67#[derive(Serialize, Deserialize, Debug)]
68#[serde(rename_all = "camelCase")]
69pub struct SimulationRow {
70 pub topic: String,
71 pub message: SimulationMessage,
72 pub delta_time: u64,
73}
74
75pub struct TetherPlaybackUtil {
76 stop_request_tx: mpsc::Sender<bool>,
77 stop_request_rx: mpsc::Receiver<bool>,
78 options: PlaybackOptions,
79}
80
81impl TetherPlaybackUtil {
82 pub fn new(options: PlaybackOptions) -> Self {
83 info!("Tether Playback Utility: initialise");
84
85 let (tx, rx) = mpsc::channel();
86 TetherPlaybackUtil {
87 stop_request_tx: tx,
88 stop_request_rx: rx,
89 options,
90 }
91 }
92
93 pub fn get_stop_tx(&self) -> mpsc::Sender<bool> {
94 self.stop_request_tx.clone()
95 }
96
97 pub fn start(&self, tether_agent: &TetherAgent) {
98 info!("Tether Playback Utility: start playback");
99
100 let filters: Option<Vec<String>> = self
101 .options
102 .topic_filters
103 .as_ref()
104 .map(|f_string| f_string.split(',').map(String::from).collect());
105
106 if let Some(t) = &self.options.override_topic {
107 warn!("Override topic provided; ALL topics in JSON entries will be ignored and replaced with \"{}\"",t);
108 }
109
110 let stop_from_key = self.stop_request_tx.clone();
111
112 if !self.options.ignore_ctrl_c {
113 warn!("Infinite loops requested; Press Ctr+C to stop");
114 ctrlc::set_handler(move || {
115 stop_from_key
117 .send(true)
118 .expect("failed to send stop from key");
119 warn!("received Ctrl+C! 2");
120 })
121 .expect("Error setting Ctrl-C handler");
122 } else {
123 warn!(
124 "No Ctrl+C handler set; you may need to kill this process manually, PID: {}",
125 std::process::id()
126 );
127 }
128
129 let mut finished = false;
130
131 let mut count = 0;
132
133 while !finished {
134 count += 1;
135 if !finished {
136 if !self.options.loop_infinite {
137 info!(
138 "Finite loops requested: starting loop {}/{}",
139 count, self.options.loop_count
140 );
141 } else {
142 info!("Infinite loops requested; starting loop {}", count);
143 }
144 if parse_json_rows(
145 &self.options.file_path,
146 tether_agent,
147 &filters,
148 &self.options.override_topic,
149 &self.stop_request_rx,
150 self.options.playback_speed,
151 ) {
152 warn!("Early exit; finish now");
153 finished = true;
154 }
155 }
156 if !self.options.loop_infinite && count >= self.options.loop_count {
157 info!("All {} loops completed", &self.options.loop_count);
158 finished = true;
159 }
160 }
161 }
162}
163
164fn parse_json_rows(
166 filename: &str,
167 tether_agent: &TetherAgent,
168 filters: &Option<Vec<String>>,
169 override_topic: &Option<String>,
170 should_stop_rx: &Receiver<bool>,
171 speed_factor: f32,
172) -> bool {
173 let file = File::open(filename).unwrap_or_else(|_| panic!("failed to open file {}", filename));
174 let reader = BufReader::new(file);
175 let deserializer = serde_json::Deserializer::from_reader(reader);
176 let mut iterator = deserializer.into_iter::<Vec<Value>>();
177 let top_level_array: Vec<Value> = iterator.next().unwrap().unwrap();
178
179 let mut finished = false;
180 let mut early_exit = false;
181 let mut index = 0;
184
185 while !finished {
186 while let Ok(_should_stop) = should_stop_rx.try_recv() {
187 early_exit = true;
188 finished = true;
189 }
190 if let Some(row_value) = top_level_array.get(index) {
191 let row: SimulationRow =
192 serde_json::from_value(row_value.clone()).expect("failed to decode JSON row");
193
194 let SimulationRow {
195 topic,
196 message,
197 delta_time,
198 } = &row;
199
200 let should_send: bool = match filters {
201 Some(filters) => filters.iter().map(String::from).any(|f| topic.contains(&f)),
202 None => true,
203 };
204
205 if should_send {
206 let payload = &message.data;
207
208 if !finished {
209 let delta_time = *delta_time as f64 / speed_factor as f64;
210 debug!("Sleeping {}ms ...", delta_time);
211 std::thread::sleep(std::time::Duration::from_millis(delta_time as u64));
212 let topic = match &override_topic {
213 Some(t) => String::from(t),
214 None => String::from(topic),
215 };
216
217 tether_agent
218 .publish_raw(&topic, payload, None, None)
219 .expect("failed to publish");
220 }
221 }
222
223 debug!("Got row {:?}", row);
224 } else {
225 finished = true;
226 }
227 index += 1;
228 }
229 early_exit
230}