tether_utils/
tether_record.rs1use std::{
2 fs::File,
3 io::{LineWriter, Write},
4 sync::mpsc,
5 time::{Duration, SystemTime},
6};
7
8use clap::Args;
9use log::{debug, info, warn};
10use tether_agent::TetherAgent;
11
12use crate::tether_playback::{SimulationMessage, SimulationRow};
13
14#[derive(Args, Clone)]
15pub struct RecordOptions {
16 pub file_override_path: Option<String>,
18
19 #[arg(long = "file.path", default_value_t=String::from("./"))]
21 pub file_base_path: String,
22
23 #[arg(long = "file.name", default_value_t=String::from("recording"))]
25 pub file_base_name: String,
26
27 #[arg(long = "file.noTimestamp")]
29 pub file_no_timestamp: bool,
30
31 #[arg(long = "topic", default_value_t=String::from("#"))]
33 pub topic: String,
34
35 #[arg(long = "timing.nonzeroStart")]
38 pub timing_nonzero_start: bool,
39
40 #[arg(long = "timing.delay")]
43 pub timing_delay: Option<f32>,
44
45 #[arg(long = "timing.duration")]
48 pub timing_duration: Option<f32>,
49
50 #[arg(long = "ignoreCtrlC")]
53 pub ignore_ctrl_c: bool,
54}
55
56impl Default for RecordOptions {
57 fn default() -> Self {
58 RecordOptions {
59 file_override_path: None,
60 file_base_path: "./".into(),
61 file_base_name: "recording".into(),
62 file_no_timestamp: false,
63 topic: "#".into(),
64 timing_nonzero_start: false,
65 timing_delay: None,
66 timing_duration: None,
67 ignore_ctrl_c: false,
68 }
69 }
70}
71
72pub struct TetherRecordUtil {
73 stop_request_tx: mpsc::Sender<bool>,
74 stop_request_rx: mpsc::Receiver<bool>,
75 options: RecordOptions,
76}
77
78impl TetherRecordUtil {
79 pub fn new(options: RecordOptions) -> Self {
80 info!("Tether Record Utility: initialise");
81
82 let (tx, rx) = mpsc::channel();
83
84 TetherRecordUtil {
85 stop_request_tx: tx,
86 stop_request_rx: rx,
87 options,
88 }
89 }
90
91 pub fn get_stop_tx(&self) -> mpsc::Sender<bool> {
92 self.stop_request_tx.clone()
93 }
94 pub fn start_recording(&self, tether_agent: &mut TetherAgent) {
95 info!("Tether Record Utility: start recording");
96
97 if let Some(client) = tether_agent.client_mut() {
105 client
106 .subscribe(
107 self.options.topic.clone(),
108 tether_agent::mqtt::QoS::ExactlyOnce,
109 )
110 .expect("failed to subscribe");
111 } else {
112 panic!("No client to subscribe to!")
113 }
114
115 let file_path = match &self.options.file_override_path {
116 Some(override_path) => String::from(override_path),
117 None => {
118 if self.options.file_no_timestamp {
119 format!(
120 "{}{}.json",
121 self.options.file_base_path, self.options.file_base_name
122 )
123 } else {
124 let timestamp = SystemTime::now()
125 .duration_since(SystemTime::UNIX_EPOCH)
126 .unwrap_or(Duration::from_secs(0))
127 .as_secs();
128 format!(
129 "{}{}-{}.json",
130 self.options.file_base_path, self.options.file_base_name, timestamp
131 )
132 }
133 }
134 };
135
136 info!("Writing recorded data to \"{}\" ...", &file_path);
137
138 let file = File::create(&file_path).expect("failed to create file");
139 let mut file = LineWriter::new(file);
140
141 let buf = b"[";
142 file.write_all(buf).expect("failed to write first line");
143
144 let max_duration = match self.options.timing_duration {
145 Some(dur) => {
146 warn!(
147 "Max duration was set to {}s; Ctr+C to stop before that point ...",
148 dur
149 );
150 Some(Duration::from_secs_f32(dur))
151 }
152 None => {
153 warn!("No duration provided; recording runs until you press Ctrl+C ...");
154 None
155 }
156 };
157
158 let start_delay = match self.options.timing_delay {
159 Some(dur) => {
160 warn!("Recording will only start after {}s", dur);
161 Some(Duration::from_secs_f32(dur))
162 }
163 None => {
164 debug!("No start delay set");
165 None
166 }
167 };
168
169 let start_application_time = SystemTime::now();
170 let mut first_message_time = SystemTime::now();
171 let mut previous_message_time = SystemTime::now();
172
173 let mut count: i128 = 0;
174
175 let stop_from_key = self.stop_request_tx.clone();
176 let stop_from_timer = self.stop_request_tx.clone();
177 if !self.options.ignore_ctrl_c {
183 ctrlc::set_handler(move || {
184 stop_from_key
186 .send(true)
187 .expect("failed to send stop from key");
188 warn!("received Ctrl+C!");
189 })
190 .expect("Error setting Ctrl-C handler");
191 } else {
192 warn!(
193 "No Ctrl+C handler set; you may need to kill this process manually, PID: {}",
194 std::process::id()
195 );
196 }
197
198 let mut finished = false;
199
200 while !finished {
201 if let Some(delay) = start_delay {
202 if let Ok(elapsed) = start_application_time.elapsed() {
203 if elapsed < delay {
204 continue;
205 }
206 }
207 }
208
209 if let Some(dur) = max_duration {
210 if let Ok(elapsed) = first_message_time.elapsed() {
211 if elapsed > dur {
212 if count == 0 && !self.options.timing_nonzero_start {
213 debug!("Ignore duration; nothing received yet")
214 } else {
215 warn!(
216 "Exceeded the max duration specified ({}s); will stop now...",
217 dur.as_secs_f32()
218 );
219 stop_from_timer
221 .send(true)
222 .expect("failed to send stop from timer");
223 }
224 }
225 }
226 }
227 if let Ok(_should_stop) = self.stop_request_rx.try_recv() {
228 info!(
229 "Stopping after {} entries written to disk; end file cleanly, wait then exit",
230 count
231 );
232 file.write_all(b"\n]")
233 .expect("failed to close JSON file properly");
234 file.flush().unwrap();
235 std::thread::sleep(Duration::from_secs(2));
236 debug!("Exit now");
237 finished = true;
239 } else {
240 let mut did_work = false;
241 while let Some((topic, payload)) = tether_agent.check_messages() {
242 did_work = true;
243
244 let delta_time = if count == 0 && !self.options.timing_nonzero_start {
245 first_message_time = SystemTime::now();
246 Duration::ZERO
247 } else {
248 previous_message_time.elapsed().unwrap_or_default()
249 };
250 previous_message_time = SystemTime::now();
251
252 let full_topic_string = topic.full_topic_string();
253
254 debug!("Received message on topic \"{}\"", &full_topic_string);
255 let row = SimulationRow {
256 topic: full_topic_string,
257 message: SimulationMessage {
258 r#type: "Buffer".into(),
259 data: payload.to_vec(),
260 },
261 delta_time: delta_time.as_millis() as u64,
262 };
263
264 if count == 0 {
265 file.write_all(b"\n").unwrap(); info!("First message written to disk");
267 } else {
268 file.write_all(b",\n").unwrap(); }
270
271 let json =
272 serde_json::to_string(&row).expect("failed to convert to stringified JSON");
273 file.write_all(json.as_bytes())
274 .expect("failed to write new entry");
275
276 file.flush().unwrap();
277
278 count += 1;
279
280 debug!("Wrote {} rows", count);
281 }
282 if !did_work {
283 std::thread::sleep(std::time::Duration::from_micros(100)); }
285 }
286 }
287 }
288}