use std::collections::VecDeque;
use std::fmt::Write as FmtWrite;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ObligationMode {
Lab,
Production,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ObligationKind {
SendPermit,
CommitResponse,
TxnSlot,
WitnessReservation,
SharedStateRegistration,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ObligationState {
Reserved,
Committed,
Aborted,
Leaked,
}
impl ObligationState {
#[must_use]
pub const fn is_terminal(self) -> bool {
matches!(self, Self::Committed | Self::Aborted | Self::Leaked)
}
}
pub struct Obligation {
id: u64,
kind: ObligationKind,
state: ObligationState,
created_at: String,
mode: ObligationMode,
ledger: Option<Arc<ObligationLedger>>,
}
impl std::fmt::Debug for Obligation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Obligation")
.field("id", &self.id)
.field("kind", &self.kind)
.field("state", &self.state)
.field("created_at", &self.created_at)
.field("mode", &self.mode)
.finish_non_exhaustive()
}
}
impl Obligation {
#[must_use]
pub fn reserve(
kind: ObligationKind,
mode: ObligationMode,
created_at: impl Into<String>,
ledger: Option<Arc<ObligationLedger>>,
) -> Self {
let id = ledger
.as_ref()
.map_or(0, |l| l.next_id.fetch_add(1, Ordering::Relaxed));
let created = created_at.into();
if let Some(ref l) = ledger {
l.record_reserve(id, kind, &created);
}
Self {
id,
kind,
state: ObligationState::Reserved,
created_at: created,
mode,
ledger,
}
}
pub fn commit(&mut self) {
assert_eq!(
self.state,
ObligationState::Reserved,
"obligation {} ({:?}): commit called on non-Reserved state {:?}",
self.id,
self.kind,
self.state,
);
self.state = ObligationState::Committed;
if let Some(ref ledger) = self.ledger {
ledger.record_terminal(self.id, ObligationState::Committed);
}
}
pub fn abort(&mut self) {
assert_eq!(
self.state,
ObligationState::Reserved,
"obligation {} ({:?}): abort called on non-Reserved state {:?}",
self.id,
self.kind,
self.state,
);
self.state = ObligationState::Aborted;
if let Some(ref ledger) = self.ledger {
ledger.record_terminal(self.id, ObligationState::Aborted);
}
}
#[must_use]
pub fn id(&self) -> u64 {
self.id
}
#[must_use]
pub fn kind(&self) -> ObligationKind {
self.kind
}
#[must_use]
pub fn state(&self) -> ObligationState {
self.state
}
#[must_use]
pub fn created_at(&self) -> &str {
&self.created_at
}
}
impl Drop for Obligation {
fn drop(&mut self) {
if self.state == ObligationState::Reserved {
self.state = ObligationState::Leaked;
if let Some(ref ledger) = self.ledger {
ledger.record_terminal(self.id, ObligationState::Leaked);
ledger.record_leak(self.id, self.kind, &self.created_at);
}
match self.mode {
ObligationMode::Lab => {
if !std::thread::panicking() {
std::panic::panic_any(format!(
"obligation leak: {:?} id={} created_at={}",
self.kind, self.id, self.created_at
));
}
}
ObligationMode::Production => {
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct LedgerEntry {
pub id: u64,
pub kind: ObligationKind,
pub state: ObligationState,
pub created_at: String,
}
#[derive(Debug, Clone)]
pub struct LeakRecord {
pub id: u64,
pub kind: ObligationKind,
pub created_at: String,
}
pub struct ObligationLedger {
entries: Mutex<Vec<LedgerEntry>>,
leaks: Mutex<Vec<LeakRecord>>,
next_id: AtomicU64,
}
impl std::fmt::Debug for ObligationLedger {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ObligationLedger")
.field("next_id", &self.next_id.load(Ordering::Relaxed))
.field("entries_count", &self.snapshot().len())
.field("leaks_count", &self.leaked().len())
.finish_non_exhaustive()
}
}
impl Default for ObligationLedger {
fn default() -> Self {
Self::new()
}
}
impl ObligationLedger {
#[must_use]
pub fn new() -> Self {
Self {
entries: Mutex::new(Vec::new()),
leaks: Mutex::new(Vec::new()),
next_id: AtomicU64::new(0),
}
}
fn record_reserve(&self, id: u64, kind: ObligationKind, created_at: &str) {
let mut entries = self
.entries
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
entries.push(LedgerEntry {
id,
kind,
state: ObligationState::Reserved,
created_at: created_at.to_owned(),
});
}
fn record_terminal(&self, id: u64, state: ObligationState) {
let mut entries = self
.entries
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(entry) = entries.iter_mut().find(|e| e.id == id) {
entry.state = state;
}
}
fn record_leak(&self, id: u64, kind: ObligationKind, created_at: &str) {
let mut leaks = self
.leaks
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
leaks.push(LeakRecord {
id,
kind,
created_at: created_at.to_owned(),
});
}
#[must_use]
pub fn snapshot(&self) -> Vec<LedgerEntry> {
self.entries
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
#[must_use]
pub fn leaked(&self) -> Vec<LeakRecord> {
self.leaks
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
#[must_use]
pub fn count_by_state(&self, state: ObligationState) -> usize {
self.entries
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.iter()
.filter(|e| e.state == state)
.count()
}
#[must_use]
pub fn diagnostic_dump(&self) -> String {
let entries = self.snapshot();
let leaks = self.leaked();
let mut out = String::new();
let _ = writeln!(out, "=== Obligation Ledger Dump ===");
let _ = writeln!(out, "Total entries: {}", entries.len());
let _ = writeln!(
out,
"Committed: {}",
entries
.iter()
.filter(|e| e.state == ObligationState::Committed)
.count()
);
let _ = writeln!(
out,
"Aborted: {}",
entries
.iter()
.filter(|e| e.state == ObligationState::Aborted)
.count()
);
let _ = writeln!(out, "Leaked: {}", leaks.len());
for leak in &leaks {
let _ = writeln!(
out,
" LEAK id={} kind={:?} created_at={}",
leak.id, leak.kind, leak.created_at
);
}
out
}
}
pub struct TrackedSender<T> {
obligation: Option<Obligation>,
sender: Option<std::sync::mpsc::Sender<T>>,
}
impl<T> std::fmt::Debug for TrackedSender<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TrackedSender")
.field("obligation", &self.obligation)
.field("has_sender", &self.sender.is_some())
.finish()
}
}
impl<T> TrackedSender<T> {
#[must_use]
pub fn new(
sender: std::sync::mpsc::Sender<T>,
kind: ObligationKind,
mode: ObligationMode,
created_at: impl Into<String>,
ledger: Option<Arc<ObligationLedger>>,
) -> Self {
let obligation = Obligation::reserve(kind, mode, created_at, ledger);
Self {
obligation: Some(obligation),
sender: Some(sender),
}
}
pub fn send(mut self, value: T) -> Result<(), std::sync::mpsc::SendError<T>> {
let sender = self
.sender
.take()
.expect("TrackedSender: sender already consumed");
let result = sender.send(value);
if result.is_ok() {
if let Some(ref mut ob) = self.obligation {
ob.commit();
}
}
result
}
pub fn abort(mut self) {
if let Some(ref mut ob) = self.obligation {
ob.abort();
}
}
}
impl<T> Drop for TrackedSender<T> {
fn drop(&mut self) {
}
}
pub struct EvictChannel<T> {
buffer: Mutex<VecDeque<T>>,
capacity: usize,
evicted: AtomicU64,
}
impl<T: std::fmt::Debug> std::fmt::Debug for EvictChannel<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EvictChannel")
.field("capacity", &self.capacity)
.field("evicted", &self.evicted.load(Ordering::Relaxed))
.field("len", &self.len())
.finish_non_exhaustive()
}
}
impl<T> EvictChannel<T> {
#[must_use]
pub fn new(capacity: usize) -> Self {
Self {
buffer: Mutex::new(VecDeque::with_capacity(capacity)),
capacity,
evicted: AtomicU64::new(0),
}
}
pub fn send_evict_oldest(&self, value: T) {
let mut buf = self
.buffer
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if buf.len() >= self.capacity {
buf.pop_front();
self.evicted.fetch_add(1, Ordering::Relaxed);
}
buf.push_back(value);
}
pub fn recv(&self) -> Option<T> {
let mut buf = self
.buffer
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
buf.pop_front()
}
#[must_use]
pub fn eviction_count(&self) -> u64 {
self.evicted.load(Ordering::Relaxed)
}
#[must_use]
pub fn len(&self) -> usize {
self.buffer
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicBool;
const BEAD_ID: &str = "bd-3j1j";
fn make_ledger() -> Arc<ObligationLedger> {
Arc::new(ObligationLedger::new())
}
#[test]
fn test_obligation_commit_reaches_terminal() {
let ledger = make_ledger();
let mut ob = Obligation::reserve(
ObligationKind::SendPermit,
ObligationMode::Lab,
"test_commit",
Some(Arc::clone(&ledger)),
);
ob.commit();
assert_eq!(
ob.state(),
ObligationState::Committed,
"bead_id={BEAD_ID} commit_terminal"
);
drop(ob);
assert_eq!(
ledger.count_by_state(ObligationState::Committed),
1,
"bead_id={BEAD_ID} ledger_shows_committed"
);
assert!(ledger.leaked().is_empty(), "bead_id={BEAD_ID} no_leaks");
}
#[test]
fn test_obligation_abort_reaches_terminal() {
let ledger = make_ledger();
let mut ob = Obligation::reserve(
ObligationKind::TxnSlot,
ObligationMode::Lab,
"test_abort",
Some(Arc::clone(&ledger)),
);
ob.abort();
assert_eq!(
ob.state(),
ObligationState::Aborted,
"bead_id={BEAD_ID} abort_terminal"
);
drop(ob);
assert_eq!(ledger.count_by_state(ObligationState::Aborted), 1);
assert!(ledger.leaked().is_empty());
}
#[test]
#[should_panic(expected = "obligation leak")]
fn test_obligation_leak_panics_in_lab() {
let _ob = Obligation::reserve(
ObligationKind::WitnessReservation,
ObligationMode::Lab,
"test_leak_lab",
None,
);
}
#[test]
fn test_obligation_leak_diagnostic_in_production() {
let ledger = make_ledger();
let leaked_flag = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&leaked_flag);
{
let _ob = Obligation::reserve(
ObligationKind::CommitResponse,
ObligationMode::Production,
"test_leak_prod",
Some(Arc::clone(&ledger)),
);
}
let leaks = ledger.leaked();
assert_eq!(leaks.len(), 1, "bead_id={BEAD_ID} production_leak_recorded");
assert_eq!(leaks[0].kind, ObligationKind::CommitResponse);
assert_eq!(leaks[0].created_at, "test_leak_prod");
let dump = ledger.diagnostic_dump();
assert!(
dump.contains("LEAK"),
"bead_id={BEAD_ID} diagnostic_contains_leak"
);
assert!(dump.contains("test_leak_prod"));
flag.store(true, Ordering::Release);
assert!(leaked_flag.load(Ordering::Acquire));
}
#[test]
fn test_tracked_sender_commit_on_send() {
let ledger = make_ledger();
let (tx, rx) = std::sync::mpsc::channel();
let tracked = TrackedSender::new(
tx,
ObligationKind::SendPermit,
ObligationMode::Lab,
"test_tracked_send",
Some(Arc::clone(&ledger)),
);
tracked.send(42).expect("send should succeed");
assert_eq!(rx.recv().unwrap(), 42);
assert_eq!(
ledger.count_by_state(ObligationState::Committed),
1,
"bead_id={BEAD_ID} tracked_sender_committed"
);
assert!(ledger.leaked().is_empty());
}
#[test]
#[should_panic(expected = "obligation leak")]
fn test_tracked_sender_leak_on_drop() {
let (tx, _rx) = std::sync::mpsc::channel::<i32>();
let _tracked = TrackedSender::new(
tx,
ObligationKind::SendPermit,
ObligationMode::Lab,
"test_tracked_leak",
None,
);
}
#[test]
fn test_five_obligation_types_registered() {
let ledger = make_ledger();
let kinds = [
ObligationKind::SendPermit,
ObligationKind::CommitResponse,
ObligationKind::TxnSlot,
ObligationKind::WitnessReservation,
ObligationKind::SharedStateRegistration,
];
for kind in &kinds {
let mut ob = Obligation::reserve(
*kind,
ObligationMode::Lab,
format!("test_{kind:?}"),
Some(Arc::clone(&ledger)),
);
ob.commit();
}
assert_eq!(
ledger.count_by_state(ObligationState::Committed),
5,
"bead_id={BEAD_ID} all_five_committed"
);
assert!(ledger.leaked().is_empty(), "bead_id={BEAD_ID} zero_leaked");
}
#[test]
fn test_obligation_ledger_diagnostic_dump() {
let ledger = make_ledger();
let mut ob1 = Obligation::reserve(
ObligationKind::SendPermit,
ObligationMode::Production,
"ob1_commit",
Some(Arc::clone(&ledger)),
);
ob1.commit();
let mut ob2 = Obligation::reserve(
ObligationKind::TxnSlot,
ObligationMode::Production,
"ob2_abort",
Some(Arc::clone(&ledger)),
);
ob2.abort();
{
let _ob3 = Obligation::reserve(
ObligationKind::WitnessReservation,
ObligationMode::Production,
"ob3_leaked_from_line_42",
Some(Arc::clone(&ledger)),
);
}
let dump = ledger.diagnostic_dump();
assert!(
dump.contains("Committed: 1"),
"bead_id={BEAD_ID} dump_committed_count"
);
assert!(
dump.contains("Aborted: 1"),
"bead_id={BEAD_ID} dump_aborted_count"
);
assert!(
dump.contains("Leaked: 1"),
"bead_id={BEAD_ID} dump_leaked_count"
);
assert!(
dump.contains("ob3_leaked_from_line_42"),
"bead_id={BEAD_ID} dump_has_creation_context"
);
let leaks = ledger.leaked();
assert_eq!(leaks.len(), 1);
assert_eq!(leaks[0].kind, ObligationKind::WitnessReservation);
}
#[test]
fn test_cancel_resolves_obligations() {
let ledger = make_ledger();
let mut ob1 = Obligation::reserve(
ObligationKind::SendPermit,
ObligationMode::Lab,
"cancel_ob1",
Some(Arc::clone(&ledger)),
);
let mut ob2 = Obligation::reserve(
ObligationKind::TxnSlot,
ObligationMode::Lab,
"cancel_ob2",
Some(Arc::clone(&ledger)),
);
ob1.abort();
ob2.abort();
assert_eq!(ob1.state(), ObligationState::Aborted);
assert_eq!(ob2.state(), ObligationState::Aborted);
assert_eq!(ledger.count_by_state(ObligationState::Aborted), 2);
assert!(
ledger.leaked().is_empty(),
"bead_id={BEAD_ID} cancel_no_leaks"
);
}
#[test]
fn test_non_critical_channel_evict_oldest() {
let ch = EvictChannel::new(2);
ch.send_evict_oldest("msg_1");
ch.send_evict_oldest("msg_2");
assert_eq!(ch.len(), 2);
ch.send_evict_oldest("msg_3");
assert_eq!(ch.len(), 2);
assert_eq!(ch.eviction_count(), 1, "bead_id={BEAD_ID} one_eviction");
assert_eq!(ch.recv(), Some("msg_2"));
assert_eq!(ch.recv(), Some("msg_3"));
assert!(ch.is_empty());
}
}