autocore-std 3.3.30

Standard library for AutoCore control programs - shared memory, IPC, and logging utilities
Documentation
/// DAQ Capture Function Block
///
/// Manages the lifecycle of a triggered DAQ capture: arm the trigger, wait for
/// the capture to complete, and retrieve the captured data — all via IPC commands
/// to the autocore-ni module.
///
/// # State Machine
///
/// ```text
/// Idle ──(rising edge on execute)──> Arming
/// Arming ──(arm response OK)──────> WaitingForCapture  (active=true)
/// Arming ──(arm response error)───> Idle               (error=true)
/// WaitingForCapture ──(data_ready)> ReadingData
/// WaitingForCapture ──(timeout)───> Idle               (error=true)
/// ReadingData ──(read OK)─────────> Idle               (data populated)
/// ReadingData ──(read error)──────> Idle               (error=true)
/// ```
///
/// # Example
///
/// ```ignore
/// use autocore_std::fb::ni::DaqCapture;
///
/// struct MyProgram {
///     daq: DaqCapture,
/// }
///
/// impl ControlProgram for MyProgram {
///     type Memory = MyMemory;
///
///     fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
///         // 5 second timeout
///         self.daq.call(ctx.gm.arm_request, 5000, ctx.client);
///
///         if !self.daq.busy && !self.daq.error {
///             if let Some(data) = &self.daq.data {
///                 // data.channels[0] = first channel's samples
///                 // data.channels[1] = second channel's samples, etc.
///             }
///         }
///     }
/// }
/// ```
use crate::CommandClient;
use crate::fb::RTrig;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum State {
    Idle,
    Arming,
    WaitingForCapture,
    ReadingData,
}

/// Captured data returned after a successful DAQ trigger.
#[derive(Debug, Clone)]
pub struct CaptureData {
    /// Sample data per channel. Outer index = channel, inner = samples.
    /// Layout: `channels[ch_idx][sample_idx]`.
    pub channels: Vec<Vec<f64>>,
    /// Number of channels in the capture.
    pub channel_count: usize,
    /// Configured post-trigger samples per channel.
    pub capture_length: usize,
    /// Configured pre-trigger samples per channel.
    pub pre_trigger_samples: usize,
    /// Actual samples written per channel (pre + post).
    pub actual_samples: usize,
    /// Sample rate in Hz.
    pub sample_rate: f64,
    /// UNIX timestamp (nanoseconds) of the trigger event.
    pub timestamp_ns: u64,
    /// Capture sequence number (monotonically increasing).
    pub sequence: u64,
}

/// DAQ Capture function block for NI triggered recordings.
///
/// Create one instance per DAQ configuration in your control program struct.
/// Call [`DaqCapture::call`] every cycle with the execute signal and timeout.
pub struct DaqCapture {
    // Configuration
    daq_fqdn: String,

    // Outputs
    /// True while the FB is executing (from arm through data retrieval).
    pub busy: bool,
    /// True when the DAQ is armed and waiting for the hardware trigger.
    pub active: bool,
    /// True when an error occurred. Stays true until the next rising edge of execute.
    pub error: bool,
    /// Error description (empty when no error).
    pub error_message: String,
    /// Captured data. `Some` after a successful capture, `None` otherwise.
    pub data: Option<CaptureData>,

    // Internal state
    state: State,
    trigger: RTrig,
    arm_time: Option<std::time::Instant>,
    pending_tid: Option<u32>,
}

impl DaqCapture {
    /// Create a new DaqCapture function block.
    ///
    /// # Arguments
    ///
    /// * `daq_fqdn` - The fully qualified topic prefix for this DAQ, e.g. `"ni.impact"`.
    ///   The FB will send commands to `<daq_fqdn>.arm`, `<daq_fqdn>.capture_status`, etc.
    pub fn new(daq_fqdn: &str) -> Self {
        Self {
            daq_fqdn: daq_fqdn.to_string(),
            busy: false,
            active: false,
            error: false,
            error_message: String::new(),
            data: None,
            state: State::Idle,
            trigger: RTrig::new(),
            arm_time: None,
            pending_tid: None,
        }
    }

    /// Execute the DAQ capture state machine. Call once per control cycle.
    ///
    /// # Arguments
    ///
    /// * `execute` - Rising edge triggers a new capture sequence.
    /// * `timeout_ms` - Maximum time to wait for the capture to complete (milliseconds).
    /// * `client` - The IPC command client for sending commands to the NI module.
    pub fn call(&mut self, execute: bool, timeout_ms: u32, client: &mut CommandClient) {
        // Detect rising edge on execute
        if self.trigger.call(execute) {
            // Reset outputs on new execution
            self.error = false;
            self.error_message.clear();
            self.data = None;
            self.active = false;

            // Send arm command
            let tid = client.send(
                &format!("{}.arm", self.daq_fqdn),
                serde_json::json!({}),
            );
            self.pending_tid = Some(tid);
            self.arm_time = Some(std::time::Instant::now());
            self.busy = true;
            self.state = State::Arming;
            return;
        }

        match self.state {
            State::Idle => {}

            State::Arming => {
                // Check timeout
                if self.check_timeout(timeout_ms) { return; }

                // Poll for arm response
                if let Some(tid) = self.pending_tid {
                    if let Some(resp) = client.take_response(tid) {
                        self.pending_tid = None;
                        if resp.success {
                            self.active = true;
                            self.state = State::WaitingForCapture;
                            // Immediately send first status poll
                            let tid = client.send(
                                &format!("{}.capture_status", self.daq_fqdn),
                                serde_json::json!({}),
                            );
                            self.pending_tid = Some(tid);
                        } else {
                            self.set_error(&resp.error_message);
                        }
                    }
                }
            }

            State::WaitingForCapture => {
                // Check timeout
                if self.check_timeout(timeout_ms) { return; }

                // Poll capture_status response
                if let Some(tid) = self.pending_tid {
                    if let Some(resp) = client.take_response(tid) {
                        self.pending_tid = None;
                        if resp.success {
                            let data_ready = resp.data.get("data_ready")
                                .and_then(|v| v.as_bool())
                                .unwrap_or(false);

                            if data_ready {
                                // Capture complete — request the data
                                self.active = false;
                                let tid = client.send(
                                    &format!("{}.read_capture", self.daq_fqdn),
                                    serde_json::json!({}),
                                );
                                self.pending_tid = Some(tid);
                                self.state = State::ReadingData;
                            } else {
                                // Not ready yet — poll again next cycle
                                let tid = client.send(
                                    &format!("{}.capture_status", self.daq_fqdn),
                                    serde_json::json!({}),
                                );
                                self.pending_tid = Some(tid);
                            }
                        } else {
                            self.set_error(&resp.error_message);
                        }
                    }
                }
            }

            State::ReadingData => {
                // Check timeout
                if self.check_timeout(timeout_ms) { return; }

                // Poll read_capture response
                if let Some(tid) = self.pending_tid {
                    if let Some(resp) = client.take_response(tid) {
                        self.pending_tid = None;
                        if resp.success {
                            match Self::parse_capture_data(&resp.data) {
                                Ok(capture) => {
                                    self.data = Some(capture);
                                    self.busy = false;
                                    self.state = State::Idle;
                                }
                                Err(e) => {
                                    self.set_error(&e);
                                }
                            }
                        } else {
                            self.set_error(&resp.error_message);
                        }
                    }
                }
            }
        }
    }

    /// Check if the timeout has elapsed. If so, transition to error state.
    fn check_timeout(&mut self, timeout_ms: u32) -> bool {
        if let Some(t) = self.arm_time {
            if t.elapsed().as_millis() as u32 > timeout_ms {
                self.set_error("Capture timeout");
                return true;
            }
        }
        false
    }

    /// Transition to error state.
    fn set_error(&mut self, message: &str) {
        self.state = State::Idle;
        self.busy = false;
        self.active = false;
        self.error = true;
        self.error_message = message.to_string();
        self.pending_tid = None;
    }

    /// Parse the JSON response from `read_capture` into a `CaptureData` struct.
    fn parse_capture_data(data: &serde_json::Value) -> Result<CaptureData, String> {
        let channel_count = data.get("channel_count")
            .and_then(|v| v.as_u64())
            .ok_or("Missing channel_count")? as usize;
        let capture_length = data.get("capture_length")
            .and_then(|v| v.as_u64())
            .ok_or("Missing capture_length")? as usize;
        let pre_trigger_samples = data.get("pre_trigger_samples")
            .and_then(|v| v.as_u64())
            .ok_or("Missing pre_trigger_samples")? as usize;
        let actual_samples = data.get("actual_samples")
            .and_then(|v| v.as_u64())
            .ok_or("Missing actual_samples")? as usize;
        let sample_rate = data.get("sample_rate")
            .and_then(|v| v.as_f64())
            .ok_or("Missing sample_rate")?;
        let timestamp_ns = data.get("timestamp_ns")
            .and_then(|v| v.as_u64())
            .ok_or("Missing timestamp_ns")?;
        let sequence = data.get("sequence")
            .and_then(|v| v.as_u64())
            .ok_or("Missing sequence")?;

        let channels_arr = data.get("channels")
            .and_then(|v| v.as_array())
            .ok_or("Missing channels array")?;

        if channels_arr.len() != channel_count {
            return Err(format!(
                "channel_count mismatch: header says {} but got {} arrays",
                channel_count, channels_arr.len()
            ));
        }

        let channels: Vec<Vec<f64>> = channels_arr.iter()
            .map(|ch| {
                ch.as_array()
                    .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
                    .unwrap_or_default()
            })
            .collect();

        Ok(CaptureData {
            channels,
            channel_count,
            capture_length,
            pre_trigger_samples,
            actual_samples,
            sample_rate,
            timestamp_ns,
            sequence,
        })
    }
}

impl Default for DaqCapture {
    fn default() -> Self {
        Self::new("ni.capture")
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_capture_data() {
        let data = serde_json::json!({
            "channel_count": 2,
            "capture_length": 100,
            "pre_trigger_samples": 10,
            "actual_samples": 110,
            "sample_rate": 1000.0,
            "timestamp_ns": 1234567890u64,
            "sequence": 1u64,
            "channels": [
                [1.0, 2.0, 3.0],
                [4.0, 5.0, 6.0],
            ],
        });

        let capture = DaqCapture::parse_capture_data(&data).unwrap();
        assert_eq!(capture.channel_count, 2);
        assert_eq!(capture.capture_length, 100);
        assert_eq!(capture.pre_trigger_samples, 10);
        assert_eq!(capture.actual_samples, 110);
        assert_eq!(capture.sample_rate, 1000.0);
        assert_eq!(capture.channels[0], vec![1.0, 2.0, 3.0]);
        assert_eq!(capture.channels[1], vec![4.0, 5.0, 6.0]);
    }

    #[test]
    fn test_parse_capture_data_missing_field() {
        let data = serde_json::json!({"channel_count": 1});
        assert!(DaqCapture::parse_capture_data(&data).is_err());
    }
}