autocore_std/fb/ni/daq_capture.rs
1/// DAQ Capture Function Block
2///
3/// Manages the lifecycle of a triggered DAQ capture: arm the trigger, wait for
4/// the capture to complete, and retrieve the captured data — all via IPC commands
5/// to the autocore-ni module.
6///
7/// # State Machine
8///
9/// ```text
10/// Idle ──(rising edge on execute)──> Arming
11/// Arming ──(arm response OK)──────> WaitingForCapture (active=true)
12/// Arming ──(arm response error)───> Idle (error=true)
13/// WaitingForCapture ──(data_ready)> ReadingData
14/// WaitingForCapture ──(timeout)───> Idle (error=true)
15/// ReadingData ──(read OK)─────────> Idle (data populated)
16/// ReadingData ──(read error)──────> Idle (error=true)
17/// ```
18///
19/// # Example
20///
21/// ```ignore
22/// use autocore_std::fb::ni::DaqCapture;
23///
24/// struct MyProgram {
25/// daq: DaqCapture,
26/// }
27///
28/// impl ControlProgram for MyProgram {
29/// type Memory = MyMemory;
30///
31/// fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
32/// // 5 second timeout
33/// self.daq.call(ctx.gm.arm_request, 5000, ctx.client);
34///
35/// if !self.daq.busy && !self.daq.error {
36/// if let Some(data) = &self.daq.data {
37/// // data.channels[0] = first channel's samples
38/// // data.channels[1] = second channel's samples, etc.
39/// }
40/// }
41/// }
42/// }
43/// ```
44use crate::CommandClient;
45use crate::fb::RTrig;
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48enum State {
49 Idle,
50 Arming,
51 WaitingForCapture,
52 ReadingData,
53}
54
55/// Captured data returned after a successful DAQ trigger.
56#[derive(Debug, Clone)]
57pub struct CaptureData {
58 /// Sample data per channel. Outer index = channel, inner = samples.
59 /// Layout: `channels[ch_idx][sample_idx]`.
60 pub channels: Vec<Vec<f64>>,
61 /// Number of channels in the capture.
62 pub channel_count: usize,
63 /// Configured post-trigger samples per channel.
64 pub capture_length: usize,
65 /// Configured pre-trigger samples per channel.
66 pub pre_trigger_samples: usize,
67 /// Actual samples written per channel (pre + post).
68 pub actual_samples: usize,
69 /// Sample rate in Hz.
70 pub sample_rate: f64,
71 /// UNIX timestamp (nanoseconds) of the trigger event.
72 pub timestamp_ns: u64,
73 /// Capture sequence number (monotonically increasing).
74 pub sequence: u64,
75}
76
77/// DAQ Capture function block for NI triggered recordings.
78///
79/// Create one instance per DAQ configuration in your control program struct.
80/// Call [`DaqCapture::call`] every cycle with the execute signal and timeout.
81pub struct DaqCapture {
82 // Configuration
83 daq_fqdn: String,
84
85 // Outputs
86 /// True while the FB is executing (from arm through data retrieval).
87 pub busy: bool,
88 /// True when the DAQ is armed and waiting for the hardware trigger.
89 pub active: bool,
90 /// True when an error occurred. Stays true until the next rising edge of execute.
91 pub error: bool,
92 /// Error description (empty when no error).
93 pub error_message: String,
94 /// Captured data. `Some` after a successful capture, `None` otherwise.
95 pub data: Option<CaptureData>,
96
97 // Internal state
98 state: State,
99 trigger: RTrig,
100 arm_time: Option<std::time::Instant>,
101 pending_tid: Option<u32>,
102}
103
104impl DaqCapture {
105 /// Create a new DaqCapture function block.
106 ///
107 /// # Arguments
108 ///
109 /// * `daq_fqdn` - The fully qualified topic prefix for this DAQ, e.g. `"ni.impact"`.
110 /// The FB will send commands to `<daq_fqdn>.arm`, `<daq_fqdn>.capture_status`, etc.
111 pub fn new(daq_fqdn: &str) -> Self {
112 Self {
113 daq_fqdn: daq_fqdn.to_string(),
114 busy: false,
115 active: false,
116 error: false,
117 error_message: String::new(),
118 data: None,
119 state: State::Idle,
120 trigger: RTrig::new(),
121 arm_time: None,
122 pending_tid: None,
123 }
124 }
125
126 /// Execute the DAQ capture state machine. Call once per control cycle.
127 ///
128 /// # Arguments
129 ///
130 /// * `execute` - Rising edge triggers a new capture sequence.
131 /// * `timeout_ms` - Maximum time to wait for the capture to complete (milliseconds).
132 /// * `client` - The IPC command client for sending commands to the NI module.
133 pub fn call(&mut self, execute: bool, timeout_ms: u32, client: &mut CommandClient) {
134 // Detect rising edge on execute
135 if self.trigger.call(execute) {
136 // Reset outputs on new execution
137 self.error = false;
138 self.error_message.clear();
139 self.data = None;
140 self.active = false;
141
142 // Send arm command
143 let tid = client.send(
144 &format!("{}.arm", self.daq_fqdn),
145 serde_json::json!({}),
146 );
147 self.pending_tid = Some(tid);
148 self.arm_time = Some(std::time::Instant::now());
149 self.busy = true;
150 self.state = State::Arming;
151 return;
152 }
153
154 match self.state {
155 State::Idle => {}
156
157 State::Arming => {
158 // Check timeout
159 if self.check_timeout(timeout_ms) { return; }
160
161 // Poll for arm response
162 if let Some(tid) = self.pending_tid {
163 if let Some(resp) = client.take_response(tid) {
164 self.pending_tid = None;
165 if resp.success {
166 self.active = true;
167 self.state = State::WaitingForCapture;
168 // Immediately send first status poll
169 let tid = client.send(
170 &format!("{}.capture_status", self.daq_fqdn),
171 serde_json::json!({}),
172 );
173 self.pending_tid = Some(tid);
174 } else {
175 self.set_error(&resp.error_message);
176 }
177 }
178 }
179 }
180
181 State::WaitingForCapture => {
182 // Check timeout
183 if self.check_timeout(timeout_ms) { return; }
184
185 // Poll capture_status response
186 if let Some(tid) = self.pending_tid {
187 if let Some(resp) = client.take_response(tid) {
188 self.pending_tid = None;
189 if resp.success {
190 let data_ready = resp.data.get("data_ready")
191 .and_then(|v| v.as_bool())
192 .unwrap_or(false);
193
194 if data_ready {
195 // Capture complete — request the data
196 self.active = false;
197 let tid = client.send(
198 &format!("{}.read_capture", self.daq_fqdn),
199 serde_json::json!({}),
200 );
201 self.pending_tid = Some(tid);
202 self.state = State::ReadingData;
203 } else {
204 // Not ready yet — poll again next cycle
205 let tid = client.send(
206 &format!("{}.capture_status", self.daq_fqdn),
207 serde_json::json!({}),
208 );
209 self.pending_tid = Some(tid);
210 }
211 } else {
212 self.set_error(&resp.error_message);
213 }
214 }
215 }
216 }
217
218 State::ReadingData => {
219 // Check timeout
220 if self.check_timeout(timeout_ms) { return; }
221
222 // Poll read_capture response
223 if let Some(tid) = self.pending_tid {
224 if let Some(resp) = client.take_response(tid) {
225 self.pending_tid = None;
226 if resp.success {
227 match Self::parse_capture_data(&resp.data) {
228 Ok(capture) => {
229 self.data = Some(capture);
230 self.busy = false;
231 self.state = State::Idle;
232 }
233 Err(e) => {
234 self.set_error(&e);
235 }
236 }
237 } else {
238 self.set_error(&resp.error_message);
239 }
240 }
241 }
242 }
243 }
244 }
245
246 /// Check if the timeout has elapsed. If so, transition to error state.
247 fn check_timeout(&mut self, timeout_ms: u32) -> bool {
248 if let Some(t) = self.arm_time {
249 if t.elapsed().as_millis() as u32 > timeout_ms {
250 self.set_error("Capture timeout");
251 return true;
252 }
253 }
254 false
255 }
256
257 /// Transition to error state.
258 fn set_error(&mut self, message: &str) {
259 self.state = State::Idle;
260 self.busy = false;
261 self.active = false;
262 self.error = true;
263 self.error_message = message.to_string();
264 self.pending_tid = None;
265 }
266
267 /// Parse the JSON response from `read_capture` into a `CaptureData` struct.
268 fn parse_capture_data(data: &serde_json::Value) -> Result<CaptureData, String> {
269 let channel_count = data.get("channel_count")
270 .and_then(|v| v.as_u64())
271 .ok_or("Missing channel_count")? as usize;
272 let capture_length = data.get("capture_length")
273 .and_then(|v| v.as_u64())
274 .ok_or("Missing capture_length")? as usize;
275 let pre_trigger_samples = data.get("pre_trigger_samples")
276 .and_then(|v| v.as_u64())
277 .ok_or("Missing pre_trigger_samples")? as usize;
278 let actual_samples = data.get("actual_samples")
279 .and_then(|v| v.as_u64())
280 .ok_or("Missing actual_samples")? as usize;
281 let sample_rate = data.get("sample_rate")
282 .and_then(|v| v.as_f64())
283 .ok_or("Missing sample_rate")?;
284 let timestamp_ns = data.get("timestamp_ns")
285 .and_then(|v| v.as_u64())
286 .ok_or("Missing timestamp_ns")?;
287 let sequence = data.get("sequence")
288 .and_then(|v| v.as_u64())
289 .ok_or("Missing sequence")?;
290
291 let channels_arr = data.get("channels")
292 .and_then(|v| v.as_array())
293 .ok_or("Missing channels array")?;
294
295 if channels_arr.len() != channel_count {
296 return Err(format!(
297 "channel_count mismatch: header says {} but got {} arrays",
298 channel_count, channels_arr.len()
299 ));
300 }
301
302 let channels: Vec<Vec<f64>> = channels_arr.iter()
303 .map(|ch| {
304 ch.as_array()
305 .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
306 .unwrap_or_default()
307 })
308 .collect();
309
310 Ok(CaptureData {
311 channels,
312 channel_count,
313 capture_length,
314 pre_trigger_samples,
315 actual_samples,
316 sample_rate,
317 timestamp_ns,
318 sequence,
319 })
320 }
321}
322
323impl Default for DaqCapture {
324 fn default() -> Self {
325 Self::new("ni.capture")
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332
333 #[test]
334 fn test_parse_capture_data() {
335 let data = serde_json::json!({
336 "channel_count": 2,
337 "capture_length": 100,
338 "pre_trigger_samples": 10,
339 "actual_samples": 110,
340 "sample_rate": 1000.0,
341 "timestamp_ns": 1234567890u64,
342 "sequence": 1u64,
343 "channels": [
344 [1.0, 2.0, 3.0],
345 [4.0, 5.0, 6.0],
346 ],
347 });
348
349 let capture = DaqCapture::parse_capture_data(&data).unwrap();
350 assert_eq!(capture.channel_count, 2);
351 assert_eq!(capture.capture_length, 100);
352 assert_eq!(capture.pre_trigger_samples, 10);
353 assert_eq!(capture.actual_samples, 110);
354 assert_eq!(capture.sample_rate, 1000.0);
355 assert_eq!(capture.channels[0], vec![1.0, 2.0, 3.0]);
356 assert_eq!(capture.channels[1], vec![4.0, 5.0, 6.0]);
357 }
358
359 #[test]
360 fn test_parse_capture_data_missing_field() {
361 let data = serde_json::json!({"channel_count": 1});
362 assert!(DaqCapture::parse_capture_data(&data).is_err());
363 }
364}