autocore_std/fb/ni/
task_timing.rs1use crate::CommandClient;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49enum State {
50 Idle,
51 Reading,
52}
53
54pub struct ReadTaskTiming {
56 task_fqdn: String,
59
60 actual_sample_rate: f64,
62 time_increment: f64,
63
64 busy: bool,
66 error: bool,
67 error_message: String,
68
69 pending_rate_tid: Option<u32>,
71 pending_ti_tid: Option<u32>,
72
73 state: State,
74 start_time: Option<std::time::Instant>,
75}
76
77impl ReadTaskTiming {
78 pub fn new(task_fqdn: &str) -> Self {
81 Self {
82 task_fqdn: task_fqdn.to_string(),
83 actual_sample_rate: 0.0,
84 time_increment: 0.0,
85 busy: false,
86 error: false,
87 error_message: String::new(),
88 pending_rate_tid: None,
89 pending_ti_tid: None,
90 state: State::Idle,
91 start_time: None,
92 }
93 }
94
95 pub fn is_busy(&self) -> bool { self.busy }
97
98 pub fn is_error(&self) -> bool { self.error }
101
102 pub fn error_message(&self) -> &str { &self.error_message }
103
104 pub fn actual_sample_rate(&self) -> f64 { self.actual_sample_rate }
106
107 pub fn time_increment(&self) -> f64 { self.time_increment }
110
111 pub fn start(&mut self, client: &mut CommandClient) {
116 self.error = false;
117 self.error_message.clear();
118 self.actual_sample_rate = 0.0;
119 self.time_increment = 0.0;
120
121 let rate_tid = client.send(
122 &format!("{}.actual_sample_rate", self.task_fqdn),
123 serde_json::json!({}),
124 );
125 let ti_tid = client.send(
126 &format!("{}.time_increment", self.task_fqdn),
127 serde_json::json!({}),
128 );
129
130 self.pending_rate_tid = Some(rate_tid);
131 self.pending_ti_tid = Some(ti_tid);
132 self.start_time = Some(std::time::Instant::now());
133 self.busy = true;
134 self.state = State::Reading;
135 }
136
137 pub fn reset(&mut self) {
141 self.state = State::Idle;
142 self.busy = false;
143 self.pending_rate_tid = None;
144 self.pending_ti_tid = None;
145 self.start_time = None;
146 }
147
148 pub fn tick(&mut self, timeout_ms: u32, client: &mut CommandClient) {
150 if self.state != State::Reading {
151 return;
152 }
153 if self.check_timeout(timeout_ms) {
154 return;
155 }
156
157 if let Some(tid) = self.pending_rate_tid {
158 if let Some(resp) = client.take_response(tid) {
159 self.pending_rate_tid = None;
160 if resp.success {
161 self.actual_sample_rate = resp.data.as_f64().unwrap_or(0.0);
162 } else {
163 self.set_error(&format!(
164 "actual_sample_rate read failed: {}", resp.error_message
165 ));
166 return;
167 }
168 }
169 }
170 if let Some(tid) = self.pending_ti_tid {
171 if let Some(resp) = client.take_response(tid) {
172 self.pending_ti_tid = None;
173 if resp.success {
174 self.time_increment = resp.data.as_f64().unwrap_or(0.0);
175 } else {
176 self.set_error(&format!(
177 "time_increment read failed: {}", resp.error_message
178 ));
179 return;
180 }
181 }
182 }
183
184 if self.pending_rate_tid.is_none() && self.pending_ti_tid.is_none() {
185 self.busy = false;
186 self.state = State::Idle;
187 self.start_time = None;
188 }
189 }
190
191 fn check_timeout(&mut self, timeout_ms: u32) -> bool {
192 if let Some(t) = self.start_time {
193 if t.elapsed().as_millis() as u32 > timeout_ms {
194 self.set_error("ReadTaskTiming: timed out waiting for responses");
195 return true;
196 }
197 }
198 false
199 }
200
201 fn set_error(&mut self, msg: &str) {
202 self.error = true;
203 self.error_message = msg.to_string();
204 self.busy = false;
205 self.state = State::Idle;
206 self.pending_rate_tid = None;
207 self.pending_ti_tid = None;
208 self.start_time = None;
209 }
210}
211
212#[cfg(test)]
217mod tests {
218 use super::*;
219 use mechutil::ipc::CommandMessage;
220 use tokio::sync::mpsc;
221
222 fn test_client() -> (
223 CommandClient,
224 mpsc::UnboundedReceiver<String>,
225 mpsc::UnboundedSender<CommandMessage>,
226 ) {
227 let (write_tx, write_rx) = mpsc::unbounded_channel::<String>();
228 let (response_tx, response_rx) = mpsc::unbounded_channel::<CommandMessage>();
229 (CommandClient::new(write_tx, response_rx), write_rx, response_tx)
230 }
231
232 fn drain_sent(rx: &mut mpsc::UnboundedReceiver<String>) -> Vec<CommandMessage> {
233 let mut out = Vec::new();
234 while let Ok(s) = rx.try_recv() {
235 if let Ok(m) = serde_json::from_str::<CommandMessage>(&s) {
236 out.push(m);
237 }
238 }
239 out
240 }
241
242 #[test]
243 fn start_fires_two_reads_and_marks_busy() {
244 let mut fb = ReadTaskTiming::new("ni.AnalogTask");
245 let (mut client, mut write_rx, _resp_tx) = test_client();
246
247 assert!(!fb.is_busy());
248 fb.start(&mut client);
249 assert!(fb.is_busy());
250
251 let sent = drain_sent(&mut write_rx);
252 assert_eq!(sent.len(), 2);
253 assert_eq!(sent[0].topic, "ni.AnalogTask.actual_sample_rate");
254 assert_eq!(sent[1].topic, "ni.AnalogTask.time_increment");
255 }
256
257 #[test]
258 fn both_successes_populate_values_and_clear_busy() {
259 let mut fb = ReadTaskTiming::new("ni.AnalogTask");
260 let (mut client, mut write_rx, resp_tx) = test_client();
261
262 fb.start(&mut client);
263 let sent = drain_sent(&mut write_rx);
264 let rate_tid = sent[0].transaction_id;
265 let ti_tid = sent[1].transaction_id;
266
267 resp_tx.send(CommandMessage::response(rate_tid, serde_json::json!(1024.75))).unwrap();
268 resp_tx.send(CommandMessage::response(ti_tid, serde_json::json!(0.0009759))).unwrap();
269
270 client.poll();
271 fb.tick(2000, &mut client);
272
273 assert!(!fb.is_busy());
274 assert!(!fb.is_error());
275 assert!((fb.actual_sample_rate() - 1024.75).abs() < 1e-6);
276 assert!((fb.time_increment() - 0.0009759).abs() < 1e-9);
277 }
278
279 #[test]
280 fn one_failure_short_circuits_and_sets_error() {
281 let mut fb = ReadTaskTiming::new("ni.BadTask");
282 let (mut client, mut write_rx, resp_tx) = test_client();
283
284 fb.start(&mut client);
285 let sent = drain_sent(&mut write_rx);
286
287 let mut err = CommandMessage::response(sent[0].transaction_id, serde_json::json!({}));
289 err.success = false;
290 err.error_message = "Unknown subtopic: badtask.actual_sample_rate".into();
291 resp_tx.send(err).unwrap();
292
293 client.poll();
294 fb.tick(2000, &mut client);
295
296 assert!(fb.is_error());
297 assert!(!fb.is_busy());
298 assert!(fb.error_message().contains("actual_sample_rate read failed"));
299 }
300
301 #[test]
302 fn timeout_sets_error() {
303 let mut fb = ReadTaskTiming::new("ni.SlowTask");
304 let (mut client, _write_rx, _resp_tx) = test_client();
305
306 fb.start(&mut client);
307 fb.start_time = Some(std::time::Instant::now() - std::time::Duration::from_secs(5));
309 fb.tick(100, &mut client);
310
311 assert!(fb.is_error());
312 assert!(!fb.is_busy());
313 assert!(fb.error_message().contains("timed out"));
314 }
315
316 #[test]
317 fn reset_drops_pending_state() {
318 let mut fb = ReadTaskTiming::new("ni.AnalogTask");
319 let (mut client, _write_rx, _resp_tx) = test_client();
320
321 fb.start(&mut client);
322 assert!(fb.is_busy());
323 fb.reset();
324 assert!(!fb.is_busy());
325 assert!(fb.pending_rate_tid.is_none());
326 assert!(fb.pending_ti_tid.is_none());
327 }
328}