Skip to main content

autocore_std/fb/ni/
daq_capture.rs

1/// DAQ Capture Function Block
2///
3/// Manages the lifecycle of a triggered DAQ capture: arm the trigger, wait for
4/// the capture to complete, and retrieve the captured data — all via IPC commands
5/// to the autocore-ni module.
6///
7/// # State Machine
8///
9/// ```text
10/// Idle ──(rising edge on execute)──> Arming
11/// Arming ──(arm response OK)──────> WaitingForCapture  (active=true)
12/// Arming ──(arm response error)───> Idle               (error=true)
13/// WaitingForCapture ──(data_ready)> ReadingData
14/// WaitingForCapture ──(timeout)───> Idle               (error=true)
15/// ReadingData ──(read OK)─────────> Idle               (data populated)
16/// ReadingData ──(read error)──────> Idle               (error=true)
17/// ```
18///
19/// # Example
20///
21/// ```ignore
22/// use autocore_std::fb::ni::DaqCapture;
23///
24/// struct MyProgram {
25///     daq: DaqCapture,
26/// }
27///
28/// impl ControlProgram for MyProgram {
29///     type Memory = MyMemory;
30///
31///     fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
32///         // 5 second timeout
33///         self.daq.call(ctx.gm.arm_request, 5000, ctx.client);
34///
35///         if !self.daq.busy && !self.daq.error {
36///             if let Some(data) = &self.daq.data {
37///                 // data.channels[0] = first channel's samples
38///                 // data.channels[1] = second channel's samples, etc.
39///             }
40///         }
41///     }
42/// }
43/// ```
44use crate::CommandClient;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47enum State {
48    Idle,
49    Arming,
50    WaitingForCapture,
51    ReadingData,
52}
53
54/// Captured data returned after a successful DAQ trigger.
55#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
56pub struct CaptureData {
57    /// Sample data per channel. Outer index = channel, inner = samples.
58    /// Layout: `channels[ch_idx][sample_idx]`.
59    pub channels: Vec<Vec<f64>>,
60    /// Number of channels in the capture.
61    pub channel_count: usize,
62    /// Configured post-trigger samples per channel.
63    pub capture_length: usize,
64    /// Configured pre-trigger samples per channel.
65    pub pre_trigger_samples: usize,
66    /// Actual samples written per channel (pre + post).
67    pub actual_samples: usize,
68    /// Sample rate in Hz.
69    pub sample_rate: f64,
70    /// UNIX timestamp (nanoseconds) of the trigger event.
71    pub timestamp_ns: u64,
72    /// Capture sequence number (monotonically increasing).
73    pub sequence: u64,
74}
75
76/// DAQ Capture function block for NI triggered recordings.
77pub struct DaqCapture {
78    // Configuration
79    daq_fqdn: String,
80
81    // Outputs
82    /// True while the FB is executing (from arm through data retrieval).
83    pub busy: bool,
84    /// True when the DAQ is armed and waiting for the hardware trigger.
85    pub active: bool,
86    /// True when an error occurred. 
87    pub error: bool,
88    /// Error description (empty when no error).
89    pub error_message: String,
90    /// Captured data. `Some` after a successful capture, `None` otherwise.
91    pub data: Option<CaptureData>,
92
93    // Internal state
94    state: State,
95    arm_time: Option<std::time::Instant>,
96    pending_tid: Option<u32>,
97}
98
99impl DaqCapture {
100    /// Create a new DaqCapture function block.
101    pub fn new(daq_fqdn: &str) -> Self {
102        Self {
103            daq_fqdn: daq_fqdn.to_string(),
104            busy: false,
105            active: false,
106            error: false,
107            error_message: String::new(),
108            data: None,
109            state: State::Idle,
110            arm_time: None,
111            pending_tid: None,
112        }
113    }
114
115    /// The FB is busy running.
116    pub fn is_busy(&self) -> bool {
117        self.busy
118    }
119
120    /// The last requested command resulted in an error.
121    pub fn is_error(&self) -> bool {
122        self.error
123    }
124
125    /// Start a new capture sequence (Arm the DAQ).
126    pub fn start(&mut self, client: &mut CommandClient) {
127        self.error = false;
128        self.error_message.clear();
129        self.data = None;
130        self.active = false;
131
132        // Send arm command
133        let tid = client.send(
134            &format!("{}.arm", self.daq_fqdn),
135            serde_json::json!({}),
136        );
137        self.pending_tid = Some(tid);
138        self.arm_time = Some(std::time::Instant::now());
139        self.busy = true;
140        self.state = State::Arming;
141    }
142
143    /// Stop/Cancel the capture and return to idle.
144    pub fn reset(&mut self) {
145        self.state = State::Idle;
146        self.busy = false;
147        self.active = false;
148        self.pending_tid = None;
149    }
150
151    /// Update the Timer-trigger delay for this DAQ.
152    ///
153    /// Only valid when the FB is idle (i.e. before `start()` is called or after
154    /// the capture has completed). If called while the FB is busy, the request
155    /// is silently dropped — call `reset()` first if you need to abort and
156    /// reconfigure. The change is also rejected server-side if the DAQ has been
157    /// armed by another caller.
158    ///
159    /// Has no effect on DAQs whose trigger type is not `timer`.
160    pub fn set_trigger_delay(&mut self, client: &mut CommandClient, delay_ms: u32) {
161        if self.state != State::Idle {
162            return;
163        }
164        // Fire-and-forget: response is not tracked. Server-side errors (e.g.
165        // wrong trigger type, armed) appear in the autocore-ni log.
166        let _ = client.send(
167            &format!("{}.set_trigger_delay", self.daq_fqdn),
168            serde_json::json!({ "delay_ms": delay_ms }),
169        );
170    }
171
172    /// Execute one scan cycle of the DAQ capture state machine.
173    pub fn tick(&mut self, timeout_ms: u32, client: &mut CommandClient) {
174        match self.state {
175            State::Idle => {}
176
177            State::Arming => {
178                if self.check_timeout(timeout_ms) { return; }
179
180                if let Some(tid) = self.pending_tid {
181                    if let Some(resp) = client.take_response(tid) {
182                        self.pending_tid = None;
183                        if resp.success {
184                            self.active = true;
185                            self.state = State::WaitingForCapture;
186                            // Immediately send first status poll
187                            let tid = client.send(
188                                &format!("{}.capture_status", self.daq_fqdn),
189                                serde_json::json!({}),
190                            );
191                            self.pending_tid = Some(tid);
192                        } else {
193                            self.set_error(&resp.error_message);
194                        }
195                    }
196                }
197            }
198
199            State::WaitingForCapture => {
200                if self.check_timeout(timeout_ms) { return; }
201
202                if let Some(tid) = self.pending_tid {
203                    if let Some(resp) = client.take_response(tid) {
204                        self.pending_tid = None;
205                        if resp.success {
206                            let data_ready = resp.data.get("data_ready")
207                                .and_then(|v| v.as_bool())
208                                .unwrap_or(false);
209
210                            if data_ready {
211                                self.active = false;
212                                let tid = client.send(
213                                    &format!("{}.read_capture", self.daq_fqdn),
214                                    serde_json::json!({}),
215                                );
216                                self.pending_tid = Some(tid);
217                                self.state = State::ReadingData;
218                            } else {
219                                let tid = client.send(
220                                    &format!("{}.capture_status", self.daq_fqdn),
221                                    serde_json::json!({}),
222                                );
223                                self.pending_tid = Some(tid);
224                            }
225                        } else {
226                            self.set_error(&resp.error_message);
227                        }
228                    }
229                }
230            }
231
232            State::ReadingData => {
233                if self.check_timeout(timeout_ms) { return; }
234
235                if let Some(tid) = self.pending_tid {
236                    if let Some(resp) = client.take_response(tid) {
237                        self.pending_tid = None;
238                        if resp.success {
239                            match Self::parse_capture_data(&resp.data) {
240                                Ok(capture) => {
241                                    self.data = Some(capture);
242                                    self.busy = false;
243                                    self.state = State::Idle;
244                                }
245                                Err(e) => {
246                                    self.set_error(&e);
247                                }
248                            }
249                        } else {
250                            self.set_error(&resp.error_message);
251                        }
252                    }
253                }
254            }
255        }
256    }
257
258    fn check_timeout(&mut self, timeout_ms: u32) -> bool {
259        if let Some(t) = self.arm_time {
260            if t.elapsed().as_millis() as u32 > timeout_ms {
261                self.set_error("Capture timeout");
262                return true;
263            }
264        }
265        false
266    }
267
268    fn set_error(&mut self, message: &str) {
269        self.state = State::Idle;
270        self.busy = false;
271        self.active = false;
272        self.error = true;
273        self.error_message = message.to_string();
274        self.pending_tid = None;
275    }
276
277    fn parse_capture_data(data: &serde_json::Value) -> Result<CaptureData, String> {
278        let channel_count = data.get("channel_count")
279            .and_then(|v| v.as_u64())
280            .ok_or("Missing channel_count")? as usize;
281        let capture_length = data.get("capture_length")
282            .and_then(|v| v.as_u64())
283            .ok_or("Missing capture_length")? as usize;
284        let pre_trigger_samples = data.get("pre_trigger_samples")
285            .and_then(|v| v.as_u64())
286            .ok_or("Missing pre_trigger_samples")? as usize;
287        let actual_samples = data.get("actual_samples")
288            .and_then(|v| v.as_u64())
289            .ok_or("Missing actual_samples")? as usize;
290        let sample_rate = data.get("sample_rate")
291            .and_then(|v| v.as_f64())
292            .ok_or("Missing sample_rate")?;
293        let timestamp_ns = data.get("timestamp_ns")
294            .and_then(|v| v.as_u64())
295            .ok_or("Missing timestamp_ns")?;
296        let sequence = data.get("sequence")
297            .and_then(|v| v.as_u64())
298            .ok_or("Missing sequence")?;
299
300        let channels_arr = data.get("channels")
301            .and_then(|v| v.as_array())
302            .ok_or("Missing channels array")?;
303
304        if channels_arr.len() != channel_count {
305            return Err(format!(
306                "channel_count mismatch: header says {} but got {} arrays",
307                channel_count, channels_arr.len()
308            ));
309        }
310
311        let channels: Vec<Vec<f64>> = channels_arr.iter()
312            .map(|ch| {
313                ch.as_array()
314                    .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
315                    .unwrap_or_default()
316            })
317            .collect();
318
319        Ok(CaptureData {
320            channels,
321            channel_count,
322            capture_length,
323            pre_trigger_samples,
324            actual_samples,
325            sample_rate,
326            timestamp_ns,
327            sequence,
328        })
329    }
330}
331
332impl Default for DaqCapture {
333    fn default() -> Self {
334        Self::new("ni.capture")
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341
342    #[test]
343    fn test_parse_capture_data() {
344        let data = serde_json::json!({
345            "channel_count": 2,
346            "capture_length": 100,
347            "pre_trigger_samples": 10,
348            "actual_samples": 110,
349            "sample_rate": 1000.0,
350            "timestamp_ns": 1234567890u64,
351            "sequence": 1u64,
352            "channels": [
353                [1.0, 2.0, 3.0],
354                [4.0, 5.0, 6.0],
355            ],
356        });
357
358        let capture = DaqCapture::parse_capture_data(&data).unwrap();
359        assert_eq!(capture.channel_count, 2);
360        assert_eq!(capture.capture_length, 100);
361        assert_eq!(capture.pre_trigger_samples, 10);
362        assert_eq!(capture.actual_samples, 110);
363        assert_eq!(capture.sample_rate, 1000.0);
364        assert_eq!(capture.channels[0], vec![1.0, 2.0, 3.0]);
365        assert_eq!(capture.channels[1], vec![4.0, 5.0, 6.0]);
366    }
367
368    #[test]
369    fn test_parse_capture_data_missing_field() {
370        let data = serde_json::json!({"channel_count": 1});
371        assert!(DaqCapture::parse_capture_data(&data).is_err());
372    }
373}