Skip to main content

fsqlite_types/
obligation.rs

1//! Obligation (linear resource) tracking for cancellation-safe two-phase protocols (§4.13).
2//!
3//! Every reserved obligation MUST reach a terminal state (`Committed` or `Aborted`).
4//! Non-terminal drop from `Reserved` = `Leaked` = correctness bug (INV-NO-OBLIGATION-LEAKS).
5
6use std::collections::VecDeque;
7use std::fmt::Write as FmtWrite;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex};
10
11// ---------------------------------------------------------------------------
12// Mode (lab vs production)
13// ---------------------------------------------------------------------------
14
15/// Controls how obligation leaks are reported.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum ObligationMode {
18    /// Leak = test failure (panic). §4.13.2 lab default.
19    Lab,
20    /// Leak = diagnostic bundle + connection close. §4.13.2 production default.
21    Production,
22}
23
24// ---------------------------------------------------------------------------
25// Obligation kind + state
26// ---------------------------------------------------------------------------
27
28/// The five normative FrankenSQLite obligation types (§4.13).
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30pub enum ObligationKind {
31    /// Commit pipeline `SendPermit` reservation (two-phase MPSC).
32    SendPermit,
33    /// Reply obligation on oneshot/session replies.
34    CommitResponse,
35    /// Transaction slot lease (abort on expiry).
36    TxnSlot,
37    /// Witness-plane reservation token (reserve/commit for publication).
38    WitnessReservation,
39    /// Name/registration in shared state (deregister on crash).
40    SharedStateRegistration,
41}
42
43/// Observable state of an obligation.
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum ObligationState {
46    /// Initial state after reservation.
47    Reserved,
48    /// Terminal: obligation fulfilled.
49    Committed,
50    /// Terminal: obligation explicitly released.
51    Aborted,
52    /// Terminal (bug): dropped without resolution.
53    Leaked,
54}
55
56impl ObligationState {
57    /// Whether this is a terminal state.
58    #[must_use]
59    pub const fn is_terminal(self) -> bool {
60        matches!(self, Self::Committed | Self::Aborted | Self::Leaked)
61    }
62}
63
64// ---------------------------------------------------------------------------
65// Obligation
66// ---------------------------------------------------------------------------
67
68/// A tracked linear resource that MUST reach a terminal state.
69///
70/// Dropping an `Obligation` in `Reserved` state triggers leak detection:
71/// - Lab mode: panics with diagnostic message.
72/// - Production mode: records the leak in the ledger.
73pub struct Obligation {
74    id: u64,
75    kind: ObligationKind,
76    state: ObligationState,
77    created_at: String,
78    mode: ObligationMode,
79    ledger: Option<Arc<ObligationLedger>>,
80}
81
82impl std::fmt::Debug for Obligation {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        f.debug_struct("Obligation")
85            .field("id", &self.id)
86            .field("kind", &self.kind)
87            .field("state", &self.state)
88            .field("created_at", &self.created_at)
89            .field("mode", &self.mode)
90            .finish_non_exhaustive()
91    }
92}
93
94impl Obligation {
95    /// Reserve a new obligation (enters `Reserved` state).
96    #[must_use]
97    pub fn reserve(
98        kind: ObligationKind,
99        mode: ObligationMode,
100        created_at: impl Into<String>,
101        ledger: Option<Arc<ObligationLedger>>,
102    ) -> Self {
103        let id = ledger
104            .as_ref()
105            .map_or(0, |l| l.next_id.fetch_add(1, Ordering::Relaxed));
106        let created = created_at.into();
107        if let Some(ref l) = ledger {
108            l.record_reserve(id, kind, &created);
109        }
110        Self {
111            id,
112            kind,
113            state: ObligationState::Reserved,
114            created_at: created,
115            mode,
116            ledger,
117        }
118    }
119
120    /// Commit this obligation (terminal state).
121    pub fn commit(&mut self) {
122        assert_eq!(
123            self.state,
124            ObligationState::Reserved,
125            "obligation {} ({:?}): commit called on non-Reserved state {:?}",
126            self.id,
127            self.kind,
128            self.state,
129        );
130        self.state = ObligationState::Committed;
131        if let Some(ref ledger) = self.ledger {
132            ledger.record_terminal(self.id, ObligationState::Committed);
133        }
134    }
135
136    /// Abort this obligation (terminal state).
137    pub fn abort(&mut self) {
138        assert_eq!(
139            self.state,
140            ObligationState::Reserved,
141            "obligation {} ({:?}): abort called on non-Reserved state {:?}",
142            self.id,
143            self.kind,
144            self.state,
145        );
146        self.state = ObligationState::Aborted;
147        if let Some(ref ledger) = self.ledger {
148            ledger.record_terminal(self.id, ObligationState::Aborted);
149        }
150    }
151
152    #[must_use]
153    pub fn id(&self) -> u64 {
154        self.id
155    }
156
157    #[must_use]
158    pub fn kind(&self) -> ObligationKind {
159        self.kind
160    }
161
162    #[must_use]
163    pub fn state(&self) -> ObligationState {
164        self.state
165    }
166
167    #[must_use]
168    pub fn created_at(&self) -> &str {
169        &self.created_at
170    }
171}
172
173impl Drop for Obligation {
174    fn drop(&mut self) {
175        if self.state == ObligationState::Reserved {
176            self.state = ObligationState::Leaked;
177            if let Some(ref ledger) = self.ledger {
178                ledger.record_terminal(self.id, ObligationState::Leaked);
179                ledger.record_leak(self.id, self.kind, &self.created_at);
180            }
181            match self.mode {
182                ObligationMode::Lab => {
183                    // Panicking during unwinding aborts the process; only raise
184                    // a hard failure when we're not already handling a panic.
185                    if !std::thread::panicking() {
186                        std::panic::panic_any(format!(
187                            "obligation leak: {:?} id={} created_at={}",
188                            self.kind, self.id, self.created_at
189                        ));
190                    }
191                }
192                ObligationMode::Production => {
193                    // In production, the leak is recorded in the ledger.
194                    // The caller (connection/region) is responsible for checking
195                    // the ledger and closing the affected connection.
196                }
197            }
198        }
199    }
200}
201
202// ---------------------------------------------------------------------------
203// Obligation Ledger
204// ---------------------------------------------------------------------------
205
206/// Entry in the obligation ledger.
207#[derive(Debug, Clone)]
208pub struct LedgerEntry {
209    pub id: u64,
210    pub kind: ObligationKind,
211    pub state: ObligationState,
212    pub created_at: String,
213}
214
215/// Diagnostic record for a leaked obligation.
216#[derive(Debug, Clone)]
217pub struct LeakRecord {
218    pub id: u64,
219    pub kind: ObligationKind,
220    pub created_at: String,
221}
222
223/// Global tracker of all outstanding obligations for diagnostic dumps.
224pub struct ObligationLedger {
225    entries: Mutex<Vec<LedgerEntry>>,
226    leaks: Mutex<Vec<LeakRecord>>,
227    next_id: AtomicU64,
228}
229
230impl std::fmt::Debug for ObligationLedger {
231    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232        f.debug_struct("ObligationLedger")
233            .field("next_id", &self.next_id.load(Ordering::Relaxed))
234            .field("entries_count", &self.snapshot().len())
235            .field("leaks_count", &self.leaked().len())
236            .finish_non_exhaustive()
237    }
238}
239
240impl Default for ObligationLedger {
241    fn default() -> Self {
242        Self::new()
243    }
244}
245
246impl ObligationLedger {
247    #[must_use]
248    pub fn new() -> Self {
249        Self {
250            entries: Mutex::new(Vec::new()),
251            leaks: Mutex::new(Vec::new()),
252            next_id: AtomicU64::new(0),
253        }
254    }
255
256    fn record_reserve(&self, id: u64, kind: ObligationKind, created_at: &str) {
257        let mut entries = self
258            .entries
259            .lock()
260            .unwrap_or_else(std::sync::PoisonError::into_inner);
261        entries.push(LedgerEntry {
262            id,
263            kind,
264            state: ObligationState::Reserved,
265            created_at: created_at.to_owned(),
266        });
267    }
268
269    fn record_terminal(&self, id: u64, state: ObligationState) {
270        let mut entries = self
271            .entries
272            .lock()
273            .unwrap_or_else(std::sync::PoisonError::into_inner);
274        if let Some(entry) = entries.iter_mut().find(|e| e.id == id) {
275            entry.state = state;
276        }
277    }
278
279    fn record_leak(&self, id: u64, kind: ObligationKind, created_at: &str) {
280        let mut leaks = self
281            .leaks
282            .lock()
283            .unwrap_or_else(std::sync::PoisonError::into_inner);
284        leaks.push(LeakRecord {
285            id,
286            kind,
287            created_at: created_at.to_owned(),
288        });
289    }
290
291    /// Snapshot of all ledger entries.
292    #[must_use]
293    pub fn snapshot(&self) -> Vec<LedgerEntry> {
294        self.entries
295            .lock()
296            .unwrap_or_else(std::sync::PoisonError::into_inner)
297            .clone()
298    }
299
300    /// Leaked obligations recorded so far.
301    #[must_use]
302    pub fn leaked(&self) -> Vec<LeakRecord> {
303        self.leaks
304            .lock()
305            .unwrap_or_else(std::sync::PoisonError::into_inner)
306            .clone()
307    }
308
309    /// Count of entries by state.
310    #[must_use]
311    pub fn count_by_state(&self, state: ObligationState) -> usize {
312        self.entries
313            .lock()
314            .unwrap_or_else(std::sync::PoisonError::into_inner)
315            .iter()
316            .filter(|e| e.state == state)
317            .count()
318    }
319
320    /// Produce a diagnostic dump string.
321    #[must_use]
322    pub fn diagnostic_dump(&self) -> String {
323        let entries = self.snapshot();
324        let leaks = self.leaked();
325        let mut out = String::new();
326        let _ = writeln!(out, "=== Obligation Ledger Dump ===");
327        let _ = writeln!(out, "Total entries: {}", entries.len());
328        let _ = writeln!(
329            out,
330            "Committed: {}",
331            entries
332                .iter()
333                .filter(|e| e.state == ObligationState::Committed)
334                .count()
335        );
336        let _ = writeln!(
337            out,
338            "Aborted: {}",
339            entries
340                .iter()
341                .filter(|e| e.state == ObligationState::Aborted)
342                .count()
343        );
344        let _ = writeln!(out, "Leaked: {}", leaks.len());
345        for leak in &leaks {
346            let _ = writeln!(
347                out,
348                "  LEAK id={} kind={:?} created_at={}",
349                leak.id, leak.kind, leak.created_at
350            );
351        }
352        out
353    }
354}
355
356// ---------------------------------------------------------------------------
357// TrackedSender (§4.13.1)
358// ---------------------------------------------------------------------------
359
360/// A sender that wraps a channel and holds an obligation.
361///
362/// Sending a value commits the obligation. Dropping without sending triggers
363/// leak detection per the obligation's mode.
364pub struct TrackedSender<T> {
365    obligation: Option<Obligation>,
366    sender: Option<std::sync::mpsc::Sender<T>>,
367}
368
369impl<T> std::fmt::Debug for TrackedSender<T> {
370    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
371        f.debug_struct("TrackedSender")
372            .field("obligation", &self.obligation)
373            .field("has_sender", &self.sender.is_some())
374            .finish()
375    }
376}
377
378impl<T> TrackedSender<T> {
379    /// Create a tracked sender wrapping an `mpsc::Sender` with an obligation.
380    #[must_use]
381    pub fn new(
382        sender: std::sync::mpsc::Sender<T>,
383        kind: ObligationKind,
384        mode: ObligationMode,
385        created_at: impl Into<String>,
386        ledger: Option<Arc<ObligationLedger>>,
387    ) -> Self {
388        let obligation = Obligation::reserve(kind, mode, created_at, ledger);
389        Self {
390            obligation: Some(obligation),
391            sender: Some(sender),
392        }
393    }
394
395    /// Send a value, committing the obligation.
396    ///
397    /// # Errors
398    ///
399    /// Returns `Err` if the receiver has been dropped.
400    pub fn send(mut self, value: T) -> Result<(), std::sync::mpsc::SendError<T>> {
401        let sender = self
402            .sender
403            .take()
404            .expect("TrackedSender: sender already consumed");
405        let result = sender.send(value);
406        if result.is_ok() {
407            if let Some(ref mut ob) = self.obligation {
408                ob.commit();
409            }
410        }
411        result
412    }
413
414    /// Explicitly abort this sender's obligation without sending.
415    pub fn abort(mut self) {
416        if let Some(ref mut ob) = self.obligation {
417            ob.abort();
418        }
419    }
420}
421
422impl<T> Drop for TrackedSender<T> {
423    fn drop(&mut self) {
424        // If the obligation is still Some and Reserved, Obligation::drop will
425        // handle leak detection.
426    }
427}
428
429// ---------------------------------------------------------------------------
430// EvictChannel — non-critical channel with send_evict_oldest (§4.13.1)
431// ---------------------------------------------------------------------------
432
433/// A bounded channel that evicts the oldest message when full.
434///
435/// For non-critical telemetry channels only. Safety-critical channels MUST NOT
436/// use this; use `TrackedSender` instead.
437pub struct EvictChannel<T> {
438    buffer: Mutex<VecDeque<T>>,
439    capacity: usize,
440    evicted: AtomicU64,
441}
442
443impl<T: std::fmt::Debug> std::fmt::Debug for EvictChannel<T> {
444    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
445        f.debug_struct("EvictChannel")
446            .field("capacity", &self.capacity)
447            .field("evicted", &self.evicted.load(Ordering::Relaxed))
448            .field("len", &self.len())
449            .finish_non_exhaustive()
450    }
451}
452
453impl<T> EvictChannel<T> {
454    /// Create a new evict channel with the given capacity.
455    #[must_use]
456    pub fn new(capacity: usize) -> Self {
457        Self {
458            buffer: Mutex::new(VecDeque::with_capacity(capacity)),
459            capacity,
460            evicted: AtomicU64::new(0),
461        }
462    }
463
464    /// Send a value, evicting the oldest if at capacity.
465    pub fn send_evict_oldest(&self, value: T) {
466        let mut buf = self
467            .buffer
468            .lock()
469            .unwrap_or_else(std::sync::PoisonError::into_inner);
470        if buf.len() >= self.capacity {
471            buf.pop_front();
472            self.evicted.fetch_add(1, Ordering::Relaxed);
473        }
474        buf.push_back(value);
475    }
476
477    /// Receive the oldest message, if any.
478    pub fn recv(&self) -> Option<T> {
479        let mut buf = self
480            .buffer
481            .lock()
482            .unwrap_or_else(std::sync::PoisonError::into_inner);
483        buf.pop_front()
484    }
485
486    /// Number of evictions that have occurred.
487    #[must_use]
488    pub fn eviction_count(&self) -> u64 {
489        self.evicted.load(Ordering::Relaxed)
490    }
491
492    /// Current number of buffered messages.
493    #[must_use]
494    pub fn len(&self) -> usize {
495        self.buffer
496            .lock()
497            .unwrap_or_else(std::sync::PoisonError::into_inner)
498            .len()
499    }
500
501    /// Whether the channel is empty.
502    #[must_use]
503    pub fn is_empty(&self) -> bool {
504        self.len() == 0
505    }
506}
507
508// ---------------------------------------------------------------------------
509// Tests
510// ---------------------------------------------------------------------------
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515    use std::sync::atomic::AtomicBool;
516
517    const BEAD_ID: &str = "bd-3j1j";
518
519    fn make_ledger() -> Arc<ObligationLedger> {
520        Arc::new(ObligationLedger::new())
521    }
522
523    #[test]
524    fn test_obligation_commit_reaches_terminal() {
525        // Test 1: SendPermit commit → Committed, no leak.
526        let ledger = make_ledger();
527        let mut ob = Obligation::reserve(
528            ObligationKind::SendPermit,
529            ObligationMode::Lab,
530            "test_commit",
531            Some(Arc::clone(&ledger)),
532        );
533        ob.commit();
534        assert_eq!(
535            ob.state(),
536            ObligationState::Committed,
537            "bead_id={BEAD_ID} commit_terminal"
538        );
539        drop(ob);
540        assert_eq!(
541            ledger.count_by_state(ObligationState::Committed),
542            1,
543            "bead_id={BEAD_ID} ledger_shows_committed"
544        );
545        assert!(ledger.leaked().is_empty(), "bead_id={BEAD_ID} no_leaks");
546    }
547
548    #[test]
549    fn test_obligation_abort_reaches_terminal() {
550        // Test 2: TxnSlot abort → Aborted, no leak.
551        let ledger = make_ledger();
552        let mut ob = Obligation::reserve(
553            ObligationKind::TxnSlot,
554            ObligationMode::Lab,
555            "test_abort",
556            Some(Arc::clone(&ledger)),
557        );
558        ob.abort();
559        assert_eq!(
560            ob.state(),
561            ObligationState::Aborted,
562            "bead_id={BEAD_ID} abort_terminal"
563        );
564        drop(ob);
565        assert_eq!(ledger.count_by_state(ObligationState::Aborted), 1);
566        assert!(ledger.leaked().is_empty());
567    }
568
569    #[test]
570    #[should_panic(expected = "obligation leak")]
571    fn test_obligation_leak_panics_in_lab() {
572        // Test 3: Witness reservation dropped without resolution panics in lab.
573        let _ob = Obligation::reserve(
574            ObligationKind::WitnessReservation,
575            ObligationMode::Lab,
576            "test_leak_lab",
577            None,
578        );
579        // Drop without commit or abort → panic.
580    }
581
582    #[test]
583    fn test_obligation_leak_diagnostic_in_production() {
584        // Test 4: Production mode — drop without resolution records leak,
585        // no panic, diagnostic bundle available.
586        let ledger = make_ledger();
587        let leaked_flag = Arc::new(AtomicBool::new(false));
588        let flag = Arc::clone(&leaked_flag);
589
590        // Create and immediately drop an obligation in production mode.
591        {
592            let _ob = Obligation::reserve(
593                ObligationKind::CommitResponse,
594                ObligationMode::Production,
595                "test_leak_prod",
596                Some(Arc::clone(&ledger)),
597            );
598            // Drop without resolution — no panic in production.
599        }
600
601        // Verify leak was recorded.
602        let leaks = ledger.leaked();
603        assert_eq!(leaks.len(), 1, "bead_id={BEAD_ID} production_leak_recorded");
604        assert_eq!(leaks[0].kind, ObligationKind::CommitResponse);
605        assert_eq!(leaks[0].created_at, "test_leak_prod");
606
607        // Diagnostic dump contains the leak.
608        let dump = ledger.diagnostic_dump();
609        assert!(
610            dump.contains("LEAK"),
611            "bead_id={BEAD_ID} diagnostic_contains_leak"
612        );
613        assert!(dump.contains("test_leak_prod"));
614
615        // Simulate connection close (caller responsibility).
616        flag.store(true, Ordering::Release);
617        assert!(leaked_flag.load(Ordering::Acquire));
618    }
619
620    #[test]
621    fn test_tracked_sender_commit_on_send() {
622        // Test 5: TrackedSender commit on send.
623        let ledger = make_ledger();
624        let (tx, rx) = std::sync::mpsc::channel();
625        let tracked = TrackedSender::new(
626            tx,
627            ObligationKind::SendPermit,
628            ObligationMode::Lab,
629            "test_tracked_send",
630            Some(Arc::clone(&ledger)),
631        );
632
633        tracked.send(42).expect("send should succeed");
634        assert_eq!(rx.recv().unwrap(), 42);
635        assert_eq!(
636            ledger.count_by_state(ObligationState::Committed),
637            1,
638            "bead_id={BEAD_ID} tracked_sender_committed"
639        );
640        assert!(ledger.leaked().is_empty());
641    }
642
643    #[test]
644    #[should_panic(expected = "obligation leak")]
645    fn test_tracked_sender_leak_on_drop() {
646        // Test 6: TrackedSender dropped without sending → leak in lab.
647        let (tx, _rx) = std::sync::mpsc::channel::<i32>();
648        let _tracked = TrackedSender::new(
649            tx,
650            ObligationKind::SendPermit,
651            ObligationMode::Lab,
652            "test_tracked_leak",
653            None,
654        );
655        // Drop without send → obligation leak → panic.
656    }
657
658    #[test]
659    fn test_five_obligation_types_registered() {
660        // Test 7: All 5 obligation types committed, 0 leaked in ledger.
661        let ledger = make_ledger();
662        let kinds = [
663            ObligationKind::SendPermit,
664            ObligationKind::CommitResponse,
665            ObligationKind::TxnSlot,
666            ObligationKind::WitnessReservation,
667            ObligationKind::SharedStateRegistration,
668        ];
669
670        for kind in &kinds {
671            let mut ob = Obligation::reserve(
672                *kind,
673                ObligationMode::Lab,
674                format!("test_{kind:?}"),
675                Some(Arc::clone(&ledger)),
676            );
677            ob.commit();
678        }
679
680        assert_eq!(
681            ledger.count_by_state(ObligationState::Committed),
682            5,
683            "bead_id={BEAD_ID} all_five_committed"
684        );
685        assert!(ledger.leaked().is_empty(), "bead_id={BEAD_ID} zero_leaked");
686    }
687
688    #[test]
689    fn test_obligation_ledger_diagnostic_dump() {
690        // Test 8: 3 obligations (1 committed, 1 aborted, 1 leaked) —
691        // dump contains exactly 1 leaked entry with creation context.
692        let ledger = make_ledger();
693
694        // Committed.
695        let mut ob1 = Obligation::reserve(
696            ObligationKind::SendPermit,
697            ObligationMode::Production,
698            "ob1_commit",
699            Some(Arc::clone(&ledger)),
700        );
701        ob1.commit();
702
703        // Aborted.
704        let mut ob2 = Obligation::reserve(
705            ObligationKind::TxnSlot,
706            ObligationMode::Production,
707            "ob2_abort",
708            Some(Arc::clone(&ledger)),
709        );
710        ob2.abort();
711
712        // Leaked (production mode — no panic).
713        {
714            let _ob3 = Obligation::reserve(
715                ObligationKind::WitnessReservation,
716                ObligationMode::Production,
717                "ob3_leaked_from_line_42",
718                Some(Arc::clone(&ledger)),
719            );
720        }
721
722        let dump = ledger.diagnostic_dump();
723        assert!(
724            dump.contains("Committed: 1"),
725            "bead_id={BEAD_ID} dump_committed_count"
726        );
727        assert!(
728            dump.contains("Aborted: 1"),
729            "bead_id={BEAD_ID} dump_aborted_count"
730        );
731        assert!(
732            dump.contains("Leaked: 1"),
733            "bead_id={BEAD_ID} dump_leaked_count"
734        );
735        assert!(
736            dump.contains("ob3_leaked_from_line_42"),
737            "bead_id={BEAD_ID} dump_has_creation_context"
738        );
739
740        let leaks = ledger.leaked();
741        assert_eq!(leaks.len(), 1);
742        assert_eq!(leaks[0].kind, ObligationKind::WitnessReservation);
743    }
744
745    #[test]
746    fn test_cancel_resolves_obligations() {
747        // Test 9: Task with 2 obligations cancelled and drained → both Aborted.
748        let ledger = make_ledger();
749
750        let mut ob1 = Obligation::reserve(
751            ObligationKind::SendPermit,
752            ObligationMode::Lab,
753            "cancel_ob1",
754            Some(Arc::clone(&ledger)),
755        );
756        let mut ob2 = Obligation::reserve(
757            ObligationKind::TxnSlot,
758            ObligationMode::Lab,
759            "cancel_ob2",
760            Some(Arc::clone(&ledger)),
761        );
762
763        // Simulate cancellation → drain phase aborts all obligations.
764        ob1.abort();
765        ob2.abort();
766
767        assert_eq!(ob1.state(), ObligationState::Aborted);
768        assert_eq!(ob2.state(), ObligationState::Aborted);
769        assert_eq!(ledger.count_by_state(ObligationState::Aborted), 2);
770        assert!(
771            ledger.leaked().is_empty(),
772            "bead_id={BEAD_ID} cancel_no_leaks"
773        );
774    }
775
776    #[test]
777    fn test_non_critical_channel_evict_oldest() {
778        // Test 10: Telemetry channel with capacity 2. Send 3 → oldest evicted,
779        // no obligation leak.
780        let ch = EvictChannel::new(2);
781
782        ch.send_evict_oldest("msg_1");
783        ch.send_evict_oldest("msg_2");
784        assert_eq!(ch.len(), 2);
785
786        // Third send evicts oldest.
787        ch.send_evict_oldest("msg_3");
788        assert_eq!(ch.len(), 2);
789        assert_eq!(ch.eviction_count(), 1, "bead_id={BEAD_ID} one_eviction");
790
791        // Remaining messages are msg_2, msg_3.
792        assert_eq!(ch.recv(), Some("msg_2"));
793        assert_eq!(ch.recv(), Some("msg_3"));
794        assert!(ch.is_empty());
795    }
796}