autocore_std/fb/ni/daq_capture.rs
1/// DAQ Capture Function Block
2///
3/// Manages the lifecycle of a triggered DAQ capture: arm the trigger, wait for
4/// the capture to complete, and retrieve the captured data — all via IPC commands
5/// to the autocore-ni module.
6///
7/// # State Machine
8///
9/// ```text
10/// Idle ──(rising edge on execute)──> Arming
11/// Arming ──(arm response OK)──────> WaitingForCapture (active=true)
12/// Arming ──(arm response error)───> Idle (error=true)
13/// WaitingForCapture ──(data_ready)> ReadingData
14/// WaitingForCapture ──(timeout)───> Idle (error=true)
15/// ReadingData ──(read OK)─────────> Idle (data populated)
16/// ReadingData ──(read error)──────> Idle (error=true)
17/// ```
18///
19/// # Example
20///
21/// ```ignore
22/// use autocore_std::fb::ni::DaqCapture;
23///
24/// struct MyProgram {
25/// daq: DaqCapture,
26/// }
27///
28/// impl ControlProgram for MyProgram {
29/// type Memory = MyMemory;
30///
31/// fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
32/// // 5 second timeout
33/// self.daq.tick(5000, ctx.client);
34///
35/// if !self.daq.is_busy() && !self.daq.is_error() {
36/// if let Some(data) = &self.daq.data {
37/// // data.channels[0] = first channel's samples
38/// // data.channels[1] = second channel's samples, etc.
39/// }
40/// }
41/// }
42/// }
43/// ```
44///
45/// # Taring channels before a run
46///
47/// The FB can zero one or more channels before arming. `tare()` sends
48/// `<module>.<channel>.tare` for each channel you name and flips
49/// `is_busy()` true until the module's 1-second (by default) averaging
50/// window has elapsed. This lets a control program chain
51/// "tare → wait → start" with the same `is_busy()` gate it already uses
52/// everywhere else:
53///
54/// ```ignore
55/// // Step 1: once the axes are homed, fire the tares.
56/// if !self.daq.is_busy() && ctx.gm.ready_to_tare && !self.tare_fired {
57/// self.daq.tare(&["tsdr_fx", "tsdr_fy", "tsdr_fz"], None, ctx.client);
58/// self.tare_fired = true;
59/// }
60///
61/// // Step 2: on the next scan where the FB is idle, arm the capture.
62/// if self.tare_fired && !self.daq.is_busy() {
63/// self.daq.start(ctx.client);
64/// self.tare_fired = false;
65/// }
66/// ```
67///
68/// Tare applies to LiveBuffer *and* captures — any recording armed after
69/// the tare completes reads zero at the tared baseline. Use
70/// [`clear_tare()`](Self::clear_tare) to reset offsets back to 0.
71use crate::CommandClient;
72
73/// Fudge factor added to each tare's expected completion time to cover
74/// IPC round-trip latency and scan-period jitter. The module computes its
75/// own completion based on `sample_rate × duration_ms`, so the FB only
76/// needs to wait "approximately long enough" before declaring done.
77const TARE_SAFETY_MARGIN_MS: u128 = 150;
78
79/// Default tare averaging window, matching the module's SHM-trigger default.
80const DEFAULT_TARE_DURATION_MS: u32 = 1000;
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83enum State {
84 Idle,
85 Arming,
86 WaitingForCapture,
87 ReadingData,
88}
89
90/// One outstanding tare request. Kept until either the tare response comes
91/// back with an error (drop early) or the deadline elapses (succeeded).
92struct PendingTare {
93 channel: String,
94 deadline: std::time::Instant,
95 /// Transaction ID for the original `tare` IPC request. Cleared to None
96 /// once the module's immediate ack has been consumed — the actual tare
97 /// completion is time-based since the module has no completion event.
98 response_tid: Option<u32>,
99}
100
101/// Captured data returned after a successful DAQ trigger.
102#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
103pub struct CaptureData {
104 /// Sample data per channel. Outer index = channel, inner = samples.
105 /// Layout: `channels[ch_idx][sample_idx]`.
106 pub channels: Vec<Vec<f64>>,
107 /// Number of channels in the capture.
108 pub channel_count: usize,
109 /// Configured post-trigger samples per channel.
110 pub capture_length: usize,
111 /// Configured pre-trigger samples per channel.
112 pub pre_trigger_samples: usize,
113 /// Actual samples written per channel (pre + post).
114 pub actual_samples: usize,
115 /// Sample rate in Hz.
116 pub sample_rate: f64,
117 /// UNIX timestamp (nanoseconds) of the trigger event.
118 pub timestamp_ns: u64,
119 /// Capture sequence number (monotonically increasing).
120 pub sequence: u64,
121}
122
123/// DAQ Capture function block for NI triggered recordings.
124pub struct DaqCapture {
125 // Configuration
126 daq_fqdn: String,
127
128 // Outputs
129 /// True while the FB is executing (from arm through data retrieval).
130 pub busy: bool,
131 /// True when the DAQ is armed and waiting for the hardware trigger.
132 pub active: bool,
133 /// True when an error occurred.
134 pub error: bool,
135 /// Error description (empty when no error).
136 pub error_message: String,
137 /// Captured data. `Some` after a successful capture, `None` otherwise.
138 pub data: Option<CaptureData>,
139
140 // Internal state
141 state: State,
142 arm_time: Option<std::time::Instant>,
143 pending_tid: Option<u32>,
144 /// Outstanding per-channel tare requests. `is_busy()` reports `true`
145 /// while this is non-empty so the control program can wait for all
146 /// channels to finish averaging before arming the DAQ.
147 pending_tares: Vec<PendingTare>,
148}
149
150impl DaqCapture {
151 /// Create a new DaqCapture function block.
152 pub fn new(daq_fqdn: &str) -> Self {
153 Self {
154 daq_fqdn: daq_fqdn.to_string(),
155 busy: false,
156 active: false,
157 error: false,
158 error_message: String::new(),
159 data: None,
160 state: State::Idle,
161 arm_time: None,
162 pending_tid: None,
163 pending_tares: Vec::new(),
164 }
165 }
166
167 /// The FB is busy running — either a capture is in flight or one or more
168 /// channels are still inside their tare averaging window.
169 pub fn is_busy(&self) -> bool {
170 self.busy || !self.pending_tares.is_empty()
171 }
172
173 /// True while one or more channel tare commands issued through this FB
174 /// are still inside their averaging window. Useful for a control program
175 /// that wants to gate Start on "all tares complete" specifically rather
176 /// than general FB busy.
177 pub fn is_taring(&self) -> bool {
178 !self.pending_tares.is_empty()
179 }
180
181 /// The last requested command resulted in an error.
182 pub fn is_error(&self) -> bool {
183 self.error
184 }
185
186 /// Start a new capture sequence (Arm the DAQ).
187 pub fn start(&mut self, client: &mut CommandClient) {
188 self.error = false;
189 self.error_message.clear();
190 self.data = None;
191 self.active = false;
192
193 // Send arm command
194 let tid = client.send(
195 &format!("{}.arm", self.daq_fqdn),
196 serde_json::json!({}),
197 );
198 self.pending_tid = Some(tid);
199 self.arm_time = Some(std::time::Instant::now());
200 self.busy = true;
201 self.state = State::Arming;
202 }
203
204 /// Stop/Cancel the capture and return to idle. Also drops any
205 /// pending tare bookkeeping so the FB reports not-busy immediately;
206 /// tares that have already started on the module side will still
207 /// complete their averages — this only forgets them here.
208 pub fn reset(&mut self) {
209 self.state = State::Idle;
210 self.busy = false;
211 self.active = false;
212 self.pending_tid = None;
213 self.pending_tares.clear();
214 }
215
216 /// Tare one or more channels. For each channel, sends
217 /// `<module>.<channel>.tare` with the given duration and marks this FB
218 /// busy until the averaging window has elapsed. `duration_ms = None`
219 /// lets the module use its 1000 ms default.
220 ///
221 /// Channel names are the same strings configured in
222 /// `ni.config.daq[].channels` / `ni.config.tasks[].channels[].name`
223 /// (e.g., `"tsdr_fz"`, `"enc_x"`). The module prefix is taken from the
224 /// `daq_fqdn` passed to [`new`]. Channels do NOT have to all belong to
225 /// this DAQ — tare is a per-channel operation on the module and the FB
226 /// just acts as a convenient dispatcher.
227 ///
228 /// Silently ignored while a capture is in flight (state ≠ Idle). Safe
229 /// to call while other tares are still pending — the new channels are
230 /// appended to the pending list.
231 ///
232 /// # Example
233 /// ```ignore
234 /// if !self.daq.is_busy() {
235 /// self.daq.tare(&["tsdr_fx", "tsdr_fy", "tsdr_fz"], None, ctx.client);
236 /// }
237 /// // later…
238 /// if !self.daq.is_busy() {
239 /// self.daq.start(ctx.client); // tares finished, safe to arm
240 /// }
241 /// ```
242 pub fn tare<S: AsRef<str>>(
243 &mut self,
244 channels: &[S],
245 duration_ms: Option<u32>,
246 client: &mut CommandClient,
247 ) {
248 if self.state != State::Idle {
249 // Taring mid-capture would cross offset boundaries inside the
250 // already-running recording. Caller is expected to gate on
251 // is_busy() — this is a backstop.
252 return;
253 }
254 let duration = duration_ms.unwrap_or(DEFAULT_TARE_DURATION_MS);
255 let module = self.module_prefix().to_string();
256 let now = std::time::Instant::now();
257 let deadline = now
258 + std::time::Duration::from_millis(duration as u64)
259 + std::time::Duration::from_millis(TARE_SAFETY_MARGIN_MS as u64);
260
261 for ch in channels {
262 let ch = ch.as_ref();
263 let tid = client.send(
264 &format!("{}.{}.tare", module, ch),
265 serde_json::json!({ "duration_ms": duration }),
266 );
267 self.pending_tares.push(PendingTare {
268 channel: ch.to_string(),
269 deadline,
270 response_tid: Some(tid),
271 });
272 }
273 }
274
275 /// Fire-and-forget: reset the tare offset on one or more channels to 0.
276 /// Does not affect `is_busy()`; the module applies the change on its
277 /// next callback. Safe to call any time.
278 pub fn clear_tare<S: AsRef<str>>(
279 &mut self,
280 channels: &[S],
281 client: &mut CommandClient,
282 ) {
283 let module = self.module_prefix().to_string();
284 for ch in channels {
285 let _ = client.send(
286 &format!("{}.{}.clear_tare", module, ch.as_ref()),
287 serde_json::json!({}),
288 );
289 }
290 }
291
292 /// Extract the module-level prefix (e.g., `"ni"`) from `daq_fqdn`
293 /// (e.g., `"ni.traction"`). If no dot is present, the whole string is
294 /// returned — unusual but we don't want to silently drop messages.
295 fn module_prefix(&self) -> &str {
296 self.daq_fqdn.split_once('.')
297 .map(|(m, _)| m)
298 .unwrap_or(&self.daq_fqdn)
299 }
300
301 /// Update the Timer-trigger delay for this DAQ.
302 ///
303 /// Only valid when the FB is idle (i.e. before `start()` is called or after
304 /// the capture has completed). If called while the FB is busy, the request
305 /// is silently dropped — call `reset()` first if you need to abort and
306 /// reconfigure. The change is also rejected server-side if the DAQ has been
307 /// armed by another caller.
308 ///
309 /// Has no effect on DAQs whose trigger type is not `timer`.
310 pub fn set_trigger_delay(&mut self, client: &mut CommandClient, delay_ms: u32) {
311 if self.state != State::Idle {
312 return;
313 }
314 // Fire-and-forget: response is not tracked. Server-side errors (e.g.
315 // wrong trigger type, armed) appear in the autocore-ni log.
316 let _ = client.send(
317 &format!("{}.set_trigger_delay", self.daq_fqdn),
318 serde_json::json!({ "delay_ms": delay_ms }),
319 );
320 }
321
322 /// Execute one scan cycle of the DAQ capture state machine.
323 pub fn tick(&mut self, timeout_ms: u32, client: &mut CommandClient) {
324 // ---- Tare bookkeeping (runs every tick, orthogonal to capture) ----
325 //
326 // 1. Consume the module's immediate tare acks so they don't accumulate
327 // in the client's response buffer. A non-success response means the
328 // module didn't accept the command (unknown channel, module down) —
329 // drop that pending tare early so the FB doesn't hang until its
330 // deadline.
331 //
332 // 2. Remove any pending tares whose deadline has elapsed. The module
333 // has no completion event; it simply averages for `duration_ms`
334 // at its configured sample rate. `TARE_SAFETY_MARGIN_MS` covers
335 // IPC/scan jitter so we don't clear busy a hair early.
336 if !self.pending_tares.is_empty() {
337 for pt in self.pending_tares.iter_mut() {
338 if let Some(tid) = pt.response_tid {
339 if let Some(resp) = client.take_response(tid) {
340 pt.response_tid = None;
341 if !resp.success {
342 log::warn!(
343 "DaqCapture tare on '{}' rejected by module: {}",
344 pt.channel, resp.error_message,
345 );
346 // Force its deadline to now so the retain() below drops it.
347 pt.deadline = std::time::Instant::now();
348 }
349 }
350 }
351 }
352 let now = std::time::Instant::now();
353 self.pending_tares.retain(|pt| pt.deadline > now);
354 }
355
356 match self.state {
357 State::Idle => {}
358
359 State::Arming => {
360 if self.check_timeout(timeout_ms) { return; }
361
362 if let Some(tid) = self.pending_tid {
363 if let Some(resp) = client.take_response(tid) {
364 self.pending_tid = None;
365 if resp.success {
366 self.active = true;
367 self.state = State::WaitingForCapture;
368 // Immediately send first status poll
369 let tid = client.send(
370 &format!("{}.capture_status", self.daq_fqdn),
371 serde_json::json!({}),
372 );
373 self.pending_tid = Some(tid);
374 } else {
375 self.set_error(&resp.error_message);
376 }
377 }
378 }
379 }
380
381 State::WaitingForCapture => {
382 if self.check_timeout(timeout_ms) { return; }
383
384 if let Some(tid) = self.pending_tid {
385 if let Some(resp) = client.take_response(tid) {
386 self.pending_tid = None;
387 if resp.success {
388 let data_ready = resp.data.get("data_ready")
389 .and_then(|v| v.as_bool())
390 .unwrap_or(false);
391
392 if data_ready {
393 self.active = false;
394 let tid = client.send(
395 &format!("{}.read_capture", self.daq_fqdn),
396 serde_json::json!({}),
397 );
398 self.pending_tid = Some(tid);
399 self.state = State::ReadingData;
400 } else {
401 let tid = client.send(
402 &format!("{}.capture_status", self.daq_fqdn),
403 serde_json::json!({}),
404 );
405 self.pending_tid = Some(tid);
406 }
407 } else {
408 self.set_error(&resp.error_message);
409 }
410 }
411 }
412 }
413
414 State::ReadingData => {
415 if self.check_timeout(timeout_ms) { return; }
416
417 if let Some(tid) = self.pending_tid {
418 if let Some(resp) = client.take_response(tid) {
419 self.pending_tid = None;
420 if resp.success {
421 match Self::parse_capture_data(&resp.data) {
422 Ok(capture) => {
423 self.data = Some(capture);
424 self.busy = false;
425 self.state = State::Idle;
426 }
427 Err(e) => {
428 self.set_error(&e);
429 }
430 }
431 } else {
432 self.set_error(&resp.error_message);
433 }
434 }
435 }
436 }
437 }
438 }
439
440 fn check_timeout(&mut self, timeout_ms: u32) -> bool {
441 if let Some(t) = self.arm_time {
442 if t.elapsed().as_millis() as u32 > timeout_ms {
443 self.set_error("Capture timeout");
444 return true;
445 }
446 }
447 false
448 }
449
450 fn set_error(&mut self, message: &str) {
451 self.state = State::Idle;
452 self.busy = false;
453 self.active = false;
454 self.error = true;
455 self.error_message = message.to_string();
456 self.pending_tid = None;
457 }
458
459 fn parse_capture_data(data: &serde_json::Value) -> Result<CaptureData, String> {
460 let channel_count = data.get("channel_count")
461 .and_then(|v| v.as_u64())
462 .ok_or("Missing channel_count")? as usize;
463 let capture_length = data.get("capture_length")
464 .and_then(|v| v.as_u64())
465 .ok_or("Missing capture_length")? as usize;
466 let pre_trigger_samples = data.get("pre_trigger_samples")
467 .and_then(|v| v.as_u64())
468 .ok_or("Missing pre_trigger_samples")? as usize;
469 let actual_samples = data.get("actual_samples")
470 .and_then(|v| v.as_u64())
471 .ok_or("Missing actual_samples")? as usize;
472 let sample_rate = data.get("sample_rate")
473 .and_then(|v| v.as_f64())
474 .ok_or("Missing sample_rate")?;
475 let timestamp_ns = data.get("timestamp_ns")
476 .and_then(|v| v.as_u64())
477 .ok_or("Missing timestamp_ns")?;
478 let sequence = data.get("sequence")
479 .and_then(|v| v.as_u64())
480 .ok_or("Missing sequence")?;
481
482 let channels_arr = data.get("channels")
483 .and_then(|v| v.as_array())
484 .ok_or("Missing channels array")?;
485
486 if channels_arr.len() != channel_count {
487 return Err(format!(
488 "channel_count mismatch: header says {} but got {} arrays",
489 channel_count, channels_arr.len()
490 ));
491 }
492
493 let channels: Vec<Vec<f64>> = channels_arr.iter()
494 .map(|ch| {
495 ch.as_array()
496 .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
497 .unwrap_or_default()
498 })
499 .collect();
500
501 Ok(CaptureData {
502 channels,
503 channel_count,
504 capture_length,
505 pre_trigger_samples,
506 actual_samples,
507 sample_rate,
508 timestamp_ns,
509 sequence,
510 })
511 }
512}
513
514impl Default for DaqCapture {
515 fn default() -> Self {
516 Self::new("ni.capture")
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523
524 #[test]
525 fn test_parse_capture_data() {
526 let data = serde_json::json!({
527 "channel_count": 2,
528 "capture_length": 100,
529 "pre_trigger_samples": 10,
530 "actual_samples": 110,
531 "sample_rate": 1000.0,
532 "timestamp_ns": 1234567890u64,
533 "sequence": 1u64,
534 "channels": [
535 [1.0, 2.0, 3.0],
536 [4.0, 5.0, 6.0],
537 ],
538 });
539
540 let capture = DaqCapture::parse_capture_data(&data).unwrap();
541 assert_eq!(capture.channel_count, 2);
542 assert_eq!(capture.capture_length, 100);
543 assert_eq!(capture.pre_trigger_samples, 10);
544 assert_eq!(capture.actual_samples, 110);
545 assert_eq!(capture.sample_rate, 1000.0);
546 assert_eq!(capture.channels[0], vec![1.0, 2.0, 3.0]);
547 assert_eq!(capture.channels[1], vec![4.0, 5.0, 6.0]);
548 }
549
550 #[test]
551 fn test_parse_capture_data_missing_field() {
552 let data = serde_json::json!({"channel_count": 1});
553 assert!(DaqCapture::parse_capture_data(&data).is_err());
554 }
555
556 // ------------------------------------------------------------------
557 // Tare tests. Use the plain CommandClient with a fake response channel
558 // so we can inspect sent messages and feed synthetic responses.
559 // ------------------------------------------------------------------
560
561 use tokio::sync::mpsc;
562 use mechutil::ipc::CommandMessage;
563
564 /// Wrapper that returns `(client, write_rx, response_tx)` so the test
565 /// can read what the FB sent and feed what it should receive.
566 fn test_client() -> (
567 CommandClient,
568 mpsc::UnboundedReceiver<String>,
569 mpsc::UnboundedSender<CommandMessage>,
570 ) {
571 let (write_tx, write_rx) = mpsc::unbounded_channel::<String>();
572 let (response_tx, response_rx) = mpsc::unbounded_channel::<CommandMessage>();
573 (CommandClient::new(write_tx, response_rx), write_rx, response_tx)
574 }
575
576 fn drain_sent(rx: &mut mpsc::UnboundedReceiver<String>) -> Vec<CommandMessage> {
577 let mut out = Vec::new();
578 while let Ok(s) = rx.try_recv() {
579 if let Ok(m) = serde_json::from_str::<CommandMessage>(&s) {
580 out.push(m);
581 }
582 }
583 out
584 }
585
586 #[test]
587 fn test_tare_dispatches_per_channel() {
588 let mut daq = DaqCapture::new("ni.traction");
589 let (mut client, mut write_rx, _resp_tx) = test_client();
590
591 daq.tare(&["tsdr_fx", "tsdr_fz", "enc_x"], Some(200), &mut client);
592
593 // FB should now report busy + taring.
594 assert!(daq.is_busy(), "FB must be busy during tare");
595 assert!(daq.is_taring(), "is_taring() must report true");
596 assert_eq!(daq.pending_tares.len(), 3);
597
598 let sent = drain_sent(&mut write_rx);
599 assert_eq!(sent.len(), 3);
600 assert_eq!(sent[0].topic, "ni.tsdr_fx.tare");
601 assert_eq!(sent[1].topic, "ni.tsdr_fz.tare");
602 assert_eq!(sent[2].topic, "ni.enc_x.tare");
603 for msg in &sent {
604 assert_eq!(msg.data.get("duration_ms").and_then(|v| v.as_u64()), Some(200));
605 }
606 }
607
608 #[test]
609 fn test_tare_default_duration() {
610 let mut daq = DaqCapture::new("ni.traction");
611 let (mut client, mut write_rx, _resp_tx) = test_client();
612
613 daq.tare(&["tsdr_fz"], None, &mut client);
614
615 let sent = drain_sent(&mut write_rx);
616 assert_eq!(sent[0].data.get("duration_ms").and_then(|v| v.as_u64()), Some(1000));
617 }
618
619 #[test]
620 fn test_tare_deadline_clears_busy() {
621 let mut daq = DaqCapture::new("ni.traction");
622 let (mut client, mut write_rx, _resp_tx) = test_client();
623
624 // 0 ms duration + 150 ms safety margin → FB should be busy for
625 // ~150 ms after the call, then clear on the next tick past deadline.
626 daq.tare(&["tsdr_fz"], Some(0), &mut client);
627 let _ = drain_sent(&mut write_rx);
628 assert!(daq.is_busy());
629
630 // Tick immediately: still busy (deadline is in the future).
631 daq.tick(5000, &mut client);
632 assert!(daq.is_busy(), "too soon — deadline not reached yet");
633
634 // Wait past the safety margin and tick again.
635 std::thread::sleep(std::time::Duration::from_millis(200));
636 daq.tick(5000, &mut client);
637 assert!(!daq.is_busy(), "tare should have cleared after deadline");
638 assert!(!daq.is_taring());
639 }
640
641 #[test]
642 fn test_tare_error_response_clears_channel_early() {
643 let mut daq = DaqCapture::new("ni.traction");
644 let (mut client, mut write_rx, resp_tx) = test_client();
645
646 // Queue two tares. We'll inject a failure response for the first.
647 daq.tare(&["good_ch", "bogus_ch"], Some(5000), &mut client);
648 let sent = drain_sent(&mut write_rx);
649 let bogus_tid = sent[1].transaction_id;
650
651 // Fake a module error for bogus_ch.
652 let mut err = CommandMessage::response(bogus_tid, serde_json::json!({}));
653 err.success = false;
654 err.error_message = "Tare: channel 'bogus_ch' not found".into();
655 resp_tx.send(err).unwrap();
656
657 // One tick: client polls, FB sees the error, drops that entry.
658 client.poll();
659 daq.tick(5000, &mut client);
660
661 // The good channel is still pending (deadline is 5 s out); the
662 // bogus channel is gone.
663 assert_eq!(daq.pending_tares.len(), 1);
664 assert_eq!(daq.pending_tares[0].channel, "good_ch");
665 }
666
667 #[test]
668 fn test_tare_is_rejected_while_capture_running() {
669 let mut daq = DaqCapture::new("ni.traction");
670 let (mut client, mut write_rx, _resp_tx) = test_client();
671
672 // Force the capture state machine into a non-Idle state and make
673 // sure tare is silently ignored.
674 daq.state = State::Arming;
675 daq.busy = true;
676
677 daq.tare(&["tsdr_fz"], Some(100), &mut client);
678 let sent = drain_sent(&mut write_rx);
679 assert!(sent.is_empty(), "tare must not fire while a capture is in progress");
680 assert!(daq.pending_tares.is_empty());
681 }
682
683 #[test]
684 fn test_clear_tare_dispatches_per_channel() {
685 let mut daq = DaqCapture::new("ni.traction");
686 let (mut client, mut write_rx, _resp_tx) = test_client();
687
688 daq.clear_tare(&["tsdr_fz", "enc_x"], &mut client);
689
690 // clear_tare is fire-and-forget — no busy flag, no pending list.
691 assert!(!daq.is_busy());
692 assert!(daq.pending_tares.is_empty());
693
694 let sent = drain_sent(&mut write_rx);
695 assert_eq!(sent.len(), 2);
696 assert_eq!(sent[0].topic, "ni.tsdr_fz.clear_tare");
697 assert_eq!(sent[1].topic, "ni.enc_x.clear_tare");
698 }
699
700 #[test]
701 fn test_reset_drops_pending_tares() {
702 let mut daq = DaqCapture::new("ni.traction");
703 let (mut client, mut write_rx, _resp_tx) = test_client();
704
705 daq.tare(&["a", "b"], Some(5000), &mut client);
706 let _ = drain_sent(&mut write_rx);
707 assert!(daq.is_busy());
708
709 daq.reset();
710 assert!(!daq.is_busy());
711 assert!(daq.pending_tares.is_empty());
712 }
713}