Skip to main content

autocore_std/fb/ni/
daq_capture.rs

1/// DAQ Capture Function Block
2///
3/// Manages the lifecycle of a triggered DAQ capture: arm the trigger, wait for
4/// the capture to complete, and retrieve the captured data — all via IPC commands
5/// to the autocore-ni module.
6///
7/// # State Machine
8///
9/// ```text
10/// Idle ──(rising edge on execute)──> Arming
11/// Arming ──(arm response OK)──────> WaitingForCapture  (active=true)
12/// Arming ──(arm response error)───> Idle               (error=true)
13/// WaitingForCapture ──(data_ready)> ReadingData
14/// WaitingForCapture ──(timeout)───> Idle               (error=true)
15/// ReadingData ──(read OK)─────────> Idle               (data populated)
16/// ReadingData ──(read error)──────> Idle               (error=true)
17/// ```
18///
19/// # Example
20///
21/// ```ignore
22/// use autocore_std::fb::ni::DaqCapture;
23///
24/// struct MyProgram {
25///     daq: DaqCapture,
26/// }
27///
28/// impl ControlProgram for MyProgram {
29///     type Memory = MyMemory;
30///
31///     fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
32///         // 5 second timeout
33///         self.daq.tick(5000, ctx.client);
34///
35///         if !self.daq.is_busy() && !self.daq.is_error() {
36///             if let Some(data) = &self.daq.data {
37///                 // data.channels[0] = first channel's samples
38///                 // data.channels[1] = second channel's samples, etc.
39///             }
40///         }
41///     }
42/// }
43/// ```
44///
45/// # Taring channels before a run
46///
47/// The FB can zero one or more channels before arming. `tare()` sends
48/// `<module>.<channel>.tare` for each channel you name and flips
49/// `is_busy()` true until the module's 1-second (by default) averaging
50/// window has elapsed. This lets a control program chain
51/// "tare → wait → start" with the same `is_busy()` gate it already uses
52/// everywhere else:
53///
54/// ```ignore
55/// // Step 1: once the axes are homed, fire the tares.
56/// if !self.daq.is_busy() && ctx.gm.ready_to_tare && !self.tare_fired {
57///     self.daq.tare(&["tsdr_fx", "tsdr_fy", "tsdr_fz"], None, ctx.client);
58///     self.tare_fired = true;
59/// }
60///
61/// // Step 2: on the next scan where the FB is idle, arm the capture.
62/// if self.tare_fired && !self.daq.is_busy() {
63///     self.daq.start(ctx.client);
64///     self.tare_fired = false;
65/// }
66/// ```
67///
68/// Tare applies to LiveBuffer *and* captures — any recording armed after
69/// the tare completes reads zero at the tared baseline. Use
70/// [`clear_tare()`](Self::clear_tare) to reset offsets back to 0.
71use crate::CommandClient;
72
73/// Fudge factor added to each tare's expected completion time to cover
74/// IPC round-trip latency and scan-period jitter. The module computes its
75/// own completion based on `sample_rate × duration_ms`, so the FB only
76/// needs to wait "approximately long enough" before declaring done.
77const TARE_SAFETY_MARGIN_MS: u128 = 150;
78
79/// Default tare averaging window, matching the module's SHM-trigger default.
80const DEFAULT_TARE_DURATION_MS: u32 = 1000;
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83enum State {
84    Idle,
85    Arming,
86    WaitingForCapture,
87    ReadingData,
88}
89
90/// One outstanding tare request. Kept until either the tare response comes
91/// back with an error (drop early) or the deadline elapses (succeeded).
92struct PendingTare {
93    channel: String,
94    deadline: std::time::Instant,
95    /// Transaction ID for the original `tare` IPC request. Cleared to None
96    /// once the module's immediate ack has been consumed — the actual tare
97    /// completion is time-based since the module has no completion event.
98    response_tid: Option<u32>,
99}
100
101/// Captured data returned after a successful DAQ trigger.
102#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
103pub struct CaptureData {
104    /// Sample data per channel. Outer index = channel, inner = samples.
105    /// Layout: `channels[ch_idx][sample_idx]`.
106    pub channels: Vec<Vec<f64>>,
107    /// Number of channels in the capture.
108    pub channel_count: usize,
109    /// Configured post-trigger samples per channel.
110    pub capture_length: usize,
111    /// Configured pre-trigger samples per channel.
112    pub pre_trigger_samples: usize,
113    /// Actual samples written per channel (pre + post).
114    pub actual_samples: usize,
115    /// Sample rate in Hz.
116    pub sample_rate: f64,
117    /// UNIX timestamp (nanoseconds) of the trigger event.
118    pub timestamp_ns: u64,
119    /// Capture sequence number (monotonically increasing).
120    pub sequence: u64,
121}
122
123/// DAQ Capture function block for NI triggered recordings.
124pub struct DaqCapture {
125    // Configuration
126    daq_fqdn: String,
127
128    // Outputs
129    /// True while the FB is executing (from arm through data retrieval).
130    pub busy: bool,
131    /// True when the DAQ is armed and waiting for the hardware trigger.
132    pub active: bool,
133    /// True when an error occurred. 
134    pub error: bool,
135    /// Error description (empty when no error).
136    pub error_message: String,
137    /// Captured data. `Some` after a successful capture, `None` otherwise.
138    pub data: Option<CaptureData>,
139
140    // Internal state
141    state: State,
142    arm_time: Option<std::time::Instant>,
143    pending_tid: Option<u32>,
144    /// Outstanding per-channel tare requests. `is_busy()` reports `true`
145    /// while this is non-empty so the control program can wait for all
146    /// channels to finish averaging before arming the DAQ.
147    pending_tares: Vec<PendingTare>,
148}
149
150impl DaqCapture {
151    /// Create a new DaqCapture function block.
152    pub fn new(daq_fqdn: &str) -> Self {
153        Self {
154            daq_fqdn: daq_fqdn.to_string(),
155            busy: false,
156            active: false,
157            error: false,
158            error_message: String::new(),
159            data: None,
160            state: State::Idle,
161            arm_time: None,
162            pending_tid: None,
163            pending_tares: Vec::new(),
164        }
165    }
166
167    /// The FB is busy running — either a capture is in flight or one or more
168    /// channels are still inside their tare averaging window.
169    pub fn is_busy(&self) -> bool {
170        self.busy || !self.pending_tares.is_empty()
171    }
172
173    /// True while one or more channel tare commands issued through this FB
174    /// are still inside their averaging window. Useful for a control program
175    /// that wants to gate Start on "all tares complete" specifically rather
176    /// than general FB busy.
177    pub fn is_taring(&self) -> bool {
178        !self.pending_tares.is_empty()
179    }
180
181    /// The last requested command resulted in an error.
182    pub fn is_error(&self) -> bool {
183        self.error
184    }
185
186    /// Start a new capture sequence (Arm the DAQ).
187    pub fn start(&mut self, client: &mut CommandClient) {
188        self.error = false;
189        self.error_message.clear();
190        self.data = None;
191        self.active = false;
192
193        // Send arm command
194        let tid = client.send(
195            &format!("{}.arm", self.daq_fqdn),
196            serde_json::json!({}),
197        );
198        self.pending_tid = Some(tid);
199        self.arm_time = Some(std::time::Instant::now());
200        self.busy = true;
201        self.state = State::Arming;
202    }
203
204    /// Stop/Cancel the capture and return to idle. Also drops any
205    /// pending tare bookkeeping so the FB reports not-busy immediately;
206    /// tares that have already started on the module side will still
207    /// complete their averages — this only forgets them here.
208    pub fn reset(&mut self) {
209        self.state = State::Idle;
210        self.busy = false;
211        self.active = false;
212        self.pending_tid = None;
213        self.pending_tares.clear();
214    }
215
216    /// Tare one or more channels. For each channel, sends
217    /// `<module>.<channel>.tare` with the given duration and marks this FB
218    /// busy until the averaging window has elapsed. `duration_ms = None`
219    /// lets the module use its 1000 ms default.
220    ///
221    /// Channel names are the same strings configured in
222    /// `ni.config.daq[].channels` / `ni.config.tasks[].channels[].name`
223    /// (e.g., `"tsdr_fz"`, `"enc_x"`). The module prefix is taken from the
224    /// `daq_fqdn` passed to [`new`]. Channels do NOT have to all belong to
225    /// this DAQ — tare is a per-channel operation on the module and the FB
226    /// just acts as a convenient dispatcher.
227    ///
228    /// Silently ignored while a capture is in flight (state ≠ Idle). Safe
229    /// to call while other tares are still pending — the new channels are
230    /// appended to the pending list.
231    ///
232    /// # Example
233    /// ```ignore
234    /// if !self.daq.is_busy() {
235    ///     self.daq.tare(&["tsdr_fx", "tsdr_fy", "tsdr_fz"], None, ctx.client);
236    /// }
237    /// // later…
238    /// if !self.daq.is_busy() {
239    ///     self.daq.start(ctx.client); // tares finished, safe to arm
240    /// }
241    /// ```
242    pub fn tare<S: AsRef<str>>(
243        &mut self,
244        channels: &[S],
245        duration_ms: Option<u32>,
246        client: &mut CommandClient,
247    ) {
248        if self.state != State::Idle {
249            // Taring mid-capture would cross offset boundaries inside the
250            // already-running recording. Caller is expected to gate on
251            // is_busy() — this is a backstop.
252            return;
253        }
254        let duration = duration_ms.unwrap_or(DEFAULT_TARE_DURATION_MS);
255        let module = self.module_prefix().to_string();
256        let now = std::time::Instant::now();
257        let deadline = now
258            + std::time::Duration::from_millis(duration as u64)
259            + std::time::Duration::from_millis(TARE_SAFETY_MARGIN_MS as u64);
260
261        for ch in channels {
262            let ch = ch.as_ref();
263            let tid = client.send(
264                &format!("{}.{}.tare", module, ch),
265                serde_json::json!({ "duration_ms": duration }),
266            );
267            self.pending_tares.push(PendingTare {
268                channel: ch.to_string(),
269                deadline,
270                response_tid: Some(tid),
271            });
272        }
273    }
274
275    /// Fire-and-forget: reset the tare offset on one or more channels to 0.
276    /// Does not affect `is_busy()`; the module applies the change on its
277    /// next callback. Safe to call any time.
278    pub fn clear_tare<S: AsRef<str>>(
279        &mut self,
280        channels: &[S],
281        client: &mut CommandClient,
282    ) {
283        let module = self.module_prefix().to_string();
284        for ch in channels {
285            let _ = client.send(
286                &format!("{}.{}.clear_tare", module, ch.as_ref()),
287                serde_json::json!({}),
288            );
289        }
290    }
291
292    /// Extract the module-level prefix (e.g., `"ni"`) from `daq_fqdn`
293    /// (e.g., `"ni.traction"`). If no dot is present, the whole string is
294    /// returned — unusual but we don't want to silently drop messages.
295    fn module_prefix(&self) -> &str {
296        self.daq_fqdn.split_once('.')
297            .map(|(m, _)| m)
298            .unwrap_or(&self.daq_fqdn)
299    }
300
301    /// Update the Timer-trigger delay for this DAQ.
302    ///
303    /// Only valid when the FB is idle (i.e. before `start()` is called or after
304    /// the capture has completed). If called while the FB is busy, the request
305    /// is silently dropped — call `reset()` first if you need to abort and
306    /// reconfigure. The change is also rejected server-side if the DAQ has been
307    /// armed by another caller.
308    ///
309    /// Has no effect on DAQs whose trigger type is not `timer`.
310    pub fn set_trigger_delay(&mut self, client: &mut CommandClient, delay_ms: u32) {
311        if self.state != State::Idle {
312            return;
313        }
314        // Fire-and-forget: response is not tracked. Server-side errors (e.g.
315        // wrong trigger type, armed) appear in the autocore-ni log.
316        let _ = client.send(
317            &format!("{}.set_trigger_delay", self.daq_fqdn),
318            serde_json::json!({ "delay_ms": delay_ms }),
319        );
320    }
321
322    /// Execute one scan cycle of the DAQ capture state machine.
323    pub fn tick(&mut self, timeout_ms: u32, client: &mut CommandClient) {
324        // ---- Tare bookkeeping (runs every tick, orthogonal to capture) ----
325        //
326        // 1. Consume the module's immediate tare acks so they don't accumulate
327        //    in the client's response buffer. A non-success response means the
328        //    module didn't accept the command (unknown channel, module down) —
329        //    drop that pending tare early so the FB doesn't hang until its
330        //    deadline.
331        //
332        // 2. Remove any pending tares whose deadline has elapsed. The module
333        //    has no completion event; it simply averages for `duration_ms`
334        //    at its configured sample rate. `TARE_SAFETY_MARGIN_MS` covers
335        //    IPC/scan jitter so we don't clear busy a hair early.
336        if !self.pending_tares.is_empty() {
337            for pt in self.pending_tares.iter_mut() {
338                if let Some(tid) = pt.response_tid {
339                    if let Some(resp) = client.take_response(tid) {
340                        pt.response_tid = None;
341                        if !resp.success {
342                            log::warn!(
343                                "DaqCapture tare on '{}' rejected by module: {}",
344                                pt.channel, resp.error_message,
345                            );
346                            // Force its deadline to now so the retain() below drops it.
347                            pt.deadline = std::time::Instant::now();
348                        }
349                    }
350                }
351            }
352            let now = std::time::Instant::now();
353            self.pending_tares.retain(|pt| pt.deadline > now);
354        }
355
356        match self.state {
357            State::Idle => {}
358
359            State::Arming => {
360                if self.check_timeout(timeout_ms) { return; }
361
362                if let Some(tid) = self.pending_tid {
363                    if let Some(resp) = client.take_response(tid) {
364                        self.pending_tid = None;
365                        if resp.success {
366                            self.active = true;
367                            self.state = State::WaitingForCapture;
368                            // Immediately send first status poll
369                            let tid = client.send(
370                                &format!("{}.capture_status", self.daq_fqdn),
371                                serde_json::json!({}),
372                            );
373                            self.pending_tid = Some(tid);
374                        } else {
375                            self.set_error(&resp.error_message);
376                        }
377                    }
378                }
379            }
380
381            State::WaitingForCapture => {
382                if self.check_timeout(timeout_ms) { return; }
383
384                if let Some(tid) = self.pending_tid {
385                    if let Some(resp) = client.take_response(tid) {
386                        self.pending_tid = None;
387                        if resp.success {
388                            let data_ready = resp.data.get("data_ready")
389                                .and_then(|v| v.as_bool())
390                                .unwrap_or(false);
391
392                            if data_ready {
393                                self.active = false;
394                                let tid = client.send(
395                                    &format!("{}.read_capture", self.daq_fqdn),
396                                    serde_json::json!({}),
397                                );
398                                self.pending_tid = Some(tid);
399                                self.state = State::ReadingData;
400                            } else {
401                                let tid = client.send(
402                                    &format!("{}.capture_status", self.daq_fqdn),
403                                    serde_json::json!({}),
404                                );
405                                self.pending_tid = Some(tid);
406                            }
407                        } else {
408                            self.set_error(&resp.error_message);
409                        }
410                    }
411                }
412            }
413
414            State::ReadingData => {
415                if self.check_timeout(timeout_ms) { return; }
416
417                if let Some(tid) = self.pending_tid {
418                    if let Some(resp) = client.take_response(tid) {
419                        self.pending_tid = None;
420                        if resp.success {
421                            match Self::parse_capture_data(&resp.data) {
422                                Ok(capture) => {
423                                    self.data = Some(capture);
424                                    self.busy = false;
425                                    self.state = State::Idle;
426                                }
427                                Err(e) => {
428                                    self.set_error(&e);
429                                }
430                            }
431                        } else {
432                            self.set_error(&resp.error_message);
433                        }
434                    }
435                }
436            }
437        }
438    }
439
440    fn check_timeout(&mut self, timeout_ms: u32) -> bool {
441        if let Some(t) = self.arm_time {
442            if t.elapsed().as_millis() as u32 > timeout_ms {
443                self.set_error("Capture timeout");
444                return true;
445            }
446        }
447        false
448    }
449
450    fn set_error(&mut self, message: &str) {
451        self.state = State::Idle;
452        self.busy = false;
453        self.active = false;
454        self.error = true;
455        self.error_message = message.to_string();
456        self.pending_tid = None;
457    }
458
459    fn parse_capture_data(data: &serde_json::Value) -> Result<CaptureData, String> {
460        let channel_count = data.get("channel_count")
461            .and_then(|v| v.as_u64())
462            .ok_or("Missing channel_count")? as usize;
463        let capture_length = data.get("capture_length")
464            .and_then(|v| v.as_u64())
465            .ok_or("Missing capture_length")? as usize;
466        let pre_trigger_samples = data.get("pre_trigger_samples")
467            .and_then(|v| v.as_u64())
468            .ok_or("Missing pre_trigger_samples")? as usize;
469        let actual_samples = data.get("actual_samples")
470            .and_then(|v| v.as_u64())
471            .ok_or("Missing actual_samples")? as usize;
472        let sample_rate = data.get("sample_rate")
473            .and_then(|v| v.as_f64())
474            .ok_or("Missing sample_rate")?;
475        let timestamp_ns = data.get("timestamp_ns")
476            .and_then(|v| v.as_u64())
477            .ok_or("Missing timestamp_ns")?;
478        let sequence = data.get("sequence")
479            .and_then(|v| v.as_u64())
480            .ok_or("Missing sequence")?;
481
482        let channels_arr = data.get("channels")
483            .and_then(|v| v.as_array())
484            .ok_or("Missing channels array")?;
485
486        if channels_arr.len() != channel_count {
487            return Err(format!(
488                "channel_count mismatch: header says {} but got {} arrays",
489                channel_count, channels_arr.len()
490            ));
491        }
492
493        let channels: Vec<Vec<f64>> = channels_arr.iter()
494            .map(|ch| {
495                ch.as_array()
496                    .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
497                    .unwrap_or_default()
498            })
499            .collect();
500
501        Ok(CaptureData {
502            channels,
503            channel_count,
504            capture_length,
505            pre_trigger_samples,
506            actual_samples,
507            sample_rate,
508            timestamp_ns,
509            sequence,
510        })
511    }
512}
513
514impl Default for DaqCapture {
515    fn default() -> Self {
516        Self::new("ni.capture")
517    }
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523
524    #[test]
525    fn test_parse_capture_data() {
526        let data = serde_json::json!({
527            "channel_count": 2,
528            "capture_length": 100,
529            "pre_trigger_samples": 10,
530            "actual_samples": 110,
531            "sample_rate": 1000.0,
532            "timestamp_ns": 1234567890u64,
533            "sequence": 1u64,
534            "channels": [
535                [1.0, 2.0, 3.0],
536                [4.0, 5.0, 6.0],
537            ],
538        });
539
540        let capture = DaqCapture::parse_capture_data(&data).unwrap();
541        assert_eq!(capture.channel_count, 2);
542        assert_eq!(capture.capture_length, 100);
543        assert_eq!(capture.pre_trigger_samples, 10);
544        assert_eq!(capture.actual_samples, 110);
545        assert_eq!(capture.sample_rate, 1000.0);
546        assert_eq!(capture.channels[0], vec![1.0, 2.0, 3.0]);
547        assert_eq!(capture.channels[1], vec![4.0, 5.0, 6.0]);
548    }
549
550    #[test]
551    fn test_parse_capture_data_missing_field() {
552        let data = serde_json::json!({"channel_count": 1});
553        assert!(DaqCapture::parse_capture_data(&data).is_err());
554    }
555
556    // ------------------------------------------------------------------
557    // Tare tests. Use the plain CommandClient with a fake response channel
558    // so we can inspect sent messages and feed synthetic responses.
559    // ------------------------------------------------------------------
560
561    use tokio::sync::mpsc;
562    use mechutil::ipc::CommandMessage;
563
564    /// Wrapper that returns `(client, write_rx, response_tx)` so the test
565    /// can read what the FB sent and feed what it should receive.
566    fn test_client() -> (
567        CommandClient,
568        mpsc::UnboundedReceiver<String>,
569        mpsc::UnboundedSender<CommandMessage>,
570    ) {
571        let (write_tx, write_rx) = mpsc::unbounded_channel::<String>();
572        let (response_tx, response_rx) = mpsc::unbounded_channel::<CommandMessage>();
573        (CommandClient::new(write_tx, response_rx), write_rx, response_tx)
574    }
575
576    fn drain_sent(rx: &mut mpsc::UnboundedReceiver<String>) -> Vec<CommandMessage> {
577        let mut out = Vec::new();
578        while let Ok(s) = rx.try_recv() {
579            if let Ok(m) = serde_json::from_str::<CommandMessage>(&s) {
580                out.push(m);
581            }
582        }
583        out
584    }
585
586    #[test]
587    fn test_tare_dispatches_per_channel() {
588        let mut daq = DaqCapture::new("ni.traction");
589        let (mut client, mut write_rx, _resp_tx) = test_client();
590
591        daq.tare(&["tsdr_fx", "tsdr_fz", "enc_x"], Some(200), &mut client);
592
593        // FB should now report busy + taring.
594        assert!(daq.is_busy(),   "FB must be busy during tare");
595        assert!(daq.is_taring(), "is_taring() must report true");
596        assert_eq!(daq.pending_tares.len(), 3);
597
598        let sent = drain_sent(&mut write_rx);
599        assert_eq!(sent.len(), 3);
600        assert_eq!(sent[0].topic, "ni.tsdr_fx.tare");
601        assert_eq!(sent[1].topic, "ni.tsdr_fz.tare");
602        assert_eq!(sent[2].topic, "ni.enc_x.tare");
603        for msg in &sent {
604            assert_eq!(msg.data.get("duration_ms").and_then(|v| v.as_u64()), Some(200));
605        }
606    }
607
608    #[test]
609    fn test_tare_default_duration() {
610        let mut daq = DaqCapture::new("ni.traction");
611        let (mut client, mut write_rx, _resp_tx) = test_client();
612
613        daq.tare(&["tsdr_fz"], None, &mut client);
614
615        let sent = drain_sent(&mut write_rx);
616        assert_eq!(sent[0].data.get("duration_ms").and_then(|v| v.as_u64()), Some(1000));
617    }
618
619    #[test]
620    fn test_tare_deadline_clears_busy() {
621        let mut daq = DaqCapture::new("ni.traction");
622        let (mut client, mut write_rx, _resp_tx) = test_client();
623
624        // 0 ms duration + 150 ms safety margin → FB should be busy for
625        // ~150 ms after the call, then clear on the next tick past deadline.
626        daq.tare(&["tsdr_fz"], Some(0), &mut client);
627        let _ = drain_sent(&mut write_rx);
628        assert!(daq.is_busy());
629
630        // Tick immediately: still busy (deadline is in the future).
631        daq.tick(5000, &mut client);
632        assert!(daq.is_busy(), "too soon — deadline not reached yet");
633
634        // Wait past the safety margin and tick again.
635        std::thread::sleep(std::time::Duration::from_millis(200));
636        daq.tick(5000, &mut client);
637        assert!(!daq.is_busy(), "tare should have cleared after deadline");
638        assert!(!daq.is_taring());
639    }
640
641    #[test]
642    fn test_tare_error_response_clears_channel_early() {
643        let mut daq = DaqCapture::new("ni.traction");
644        let (mut client, mut write_rx, resp_tx) = test_client();
645
646        // Queue two tares. We'll inject a failure response for the first.
647        daq.tare(&["good_ch", "bogus_ch"], Some(5000), &mut client);
648        let sent = drain_sent(&mut write_rx);
649        let bogus_tid = sent[1].transaction_id;
650
651        // Fake a module error for bogus_ch.
652        let mut err = CommandMessage::response(bogus_tid, serde_json::json!({}));
653        err.success = false;
654        err.error_message = "Tare: channel 'bogus_ch' not found".into();
655        resp_tx.send(err).unwrap();
656
657        // One tick: client polls, FB sees the error, drops that entry.
658        client.poll();
659        daq.tick(5000, &mut client);
660
661        // The good channel is still pending (deadline is 5 s out); the
662        // bogus channel is gone.
663        assert_eq!(daq.pending_tares.len(), 1);
664        assert_eq!(daq.pending_tares[0].channel, "good_ch");
665    }
666
667    #[test]
668    fn test_tare_is_rejected_while_capture_running() {
669        let mut daq = DaqCapture::new("ni.traction");
670        let (mut client, mut write_rx, _resp_tx) = test_client();
671
672        // Force the capture state machine into a non-Idle state and make
673        // sure tare is silently ignored.
674        daq.state = State::Arming;
675        daq.busy = true;
676
677        daq.tare(&["tsdr_fz"], Some(100), &mut client);
678        let sent = drain_sent(&mut write_rx);
679        assert!(sent.is_empty(), "tare must not fire while a capture is in progress");
680        assert!(daq.pending_tares.is_empty());
681    }
682
683    #[test]
684    fn test_clear_tare_dispatches_per_channel() {
685        let mut daq = DaqCapture::new("ni.traction");
686        let (mut client, mut write_rx, _resp_tx) = test_client();
687
688        daq.clear_tare(&["tsdr_fz", "enc_x"], &mut client);
689
690        // clear_tare is fire-and-forget — no busy flag, no pending list.
691        assert!(!daq.is_busy());
692        assert!(daq.pending_tares.is_empty());
693
694        let sent = drain_sent(&mut write_rx);
695        assert_eq!(sent.len(), 2);
696        assert_eq!(sent[0].topic, "ni.tsdr_fz.clear_tare");
697        assert_eq!(sent[1].topic, "ni.enc_x.clear_tare");
698    }
699
700    #[test]
701    fn test_reset_drops_pending_tares() {
702        let mut daq = DaqCapture::new("ni.traction");
703        let (mut client, mut write_rx, _resp_tx) = test_client();
704
705        daq.tare(&["a", "b"], Some(5000), &mut client);
706        let _ = drain_sent(&mut write_rx);
707        assert!(daq.is_busy());
708
709        daq.reset();
710        assert!(!daq.is_busy());
711        assert!(daq.pending_tares.is_empty());
712    }
713}