1use crate::CommandClient;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47enum State {
48 Idle,
49 Arming,
50 WaitingForCapture,
51 ReadingData,
52}
53
54#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
56pub struct CaptureData {
57 pub channels: Vec<Vec<f64>>,
60 pub channel_count: usize,
62 pub capture_length: usize,
64 pub pre_trigger_samples: usize,
66 pub actual_samples: usize,
68 pub sample_rate: f64,
70 pub timestamp_ns: u64,
72 pub sequence: u64,
74}
75
76pub struct DaqCapture {
78 daq_fqdn: String,
80
81 pub busy: bool,
84 pub active: bool,
86 pub error: bool,
88 pub error_message: String,
90 pub data: Option<CaptureData>,
92
93 state: State,
95 arm_time: Option<std::time::Instant>,
96 pending_tid: Option<u32>,
97}
98
99impl DaqCapture {
100 pub fn new(daq_fqdn: &str) -> Self {
102 Self {
103 daq_fqdn: daq_fqdn.to_string(),
104 busy: false,
105 active: false,
106 error: false,
107 error_message: String::new(),
108 data: None,
109 state: State::Idle,
110 arm_time: None,
111 pending_tid: None,
112 }
113 }
114
115 pub fn is_busy(&self) -> bool {
117 self.busy
118 }
119
120 pub fn is_error(&self) -> bool {
122 self.error
123 }
124
125 pub fn start(&mut self, client: &mut CommandClient) {
127 self.error = false;
128 self.error_message.clear();
129 self.data = None;
130 self.active = false;
131
132 let tid = client.send(
134 &format!("{}.arm", self.daq_fqdn),
135 serde_json::json!({}),
136 );
137 self.pending_tid = Some(tid);
138 self.arm_time = Some(std::time::Instant::now());
139 self.busy = true;
140 self.state = State::Arming;
141 }
142
143 pub fn reset(&mut self) {
145 self.state = State::Idle;
146 self.busy = false;
147 self.active = false;
148 self.pending_tid = None;
149 }
150
151 pub fn tick(&mut self, timeout_ms: u32, client: &mut CommandClient) {
153 match self.state {
154 State::Idle => {}
155
156 State::Arming => {
157 if self.check_timeout(timeout_ms) { return; }
158
159 if let Some(tid) = self.pending_tid {
160 if let Some(resp) = client.take_response(tid) {
161 self.pending_tid = None;
162 if resp.success {
163 self.active = true;
164 self.state = State::WaitingForCapture;
165 let tid = client.send(
167 &format!("{}.capture_status", self.daq_fqdn),
168 serde_json::json!({}),
169 );
170 self.pending_tid = Some(tid);
171 } else {
172 self.set_error(&resp.error_message);
173 }
174 }
175 }
176 }
177
178 State::WaitingForCapture => {
179 if self.check_timeout(timeout_ms) { return; }
180
181 if let Some(tid) = self.pending_tid {
182 if let Some(resp) = client.take_response(tid) {
183 self.pending_tid = None;
184 if resp.success {
185 let data_ready = resp.data.get("data_ready")
186 .and_then(|v| v.as_bool())
187 .unwrap_or(false);
188
189 if data_ready {
190 self.active = false;
191 let tid = client.send(
192 &format!("{}.read_capture", self.daq_fqdn),
193 serde_json::json!({}),
194 );
195 self.pending_tid = Some(tid);
196 self.state = State::ReadingData;
197 } else {
198 let tid = client.send(
199 &format!("{}.capture_status", self.daq_fqdn),
200 serde_json::json!({}),
201 );
202 self.pending_tid = Some(tid);
203 }
204 } else {
205 self.set_error(&resp.error_message);
206 }
207 }
208 }
209 }
210
211 State::ReadingData => {
212 if self.check_timeout(timeout_ms) { return; }
213
214 if let Some(tid) = self.pending_tid {
215 if let Some(resp) = client.take_response(tid) {
216 self.pending_tid = None;
217 if resp.success {
218 match Self::parse_capture_data(&resp.data) {
219 Ok(capture) => {
220 self.data = Some(capture);
221 self.busy = false;
222 self.state = State::Idle;
223 }
224 Err(e) => {
225 self.set_error(&e);
226 }
227 }
228 } else {
229 self.set_error(&resp.error_message);
230 }
231 }
232 }
233 }
234 }
235 }
236
237 fn check_timeout(&mut self, timeout_ms: u32) -> bool {
238 if let Some(t) = self.arm_time {
239 if t.elapsed().as_millis() as u32 > timeout_ms {
240 self.set_error("Capture timeout");
241 return true;
242 }
243 }
244 false
245 }
246
247 fn set_error(&mut self, message: &str) {
248 self.state = State::Idle;
249 self.busy = false;
250 self.active = false;
251 self.error = true;
252 self.error_message = message.to_string();
253 self.pending_tid = None;
254 }
255
256 fn parse_capture_data(data: &serde_json::Value) -> Result<CaptureData, String> {
257 let channel_count = data.get("channel_count")
258 .and_then(|v| v.as_u64())
259 .ok_or("Missing channel_count")? as usize;
260 let capture_length = data.get("capture_length")
261 .and_then(|v| v.as_u64())
262 .ok_or("Missing capture_length")? as usize;
263 let pre_trigger_samples = data.get("pre_trigger_samples")
264 .and_then(|v| v.as_u64())
265 .ok_or("Missing pre_trigger_samples")? as usize;
266 let actual_samples = data.get("actual_samples")
267 .and_then(|v| v.as_u64())
268 .ok_or("Missing actual_samples")? as usize;
269 let sample_rate = data.get("sample_rate")
270 .and_then(|v| v.as_f64())
271 .ok_or("Missing sample_rate")?;
272 let timestamp_ns = data.get("timestamp_ns")
273 .and_then(|v| v.as_u64())
274 .ok_or("Missing timestamp_ns")?;
275 let sequence = data.get("sequence")
276 .and_then(|v| v.as_u64())
277 .ok_or("Missing sequence")?;
278
279 let channels_arr = data.get("channels")
280 .and_then(|v| v.as_array())
281 .ok_or("Missing channels array")?;
282
283 if channels_arr.len() != channel_count {
284 return Err(format!(
285 "channel_count mismatch: header says {} but got {} arrays",
286 channel_count, channels_arr.len()
287 ));
288 }
289
290 let channels: Vec<Vec<f64>> = channels_arr.iter()
291 .map(|ch| {
292 ch.as_array()
293 .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
294 .unwrap_or_default()
295 })
296 .collect();
297
298 Ok(CaptureData {
299 channels,
300 channel_count,
301 capture_length,
302 pre_trigger_samples,
303 actual_samples,
304 sample_rate,
305 timestamp_ns,
306 sequence,
307 })
308 }
309}
310
311impl Default for DaqCapture {
312 fn default() -> Self {
313 Self::new("ni.capture")
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320
321 #[test]
322 fn test_parse_capture_data() {
323 let data = serde_json::json!({
324 "channel_count": 2,
325 "capture_length": 100,
326 "pre_trigger_samples": 10,
327 "actual_samples": 110,
328 "sample_rate": 1000.0,
329 "timestamp_ns": 1234567890u64,
330 "sequence": 1u64,
331 "channels": [
332 [1.0, 2.0, 3.0],
333 [4.0, 5.0, 6.0],
334 ],
335 });
336
337 let capture = DaqCapture::parse_capture_data(&data).unwrap();
338 assert_eq!(capture.channel_count, 2);
339 assert_eq!(capture.capture_length, 100);
340 assert_eq!(capture.pre_trigger_samples, 10);
341 assert_eq!(capture.actual_samples, 110);
342 assert_eq!(capture.sample_rate, 1000.0);
343 assert_eq!(capture.channels[0], vec![1.0, 2.0, 3.0]);
344 assert_eq!(capture.channels[1], vec![4.0, 5.0, 6.0]);
345 }
346
347 #[test]
348 fn test_parse_capture_data_missing_field() {
349 let data = serde_json::json!({"channel_count": 1});
350 assert!(DaqCapture::parse_capture_data(&data).is_err());
351 }
352}