autocore-std 3.3.43

Standard library for AutoCore control programs - shared memory, IPC, and logging utilities
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
/// DAQ Capture Function Block
///
/// Manages the lifecycle of a triggered DAQ capture: arm the trigger, wait for
/// the capture to complete, and retrieve the captured data — all via IPC commands
/// to the autocore-ni module.
///
/// # State Machine
///
/// ```text
/// Idle ──(rising edge on execute)──> Arming
/// Arming ──(arm response OK)──────> WaitingForCapture  (active=true)
/// Arming ──(arm response error)───> Idle               (error=true)
/// WaitingForCapture ──(data_ready)> ReadingData
/// WaitingForCapture ──(timeout)───> Idle               (error=true)
/// ReadingData ──(read OK)─────────> Idle               (data populated)
/// ReadingData ──(read error)──────> Idle               (error=true)
/// ```
///
/// # Example
///
/// ```ignore
/// use autocore_std::fb::ni::DaqCapture;
///
/// struct MyProgram {
///     daq: DaqCapture,
/// }
///
/// impl ControlProgram for MyProgram {
///     type Memory = MyMemory;
///
///     fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
///         // 5 second timeout
///         self.daq.tick(5000, ctx.client);
///
///         if !self.daq.is_busy() && !self.daq.is_error() {
///             if let Some(data) = &self.daq.data {
///                 // data.channels[0] = first channel's samples
///                 // data.channels[1] = second channel's samples, etc.
///             }
///         }
///     }
/// }
/// ```
///
/// # Taring channels before a run
///
/// The FB can zero one or more channels before arming. `tare()` sends
/// `<module>.<channel>.tare` for each channel you name and flips
/// `is_busy()` true until the module's 1-second (by default) averaging
/// window has elapsed. This lets a control program chain
/// "tare → wait → start" with the same `is_busy()` gate it already uses
/// everywhere else:
///
/// ```ignore
/// // Step 1: once the axes are homed, fire the tares.
/// if !self.daq.is_busy() && ctx.gm.ready_to_tare && !self.tare_fired {
///     self.daq.tare(&["tsdr_fx", "tsdr_fy", "tsdr_fz"], None, ctx.client);
///     self.tare_fired = true;
/// }
///
/// // Step 2: on the next scan where the FB is idle, arm the capture.
/// if self.tare_fired && !self.daq.is_busy() {
///     self.daq.start(ctx.client);
///     self.tare_fired = false;
/// }
/// ```
///
/// Tare applies to LiveBuffer *and* captures — any recording armed after
/// the tare completes reads zero at the tared baseline. Use
/// [`clear_tare()`](Self::clear_tare) to reset offsets back to 0.
use crate::CommandClient;

/// Fudge factor added to each tare's expected completion time to cover
/// IPC round-trip latency and scan-period jitter. The module computes its
/// own completion based on `sample_rate × duration_ms`, so the FB only
/// needs to wait "approximately long enough" before declaring done.
const TARE_SAFETY_MARGIN_MS: u128 = 150;

/// Default tare averaging window, matching the module's SHM-trigger default.
const DEFAULT_TARE_DURATION_MS: u32 = 1000;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum State {
    Idle,
    Arming,
    WaitingForCapture,
    ReadingData,
}

/// One outstanding tare request. Kept until either the tare response comes
/// back with an error (drop early) or the deadline elapses (succeeded).
struct PendingTare {
    channel: String,
    deadline: std::time::Instant,
    /// Transaction ID for the original `tare` IPC request. Cleared to None
    /// once the module's immediate ack has been consumed — the actual tare
    /// completion is time-based since the module has no completion event.
    response_tid: Option<u32>,
}

/// Captured data returned after a successful DAQ trigger.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CaptureData {
    /// Sample data per channel. Outer index = channel, inner = samples.
    /// Layout: `channels[ch_idx][sample_idx]`.
    pub channels: Vec<Vec<f64>>,
    /// Number of channels in the capture.
    pub channel_count: usize,
    /// Configured post-trigger samples per channel.
    pub capture_length: usize,
    /// Configured pre-trigger samples per channel.
    pub pre_trigger_samples: usize,
    /// Actual samples written per channel (pre + post).
    pub actual_samples: usize,
    /// Sample rate in Hz.
    pub sample_rate: f64,
    /// UNIX timestamp (nanoseconds) of the trigger event.
    pub timestamp_ns: u64,
    /// Capture sequence number (monotonically increasing).
    pub sequence: u64,
}

/// DAQ Capture function block for NI triggered recordings.
pub struct DaqCapture {
    // Configuration
    daq_fqdn: String,

    // Outputs
    /// True while the FB is executing (from arm through data retrieval).
    pub busy: bool,
    /// True when the DAQ is armed and waiting for the hardware trigger.
    pub active: bool,
    /// True when an error occurred. 
    pub error: bool,
    /// Error description (empty when no error).
    pub error_message: String,
    /// Captured data. `Some` after a successful capture, `None` otherwise.
    pub data: Option<CaptureData>,

    // Internal state
    state: State,
    arm_time: Option<std::time::Instant>,
    pending_tid: Option<u32>,
    /// Outstanding per-channel tare requests. `is_busy()` reports `true`
    /// while this is non-empty so the control program can wait for all
    /// channels to finish averaging before arming the DAQ.
    pending_tares: Vec<PendingTare>,
}

impl DaqCapture {
    /// Create a new DaqCapture function block.
    pub fn new(daq_fqdn: &str) -> Self {
        Self {
            daq_fqdn: daq_fqdn.to_string(),
            busy: false,
            active: false,
            error: false,
            error_message: String::new(),
            data: None,
            state: State::Idle,
            arm_time: None,
            pending_tid: None,
            pending_tares: Vec::new(),
        }
    }

    /// The FB is busy running — either a capture is in flight or one or more
    /// channels are still inside their tare averaging window.
    pub fn is_busy(&self) -> bool {
        self.busy || !self.pending_tares.is_empty()
    }

    /// True while one or more channel tare commands issued through this FB
    /// are still inside their averaging window. Useful for a control program
    /// that wants to gate Start on "all tares complete" specifically rather
    /// than general FB busy.
    pub fn is_taring(&self) -> bool {
        !self.pending_tares.is_empty()
    }

    /// The last requested command resulted in an error.
    pub fn is_error(&self) -> bool {
        self.error
    }

    /// Start a new capture sequence (Arm the DAQ).
    pub fn start(&mut self, client: &mut CommandClient) {
        self.error = false;
        self.error_message.clear();
        self.data = None;
        self.active = false;

        // Send arm command
        let tid = client.send(
            &format!("{}.arm", self.daq_fqdn),
            serde_json::json!({}),
        );
        self.pending_tid = Some(tid);
        self.arm_time = Some(std::time::Instant::now());
        self.busy = true;
        self.state = State::Arming;
    }

    /// Stop/Cancel the capture and return to idle. Also drops any
    /// pending tare bookkeeping so the FB reports not-busy immediately;
    /// tares that have already started on the module side will still
    /// complete their averages — this only forgets them here.
    pub fn reset(&mut self) {
        self.state = State::Idle;
        self.busy = false;
        self.active = false;
        self.pending_tid = None;
        self.pending_tares.clear();
    }

    /// Tare one or more channels. For each channel, sends
    /// `<module>.<channel>.tare` with the given duration and marks this FB
    /// busy until the averaging window has elapsed. `duration_ms = None`
    /// lets the module use its 1000 ms default.
    ///
    /// Channel names are the same strings configured in
    /// `ni.config.daq[].channels` / `ni.config.tasks[].channels[].name`
    /// (e.g., `"tsdr_fz"`, `"enc_x"`). The module prefix is taken from the
    /// `daq_fqdn` passed to [`new`]. Channels do NOT have to all belong to
    /// this DAQ — tare is a per-channel operation on the module and the FB
    /// just acts as a convenient dispatcher.
    ///
    /// Silently ignored while a capture is in flight (state ≠ Idle). Safe
    /// to call while other tares are still pending — the new channels are
    /// appended to the pending list.
    ///
    /// # Example
    /// ```ignore
    /// if !self.daq.is_busy() {
    ///     self.daq.tare(&["tsdr_fx", "tsdr_fy", "tsdr_fz"], None, ctx.client);
    /// }
    /// // later…
    /// if !self.daq.is_busy() {
    ///     self.daq.start(ctx.client); // tares finished, safe to arm
    /// }
    /// ```
    pub fn tare<S: AsRef<str>>(
        &mut self,
        channels: &[S],
        duration_ms: Option<u32>,
        client: &mut CommandClient,
    ) {
        if self.state != State::Idle {
            // Taring mid-capture would cross offset boundaries inside the
            // already-running recording. Caller is expected to gate on
            // is_busy() — this is a backstop.
            return;
        }
        let duration = duration_ms.unwrap_or(DEFAULT_TARE_DURATION_MS);
        let module = self.module_prefix().to_string();
        let now = std::time::Instant::now();
        let deadline = now
            + std::time::Duration::from_millis(duration as u64)
            + std::time::Duration::from_millis(TARE_SAFETY_MARGIN_MS as u64);

        for ch in channels {
            let ch = ch.as_ref();
            let tid = client.send(
                &format!("{}.{}.tare", module, ch),
                serde_json::json!({ "duration_ms": duration }),
            );
            self.pending_tares.push(PendingTare {
                channel: ch.to_string(),
                deadline,
                response_tid: Some(tid),
            });
        }
    }

    /// Fire-and-forget: reset the tare offset on one or more channels to 0.
    /// Does not affect `is_busy()`; the module applies the change on its
    /// next callback. Safe to call any time.
    pub fn clear_tare<S: AsRef<str>>(
        &mut self,
        channels: &[S],
        client: &mut CommandClient,
    ) {
        let module = self.module_prefix().to_string();
        for ch in channels {
            let _ = client.send(
                &format!("{}.{}.clear_tare", module, ch.as_ref()),
                serde_json::json!({}),
            );
        }
    }

    /// Extract the module-level prefix (e.g., `"ni"`) from `daq_fqdn`
    /// (e.g., `"ni.traction"`). If no dot is present, the whole string is
    /// returned — unusual but we don't want to silently drop messages.
    fn module_prefix(&self) -> &str {
        self.daq_fqdn.split_once('.')
            .map(|(m, _)| m)
            .unwrap_or(&self.daq_fqdn)
    }

    /// Update the Timer-trigger delay for this DAQ.
    ///
    /// Only valid when the FB is idle (i.e. before `start()` is called or after
    /// the capture has completed). If called while the FB is busy, the request
    /// is silently dropped — call `reset()` first if you need to abort and
    /// reconfigure. The change is also rejected server-side if the DAQ has been
    /// armed by another caller.
    ///
    /// Has no effect on DAQs whose trigger type is not `timer`.
    pub fn set_trigger_delay(&mut self, client: &mut CommandClient, delay_ms: u32) {
        if self.state != State::Idle {
            return;
        }
        // Fire-and-forget: response is not tracked. Server-side errors (e.g.
        // wrong trigger type, armed) appear in the autocore-ni log.
        let _ = client.send(
            &format!("{}.set_trigger_delay", self.daq_fqdn),
            serde_json::json!({ "delay_ms": delay_ms }),
        );
    }

    /// Execute one scan cycle of the DAQ capture state machine.
    pub fn tick(&mut self, timeout_ms: u32, client: &mut CommandClient) {
        // ---- Tare bookkeeping (runs every tick, orthogonal to capture) ----
        //
        // 1. Consume the module's immediate tare acks so they don't accumulate
        //    in the client's response buffer. A non-success response means the
        //    module didn't accept the command (unknown channel, module down) —
        //    drop that pending tare early so the FB doesn't hang until its
        //    deadline.
        //
        // 2. Remove any pending tares whose deadline has elapsed. The module
        //    has no completion event; it simply averages for `duration_ms`
        //    at its configured sample rate. `TARE_SAFETY_MARGIN_MS` covers
        //    IPC/scan jitter so we don't clear busy a hair early.
        if !self.pending_tares.is_empty() {
            for pt in self.pending_tares.iter_mut() {
                if let Some(tid) = pt.response_tid {
                    if let Some(resp) = client.take_response(tid) {
                        pt.response_tid = None;
                        if !resp.success {
                            log::warn!(
                                "DaqCapture tare on '{}' rejected by module: {}",
                                pt.channel, resp.error_message,
                            );
                            // Force its deadline to now so the retain() below drops it.
                            pt.deadline = std::time::Instant::now();
                        }
                    }
                }
            }
            let now = std::time::Instant::now();
            self.pending_tares.retain(|pt| pt.deadline > now);
        }

        match self.state {
            State::Idle => {}

            State::Arming => {
                if self.check_timeout(timeout_ms) { return; }

                if let Some(tid) = self.pending_tid {
                    if let Some(resp) = client.take_response(tid) {
                        self.pending_tid = None;
                        if resp.success {
                            self.active = true;
                            self.state = State::WaitingForCapture;
                            // Immediately send first status poll
                            let tid = client.send(
                                &format!("{}.capture_status", self.daq_fqdn),
                                serde_json::json!({}),
                            );
                            self.pending_tid = Some(tid);
                        } else {
                            self.set_error(&resp.error_message);
                        }
                    }
                }
            }

            State::WaitingForCapture => {
                if self.check_timeout(timeout_ms) { return; }

                if let Some(tid) = self.pending_tid {
                    if let Some(resp) = client.take_response(tid) {
                        self.pending_tid = None;
                        if resp.success {
                            let data_ready = resp.data.get("data_ready")
                                .and_then(|v| v.as_bool())
                                .unwrap_or(false);

                            if data_ready {
                                self.active = false;
                                let tid = client.send(
                                    &format!("{}.read_capture", self.daq_fqdn),
                                    serde_json::json!({}),
                                );
                                self.pending_tid = Some(tid);
                                self.state = State::ReadingData;
                            } else {
                                let tid = client.send(
                                    &format!("{}.capture_status", self.daq_fqdn),
                                    serde_json::json!({}),
                                );
                                self.pending_tid = Some(tid);
                            }
                        } else {
                            self.set_error(&resp.error_message);
                        }
                    }
                }
            }

            State::ReadingData => {
                if self.check_timeout(timeout_ms) { return; }

                if let Some(tid) = self.pending_tid {
                    if let Some(resp) = client.take_response(tid) {
                        self.pending_tid = None;
                        if resp.success {
                            match Self::parse_capture_data(&resp.data) {
                                Ok(capture) => {
                                    self.data = Some(capture);
                                    self.busy = false;
                                    self.state = State::Idle;
                                }
                                Err(e) => {
                                    self.set_error(&e);
                                }
                            }
                        } else {
                            self.set_error(&resp.error_message);
                        }
                    }
                }
            }
        }
    }

    fn check_timeout(&mut self, timeout_ms: u32) -> bool {
        if let Some(t) = self.arm_time {
            if t.elapsed().as_millis() as u32 > timeout_ms {
                self.set_error("Capture timeout");
                return true;
            }
        }
        false
    }

    fn set_error(&mut self, message: &str) {
        self.state = State::Idle;
        self.busy = false;
        self.active = false;
        self.error = true;
        self.error_message = message.to_string();
        self.pending_tid = None;
    }

    fn parse_capture_data(data: &serde_json::Value) -> Result<CaptureData, String> {
        let channel_count = data.get("channel_count")
            .and_then(|v| v.as_u64())
            .ok_or("Missing channel_count")? as usize;
        let capture_length = data.get("capture_length")
            .and_then(|v| v.as_u64())
            .ok_or("Missing capture_length")? as usize;
        let pre_trigger_samples = data.get("pre_trigger_samples")
            .and_then(|v| v.as_u64())
            .ok_or("Missing pre_trigger_samples")? as usize;
        let actual_samples = data.get("actual_samples")
            .and_then(|v| v.as_u64())
            .ok_or("Missing actual_samples")? as usize;
        let sample_rate = data.get("sample_rate")
            .and_then(|v| v.as_f64())
            .ok_or("Missing sample_rate")?;
        let timestamp_ns = data.get("timestamp_ns")
            .and_then(|v| v.as_u64())
            .ok_or("Missing timestamp_ns")?;
        let sequence = data.get("sequence")
            .and_then(|v| v.as_u64())
            .ok_or("Missing sequence")?;

        let channels_arr = data.get("channels")
            .and_then(|v| v.as_array())
            .ok_or("Missing channels array")?;

        if channels_arr.len() != channel_count {
            return Err(format!(
                "channel_count mismatch: header says {} but got {} arrays",
                channel_count, channels_arr.len()
            ));
        }

        let channels: Vec<Vec<f64>> = channels_arr.iter()
            .map(|ch| {
                ch.as_array()
                    .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
                    .unwrap_or_default()
            })
            .collect();

        Ok(CaptureData {
            channels,
            channel_count,
            capture_length,
            pre_trigger_samples,
            actual_samples,
            sample_rate,
            timestamp_ns,
            sequence,
        })
    }
}

impl Default for DaqCapture {
    fn default() -> Self {
        Self::new("ni.capture")
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_capture_data() {
        let data = serde_json::json!({
            "channel_count": 2,
            "capture_length": 100,
            "pre_trigger_samples": 10,
            "actual_samples": 110,
            "sample_rate": 1000.0,
            "timestamp_ns": 1234567890u64,
            "sequence": 1u64,
            "channels": [
                [1.0, 2.0, 3.0],
                [4.0, 5.0, 6.0],
            ],
        });

        let capture = DaqCapture::parse_capture_data(&data).unwrap();
        assert_eq!(capture.channel_count, 2);
        assert_eq!(capture.capture_length, 100);
        assert_eq!(capture.pre_trigger_samples, 10);
        assert_eq!(capture.actual_samples, 110);
        assert_eq!(capture.sample_rate, 1000.0);
        assert_eq!(capture.channels[0], vec![1.0, 2.0, 3.0]);
        assert_eq!(capture.channels[1], vec![4.0, 5.0, 6.0]);
    }

    #[test]
    fn test_parse_capture_data_missing_field() {
        let data = serde_json::json!({"channel_count": 1});
        assert!(DaqCapture::parse_capture_data(&data).is_err());
    }

    // ------------------------------------------------------------------
    // Tare tests. Use the plain CommandClient with a fake response channel
    // so we can inspect sent messages and feed synthetic responses.
    // ------------------------------------------------------------------

    use tokio::sync::mpsc;
    use mechutil::ipc::CommandMessage;

    /// Wrapper that returns `(client, write_rx, response_tx)` so the test
    /// can read what the FB sent and feed what it should receive.
    fn test_client() -> (
        CommandClient,
        mpsc::UnboundedReceiver<String>,
        mpsc::UnboundedSender<CommandMessage>,
    ) {
        let (write_tx, write_rx) = mpsc::unbounded_channel::<String>();
        let (response_tx, response_rx) = mpsc::unbounded_channel::<CommandMessage>();
        (CommandClient::new(write_tx, response_rx), write_rx, response_tx)
    }

    fn drain_sent(rx: &mut mpsc::UnboundedReceiver<String>) -> Vec<CommandMessage> {
        let mut out = Vec::new();
        while let Ok(s) = rx.try_recv() {
            if let Ok(m) = serde_json::from_str::<CommandMessage>(&s) {
                out.push(m);
            }
        }
        out
    }

    #[test]
    fn test_tare_dispatches_per_channel() {
        let mut daq = DaqCapture::new("ni.traction");
        let (mut client, mut write_rx, _resp_tx) = test_client();

        daq.tare(&["tsdr_fx", "tsdr_fz", "enc_x"], Some(200), &mut client);

        // FB should now report busy + taring.
        assert!(daq.is_busy(),   "FB must be busy during tare");
        assert!(daq.is_taring(), "is_taring() must report true");
        assert_eq!(daq.pending_tares.len(), 3);

        let sent = drain_sent(&mut write_rx);
        assert_eq!(sent.len(), 3);
        assert_eq!(sent[0].topic, "ni.tsdr_fx.tare");
        assert_eq!(sent[1].topic, "ni.tsdr_fz.tare");
        assert_eq!(sent[2].topic, "ni.enc_x.tare");
        for msg in &sent {
            assert_eq!(msg.data.get("duration_ms").and_then(|v| v.as_u64()), Some(200));
        }
    }

    #[test]
    fn test_tare_default_duration() {
        let mut daq = DaqCapture::new("ni.traction");
        let (mut client, mut write_rx, _resp_tx) = test_client();

        daq.tare(&["tsdr_fz"], None, &mut client);

        let sent = drain_sent(&mut write_rx);
        assert_eq!(sent[0].data.get("duration_ms").and_then(|v| v.as_u64()), Some(1000));
    }

    #[test]
    fn test_tare_deadline_clears_busy() {
        let mut daq = DaqCapture::new("ni.traction");
        let (mut client, mut write_rx, _resp_tx) = test_client();

        // 0 ms duration + 150 ms safety margin → FB should be busy for
        // ~150 ms after the call, then clear on the next tick past deadline.
        daq.tare(&["tsdr_fz"], Some(0), &mut client);
        let _ = drain_sent(&mut write_rx);
        assert!(daq.is_busy());

        // Tick immediately: still busy (deadline is in the future).
        daq.tick(5000, &mut client);
        assert!(daq.is_busy(), "too soon — deadline not reached yet");

        // Wait past the safety margin and tick again.
        std::thread::sleep(std::time::Duration::from_millis(200));
        daq.tick(5000, &mut client);
        assert!(!daq.is_busy(), "tare should have cleared after deadline");
        assert!(!daq.is_taring());
    }

    #[test]
    fn test_tare_error_response_clears_channel_early() {
        let mut daq = DaqCapture::new("ni.traction");
        let (mut client, mut write_rx, resp_tx) = test_client();

        // Queue two tares. We'll inject a failure response for the first.
        daq.tare(&["good_ch", "bogus_ch"], Some(5000), &mut client);
        let sent = drain_sent(&mut write_rx);
        let bogus_tid = sent[1].transaction_id;

        // Fake a module error for bogus_ch.
        let mut err = CommandMessage::response(bogus_tid, serde_json::json!({}));
        err.success = false;
        err.error_message = "Tare: channel 'bogus_ch' not found".into();
        resp_tx.send(err).unwrap();

        // One tick: client polls, FB sees the error, drops that entry.
        client.poll();
        daq.tick(5000, &mut client);

        // The good channel is still pending (deadline is 5 s out); the
        // bogus channel is gone.
        assert_eq!(daq.pending_tares.len(), 1);
        assert_eq!(daq.pending_tares[0].channel, "good_ch");
    }

    #[test]
    fn test_tare_is_rejected_while_capture_running() {
        let mut daq = DaqCapture::new("ni.traction");
        let (mut client, mut write_rx, _resp_tx) = test_client();

        // Force the capture state machine into a non-Idle state and make
        // sure tare is silently ignored.
        daq.state = State::Arming;
        daq.busy = true;

        daq.tare(&["tsdr_fz"], Some(100), &mut client);
        let sent = drain_sent(&mut write_rx);
        assert!(sent.is_empty(), "tare must not fire while a capture is in progress");
        assert!(daq.pending_tares.is_empty());
    }

    #[test]
    fn test_clear_tare_dispatches_per_channel() {
        let mut daq = DaqCapture::new("ni.traction");
        let (mut client, mut write_rx, _resp_tx) = test_client();

        daq.clear_tare(&["tsdr_fz", "enc_x"], &mut client);

        // clear_tare is fire-and-forget — no busy flag, no pending list.
        assert!(!daq.is_busy());
        assert!(daq.pending_tares.is_empty());

        let sent = drain_sent(&mut write_rx);
        assert_eq!(sent.len(), 2);
        assert_eq!(sent[0].topic, "ni.tsdr_fz.clear_tare");
        assert_eq!(sent[1].topic, "ni.enc_x.clear_tare");
    }

    #[test]
    fn test_reset_drops_pending_tares() {
        let mut daq = DaqCapture::new("ni.traction");
        let (mut client, mut write_rx, _resp_tx) = test_client();

        daq.tare(&["a", "b"], Some(5000), &mut client);
        let _ = drain_sent(&mut write_rx);
        assert!(daq.is_busy());

        daq.reset();
        assert!(!daq.is_busy());
        assert!(daq.pending_tares.is_empty());
    }
}