tether_utils/
tether_record.rs

1use std::{
2    fs::File,
3    io::{LineWriter, Write},
4    sync::mpsc,
5    time::{Duration, SystemTime},
6};
7
8use clap::Args;
9use log::{debug, info, warn};
10use tether_agent::TetherAgent;
11
12use crate::tether_playback::{SimulationMessage, SimulationRow};
13
14#[derive(Args, Clone)]
15pub struct RecordOptions {
16    /// Specify the full path for the recording file; overrides any other file args
17    pub file_override_path: Option<String>,
18
19    /// Base path for recording file
20    #[arg(long = "file.path", default_value_t=String::from("./"))]
21    pub file_base_path: String,
22
23    /// Base name for recording file, excluding timestamp and .json extension
24    #[arg(long = "file.name", default_value_t=String::from("recording"))]
25    pub file_base_name: String,
26
27    /// Flag to disable appending timestamp onto recording file name
28    #[arg(long = "file.noTimestamp")]
29    pub file_no_timestamp: bool,
30
31    /// Topic to subscribe; by default we recording everything
32    #[arg(long = "topic", default_value_t=String::from("#"))]
33    pub topic: String,
34
35    /// Flag to disable zero-ing of the first entry's deltaTime; using this
36    /// flag will count time from launch, not first message received
37    #[arg(long = "timing.nonzeroStart")]
38    pub timing_nonzero_start: bool,
39
40    /// Time (in seconds) to delay writing anything to disk, even if messages are
41    /// received
42    #[arg(long = "timing.delay")]
43    pub timing_delay: Option<f32>,
44
45    /// Duration (in seconds) to stop recording even if Ctrl+C has not been encountered
46    /// yet
47    #[arg(long = "timing.duration")]
48    pub timing_duration: Option<f32>,
49
50    /// Flag to disable registration of Ctrl+C handler - this is usually necessary
51    /// when using the utility programmatically (i.e. not via CLI)
52    #[arg(long = "ignoreCtrlC")]
53    pub ignore_ctrl_c: bool,
54}
55
56impl Default for RecordOptions {
57    fn default() -> Self {
58        RecordOptions {
59            file_override_path: None,
60            file_base_path: "./".into(),
61            file_base_name: "recording".into(),
62            file_no_timestamp: false,
63            topic: "#".into(),
64            timing_nonzero_start: false,
65            timing_delay: None,
66            timing_duration: None,
67            ignore_ctrl_c: false,
68        }
69    }
70}
71
72pub struct TetherRecordUtil {
73    stop_request_tx: mpsc::Sender<bool>,
74    stop_request_rx: mpsc::Receiver<bool>,
75    options: RecordOptions,
76}
77
78impl TetherRecordUtil {
79    pub fn new(options: RecordOptions) -> Self {
80        info!("Tether Record Utility: initialise");
81
82        let (tx, rx) = mpsc::channel();
83
84        TetherRecordUtil {
85            stop_request_tx: tx,
86            stop_request_rx: rx,
87            options,
88        }
89    }
90
91    pub fn get_stop_tx(&self) -> mpsc::Sender<bool> {
92        self.stop_request_tx.clone()
93    }
94    pub fn start_recording(&self, tether_agent: &mut TetherAgent) {
95        info!("Tether Record Utility: start recording");
96
97        // // The channel definition is never actually used, since we do no matching from topics to channel!
98        // // But we must set it up so that subscription happens.
99        // let _channel_def = ChannelOptionsBuilder::create_receiver("all")
100        //     .topic(Some(self.options.topic.clone()).as_deref()) // TODO: should be possible to build TPT
101        //     .build(tether_agent)
102        //     .expect("failed to create Channel Receiver");
103
104        if let Some(client) = tether_agent.client_mut() {
105            client
106                .subscribe(
107                    self.options.topic.clone(),
108                    tether_agent::mqtt::QoS::ExactlyOnce,
109                )
110                .expect("failed to subscribe");
111        } else {
112            panic!("No client to subscribe to!")
113        }
114
115        let file_path = match &self.options.file_override_path {
116            Some(override_path) => String::from(override_path),
117            None => {
118                if self.options.file_no_timestamp {
119                    format!(
120                        "{}{}.json",
121                        self.options.file_base_path, self.options.file_base_name
122                    )
123                } else {
124                    let timestamp = SystemTime::now()
125                        .duration_since(SystemTime::UNIX_EPOCH)
126                        .unwrap_or(Duration::from_secs(0))
127                        .as_secs();
128                    format!(
129                        "{}{}-{}.json",
130                        self.options.file_base_path, self.options.file_base_name, timestamp
131                    )
132                }
133            }
134        };
135
136        info!("Writing recorded data to \"{}\" ...", &file_path);
137
138        let file = File::create(&file_path).expect("failed to create file");
139        let mut file = LineWriter::new(file);
140
141        let buf = b"[";
142        file.write_all(buf).expect("failed to write first line");
143
144        let max_duration = match self.options.timing_duration {
145            Some(dur) => {
146                warn!(
147                    "Max duration was set to {}s; Ctr+C to stop before that point ...",
148                    dur
149                );
150                Some(Duration::from_secs_f32(dur))
151            }
152            None => {
153                warn!("No duration provided; recording runs until you press Ctrl+C ...");
154                None
155            }
156        };
157
158        let start_delay = match self.options.timing_delay {
159            Some(dur) => {
160                warn!("Recording will only start after {}s", dur);
161                Some(Duration::from_secs_f32(dur))
162            }
163            None => {
164                debug!("No start delay set");
165                None
166            }
167        };
168
169        let start_application_time = SystemTime::now();
170        let mut first_message_time = SystemTime::now();
171        let mut previous_message_time = SystemTime::now();
172
173        let mut count: i128 = 0;
174
175        let stop_from_key = self.stop_request_tx.clone();
176        let stop_from_timer = self.stop_request_tx.clone();
177        // let stop_tx_clone = stop_tx.clone();
178
179        // let should_stop = Arc::new(AtomicBool::new(false));
180        // let should_stop_clone = Arc::clone(&should_stop);
181
182        if !self.options.ignore_ctrl_c {
183            ctrlc::set_handler(move || {
184                // should_stop_clone.store(true, Ordering::Relaxed);
185                stop_from_key
186                    .send(true)
187                    .expect("failed to send stop from key");
188                warn!("received Ctrl+C!");
189            })
190            .expect("Error setting Ctrl-C handler");
191        } else {
192            warn!(
193                "No Ctrl+C handler set; you may need to kill this process manually, PID: {}",
194                std::process::id()
195            );
196        }
197
198        let mut finished = false;
199
200        while !finished {
201            if let Some(delay) = start_delay {
202                if let Ok(elapsed) = start_application_time.elapsed() {
203                    if elapsed < delay {
204                        continue;
205                    }
206                }
207            }
208
209            if let Some(dur) = max_duration {
210                if let Ok(elapsed) = first_message_time.elapsed() {
211                    if elapsed > dur {
212                        if count == 0 && !self.options.timing_nonzero_start {
213                            debug!("Ignore duration; nothing received yet")
214                        } else {
215                            warn!(
216                                "Exceeded the max duration specified ({}s); will stop now...",
217                                dur.as_secs_f32()
218                            );
219                            // should_stop.store(true, Ordering::Relaxed);
220                            stop_from_timer
221                                .send(true)
222                                .expect("failed to send stop from timer");
223                        }
224                    }
225                }
226            }
227            if let Ok(_should_stop) = self.stop_request_rx.try_recv() {
228                info!(
229                    "Stopping after {} entries written to disk; end file cleanly, wait then exit",
230                    count
231                );
232                file.write_all(b"\n]")
233                    .expect("failed to close JSON file properly");
234                file.flush().unwrap();
235                std::thread::sleep(Duration::from_secs(2));
236                debug!("Exit now");
237                // exit(0);
238                finished = true;
239            } else {
240                let mut did_work = false;
241                while let Some((topic, payload)) = tether_agent.check_messages() {
242                    did_work = true;
243
244                    let delta_time = if count == 0 && !self.options.timing_nonzero_start {
245                        first_message_time = SystemTime::now();
246                        Duration::ZERO
247                    } else {
248                        previous_message_time.elapsed().unwrap_or_default()
249                    };
250                    previous_message_time = SystemTime::now();
251
252                    let full_topic_string = topic.full_topic_string();
253
254                    debug!("Received message on topic \"{}\"", &full_topic_string);
255                    let row = SimulationRow {
256                        topic: full_topic_string,
257                        message: SimulationMessage {
258                            r#type: "Buffer".into(),
259                            data: payload.to_vec(),
260                        },
261                        delta_time: delta_time.as_millis() as u64,
262                    };
263
264                    if count == 0 {
265                        file.write_all(b"\n").unwrap(); // line break only
266                        info!("First message written to disk");
267                    } else {
268                        file.write_all(b",\n").unwrap(); // comma, line break
269                    }
270
271                    let json =
272                        serde_json::to_string(&row).expect("failed to convert to stringified JSON");
273                    file.write_all(json.as_bytes())
274                        .expect("failed to write new entry");
275
276                    file.flush().unwrap();
277
278                    count += 1;
279
280                    debug!("Wrote {} rows", count);
281                }
282                if !did_work {
283                    std::thread::sleep(std::time::Duration::from_micros(100)); //0.1 ms
284                }
285            }
286        }
287    }
288}