1use std::collections::VecDeque;
7use std::fmt::Write as FmtWrite;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum ObligationMode {
18 Lab,
20 Production,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30pub enum ObligationKind {
31 SendPermit,
33 CommitResponse,
35 TxnSlot,
37 WitnessReservation,
39 SharedStateRegistration,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum ObligationState {
46 Reserved,
48 Committed,
50 Aborted,
52 Leaked,
54}
55
56impl ObligationState {
57 #[must_use]
59 pub const fn is_terminal(self) -> bool {
60 matches!(self, Self::Committed | Self::Aborted | Self::Leaked)
61 }
62}
63
64pub struct Obligation {
74 id: u64,
75 kind: ObligationKind,
76 state: ObligationState,
77 created_at: String,
78 mode: ObligationMode,
79 ledger: Option<Arc<ObligationLedger>>,
80}
81
82impl std::fmt::Debug for Obligation {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 f.debug_struct("Obligation")
85 .field("id", &self.id)
86 .field("kind", &self.kind)
87 .field("state", &self.state)
88 .field("created_at", &self.created_at)
89 .field("mode", &self.mode)
90 .finish_non_exhaustive()
91 }
92}
93
94impl Obligation {
95 #[must_use]
97 pub fn reserve(
98 kind: ObligationKind,
99 mode: ObligationMode,
100 created_at: impl Into<String>,
101 ledger: Option<Arc<ObligationLedger>>,
102 ) -> Self {
103 let id = ledger
104 .as_ref()
105 .map_or(0, |l| l.next_id.fetch_add(1, Ordering::Relaxed));
106 let created = created_at.into();
107 if let Some(ref l) = ledger {
108 l.record_reserve(id, kind, &created);
109 }
110 Self {
111 id,
112 kind,
113 state: ObligationState::Reserved,
114 created_at: created,
115 mode,
116 ledger,
117 }
118 }
119
120 pub fn commit(&mut self) {
122 assert_eq!(
123 self.state,
124 ObligationState::Reserved,
125 "obligation {} ({:?}): commit called on non-Reserved state {:?}",
126 self.id,
127 self.kind,
128 self.state,
129 );
130 self.state = ObligationState::Committed;
131 if let Some(ref ledger) = self.ledger {
132 ledger.record_terminal(self.id, ObligationState::Committed);
133 }
134 }
135
136 pub fn abort(&mut self) {
138 assert_eq!(
139 self.state,
140 ObligationState::Reserved,
141 "obligation {} ({:?}): abort called on non-Reserved state {:?}",
142 self.id,
143 self.kind,
144 self.state,
145 );
146 self.state = ObligationState::Aborted;
147 if let Some(ref ledger) = self.ledger {
148 ledger.record_terminal(self.id, ObligationState::Aborted);
149 }
150 }
151
152 #[must_use]
153 pub fn id(&self) -> u64 {
154 self.id
155 }
156
157 #[must_use]
158 pub fn kind(&self) -> ObligationKind {
159 self.kind
160 }
161
162 #[must_use]
163 pub fn state(&self) -> ObligationState {
164 self.state
165 }
166
167 #[must_use]
168 pub fn created_at(&self) -> &str {
169 &self.created_at
170 }
171}
172
173impl Drop for Obligation {
174 fn drop(&mut self) {
175 if self.state == ObligationState::Reserved {
176 self.state = ObligationState::Leaked;
177 if let Some(ref ledger) = self.ledger {
178 ledger.record_terminal(self.id, ObligationState::Leaked);
179 ledger.record_leak(self.id, self.kind, &self.created_at);
180 }
181 match self.mode {
182 ObligationMode::Lab => {
183 if !std::thread::panicking() {
186 std::panic::panic_any(format!(
187 "obligation leak: {:?} id={} created_at={}",
188 self.kind, self.id, self.created_at
189 ));
190 }
191 }
192 ObligationMode::Production => {
193 }
197 }
198 }
199 }
200}
201
202#[derive(Debug, Clone)]
208pub struct LedgerEntry {
209 pub id: u64,
210 pub kind: ObligationKind,
211 pub state: ObligationState,
212 pub created_at: String,
213}
214
215#[derive(Debug, Clone)]
217pub struct LeakRecord {
218 pub id: u64,
219 pub kind: ObligationKind,
220 pub created_at: String,
221}
222
223pub struct ObligationLedger {
225 entries: Mutex<Vec<LedgerEntry>>,
226 leaks: Mutex<Vec<LeakRecord>>,
227 next_id: AtomicU64,
228}
229
230impl std::fmt::Debug for ObligationLedger {
231 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 f.debug_struct("ObligationLedger")
233 .field("next_id", &self.next_id.load(Ordering::Relaxed))
234 .field("entries_count", &self.snapshot().len())
235 .field("leaks_count", &self.leaked().len())
236 .finish_non_exhaustive()
237 }
238}
239
240impl Default for ObligationLedger {
241 fn default() -> Self {
242 Self::new()
243 }
244}
245
246impl ObligationLedger {
247 #[must_use]
248 pub fn new() -> Self {
249 Self {
250 entries: Mutex::new(Vec::new()),
251 leaks: Mutex::new(Vec::new()),
252 next_id: AtomicU64::new(0),
253 }
254 }
255
256 fn record_reserve(&self, id: u64, kind: ObligationKind, created_at: &str) {
257 let mut entries = self
258 .entries
259 .lock()
260 .unwrap_or_else(std::sync::PoisonError::into_inner);
261 entries.push(LedgerEntry {
262 id,
263 kind,
264 state: ObligationState::Reserved,
265 created_at: created_at.to_owned(),
266 });
267 }
268
269 fn record_terminal(&self, id: u64, state: ObligationState) {
270 let mut entries = self
271 .entries
272 .lock()
273 .unwrap_or_else(std::sync::PoisonError::into_inner);
274 if let Some(entry) = entries.iter_mut().find(|e| e.id == id) {
275 entry.state = state;
276 }
277 }
278
279 fn record_leak(&self, id: u64, kind: ObligationKind, created_at: &str) {
280 let mut leaks = self
281 .leaks
282 .lock()
283 .unwrap_or_else(std::sync::PoisonError::into_inner);
284 leaks.push(LeakRecord {
285 id,
286 kind,
287 created_at: created_at.to_owned(),
288 });
289 }
290
291 #[must_use]
293 pub fn snapshot(&self) -> Vec<LedgerEntry> {
294 self.entries
295 .lock()
296 .unwrap_or_else(std::sync::PoisonError::into_inner)
297 .clone()
298 }
299
300 #[must_use]
302 pub fn leaked(&self) -> Vec<LeakRecord> {
303 self.leaks
304 .lock()
305 .unwrap_or_else(std::sync::PoisonError::into_inner)
306 .clone()
307 }
308
309 #[must_use]
311 pub fn count_by_state(&self, state: ObligationState) -> usize {
312 self.entries
313 .lock()
314 .unwrap_or_else(std::sync::PoisonError::into_inner)
315 .iter()
316 .filter(|e| e.state == state)
317 .count()
318 }
319
320 #[must_use]
322 pub fn diagnostic_dump(&self) -> String {
323 let entries = self.snapshot();
324 let leaks = self.leaked();
325 let mut out = String::new();
326 let _ = writeln!(out, "=== Obligation Ledger Dump ===");
327 let _ = writeln!(out, "Total entries: {}", entries.len());
328 let _ = writeln!(
329 out,
330 "Committed: {}",
331 entries
332 .iter()
333 .filter(|e| e.state == ObligationState::Committed)
334 .count()
335 );
336 let _ = writeln!(
337 out,
338 "Aborted: {}",
339 entries
340 .iter()
341 .filter(|e| e.state == ObligationState::Aborted)
342 .count()
343 );
344 let _ = writeln!(out, "Leaked: {}", leaks.len());
345 for leak in &leaks {
346 let _ = writeln!(
347 out,
348 " LEAK id={} kind={:?} created_at={}",
349 leak.id, leak.kind, leak.created_at
350 );
351 }
352 out
353 }
354}
355
356pub struct TrackedSender<T> {
365 obligation: Option<Obligation>,
366 sender: Option<std::sync::mpsc::Sender<T>>,
367}
368
369impl<T> std::fmt::Debug for TrackedSender<T> {
370 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
371 f.debug_struct("TrackedSender")
372 .field("obligation", &self.obligation)
373 .field("has_sender", &self.sender.is_some())
374 .finish()
375 }
376}
377
378impl<T> TrackedSender<T> {
379 #[must_use]
381 pub fn new(
382 sender: std::sync::mpsc::Sender<T>,
383 kind: ObligationKind,
384 mode: ObligationMode,
385 created_at: impl Into<String>,
386 ledger: Option<Arc<ObligationLedger>>,
387 ) -> Self {
388 let obligation = Obligation::reserve(kind, mode, created_at, ledger);
389 Self {
390 obligation: Some(obligation),
391 sender: Some(sender),
392 }
393 }
394
395 pub fn send(mut self, value: T) -> Result<(), std::sync::mpsc::SendError<T>> {
401 let sender = self
402 .sender
403 .take()
404 .expect("TrackedSender: sender already consumed");
405 let result = sender.send(value);
406 if result.is_ok() {
407 if let Some(ref mut ob) = self.obligation {
408 ob.commit();
409 }
410 }
411 result
412 }
413
414 pub fn abort(mut self) {
416 if let Some(ref mut ob) = self.obligation {
417 ob.abort();
418 }
419 }
420}
421
422impl<T> Drop for TrackedSender<T> {
423 fn drop(&mut self) {
424 }
427}
428
429pub struct EvictChannel<T> {
438 buffer: Mutex<VecDeque<T>>,
439 capacity: usize,
440 evicted: AtomicU64,
441}
442
443impl<T: std::fmt::Debug> std::fmt::Debug for EvictChannel<T> {
444 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
445 f.debug_struct("EvictChannel")
446 .field("capacity", &self.capacity)
447 .field("evicted", &self.evicted.load(Ordering::Relaxed))
448 .field("len", &self.len())
449 .finish_non_exhaustive()
450 }
451}
452
453impl<T> EvictChannel<T> {
454 #[must_use]
456 pub fn new(capacity: usize) -> Self {
457 Self {
458 buffer: Mutex::new(VecDeque::with_capacity(capacity)),
459 capacity,
460 evicted: AtomicU64::new(0),
461 }
462 }
463
464 pub fn send_evict_oldest(&self, value: T) {
466 let mut buf = self
467 .buffer
468 .lock()
469 .unwrap_or_else(std::sync::PoisonError::into_inner);
470 if buf.len() >= self.capacity {
471 buf.pop_front();
472 self.evicted.fetch_add(1, Ordering::Relaxed);
473 }
474 buf.push_back(value);
475 }
476
477 pub fn recv(&self) -> Option<T> {
479 let mut buf = self
480 .buffer
481 .lock()
482 .unwrap_or_else(std::sync::PoisonError::into_inner);
483 buf.pop_front()
484 }
485
486 #[must_use]
488 pub fn eviction_count(&self) -> u64 {
489 self.evicted.load(Ordering::Relaxed)
490 }
491
492 #[must_use]
494 pub fn len(&self) -> usize {
495 self.buffer
496 .lock()
497 .unwrap_or_else(std::sync::PoisonError::into_inner)
498 .len()
499 }
500
501 #[must_use]
503 pub fn is_empty(&self) -> bool {
504 self.len() == 0
505 }
506}
507
508#[cfg(test)]
513mod tests {
514 use super::*;
515 use std::sync::atomic::AtomicBool;
516
517 const BEAD_ID: &str = "bd-3j1j";
518
519 fn make_ledger() -> Arc<ObligationLedger> {
520 Arc::new(ObligationLedger::new())
521 }
522
523 #[test]
524 fn test_obligation_commit_reaches_terminal() {
525 let ledger = make_ledger();
527 let mut ob = Obligation::reserve(
528 ObligationKind::SendPermit,
529 ObligationMode::Lab,
530 "test_commit",
531 Some(Arc::clone(&ledger)),
532 );
533 ob.commit();
534 assert_eq!(
535 ob.state(),
536 ObligationState::Committed,
537 "bead_id={BEAD_ID} commit_terminal"
538 );
539 drop(ob);
540 assert_eq!(
541 ledger.count_by_state(ObligationState::Committed),
542 1,
543 "bead_id={BEAD_ID} ledger_shows_committed"
544 );
545 assert!(ledger.leaked().is_empty(), "bead_id={BEAD_ID} no_leaks");
546 }
547
548 #[test]
549 fn test_obligation_abort_reaches_terminal() {
550 let ledger = make_ledger();
552 let mut ob = Obligation::reserve(
553 ObligationKind::TxnSlot,
554 ObligationMode::Lab,
555 "test_abort",
556 Some(Arc::clone(&ledger)),
557 );
558 ob.abort();
559 assert_eq!(
560 ob.state(),
561 ObligationState::Aborted,
562 "bead_id={BEAD_ID} abort_terminal"
563 );
564 drop(ob);
565 assert_eq!(ledger.count_by_state(ObligationState::Aborted), 1);
566 assert!(ledger.leaked().is_empty());
567 }
568
569 #[test]
570 #[should_panic(expected = "obligation leak")]
571 fn test_obligation_leak_panics_in_lab() {
572 let _ob = Obligation::reserve(
574 ObligationKind::WitnessReservation,
575 ObligationMode::Lab,
576 "test_leak_lab",
577 None,
578 );
579 }
581
582 #[test]
583 fn test_obligation_leak_diagnostic_in_production() {
584 let ledger = make_ledger();
587 let leaked_flag = Arc::new(AtomicBool::new(false));
588 let flag = Arc::clone(&leaked_flag);
589
590 {
592 let _ob = Obligation::reserve(
593 ObligationKind::CommitResponse,
594 ObligationMode::Production,
595 "test_leak_prod",
596 Some(Arc::clone(&ledger)),
597 );
598 }
600
601 let leaks = ledger.leaked();
603 assert_eq!(leaks.len(), 1, "bead_id={BEAD_ID} production_leak_recorded");
604 assert_eq!(leaks[0].kind, ObligationKind::CommitResponse);
605 assert_eq!(leaks[0].created_at, "test_leak_prod");
606
607 let dump = ledger.diagnostic_dump();
609 assert!(
610 dump.contains("LEAK"),
611 "bead_id={BEAD_ID} diagnostic_contains_leak"
612 );
613 assert!(dump.contains("test_leak_prod"));
614
615 flag.store(true, Ordering::Release);
617 assert!(leaked_flag.load(Ordering::Acquire));
618 }
619
620 #[test]
621 fn test_tracked_sender_commit_on_send() {
622 let ledger = make_ledger();
624 let (tx, rx) = std::sync::mpsc::channel();
625 let tracked = TrackedSender::new(
626 tx,
627 ObligationKind::SendPermit,
628 ObligationMode::Lab,
629 "test_tracked_send",
630 Some(Arc::clone(&ledger)),
631 );
632
633 tracked.send(42).expect("send should succeed");
634 assert_eq!(rx.recv().unwrap(), 42);
635 assert_eq!(
636 ledger.count_by_state(ObligationState::Committed),
637 1,
638 "bead_id={BEAD_ID} tracked_sender_committed"
639 );
640 assert!(ledger.leaked().is_empty());
641 }
642
643 #[test]
644 #[should_panic(expected = "obligation leak")]
645 fn test_tracked_sender_leak_on_drop() {
646 let (tx, _rx) = std::sync::mpsc::channel::<i32>();
648 let _tracked = TrackedSender::new(
649 tx,
650 ObligationKind::SendPermit,
651 ObligationMode::Lab,
652 "test_tracked_leak",
653 None,
654 );
655 }
657
658 #[test]
659 fn test_five_obligation_types_registered() {
660 let ledger = make_ledger();
662 let kinds = [
663 ObligationKind::SendPermit,
664 ObligationKind::CommitResponse,
665 ObligationKind::TxnSlot,
666 ObligationKind::WitnessReservation,
667 ObligationKind::SharedStateRegistration,
668 ];
669
670 for kind in &kinds {
671 let mut ob = Obligation::reserve(
672 *kind,
673 ObligationMode::Lab,
674 format!("test_{kind:?}"),
675 Some(Arc::clone(&ledger)),
676 );
677 ob.commit();
678 }
679
680 assert_eq!(
681 ledger.count_by_state(ObligationState::Committed),
682 5,
683 "bead_id={BEAD_ID} all_five_committed"
684 );
685 assert!(ledger.leaked().is_empty(), "bead_id={BEAD_ID} zero_leaked");
686 }
687
688 #[test]
689 fn test_obligation_ledger_diagnostic_dump() {
690 let ledger = make_ledger();
693
694 let mut ob1 = Obligation::reserve(
696 ObligationKind::SendPermit,
697 ObligationMode::Production,
698 "ob1_commit",
699 Some(Arc::clone(&ledger)),
700 );
701 ob1.commit();
702
703 let mut ob2 = Obligation::reserve(
705 ObligationKind::TxnSlot,
706 ObligationMode::Production,
707 "ob2_abort",
708 Some(Arc::clone(&ledger)),
709 );
710 ob2.abort();
711
712 {
714 let _ob3 = Obligation::reserve(
715 ObligationKind::WitnessReservation,
716 ObligationMode::Production,
717 "ob3_leaked_from_line_42",
718 Some(Arc::clone(&ledger)),
719 );
720 }
721
722 let dump = ledger.diagnostic_dump();
723 assert!(
724 dump.contains("Committed: 1"),
725 "bead_id={BEAD_ID} dump_committed_count"
726 );
727 assert!(
728 dump.contains("Aborted: 1"),
729 "bead_id={BEAD_ID} dump_aborted_count"
730 );
731 assert!(
732 dump.contains("Leaked: 1"),
733 "bead_id={BEAD_ID} dump_leaked_count"
734 );
735 assert!(
736 dump.contains("ob3_leaked_from_line_42"),
737 "bead_id={BEAD_ID} dump_has_creation_context"
738 );
739
740 let leaks = ledger.leaked();
741 assert_eq!(leaks.len(), 1);
742 assert_eq!(leaks[0].kind, ObligationKind::WitnessReservation);
743 }
744
745 #[test]
746 fn test_cancel_resolves_obligations() {
747 let ledger = make_ledger();
749
750 let mut ob1 = Obligation::reserve(
751 ObligationKind::SendPermit,
752 ObligationMode::Lab,
753 "cancel_ob1",
754 Some(Arc::clone(&ledger)),
755 );
756 let mut ob2 = Obligation::reserve(
757 ObligationKind::TxnSlot,
758 ObligationMode::Lab,
759 "cancel_ob2",
760 Some(Arc::clone(&ledger)),
761 );
762
763 ob1.abort();
765 ob2.abort();
766
767 assert_eq!(ob1.state(), ObligationState::Aborted);
768 assert_eq!(ob2.state(), ObligationState::Aborted);
769 assert_eq!(ledger.count_by_state(ObligationState::Aborted), 2);
770 assert!(
771 ledger.leaked().is_empty(),
772 "bead_id={BEAD_ID} cancel_no_leaks"
773 );
774 }
775
776 #[test]
777 fn test_non_critical_channel_evict_oldest() {
778 let ch = EvictChannel::new(2);
781
782 ch.send_evict_oldest("msg_1");
783 ch.send_evict_oldest("msg_2");
784 assert_eq!(ch.len(), 2);
785
786 ch.send_evict_oldest("msg_3");
788 assert_eq!(ch.len(), 2);
789 assert_eq!(ch.eviction_count(), 1, "bead_id={BEAD_ID} one_eviction");
790
791 assert_eq!(ch.recv(), Some("msg_2"));
793 assert_eq!(ch.recv(), Some("msg_3"));
794 assert!(ch.is_empty());
795 }
796}