1use crate::CommandClient;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47enum State {
48 Idle,
49 Arming,
50 WaitingForCapture,
51 ReadingData,
52}
53
54#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
56pub struct CaptureData {
57 pub channels: Vec<Vec<f64>>,
60 pub channel_count: usize,
62 pub capture_length: usize,
64 pub pre_trigger_samples: usize,
66 pub actual_samples: usize,
68 pub sample_rate: f64,
70 pub timestamp_ns: u64,
72 pub sequence: u64,
74}
75
76pub struct DaqCapture {
78 daq_fqdn: String,
80
81 pub busy: bool,
84 pub active: bool,
86 pub error: bool,
88 pub error_message: String,
90 pub data: Option<CaptureData>,
92
93 state: State,
95 arm_time: Option<std::time::Instant>,
96 pending_tid: Option<u32>,
97}
98
99impl DaqCapture {
100 pub fn new(daq_fqdn: &str) -> Self {
102 Self {
103 daq_fqdn: daq_fqdn.to_string(),
104 busy: false,
105 active: false,
106 error: false,
107 error_message: String::new(),
108 data: None,
109 state: State::Idle,
110 arm_time: None,
111 pending_tid: None,
112 }
113 }
114
115 pub fn is_busy(&self) -> bool {
117 self.busy
118 }
119
120 pub fn is_error(&self) -> bool {
122 self.error
123 }
124
125 pub fn start(&mut self, client: &mut CommandClient) {
127 self.error = false;
128 self.error_message.clear();
129 self.data = None;
130 self.active = false;
131
132 let tid = client.send(
134 &format!("{}.arm", self.daq_fqdn),
135 serde_json::json!({}),
136 );
137 self.pending_tid = Some(tid);
138 self.arm_time = Some(std::time::Instant::now());
139 self.busy = true;
140 self.state = State::Arming;
141 }
142
143 pub fn reset(&mut self) {
145 self.state = State::Idle;
146 self.busy = false;
147 self.active = false;
148 self.pending_tid = None;
149 }
150
151 pub fn set_trigger_delay(&mut self, client: &mut CommandClient, delay_ms: u32) {
161 if self.state != State::Idle {
162 return;
163 }
164 let _ = client.send(
167 &format!("{}.set_trigger_delay", self.daq_fqdn),
168 serde_json::json!({ "delay_ms": delay_ms }),
169 );
170 }
171
172 pub fn tick(&mut self, timeout_ms: u32, client: &mut CommandClient) {
174 match self.state {
175 State::Idle => {}
176
177 State::Arming => {
178 if self.check_timeout(timeout_ms) { return; }
179
180 if let Some(tid) = self.pending_tid {
181 if let Some(resp) = client.take_response(tid) {
182 self.pending_tid = None;
183 if resp.success {
184 self.active = true;
185 self.state = State::WaitingForCapture;
186 let tid = client.send(
188 &format!("{}.capture_status", self.daq_fqdn),
189 serde_json::json!({}),
190 );
191 self.pending_tid = Some(tid);
192 } else {
193 self.set_error(&resp.error_message);
194 }
195 }
196 }
197 }
198
199 State::WaitingForCapture => {
200 if self.check_timeout(timeout_ms) { return; }
201
202 if let Some(tid) = self.pending_tid {
203 if let Some(resp) = client.take_response(tid) {
204 self.pending_tid = None;
205 if resp.success {
206 let data_ready = resp.data.get("data_ready")
207 .and_then(|v| v.as_bool())
208 .unwrap_or(false);
209
210 if data_ready {
211 self.active = false;
212 let tid = client.send(
213 &format!("{}.read_capture", self.daq_fqdn),
214 serde_json::json!({}),
215 );
216 self.pending_tid = Some(tid);
217 self.state = State::ReadingData;
218 } else {
219 let tid = client.send(
220 &format!("{}.capture_status", self.daq_fqdn),
221 serde_json::json!({}),
222 );
223 self.pending_tid = Some(tid);
224 }
225 } else {
226 self.set_error(&resp.error_message);
227 }
228 }
229 }
230 }
231
232 State::ReadingData => {
233 if self.check_timeout(timeout_ms) { return; }
234
235 if let Some(tid) = self.pending_tid {
236 if let Some(resp) = client.take_response(tid) {
237 self.pending_tid = None;
238 if resp.success {
239 match Self::parse_capture_data(&resp.data) {
240 Ok(capture) => {
241 self.data = Some(capture);
242 self.busy = false;
243 self.state = State::Idle;
244 }
245 Err(e) => {
246 self.set_error(&e);
247 }
248 }
249 } else {
250 self.set_error(&resp.error_message);
251 }
252 }
253 }
254 }
255 }
256 }
257
258 fn check_timeout(&mut self, timeout_ms: u32) -> bool {
259 if let Some(t) = self.arm_time {
260 if t.elapsed().as_millis() as u32 > timeout_ms {
261 self.set_error("Capture timeout");
262 return true;
263 }
264 }
265 false
266 }
267
268 fn set_error(&mut self, message: &str) {
269 self.state = State::Idle;
270 self.busy = false;
271 self.active = false;
272 self.error = true;
273 self.error_message = message.to_string();
274 self.pending_tid = None;
275 }
276
277 fn parse_capture_data(data: &serde_json::Value) -> Result<CaptureData, String> {
278 let channel_count = data.get("channel_count")
279 .and_then(|v| v.as_u64())
280 .ok_or("Missing channel_count")? as usize;
281 let capture_length = data.get("capture_length")
282 .and_then(|v| v.as_u64())
283 .ok_or("Missing capture_length")? as usize;
284 let pre_trigger_samples = data.get("pre_trigger_samples")
285 .and_then(|v| v.as_u64())
286 .ok_or("Missing pre_trigger_samples")? as usize;
287 let actual_samples = data.get("actual_samples")
288 .and_then(|v| v.as_u64())
289 .ok_or("Missing actual_samples")? as usize;
290 let sample_rate = data.get("sample_rate")
291 .and_then(|v| v.as_f64())
292 .ok_or("Missing sample_rate")?;
293 let timestamp_ns = data.get("timestamp_ns")
294 .and_then(|v| v.as_u64())
295 .ok_or("Missing timestamp_ns")?;
296 let sequence = data.get("sequence")
297 .and_then(|v| v.as_u64())
298 .ok_or("Missing sequence")?;
299
300 let channels_arr = data.get("channels")
301 .and_then(|v| v.as_array())
302 .ok_or("Missing channels array")?;
303
304 if channels_arr.len() != channel_count {
305 return Err(format!(
306 "channel_count mismatch: header says {} but got {} arrays",
307 channel_count, channels_arr.len()
308 ));
309 }
310
311 let channels: Vec<Vec<f64>> = channels_arr.iter()
312 .map(|ch| {
313 ch.as_array()
314 .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
315 .unwrap_or_default()
316 })
317 .collect();
318
319 Ok(CaptureData {
320 channels,
321 channel_count,
322 capture_length,
323 pre_trigger_samples,
324 actual_samples,
325 sample_rate,
326 timestamp_ns,
327 sequence,
328 })
329 }
330}
331
332impl Default for DaqCapture {
333 fn default() -> Self {
334 Self::new("ni.capture")
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341
342 #[test]
343 fn test_parse_capture_data() {
344 let data = serde_json::json!({
345 "channel_count": 2,
346 "capture_length": 100,
347 "pre_trigger_samples": 10,
348 "actual_samples": 110,
349 "sample_rate": 1000.0,
350 "timestamp_ns": 1234567890u64,
351 "sequence": 1u64,
352 "channels": [
353 [1.0, 2.0, 3.0],
354 [4.0, 5.0, 6.0],
355 ],
356 });
357
358 let capture = DaqCapture::parse_capture_data(&data).unwrap();
359 assert_eq!(capture.channel_count, 2);
360 assert_eq!(capture.capture_length, 100);
361 assert_eq!(capture.pre_trigger_samples, 10);
362 assert_eq!(capture.actual_samples, 110);
363 assert_eq!(capture.sample_rate, 1000.0);
364 assert_eq!(capture.channels[0], vec![1.0, 2.0, 3.0]);
365 assert_eq!(capture.channels[1], vec![4.0, 5.0, 6.0]);
366 }
367
368 #[test]
369 fn test_parse_capture_data_missing_field() {
370 let data = serde_json::json!({"channel_count": 1});
371 assert!(DaqCapture::parse_capture_data(&data).is_err());
372 }
373}