Skip to main content

autocore_std/fb/ni/
task_timing.rs

1//! ReadTaskTiming function block.
2//!
3//! Fetches the DAQmx-measured sample rate and time increment for a single NI
4//! task via IPC. Uses the standard autocore-std FB lifecycle:
5//!
6//! 1. `start(client)` — fires two read requests (`actual_sample_rate` and
7//!    `time_increment`).
8//! 2. `tick(timeout_ms, client)` on every scan — drains the responses.
9//! 3. Poll `is_busy()` / `is_error()` to know when the fetch completed and
10//!    whether it succeeded; read `actual_sample_rate()` / `time_increment()`
11//!    once `is_busy()` has cleared without error.
12//!
13//! Two separate reads are used rather than one composite so each response
14//! independently surfaces any routing failure (e.g. the task name was
15//! typoed) — the FB reports error on the first failure and bails.
16//!
17//! # Example
18//!
19//! ```ignore
20//! use autocore_std::fb::ni::ReadTaskTiming;
21//!
22//! pub struct MyProgram {
23//!     task_timing: ReadTaskTiming,
24//! }
25//!
26//! impl MyProgram {
27//!     pub fn new() -> Self {
28//!         Self { task_timing: ReadTaskTiming::new("ni.AnalogTask") }
29//!     }
30//!
31//!     fn tick_timing(&mut self, ctx: &mut TickContext<GM>) {
32//!         self.task_timing.tick(1000, ctx.client);
33//!
34//!         if !self.task_timing.is_busy() {
35//!             if self.task_timing.is_error() {
36//!                 log::warn!("timing read failed: {}", self.task_timing.error_message());
37//!             } else {
38//!                 let rate = self.task_timing.actual_sample_rate();
39//!                 // ... use rate, e.g. stash into ctx.gm so the HMI can show it.
40//!             }
41//!         }
42//!     }
43//! }
44//! ```
45
46use crate::CommandClient;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49enum State {
50    Idle,
51    Reading,
52}
53
54/// One-shot read of a task's actual sample rate and time increment.
55pub struct ReadTaskTiming {
56    /// Fully-qualified task name, e.g., `"ni.AnalogTask"`. Case-insensitive
57    /// server-side — the module matches against its configured task names.
58    task_fqdn: String,
59
60    // Results (only meaningful once !is_busy() && !is_error()).
61    actual_sample_rate: f64,
62    time_increment: f64,
63
64    // Standard FB outputs.
65    busy: bool,
66    error: bool,
67    error_message: String,
68
69    // Per-request transaction IDs. Each is cleared as its response lands.
70    pending_rate_tid: Option<u32>,
71    pending_ti_tid:   Option<u32>,
72
73    state: State,
74    start_time: Option<std::time::Instant>,
75}
76
77impl ReadTaskTiming {
78    /// Create a new FB for the given task FQDN. Construction is cheap —
79    /// nothing is fetched until `start()` is called.
80    pub fn new(task_fqdn: &str) -> Self {
81        Self {
82            task_fqdn: task_fqdn.to_string(),
83            actual_sample_rate: 0.0,
84            time_increment: 0.0,
85            busy: false,
86            error: false,
87            error_message: String::new(),
88            pending_rate_tid: None,
89            pending_ti_tid:   None,
90            state: State::Idle,
91            start_time: None,
92        }
93    }
94
95    /// True while the FB is waiting for either read response.
96    pub fn is_busy(&self) -> bool { self.busy }
97
98    /// True when the most recent `start()` concluded in an error. Cleared
99    /// on the next `start()`.
100    pub fn is_error(&self) -> bool { self.error }
101
102    pub fn error_message(&self) -> &str { &self.error_message }
103
104    /// Last-read actual sample rate in Hz. Zero until a read has succeeded.
105    pub fn actual_sample_rate(&self) -> f64 { self.actual_sample_rate }
106
107    /// Last-read time increment per sample in seconds. Zero until a read
108    /// has succeeded.
109    pub fn time_increment(&self) -> f64 { self.time_increment }
110
111    /// Fire both reads. Safe to call repeatedly from the control program —
112    /// a prior in-flight read is abandoned and its transaction IDs are
113    /// dropped (the responses will eventually be discarded by the
114    /// CommandClient's unmatched-response path).
115    pub fn start(&mut self, client: &mut CommandClient) {
116        self.error = false;
117        self.error_message.clear();
118        self.actual_sample_rate = 0.0;
119        self.time_increment = 0.0;
120
121        let rate_tid = client.send(
122            &format!("{}.actual_sample_rate", self.task_fqdn),
123            serde_json::json!({}),
124        );
125        let ti_tid = client.send(
126            &format!("{}.time_increment", self.task_fqdn),
127            serde_json::json!({}),
128        );
129
130        self.pending_rate_tid = Some(rate_tid);
131        self.pending_ti_tid   = Some(ti_tid);
132        self.start_time = Some(std::time::Instant::now());
133        self.busy = true;
134        self.state = State::Reading;
135    }
136
137    /// Cancel any in-flight reads and return to idle. Already-sent IPC
138    /// requests will still get responses; the CommandClient will drop them
139    /// as unmatched.
140    pub fn reset(&mut self) {
141        self.state = State::Idle;
142        self.busy = false;
143        self.pending_rate_tid = None;
144        self.pending_ti_tid = None;
145        self.start_time = None;
146    }
147
148    /// Drive one scan cycle of the FB. Call every tick while `is_busy()`.
149    pub fn tick(&mut self, timeout_ms: u32, client: &mut CommandClient) {
150        if self.state != State::Reading {
151            return;
152        }
153        if self.check_timeout(timeout_ms) {
154            return;
155        }
156
157        if let Some(tid) = self.pending_rate_tid {
158            if let Some(resp) = client.take_response(tid) {
159                self.pending_rate_tid = None;
160                if resp.success {
161                    self.actual_sample_rate = resp.data.as_f64().unwrap_or(0.0);
162                } else {
163                    self.set_error(&format!(
164                        "actual_sample_rate read failed: {}", resp.error_message
165                    ));
166                    return;
167                }
168            }
169        }
170        if let Some(tid) = self.pending_ti_tid {
171            if let Some(resp) = client.take_response(tid) {
172                self.pending_ti_tid = None;
173                if resp.success {
174                    self.time_increment = resp.data.as_f64().unwrap_or(0.0);
175                } else {
176                    self.set_error(&format!(
177                        "time_increment read failed: {}", resp.error_message
178                    ));
179                    return;
180                }
181            }
182        }
183
184        if self.pending_rate_tid.is_none() && self.pending_ti_tid.is_none() {
185            self.busy = false;
186            self.state = State::Idle;
187            self.start_time = None;
188        }
189    }
190
191    fn check_timeout(&mut self, timeout_ms: u32) -> bool {
192        if let Some(t) = self.start_time {
193            if t.elapsed().as_millis() as u32 > timeout_ms {
194                self.set_error("ReadTaskTiming: timed out waiting for responses");
195                return true;
196            }
197        }
198        false
199    }
200
201    fn set_error(&mut self, msg: &str) {
202        self.error = true;
203        self.error_message = msg.to_string();
204        self.busy = false;
205        self.state = State::Idle;
206        self.pending_rate_tid = None;
207        self.pending_ti_tid = None;
208        self.start_time = None;
209    }
210}
211
212// -------------------------------------------------------------------------
213// Tests
214// -------------------------------------------------------------------------
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use mechutil::ipc::CommandMessage;
220    use tokio::sync::mpsc;
221
222    fn test_client() -> (
223        CommandClient,
224        mpsc::UnboundedReceiver<String>,
225        mpsc::UnboundedSender<CommandMessage>,
226    ) {
227        let (write_tx, write_rx) = mpsc::unbounded_channel::<String>();
228        let (response_tx, response_rx) = mpsc::unbounded_channel::<CommandMessage>();
229        (CommandClient::new(write_tx, response_rx), write_rx, response_tx)
230    }
231
232    fn drain_sent(rx: &mut mpsc::UnboundedReceiver<String>) -> Vec<CommandMessage> {
233        let mut out = Vec::new();
234        while let Ok(s) = rx.try_recv() {
235            if let Ok(m) = serde_json::from_str::<CommandMessage>(&s) {
236                out.push(m);
237            }
238        }
239        out
240    }
241
242    #[test]
243    fn start_fires_two_reads_and_marks_busy() {
244        let mut fb = ReadTaskTiming::new("ni.AnalogTask");
245        let (mut client, mut write_rx, _resp_tx) = test_client();
246
247        assert!(!fb.is_busy());
248        fb.start(&mut client);
249        assert!(fb.is_busy());
250
251        let sent = drain_sent(&mut write_rx);
252        assert_eq!(sent.len(), 2);
253        assert_eq!(sent[0].topic, "ni.AnalogTask.actual_sample_rate");
254        assert_eq!(sent[1].topic, "ni.AnalogTask.time_increment");
255    }
256
257    #[test]
258    fn both_successes_populate_values_and_clear_busy() {
259        let mut fb = ReadTaskTiming::new("ni.AnalogTask");
260        let (mut client, mut write_rx, resp_tx) = test_client();
261
262        fb.start(&mut client);
263        let sent = drain_sent(&mut write_rx);
264        let rate_tid = sent[0].transaction_id;
265        let ti_tid   = sent[1].transaction_id;
266
267        resp_tx.send(CommandMessage::response(rate_tid, serde_json::json!(1024.75))).unwrap();
268        resp_tx.send(CommandMessage::response(ti_tid,   serde_json::json!(0.0009759))).unwrap();
269
270        client.poll();
271        fb.tick(2000, &mut client);
272
273        assert!(!fb.is_busy());
274        assert!(!fb.is_error());
275        assert!((fb.actual_sample_rate() - 1024.75).abs() < 1e-6);
276        assert!((fb.time_increment() - 0.0009759).abs() < 1e-9);
277    }
278
279    #[test]
280    fn one_failure_short_circuits_and_sets_error() {
281        let mut fb = ReadTaskTiming::new("ni.BadTask");
282        let (mut client, mut write_rx, resp_tx) = test_client();
283
284        fb.start(&mut client);
285        let sent = drain_sent(&mut write_rx);
286
287        // First read returns error; second read never answered.
288        let mut err = CommandMessage::response(sent[0].transaction_id, serde_json::json!({}));
289        err.success = false;
290        err.error_message = "Unknown subtopic: badtask.actual_sample_rate".into();
291        resp_tx.send(err).unwrap();
292
293        client.poll();
294        fb.tick(2000, &mut client);
295
296        assert!(fb.is_error());
297        assert!(!fb.is_busy());
298        assert!(fb.error_message().contains("actual_sample_rate read failed"));
299    }
300
301    #[test]
302    fn timeout_sets_error() {
303        let mut fb = ReadTaskTiming::new("ni.SlowTask");
304        let (mut client, _write_rx, _resp_tx) = test_client();
305
306        fb.start(&mut client);
307        // Force the start_time back so check_timeout trips immediately.
308        fb.start_time = Some(std::time::Instant::now() - std::time::Duration::from_secs(5));
309        fb.tick(100, &mut client);
310
311        assert!(fb.is_error());
312        assert!(!fb.is_busy());
313        assert!(fb.error_message().contains("timed out"));
314    }
315
316    #[test]
317    fn reset_drops_pending_state() {
318        let mut fb = ReadTaskTiming::new("ni.AnalogTask");
319        let (mut client, _write_rx, _resp_tx) = test_client();
320
321        fb.start(&mut client);
322        assert!(fb.is_busy());
323        fb.reset();
324        assert!(!fb.is_busy());
325        assert!(fb.pending_rate_tid.is_none());
326        assert!(fb.pending_ti_tid.is_none());
327    }
328}