tether_utils/
tether_playback.rs

1use std::{
2    fs::File,
3    io::BufReader,
4    sync::mpsc::{self, Receiver},
5};
6
7use clap::Args;
8use log::{debug, info, warn};
9
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use tether_agent::TetherAgent;
13
14#[derive(Args, Clone)]
15pub struct PlaybackOptions {
16    /// JSON file to load recording from
17    #[arg(default_value_t=String::from("./demo.json"))]
18    pub file_path: String,
19
20    /// Comma-separated strings used to filter messages by topic
21    #[arg(long = "topic.filter")]
22    pub topic_filters: Option<String>,
23
24    /// Overide any original topics saved in the file, to use with every published message
25    #[arg(long = "topic.override")]
26    pub override_topic: Option<String>,
27
28    /// Speed up or slow down playback (e.g. 2.0 = double speed)
29    #[arg(long = "playback.speed", default_value_t = 1.0)]
30    pub playback_speed: f32,
31
32    /// How many times to loop playback
33    #[arg(long = "loops.count", default_value_t = 1)]
34    pub loop_count: usize,
35
36    /// Flag to enable infinite looping for playback (ignore loops.count if enabled)
37    #[arg(long = "loops.infinite")]
38    pub loop_infinite: bool,
39
40    /// Flag to disable registration of Ctrl+C handler - this is usually necessary
41    /// when using the utility programmatically (i.e. not via CLI)
42    #[arg(long = "ignoreCtrlC")]
43    pub ignore_ctrl_c: bool,
44}
45
46impl Default for PlaybackOptions {
47    fn default() -> Self {
48        PlaybackOptions {
49            file_path: "./demo.json".into(),
50            override_topic: None,
51            loop_count: 1,
52            loop_infinite: false,
53            ignore_ctrl_c: false,
54            playback_speed: 1.0,
55            topic_filters: None,
56        }
57    }
58}
59
60#[derive(Serialize, Deserialize, Debug)]
61#[serde(rename_all = "camelCase")]
62pub struct SimulationMessage {
63    pub r#type: String,
64    pub data: Vec<u8>,
65}
66
67#[derive(Serialize, Deserialize, Debug)]
68#[serde(rename_all = "camelCase")]
69pub struct SimulationRow {
70    pub topic: String,
71    pub message: SimulationMessage,
72    pub delta_time: u64,
73}
74
75pub struct TetherPlaybackUtil {
76    stop_request_tx: mpsc::Sender<bool>,
77    stop_request_rx: mpsc::Receiver<bool>,
78    options: PlaybackOptions,
79}
80
81impl TetherPlaybackUtil {
82    pub fn new(options: PlaybackOptions) -> Self {
83        info!("Tether Playback Utility: initialise");
84
85        let (tx, rx) = mpsc::channel();
86        TetherPlaybackUtil {
87            stop_request_tx: tx,
88            stop_request_rx: rx,
89            options,
90        }
91    }
92
93    pub fn get_stop_tx(&self) -> mpsc::Sender<bool> {
94        self.stop_request_tx.clone()
95    }
96
97    pub fn start(&self, tether_agent: &TetherAgent) {
98        info!("Tether Playback Utility: start playback");
99
100        let filters: Option<Vec<String>> = self
101            .options
102            .topic_filters
103            .as_ref()
104            .map(|f_string| f_string.split(',').map(String::from).collect());
105
106        if let Some(t) = &self.options.override_topic {
107            warn!("Override topic provided; ALL topics in JSON entries will be ignored and replaced with \"{}\"",t);
108        }
109
110        let stop_from_key = self.stop_request_tx.clone();
111
112        if !self.options.ignore_ctrl_c {
113            warn!("Infinite loops requested; Press Ctr+C to stop");
114            ctrlc::set_handler(move || {
115                // should_stop_clone.store(true, Ordering::Relaxed);
116                stop_from_key
117                    .send(true)
118                    .expect("failed to send stop from key");
119                warn!("received Ctrl+C! 2");
120            })
121            .expect("Error setting Ctrl-C handler");
122        } else {
123            warn!(
124                "No Ctrl+C handler set; you may need to kill this process manually, PID: {}",
125                std::process::id()
126            );
127        }
128
129        let mut finished = false;
130
131        let mut count = 0;
132
133        while !finished {
134            count += 1;
135            if !finished {
136                if !self.options.loop_infinite {
137                    info!(
138                        "Finite loops requested: starting loop {}/{}",
139                        count, self.options.loop_count
140                    );
141                } else {
142                    info!("Infinite loops requested; starting loop {}", count);
143                }
144                if parse_json_rows(
145                    &self.options.file_path,
146                    tether_agent,
147                    &filters,
148                    &self.options.override_topic,
149                    &self.stop_request_rx,
150                    self.options.playback_speed,
151                ) {
152                    warn!("Early exit; finish now");
153                    finished = true;
154                }
155            }
156            if !self.options.loop_infinite && count >= self.options.loop_count {
157                info!("All {} loops completed", &self.options.loop_count);
158                finished = true;
159            }
160        }
161    }
162}
163
164/// Parse rows and return true when finished
165fn parse_json_rows(
166    filename: &str,
167    tether_agent: &TetherAgent,
168    filters: &Option<Vec<String>>,
169    override_topic: &Option<String>,
170    should_stop_rx: &Receiver<bool>,
171    speed_factor: f32,
172) -> bool {
173    let file = File::open(filename).unwrap_or_else(|_| panic!("failed to open file {}", filename));
174    let reader = BufReader::new(file);
175    let deserializer = serde_json::Deserializer::from_reader(reader);
176    let mut iterator = deserializer.into_iter::<Vec<Value>>();
177    let top_level_array: Vec<Value> = iterator.next().unwrap().unwrap();
178
179    let mut finished = false;
180    let mut early_exit = false;
181    // let rows = top_level_array.into_iter();
182
183    let mut index = 0;
184
185    while !finished {
186        while let Ok(_should_stop) = should_stop_rx.try_recv() {
187            early_exit = true;
188            finished = true;
189        }
190        if let Some(row_value) = top_level_array.get(index) {
191            let row: SimulationRow =
192                serde_json::from_value(row_value.clone()).expect("failed to decode JSON row");
193
194            let SimulationRow {
195                topic,
196                message,
197                delta_time,
198            } = &row;
199
200            let should_send: bool = match filters {
201                Some(filters) => filters.iter().map(String::from).any(|f| topic.contains(&f)),
202                None => true,
203            };
204
205            if should_send {
206                let payload = &message.data;
207
208                if !finished {
209                    let delta_time = *delta_time as f64 / speed_factor as f64;
210                    debug!("Sleeping {}ms ...", delta_time);
211                    std::thread::sleep(std::time::Duration::from_millis(delta_time as u64));
212                    let topic = match &override_topic {
213                        Some(t) => String::from(t),
214                        None => String::from(topic),
215                    };
216
217                    tether_agent
218                        .publish_raw(&topic, payload, None, None)
219                        .expect("failed to publish");
220                }
221            }
222
223            debug!("Got row {:?}", row);
224        } else {
225            finished = true;
226        }
227        index += 1;
228    }
229    early_exit
230}