Skip to main content

autocore_std/fb/ni/
daq_capture.rs

1/// DAQ Capture Function Block
2///
3/// Manages the lifecycle of a triggered DAQ capture: arm the trigger, wait for
4/// the capture to complete, and retrieve the captured data — all via IPC commands
5/// to the autocore-ni module.
6///
7/// # State Machine
8///
9/// ```text
10/// Idle ──(rising edge on execute)──> Arming
11/// Arming ──(arm response OK)──────> WaitingForCapture  (active=true)
12/// Arming ──(arm response error)───> Idle               (error=true)
13/// WaitingForCapture ──(data_ready)> ReadingData
14/// WaitingForCapture ──(timeout)───> Idle               (error=true)
15/// ReadingData ──(read OK)─────────> Idle               (data populated)
16/// ReadingData ──(read error)──────> Idle               (error=true)
17/// ```
18///
19/// # Example
20///
21/// ```ignore
22/// use autocore_std::fb::ni::DaqCapture;
23///
24/// struct MyProgram {
25///     daq: DaqCapture,
26/// }
27///
28/// impl ControlProgram for MyProgram {
29///     type Memory = MyMemory;
30///
31///     fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
32///         // 5 second timeout
33///         self.daq.call(ctx.gm.arm_request, 5000, ctx.client);
34///
35///         if !self.daq.busy && !self.daq.error {
36///             if let Some(data) = &self.daq.data {
37///                 // data.channels[0] = first channel's samples
38///                 // data.channels[1] = second channel's samples, etc.
39///             }
40///         }
41///     }
42/// }
43/// ```
44use crate::CommandClient;
45use crate::fb::RTrig;
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48enum State {
49    Idle,
50    Arming,
51    WaitingForCapture,
52    ReadingData,
53}
54
55/// Captured data returned after a successful DAQ trigger.
56#[derive(Debug, Clone)]
57pub struct CaptureData {
58    /// Sample data per channel. Outer index = channel, inner = samples.
59    /// Layout: `channels[ch_idx][sample_idx]`.
60    pub channels: Vec<Vec<f64>>,
61    /// Number of channels in the capture.
62    pub channel_count: usize,
63    /// Configured post-trigger samples per channel.
64    pub capture_length: usize,
65    /// Configured pre-trigger samples per channel.
66    pub pre_trigger_samples: usize,
67    /// Actual samples written per channel (pre + post).
68    pub actual_samples: usize,
69    /// Sample rate in Hz.
70    pub sample_rate: f64,
71    /// UNIX timestamp (nanoseconds) of the trigger event.
72    pub timestamp_ns: u64,
73    /// Capture sequence number (monotonically increasing).
74    pub sequence: u64,
75}
76
77/// DAQ Capture function block for NI triggered recordings.
78///
79/// Create one instance per DAQ configuration in your control program struct.
80/// Call [`DaqCapture::call`] every cycle with the execute signal and timeout.
81pub struct DaqCapture {
82    // Configuration
83    daq_fqdn: String,
84
85    // Outputs
86    /// True while the FB is executing (from arm through data retrieval).
87    pub busy: bool,
88    /// True when the DAQ is armed and waiting for the hardware trigger.
89    pub active: bool,
90    /// True when an error occurred. Stays true until the next rising edge of execute.
91    pub error: bool,
92    /// Error description (empty when no error).
93    pub error_message: String,
94    /// Captured data. `Some` after a successful capture, `None` otherwise.
95    pub data: Option<CaptureData>,
96
97    // Internal state
98    state: State,
99    trigger: RTrig,
100    arm_time: Option<std::time::Instant>,
101    pending_tid: Option<u32>,
102}
103
104impl DaqCapture {
105    /// Create a new DaqCapture function block.
106    ///
107    /// # Arguments
108    ///
109    /// * `daq_fqdn` - The fully qualified topic prefix for this DAQ, e.g. `"ni.impact"`.
110    ///   The FB will send commands to `<daq_fqdn>.arm`, `<daq_fqdn>.capture_status`, etc.
111    pub fn new(daq_fqdn: &str) -> Self {
112        Self {
113            daq_fqdn: daq_fqdn.to_string(),
114            busy: false,
115            active: false,
116            error: false,
117            error_message: String::new(),
118            data: None,
119            state: State::Idle,
120            trigger: RTrig::new(),
121            arm_time: None,
122            pending_tid: None,
123        }
124    }
125
126    /// Execute the DAQ capture state machine. Call once per control cycle.
127    ///
128    /// # Arguments
129    ///
130    /// * `execute` - Rising edge triggers a new capture sequence.
131    /// * `timeout_ms` - Maximum time to wait for the capture to complete (milliseconds).
132    /// * `client` - The IPC command client for sending commands to the NI module.
133    pub fn call(&mut self, execute: bool, timeout_ms: u32, client: &mut CommandClient) {
134        // Detect rising edge on execute
135        if self.trigger.call(execute) {
136            // Reset outputs on new execution
137            self.error = false;
138            self.error_message.clear();
139            self.data = None;
140            self.active = false;
141
142            // Send arm command
143            let tid = client.send(
144                &format!("{}.arm", self.daq_fqdn),
145                serde_json::json!({}),
146            );
147            self.pending_tid = Some(tid);
148            self.arm_time = Some(std::time::Instant::now());
149            self.busy = true;
150            self.state = State::Arming;
151            return;
152        }
153
154        match self.state {
155            State::Idle => {}
156
157            State::Arming => {
158                // Check timeout
159                if self.check_timeout(timeout_ms) { return; }
160
161                // Poll for arm response
162                if let Some(tid) = self.pending_tid {
163                    if let Some(resp) = client.take_response(tid) {
164                        self.pending_tid = None;
165                        if resp.success {
166                            self.active = true;
167                            self.state = State::WaitingForCapture;
168                            // Immediately send first status poll
169                            let tid = client.send(
170                                &format!("{}.capture_status", self.daq_fqdn),
171                                serde_json::json!({}),
172                            );
173                            self.pending_tid = Some(tid);
174                        } else {
175                            self.set_error(&resp.error_message);
176                        }
177                    }
178                }
179            }
180
181            State::WaitingForCapture => {
182                // Check timeout
183                if self.check_timeout(timeout_ms) { return; }
184
185                // Poll capture_status response
186                if let Some(tid) = self.pending_tid {
187                    if let Some(resp) = client.take_response(tid) {
188                        self.pending_tid = None;
189                        if resp.success {
190                            let data_ready = resp.data.get("data_ready")
191                                .and_then(|v| v.as_bool())
192                                .unwrap_or(false);
193
194                            if data_ready {
195                                // Capture complete — request the data
196                                self.active = false;
197                                let tid = client.send(
198                                    &format!("{}.read_capture", self.daq_fqdn),
199                                    serde_json::json!({}),
200                                );
201                                self.pending_tid = Some(tid);
202                                self.state = State::ReadingData;
203                            } else {
204                                // Not ready yet — poll again next cycle
205                                let tid = client.send(
206                                    &format!("{}.capture_status", self.daq_fqdn),
207                                    serde_json::json!({}),
208                                );
209                                self.pending_tid = Some(tid);
210                            }
211                        } else {
212                            self.set_error(&resp.error_message);
213                        }
214                    }
215                }
216            }
217
218            State::ReadingData => {
219                // Check timeout
220                if self.check_timeout(timeout_ms) { return; }
221
222                // Poll read_capture response
223                if let Some(tid) = self.pending_tid {
224                    if let Some(resp) = client.take_response(tid) {
225                        self.pending_tid = None;
226                        if resp.success {
227                            match Self::parse_capture_data(&resp.data) {
228                                Ok(capture) => {
229                                    self.data = Some(capture);
230                                    self.busy = false;
231                                    self.state = State::Idle;
232                                }
233                                Err(e) => {
234                                    self.set_error(&e);
235                                }
236                            }
237                        } else {
238                            self.set_error(&resp.error_message);
239                        }
240                    }
241                }
242            }
243        }
244    }
245
246    /// Check if the timeout has elapsed. If so, transition to error state.
247    fn check_timeout(&mut self, timeout_ms: u32) -> bool {
248        if let Some(t) = self.arm_time {
249            if t.elapsed().as_millis() as u32 > timeout_ms {
250                self.set_error("Capture timeout");
251                return true;
252            }
253        }
254        false
255    }
256
257    /// Transition to error state.
258    fn set_error(&mut self, message: &str) {
259        self.state = State::Idle;
260        self.busy = false;
261        self.active = false;
262        self.error = true;
263        self.error_message = message.to_string();
264        self.pending_tid = None;
265    }
266
267    /// Parse the JSON response from `read_capture` into a `CaptureData` struct.
268    fn parse_capture_data(data: &serde_json::Value) -> Result<CaptureData, String> {
269        let channel_count = data.get("channel_count")
270            .and_then(|v| v.as_u64())
271            .ok_or("Missing channel_count")? as usize;
272        let capture_length = data.get("capture_length")
273            .and_then(|v| v.as_u64())
274            .ok_or("Missing capture_length")? as usize;
275        let pre_trigger_samples = data.get("pre_trigger_samples")
276            .and_then(|v| v.as_u64())
277            .ok_or("Missing pre_trigger_samples")? as usize;
278        let actual_samples = data.get("actual_samples")
279            .and_then(|v| v.as_u64())
280            .ok_or("Missing actual_samples")? as usize;
281        let sample_rate = data.get("sample_rate")
282            .and_then(|v| v.as_f64())
283            .ok_or("Missing sample_rate")?;
284        let timestamp_ns = data.get("timestamp_ns")
285            .and_then(|v| v.as_u64())
286            .ok_or("Missing timestamp_ns")?;
287        let sequence = data.get("sequence")
288            .and_then(|v| v.as_u64())
289            .ok_or("Missing sequence")?;
290
291        let channels_arr = data.get("channels")
292            .and_then(|v| v.as_array())
293            .ok_or("Missing channels array")?;
294
295        if channels_arr.len() != channel_count {
296            return Err(format!(
297                "channel_count mismatch: header says {} but got {} arrays",
298                channel_count, channels_arr.len()
299            ));
300        }
301
302        let channels: Vec<Vec<f64>> = channels_arr.iter()
303            .map(|ch| {
304                ch.as_array()
305                    .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
306                    .unwrap_or_default()
307            })
308            .collect();
309
310        Ok(CaptureData {
311            channels,
312            channel_count,
313            capture_length,
314            pre_trigger_samples,
315            actual_samples,
316            sample_rate,
317            timestamp_ns,
318            sequence,
319        })
320    }
321}
322
323impl Default for DaqCapture {
324    fn default() -> Self {
325        Self::new("ni.capture")
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332
333    #[test]
334    fn test_parse_capture_data() {
335        let data = serde_json::json!({
336            "channel_count": 2,
337            "capture_length": 100,
338            "pre_trigger_samples": 10,
339            "actual_samples": 110,
340            "sample_rate": 1000.0,
341            "timestamp_ns": 1234567890u64,
342            "sequence": 1u64,
343            "channels": [
344                [1.0, 2.0, 3.0],
345                [4.0, 5.0, 6.0],
346            ],
347        });
348
349        let capture = DaqCapture::parse_capture_data(&data).unwrap();
350        assert_eq!(capture.channel_count, 2);
351        assert_eq!(capture.capture_length, 100);
352        assert_eq!(capture.pre_trigger_samples, 10);
353        assert_eq!(capture.actual_samples, 110);
354        assert_eq!(capture.sample_rate, 1000.0);
355        assert_eq!(capture.channels[0], vec![1.0, 2.0, 3.0]);
356        assert_eq!(capture.channels[1], vec![4.0, 5.0, 6.0]);
357    }
358
359    #[test]
360    fn test_parse_capture_data_missing_field() {
361        let data = serde_json::json!({"channel_count": 1});
362        assert!(DaqCapture::parse_capture_data(&data).is_err());
363    }
364}