autocore-std 3.3.43

Standard library for AutoCore control programs - shared memory, IPC, and logging utilities
Documentation
//! ReadTaskTiming function block.
//!
//! Fetches the DAQmx-measured sample rate and time increment for a single NI
//! task via IPC. Uses the standard autocore-std FB lifecycle:
//!
//! 1. `start(client)` — fires two read requests (`actual_sample_rate` and
//!    `time_increment`).
//! 2. `tick(timeout_ms, client)` on every scan — drains the responses.
//! 3. Poll `is_busy()` / `is_error()` to know when the fetch completed and
//!    whether it succeeded; read `actual_sample_rate()` / `time_increment()`
//!    once `is_busy()` has cleared without error.
//!
//! Two separate reads are used rather than one composite so each response
//! independently surfaces any routing failure (e.g. the task name was
//! typoed) — the FB reports error on the first failure and bails.
//!
//! # Example
//!
//! ```ignore
//! use autocore_std::fb::ni::ReadTaskTiming;
//!
//! pub struct MyProgram {
//!     task_timing: ReadTaskTiming,
//! }
//!
//! impl MyProgram {
//!     pub fn new() -> Self {
//!         Self { task_timing: ReadTaskTiming::new("ni.AnalogTask") }
//!     }
//!
//!     fn tick_timing(&mut self, ctx: &mut TickContext<GM>) {
//!         self.task_timing.tick(1000, ctx.client);
//!
//!         if !self.task_timing.is_busy() {
//!             if self.task_timing.is_error() {
//!                 log::warn!("timing read failed: {}", self.task_timing.error_message());
//!             } else {
//!                 let rate = self.task_timing.actual_sample_rate();
//!                 // ... use rate, e.g. stash into ctx.gm so the HMI can show it.
//!             }
//!         }
//!     }
//! }
//! ```

use crate::CommandClient;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum State {
    Idle,
    Reading,
}

/// One-shot read of a task's actual sample rate and time increment.
pub struct ReadTaskTiming {
    /// Fully-qualified task name, e.g., `"ni.AnalogTask"`. Case-insensitive
    /// server-side — the module matches against its configured task names.
    task_fqdn: String,

    // Results (only meaningful once !is_busy() && !is_error()).
    actual_sample_rate: f64,
    time_increment: f64,

    // Standard FB outputs.
    busy: bool,
    error: bool,
    error_message: String,

    // Per-request transaction IDs. Each is cleared as its response lands.
    pending_rate_tid: Option<u32>,
    pending_ti_tid:   Option<u32>,

    state: State,
    start_time: Option<std::time::Instant>,
}

impl ReadTaskTiming {
    /// Create a new FB for the given task FQDN. Construction is cheap —
    /// nothing is fetched until `start()` is called.
    pub fn new(task_fqdn: &str) -> Self {
        Self {
            task_fqdn: task_fqdn.to_string(),
            actual_sample_rate: 0.0,
            time_increment: 0.0,
            busy: false,
            error: false,
            error_message: String::new(),
            pending_rate_tid: None,
            pending_ti_tid:   None,
            state: State::Idle,
            start_time: None,
        }
    }

    /// True while the FB is waiting for either read response.
    pub fn is_busy(&self) -> bool { self.busy }

    /// True when the most recent `start()` concluded in an error. Cleared
    /// on the next `start()`.
    pub fn is_error(&self) -> bool { self.error }

    pub fn error_message(&self) -> &str { &self.error_message }

    /// Last-read actual sample rate in Hz. Zero until a read has succeeded.
    pub fn actual_sample_rate(&self) -> f64 { self.actual_sample_rate }

    /// Last-read time increment per sample in seconds. Zero until a read
    /// has succeeded.
    pub fn time_increment(&self) -> f64 { self.time_increment }

    /// Fire both reads. Safe to call repeatedly from the control program —
    /// a prior in-flight read is abandoned and its transaction IDs are
    /// dropped (the responses will eventually be discarded by the
    /// CommandClient's unmatched-response path).
    pub fn start(&mut self, client: &mut CommandClient) {
        self.error = false;
        self.error_message.clear();
        self.actual_sample_rate = 0.0;
        self.time_increment = 0.0;

        let rate_tid = client.send(
            &format!("{}.actual_sample_rate", self.task_fqdn),
            serde_json::json!({}),
        );
        let ti_tid = client.send(
            &format!("{}.time_increment", self.task_fqdn),
            serde_json::json!({}),
        );

        self.pending_rate_tid = Some(rate_tid);
        self.pending_ti_tid   = Some(ti_tid);
        self.start_time = Some(std::time::Instant::now());
        self.busy = true;
        self.state = State::Reading;
    }

    /// Cancel any in-flight reads and return to idle. Already-sent IPC
    /// requests will still get responses; the CommandClient will drop them
    /// as unmatched.
    pub fn reset(&mut self) {
        self.state = State::Idle;
        self.busy = false;
        self.pending_rate_tid = None;
        self.pending_ti_tid = None;
        self.start_time = None;
    }

    /// Drive one scan cycle of the FB. Call every tick while `is_busy()`.
    pub fn tick(&mut self, timeout_ms: u32, client: &mut CommandClient) {
        if self.state != State::Reading {
            return;
        }
        if self.check_timeout(timeout_ms) {
            return;
        }

        if let Some(tid) = self.pending_rate_tid {
            if let Some(resp) = client.take_response(tid) {
                self.pending_rate_tid = None;
                if resp.success {
                    self.actual_sample_rate = resp.data.as_f64().unwrap_or(0.0);
                } else {
                    self.set_error(&format!(
                        "actual_sample_rate read failed: {}", resp.error_message
                    ));
                    return;
                }
            }
        }
        if let Some(tid) = self.pending_ti_tid {
            if let Some(resp) = client.take_response(tid) {
                self.pending_ti_tid = None;
                if resp.success {
                    self.time_increment = resp.data.as_f64().unwrap_or(0.0);
                } else {
                    self.set_error(&format!(
                        "time_increment read failed: {}", resp.error_message
                    ));
                    return;
                }
            }
        }

        if self.pending_rate_tid.is_none() && self.pending_ti_tid.is_none() {
            self.busy = false;
            self.state = State::Idle;
            self.start_time = None;
        }
    }

    fn check_timeout(&mut self, timeout_ms: u32) -> bool {
        if let Some(t) = self.start_time {
            if t.elapsed().as_millis() as u32 > timeout_ms {
                self.set_error("ReadTaskTiming: timed out waiting for responses");
                return true;
            }
        }
        false
    }

    fn set_error(&mut self, msg: &str) {
        self.error = true;
        self.error_message = msg.to_string();
        self.busy = false;
        self.state = State::Idle;
        self.pending_rate_tid = None;
        self.pending_ti_tid = None;
        self.start_time = None;
    }
}

// -------------------------------------------------------------------------
// Tests
// -------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;
    use mechutil::ipc::CommandMessage;
    use tokio::sync::mpsc;

    fn test_client() -> (
        CommandClient,
        mpsc::UnboundedReceiver<String>,
        mpsc::UnboundedSender<CommandMessage>,
    ) {
        let (write_tx, write_rx) = mpsc::unbounded_channel::<String>();
        let (response_tx, response_rx) = mpsc::unbounded_channel::<CommandMessage>();
        (CommandClient::new(write_tx, response_rx), write_rx, response_tx)
    }

    fn drain_sent(rx: &mut mpsc::UnboundedReceiver<String>) -> Vec<CommandMessage> {
        let mut out = Vec::new();
        while let Ok(s) = rx.try_recv() {
            if let Ok(m) = serde_json::from_str::<CommandMessage>(&s) {
                out.push(m);
            }
        }
        out
    }

    #[test]
    fn start_fires_two_reads_and_marks_busy() {
        let mut fb = ReadTaskTiming::new("ni.AnalogTask");
        let (mut client, mut write_rx, _resp_tx) = test_client();

        assert!(!fb.is_busy());
        fb.start(&mut client);
        assert!(fb.is_busy());

        let sent = drain_sent(&mut write_rx);
        assert_eq!(sent.len(), 2);
        assert_eq!(sent[0].topic, "ni.AnalogTask.actual_sample_rate");
        assert_eq!(sent[1].topic, "ni.AnalogTask.time_increment");
    }

    #[test]
    fn both_successes_populate_values_and_clear_busy() {
        let mut fb = ReadTaskTiming::new("ni.AnalogTask");
        let (mut client, mut write_rx, resp_tx) = test_client();

        fb.start(&mut client);
        let sent = drain_sent(&mut write_rx);
        let rate_tid = sent[0].transaction_id;
        let ti_tid   = sent[1].transaction_id;

        resp_tx.send(CommandMessage::response(rate_tid, serde_json::json!(1024.75))).unwrap();
        resp_tx.send(CommandMessage::response(ti_tid,   serde_json::json!(0.0009759))).unwrap();

        client.poll();
        fb.tick(2000, &mut client);

        assert!(!fb.is_busy());
        assert!(!fb.is_error());
        assert!((fb.actual_sample_rate() - 1024.75).abs() < 1e-6);
        assert!((fb.time_increment() - 0.0009759).abs() < 1e-9);
    }

    #[test]
    fn one_failure_short_circuits_and_sets_error() {
        let mut fb = ReadTaskTiming::new("ni.BadTask");
        let (mut client, mut write_rx, resp_tx) = test_client();

        fb.start(&mut client);
        let sent = drain_sent(&mut write_rx);

        // First read returns error; second read never answered.
        let mut err = CommandMessage::response(sent[0].transaction_id, serde_json::json!({}));
        err.success = false;
        err.error_message = "Unknown subtopic: badtask.actual_sample_rate".into();
        resp_tx.send(err).unwrap();

        client.poll();
        fb.tick(2000, &mut client);

        assert!(fb.is_error());
        assert!(!fb.is_busy());
        assert!(fb.error_message().contains("actual_sample_rate read failed"));
    }

    #[test]
    fn timeout_sets_error() {
        let mut fb = ReadTaskTiming::new("ni.SlowTask");
        let (mut client, _write_rx, _resp_tx) = test_client();

        fb.start(&mut client);
        // Force the start_time back so check_timeout trips immediately.
        fb.start_time = Some(std::time::Instant::now() - std::time::Duration::from_secs(5));
        fb.tick(100, &mut client);

        assert!(fb.is_error());
        assert!(!fb.is_busy());
        assert!(fb.error_message().contains("timed out"));
    }

    #[test]
    fn reset_drops_pending_state() {
        let mut fb = ReadTaskTiming::new("ni.AnalogTask");
        let (mut client, _write_rx, _resp_tx) = test_client();

        fb.start(&mut client);
        assert!(fb.is_busy());
        fb.reset();
        assert!(!fb.is_busy());
        assert!(fb.pending_rate_tid.is_none());
        assert!(fb.pending_ti_tid.is_none());
    }
}