use core::fmt;
use parking_lot::RwLock;
use smallvec::SmallVec;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use crate::types::symbol::{ObjectId, ObjectParams, SymbolId};
use crate::types::{ObligationId, RegionId, TaskId, Time};
use crate::util::ArenaIndex;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SymbolicObligationKind {
SendObject,
SendSymbol,
AcknowledgeReceipt,
DecodeObject,
RepairDelivery,
}
impl SymbolicObligationKind {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::SendObject => "send_object",
Self::SendSymbol => "send_symbol",
Self::AcknowledgeReceipt => "ack_receipt",
Self::DecodeObject => "decode_object",
Self::RepairDelivery => "repair_delivery",
}
}
}
impl fmt::Display for SymbolicObligationKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SymbolicObligationState {
Reserved,
InProgress,
Committed,
Aborted,
Leaked,
}
impl SymbolicObligationState {
#[must_use]
pub const fn is_terminal(self) -> bool {
matches!(self, Self::Committed | Self::Aborted | Self::Leaked)
}
#[must_use]
pub const fn is_success(self) -> bool {
matches!(self, Self::Committed | Self::Aborted)
}
#[must_use]
pub const fn is_leaked(self) -> bool {
matches!(self, Self::Leaked)
}
}
pub struct FulfillmentProgress {
total: u32,
fulfilled: AtomicU32,
}
impl FulfillmentProgress {
#[must_use]
pub fn new(total: u32) -> Self {
Self {
total,
fulfilled: AtomicU32::new(0),
}
}
pub fn increment(&self) {
self.add(1);
}
pub fn add(&self, count: u32) {
let _ = self
.fulfilled
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some(current.saturating_add(count))
});
}
#[must_use]
pub fn total(&self) -> u32 {
self.total
}
#[must_use]
pub fn fulfilled(&self) -> u32 {
self.fulfilled.load(Ordering::Relaxed)
}
#[must_use]
pub fn is_complete(&self) -> bool {
self.fulfilled() >= self.total
}
#[must_use]
pub fn percent(&self) -> f64 {
percent_complete(self.total, self.fulfilled())
}
#[must_use]
pub fn snapshot(&self) -> FulfillmentSnapshot {
let fulfilled = self.fulfilled();
FulfillmentSnapshot {
total: self.total,
fulfilled,
percent: percent_complete(self.total, fulfilled),
complete: fulfilled >= self.total,
}
}
#[must_use]
pub fn remaining(&self) -> u32 {
self.total.saturating_sub(self.fulfilled())
}
}
fn percent_complete(total: u32, fulfilled: u32) -> f64 {
if total == 0 {
1.0
} else {
f64::from(fulfilled.min(total)) / f64::from(total)
}
}
impl fmt::Debug for FulfillmentProgress {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FulfillmentProgress")
.field("fulfilled", &self.fulfilled())
.field("total", &self.total)
.field("complete", &self.is_complete())
.finish()
}
}
#[derive(Clone, Debug)]
pub struct FulfillmentSnapshot {
pub total: u32,
pub fulfilled: u32,
pub percent: f64,
pub complete: bool,
}
struct ObligationInner {
id: ObligationId,
kind: SymbolicObligationKind,
object_id: ObjectId,
expected_symbol: Option<SymbolId>,
holder: TaskId,
region: RegionId,
state: RwLock<SymbolicObligationState>,
progress: FulfillmentProgress,
created_at: Time,
registry: Option<RegistryMirror>,
}
pub struct SymbolicObligation {
state: Arc<ObligationInner>,
resolved: bool,
}
impl SymbolicObligation {
#[must_use]
fn new_send_object(
id: ObligationId,
object_id: ObjectId,
params: &ObjectParams,
holder: TaskId,
region: RegionId,
created_at: Time,
registry: Option<RegistryMirror>,
) -> Self {
let total_symbols = params.total_source_symbols();
Self {
state: Arc::new(ObligationInner {
id,
kind: SymbolicObligationKind::SendObject,
object_id,
expected_symbol: None,
holder,
region,
state: RwLock::new(SymbolicObligationState::Reserved),
progress: FulfillmentProgress::new(total_symbols),
created_at,
registry,
}),
resolved: false,
}
}
#[must_use]
fn new_send_symbol(
id: ObligationId,
symbol_id: SymbolId,
holder: TaskId,
region: RegionId,
created_at: Time,
registry: Option<RegistryMirror>,
) -> Self {
Self {
state: Arc::new(ObligationInner {
id,
kind: SymbolicObligationKind::SendSymbol,
object_id: symbol_id.object_id(),
expected_symbol: Some(symbol_id),
holder,
region,
state: RwLock::new(SymbolicObligationState::Reserved),
progress: FulfillmentProgress::new(1),
created_at,
registry,
}),
resolved: false,
}
}
#[must_use]
fn new_acknowledge(
id: ObligationId,
object_id: ObjectId,
expected_count: u32,
holder: TaskId,
region: RegionId,
created_at: Time,
registry: Option<RegistryMirror>,
) -> Self {
Self {
state: Arc::new(ObligationInner {
id,
kind: SymbolicObligationKind::AcknowledgeReceipt,
object_id,
expected_symbol: None,
holder,
region,
state: RwLock::new(SymbolicObligationState::Reserved),
progress: FulfillmentProgress::new(expected_count),
created_at,
registry,
}),
resolved: false,
}
}
#[must_use]
fn new_decode(
id: ObligationId,
object_id: ObjectId,
min_symbols: u32,
holder: TaskId,
region: RegionId,
created_at: Time,
registry: Option<RegistryMirror>,
) -> Self {
Self {
state: Arc::new(ObligationInner {
id,
kind: SymbolicObligationKind::DecodeObject,
object_id,
expected_symbol: None,
holder,
region,
state: RwLock::new(SymbolicObligationState::Reserved),
progress: FulfillmentProgress::new(min_symbols),
created_at,
registry,
}),
resolved: false,
}
}
#[must_use]
pub fn id(&self) -> ObligationId {
self.state.id
}
#[must_use]
pub fn kind(&self) -> SymbolicObligationKind {
self.state.kind
}
#[must_use]
pub fn object_id(&self) -> ObjectId {
self.state.object_id
}
#[must_use]
pub fn holder(&self) -> TaskId {
self.state.holder
}
#[must_use]
pub fn region(&self) -> RegionId {
self.state.region
}
#[must_use]
pub fn state(&self) -> SymbolicObligationState {
*self.state.state.read()
}
#[must_use]
pub fn is_pending(&self) -> bool {
!self.state().is_terminal()
}
#[must_use]
pub fn progress(&self) -> FulfillmentSnapshot {
self.state.progress.snapshot()
}
#[must_use]
pub fn created_at(&self) -> Time {
self.state.created_at
}
fn sync_registry_state(&self, state: SymbolicObligationState) {
if let Some(registry) = &self.state.registry {
if state.is_terminal() {
registry.unregister(self.id(), self.object_id(), self.holder(), self.region());
} else if let Some(entry) = registry.by_id.write().get_mut(&self.id()) {
entry.state = state;
}
}
}
fn set_state(&self, state: SymbolicObligationState) {
*self.state.state.write() = state;
self.sync_registry_state(state);
}
fn validate_fulfilled_symbol(&self, symbol_id: SymbolId) {
assert_eq!(
symbol_id.object_id(),
self.object_id(),
"fulfilled symbol object mismatch: expected {}, got {}",
self.object_id(),
symbol_id.object_id()
);
if let Some(expected_symbol) = self.state.expected_symbol {
assert_eq!(
symbol_id, expected_symbol,
"fulfilled symbol mismatch: expected {expected_symbol}, got {symbol_id}"
);
}
}
pub fn fulfill_one(&self, symbol_id: SymbolId) {
self.validate_fulfilled_symbol(symbol_id);
self.state.progress.increment();
let mut state = self.state.state.write();
if *state == SymbolicObligationState::Reserved {
*state = SymbolicObligationState::InProgress;
drop(state);
self.sync_registry_state(SymbolicObligationState::InProgress);
}
}
pub fn fulfill_many(&self, count: u32) {
assert!(
self.state.expected_symbol.is_none(),
"fulfill_many is invalid for single-symbol obligations; use fulfill_one with the expected SymbolId"
);
self.state.progress.add(count);
let mut state = self.state.state.write();
if *state == SymbolicObligationState::Reserved {
*state = SymbolicObligationState::InProgress;
drop(state);
self.sync_registry_state(SymbolicObligationState::InProgress);
}
}
pub fn commit(mut self) {
assert!(self.is_pending(), "obligation already resolved");
self.set_state(SymbolicObligationState::Committed);
self.resolved = true;
}
pub fn abort(mut self) {
assert!(self.is_pending(), "obligation already resolved");
self.set_state(SymbolicObligationState::Aborted);
self.resolved = true;
}
pub fn commit_or_abort(self) {
if self.state.progress.is_complete() {
self.commit();
} else {
self.abort();
}
}
pub(crate) fn mark_leaked(&self) {
self.set_state(SymbolicObligationState::Leaked);
}
#[doc(hidden)]
#[must_use]
pub fn new_for_test(id: u64, object_id: ObjectId, total: u32) -> Self {
Self {
state: Arc::new(ObligationInner {
id: ObligationId::from_arena(ArenaIndex::new(id as u32, 0)),
kind: SymbolicObligationKind::SendObject,
object_id,
expected_symbol: None,
holder: TaskId::from_arena(ArenaIndex::new(0, 0)),
region: RegionId::from_arena(ArenaIndex::new(0, 0)),
state: RwLock::new(SymbolicObligationState::Reserved),
progress: FulfillmentProgress::new(total),
created_at: Time::ZERO,
registry: None,
}),
resolved: false,
}
}
}
impl Drop for SymbolicObligation {
fn drop(&mut self) {
if !self.resolved && self.is_pending() {
self.mark_leaked();
if std::thread::panicking() {
return;
}
#[cfg(debug_assertions)]
panic!(
"SymbolicObligation leaked: {:?} for object {} was dropped without resolution",
self.kind(),
self.object_id()
);
#[cfg(not(debug_assertions))]
crate::tracing_compat::error!(
kind = ?self.kind(),
object_id = %self.object_id(),
"symbolic obligation leaked"
);
}
}
}
impl fmt::Debug for SymbolicObligation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SymbolicObligation")
.field("id", &self.id())
.field("kind", &self.kind())
.field("object_id", &self.object_id())
.field("state", &self.state())
.field("progress", &self.progress())
.finish()
}
}
#[derive(Clone, Debug)]
pub struct ObligationSummary {
pub id: ObligationId,
pub kind: SymbolicObligationKind,
pub object_id: ObjectId,
pub holder: TaskId,
pub state: SymbolicObligationState,
}
pub(crate) struct ObligationEntry {
id: ObligationId,
kind: SymbolicObligationKind,
object_id: ObjectId,
holder: TaskId,
#[allow(dead_code)]
created_at: Time,
state: SymbolicObligationState,
}
#[derive(Clone)]
#[allow(clippy::struct_field_names)]
struct RegistryMirror {
by_id: Arc<RwLock<HashMap<ObligationId, ObligationEntry>>>,
by_object: Arc<RwLock<HashMap<ObjectId, Vec<ObligationId>>>>,
by_holder: Arc<RwLock<Vec<HolderSlot>>>,
by_region: Arc<RwLock<Vec<RegionSlot>>>,
}
impl RegistryMirror {
fn unregister(&self, id: ObligationId, object_id: ObjectId, holder: TaskId, region: RegionId) {
self.by_id.write().remove(&id);
let mut by_object = self.by_object.write();
let remove_object_entry = by_object.get_mut(&object_id).is_some_and(|ids| {
ids.retain(|current| *current != id);
ids.is_empty()
});
if remove_object_entry {
by_object.remove(&object_id);
}
drop(by_object);
remove_obligation_from_holder_slot(&self.by_holder, holder, id);
remove_obligation_from_region_slot(&self.by_region, region, id);
}
}
type ObligationIds = SmallVec<[ObligationId; 4]>;
type HolderSlot = SmallVec<[(TaskId, ObligationIds); 1]>;
type RegionSlot = SmallVec<[(RegionId, ObligationIds); 1]>;
#[allow(clippy::significant_drop_tightening)]
fn remove_obligation_from_holder_slot(
by_holder: &Arc<RwLock<Vec<HolderSlot>>>,
holder: TaskId,
id: ObligationId,
) {
let holder_slot = holder.arena_index().index() as usize;
let mut guard = by_holder.write();
let Some(entries) = guard.get_mut(holder_slot) else {
return;
};
let Some(position) = entries
.iter()
.position(|(stored_holder, _)| *stored_holder == holder)
else {
return;
};
entries[position].1.retain(|current| *current != id);
if entries[position].1.is_empty() {
entries.remove(position);
}
}
#[allow(clippy::significant_drop_tightening)]
fn remove_obligation_from_region_slot(
by_region: &Arc<RwLock<Vec<RegionSlot>>>,
region: RegionId,
id: ObligationId,
) {
let region_slot = region.arena_index().index() as usize;
let mut guard = by_region.write();
let Some(entries) = guard.get_mut(region_slot) else {
return;
};
let Some(position) = entries
.iter()
.position(|(stored_region, _)| *stored_region == region)
else {
return;
};
entries[position].1.retain(|current| *current != id);
if entries[position].1.is_empty() {
entries.remove(position);
}
}
pub struct SymbolicObligationRegistry {
by_id: Arc<RwLock<HashMap<ObligationId, ObligationEntry>>>,
by_object: Arc<RwLock<HashMap<ObjectId, Vec<ObligationId>>>>,
by_holder: Arc<RwLock<Vec<HolderSlot>>>,
by_region: Arc<RwLock<Vec<RegionSlot>>>,
next_id: AtomicU64,
}
impl SymbolicObligationRegistry {
#[must_use]
pub fn new() -> Self {
Self {
by_id: Arc::new(RwLock::new(HashMap::new())),
by_object: Arc::new(RwLock::new(HashMap::new())),
by_holder: Arc::new(RwLock::new(Vec::new())),
by_region: Arc::new(RwLock::new(Vec::new())),
next_id: AtomicU64::new(1),
}
}
fn registry_mirror(&self) -> RegistryMirror {
RegistryMirror {
by_id: Arc::clone(&self.by_id),
by_object: Arc::clone(&self.by_object),
by_holder: Arc::clone(&self.by_holder),
by_region: Arc::clone(&self.by_region),
}
}
pub fn create_send_object(
&self,
object_id: ObjectId,
params: &ObjectParams,
holder: TaskId,
region: RegionId,
now: Time,
) -> SymbolicObligation {
let id = self.allocate_id();
let obligation = SymbolicObligation::new_send_object(
id,
object_id,
params,
holder,
region,
now,
Some(self.registry_mirror()),
);
self.register(
id,
SymbolicObligationKind::SendObject,
object_id,
holder,
region,
now,
);
obligation
}
pub fn create_send_symbol(
&self,
symbol_id: SymbolId,
holder: TaskId,
region: RegionId,
now: Time,
) -> SymbolicObligation {
let id = self.allocate_id();
let obligation = SymbolicObligation::new_send_symbol(
id,
symbol_id,
holder,
region,
now,
Some(self.registry_mirror()),
);
self.register(
id,
SymbolicObligationKind::SendSymbol,
symbol_id.object_id(),
holder,
region,
now,
);
obligation
}
pub fn create_acknowledge(
&self,
object_id: ObjectId,
expected_count: u32,
holder: TaskId,
region: RegionId,
now: Time,
) -> SymbolicObligation {
let id = self.allocate_id();
let obligation = SymbolicObligation::new_acknowledge(
id,
object_id,
expected_count,
holder,
region,
now,
Some(self.registry_mirror()),
);
self.register(
id,
SymbolicObligationKind::AcknowledgeReceipt,
object_id,
holder,
region,
now,
);
obligation
}
pub fn create_decode(
&self,
object_id: ObjectId,
min_symbols: u32,
holder: TaskId,
region: RegionId,
now: Time,
) -> SymbolicObligation {
let id = self.allocate_id();
let obligation = SymbolicObligation::new_decode(
id,
object_id,
min_symbols,
holder,
region,
now,
Some(self.registry_mirror()),
);
self.register(
id,
SymbolicObligationKind::DecodeObject,
object_id,
holder,
region,
now,
);
obligation
}
pub fn update_state(&self, id: ObligationId, state: SymbolicObligationState) {
if let Some(entry) = self.by_id.write().get_mut(&id) {
entry.state = state;
}
}
#[must_use]
pub fn obligations_for_region(&self, region: RegionId) -> Vec<ObligationId> {
let slot = region.arena_index().index() as usize;
let guard = self.by_region.read();
if let Some(entries) = guard.get(slot) {
if let Some((_, ids)) = entries
.iter()
.find(|(stored_region, _)| *stored_region == region)
{
return ids.to_vec();
}
}
drop(guard);
Vec::new()
}
#[must_use]
pub fn obligations_for_task(&self, task: TaskId) -> Vec<ObligationId> {
let slot = task.arena_index().index() as usize;
let guard = self.by_holder.read();
if let Some(entries) = guard.get(slot) {
if let Some((_, ids)) = entries.iter().find(|(stored_task, _)| *stored_task == task) {
return ids.to_vec();
}
}
drop(guard);
Vec::new()
}
#[must_use]
pub fn obligations_for_object(&self, object_id: ObjectId) -> Vec<ObligationId> {
self.by_object
.read()
.get(&object_id)
.cloned()
.unwrap_or_default()
}
#[must_use]
pub fn has_pending_in_region(&self, region: RegionId) -> bool {
let slot = region.arena_index().index() as usize;
let ids = {
let by_region = self.by_region.read();
by_region.get(slot).and_then(|entries| {
entries
.iter()
.find(|(stored_region, _)| *stored_region == region)
.map(|(_, ids)| ids.clone())
})
};
let Some(ids) = ids else {
return false;
};
let by_id = self.by_id.read();
for id in &ids {
if let Some(entry) = by_id.get(id) {
if !entry.state.is_terminal() {
return true;
}
}
}
false
}
#[must_use]
pub fn pending_in_region(&self, region: RegionId) -> Vec<ObligationSummary> {
let mut result = Vec::new();
let slot = region.arena_index().index() as usize;
let ids = {
let by_region = self.by_region.read();
by_region.get(slot).and_then(|entries| {
entries
.iter()
.find(|(stored_region, _)| *stored_region == region)
.map(|(_, ids)| ids.clone())
})
};
let Some(ids) = ids else {
return result;
};
let by_id = self.by_id.read();
for id in &ids {
if let Some(entry) = by_id.get(id) {
if !entry.state.is_terminal() {
result.push(ObligationSummary {
id: entry.id,
kind: entry.kind,
object_id: entry.object_id,
holder: entry.holder,
state: entry.state,
});
}
}
}
result
}
fn allocate_id(&self) -> ObligationId {
let raw = self.next_id.fetch_add(1, Ordering::Relaxed);
let index = u32::try_from(raw)
.expect("symbolic obligation id overflow: arena index exhausted for obligations");
ObligationId::from_arena(ArenaIndex::new(index, 0))
}
fn register(
&self,
id: ObligationId,
kind: SymbolicObligationKind,
object_id: ObjectId,
holder: TaskId,
region: RegionId,
created_at: Time,
) {
let entry = ObligationEntry {
id,
kind,
object_id,
holder,
created_at,
state: SymbolicObligationState::Reserved,
};
self.by_id.write().insert(id, entry);
self.by_object
.write()
.entry(object_id)
.or_default()
.push(id);
{
let holder_slot = holder.arena_index().index() as usize;
let mut by_holder = self.by_holder.write();
if holder_slot >= by_holder.len() {
by_holder.resize_with(holder_slot + 1, SmallVec::new);
}
let entries = &mut by_holder[holder_slot];
if let Some((_, ids)) = entries
.iter_mut()
.find(|(stored_holder, _)| *stored_holder == holder)
{
ids.push(id);
} else {
let mut ids = SmallVec::new();
ids.push(id);
entries.push((holder, ids));
}
drop(by_holder);
}
{
let region_slot = region.arena_index().index() as usize;
let mut by_region = self.by_region.write();
if region_slot >= by_region.len() {
by_region.resize_with(region_slot + 1, SmallVec::new);
}
let entries = &mut by_region[region_slot];
if let Some((_, ids)) = entries
.iter_mut()
.find(|(stored_region, _)| *stored_region == region)
{
ids.push(id);
} else {
let mut ids = SmallVec::new();
ids.push(id);
entries.push((region, ids));
}
drop(by_region);
}
}
}
impl Default for SymbolicObligationRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_object_id() -> ObjectId {
ObjectId::new_for_test(1)
}
fn test_task_id() -> TaskId {
TaskId::from_arena(ArenaIndex::new(1, 0))
}
fn test_task_id_with_generation(index: u32, generation: u32) -> TaskId {
TaskId::from_arena(ArenaIndex::new(index, generation))
}
fn test_region_id() -> RegionId {
RegionId::from_arena(ArenaIndex::new(1, 0))
}
fn test_region_id_with_generation(index: u32, generation: u32) -> RegionId {
RegionId::from_arena(ArenaIndex::new(index, generation))
}
fn test_params() -> ObjectParams {
ObjectParams::new(test_object_id(), 5120, 1280, 1, 4)
}
#[test]
fn obligation_lifecycle_commit() {
let obj = SymbolicObligation::new_for_test(1, test_object_id(), 4);
assert_eq!(obj.state(), SymbolicObligationState::Reserved);
assert!(obj.is_pending());
assert_eq!(obj.progress().total, 4);
assert_eq!(obj.progress().fulfilled, 0);
assert!(!obj.progress().complete);
obj.commit();
}
#[test]
fn obligation_lifecycle_abort() {
let obj = SymbolicObligation::new_for_test(2, test_object_id(), 4);
assert!(obj.is_pending());
obj.abort();
}
#[test]
fn partial_fulfillment_tracking() {
let obj = SymbolicObligation::new_for_test(3, test_object_id(), 3);
let sym1 = SymbolId::new(test_object_id(), 0, 0);
let sym2 = SymbolId::new(test_object_id(), 0, 1);
obj.fulfill_one(sym1);
assert_eq!(obj.state(), SymbolicObligationState::InProgress);
assert_eq!(obj.progress().fulfilled, 1);
obj.fulfill_one(sym2);
assert_eq!(obj.progress().fulfilled, 2);
assert!(!obj.progress().complete);
obj.fulfill_many(1);
assert!(obj.progress().complete);
obj.commit();
}
#[test]
fn commit_or_abort_complete() {
let obj = SymbolicObligation::new_for_test(4, test_object_id(), 2);
obj.fulfill_many(2);
obj.commit_or_abort();
}
#[test]
fn commit_or_abort_incomplete() {
let obj = SymbolicObligation::new_for_test(5, test_object_id(), 10);
obj.fulfill_many(3);
obj.commit_or_abort();
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "SymbolicObligation leaked")]
fn leak_detection_panics_in_debug() {
let _obj = SymbolicObligation::new_for_test(6, test_object_id(), 1);
}
#[test]
fn fulfillment_progress_zero_total() {
let progress = FulfillmentProgress::new(0);
assert!(progress.is_complete());
assert!((progress.percent() - 1.0).abs() < f64::EPSILON);
assert_eq!(progress.remaining(), 0);
}
#[test]
fn fulfillment_progress_snapshot() {
let progress = FulfillmentProgress::new(10);
progress.add(7);
let snap = progress.snapshot();
assert_eq!(snap.total, 10);
assert_eq!(snap.fulfilled, 7);
assert!(!snap.complete);
assert!((snap.percent - 0.7).abs() < f64::EPSILON);
}
#[test]
fn fulfillment_progress_saturates_on_overflow() {
let progress = FulfillmentProgress::new(u32::MAX);
progress.add(u32::MAX);
assert_eq!(progress.fulfilled(), u32::MAX);
progress.increment();
assert_eq!(progress.fulfilled(), u32::MAX);
assert!(progress.is_complete());
}
#[test]
fn fulfillment_progress_percent_clamps_at_one() {
let progress = FulfillmentProgress::new(3);
progress.add(5);
assert_eq!(progress.fulfilled(), 5);
assert!((progress.percent() - 1.0).abs() < f64::EPSILON);
}
#[test]
fn fulfillment_progress_snapshot_clamps_percent_from_captured_fulfillment() {
let progress = FulfillmentProgress::new(2);
progress.add(5);
let snapshot = progress.snapshot();
assert_eq!(snapshot.total, 2);
assert_eq!(snapshot.fulfilled, 5);
assert!(snapshot.complete);
assert!((snapshot.percent - 1.0).abs() < f64::EPSILON);
}
#[test]
fn obligation_kind_display() {
assert_eq!(SymbolicObligationKind::SendObject.as_str(), "send_object");
assert_eq!(
SymbolicObligationKind::AcknowledgeReceipt.as_str(),
"ack_receipt"
);
}
#[test]
fn obligation_state_properties() {
assert!(!SymbolicObligationState::Reserved.is_terminal());
assert!(!SymbolicObligationState::InProgress.is_terminal());
assert!(SymbolicObligationState::Committed.is_terminal());
assert!(SymbolicObligationState::Aborted.is_terminal());
assert!(SymbolicObligationState::Leaked.is_terminal());
assert!(SymbolicObligationState::Committed.is_success());
assert!(SymbolicObligationState::Aborted.is_success());
assert!(!SymbolicObligationState::Leaked.is_success());
assert!(SymbolicObligationState::Leaked.is_leaked());
assert!(!SymbolicObligationState::Committed.is_leaked());
}
#[test]
fn registry_create_and_lookup() {
let registry = SymbolicObligationRegistry::new();
let holder = test_task_id();
let region = test_region_id();
let object = test_object_id();
let params = test_params();
let obligation = registry.create_send_object(object, ¶ms, holder, region, Time::ZERO);
assert_eq!(registry.obligations_for_region(region).len(), 1);
assert_eq!(registry.obligations_for_task(holder).len(), 1);
assert_eq!(registry.obligations_for_object(object).len(), 1);
assert!(registry.has_pending_in_region(region));
let pending = registry.pending_in_region(region);
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].kind, SymbolicObligationKind::SendObject);
obligation.commit();
assert!(registry.obligations_for_region(region).is_empty());
assert!(registry.obligations_for_task(holder).is_empty());
assert!(registry.obligations_for_object(object).is_empty());
assert!(!registry.has_pending_in_region(region));
assert!(registry.pending_in_region(region).is_empty());
}
#[test]
fn registry_multiple_obligations() {
let registry = SymbolicObligationRegistry::new();
let holder = test_task_id();
let region = test_region_id();
let object = test_object_id();
let sym1 = SymbolId::new(object, 0, 0);
let sym2 = SymbolId::new(object, 0, 1);
let o1 = registry.create_send_symbol(sym1, holder, region, Time::ZERO);
let o2 = registry.create_send_symbol(sym2, holder, region, Time::ZERO);
assert_eq!(registry.obligations_for_object(object).len(), 2);
assert!(registry.has_pending_in_region(region));
o1.commit();
assert_eq!(registry.obligations_for_object(object), vec![o2.id()]);
assert!(registry.has_pending_in_region(region));
o2.abort();
assert!(registry.obligations_for_object(object).is_empty());
assert!(!registry.has_pending_in_region(region));
assert!(registry.pending_in_region(region).is_empty());
}
#[test]
fn registry_decode_obligation() {
let registry = SymbolicObligationRegistry::new();
let holder = test_task_id();
let region = test_region_id();
let object = test_object_id();
let obligation = registry.create_decode(object, 4, holder, region, Time::ZERO);
assert_eq!(obligation.kind(), SymbolicObligationKind::DecodeObject);
assert_eq!(obligation.progress().total, 4);
let pending = registry.pending_in_region(region);
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].state, SymbolicObligationState::Reserved);
obligation.fulfill_many(4);
let pending = registry.pending_in_region(region);
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].state, SymbolicObligationState::InProgress);
assert!(obligation.progress().complete);
obligation.commit();
assert!(!registry.has_pending_in_region(region));
}
#[test]
fn registry_obligations_for_task_preserve_same_slot_generations() {
let registry = SymbolicObligationRegistry::new();
let older = test_task_id_with_generation(5, 0);
let newer = test_task_id_with_generation(5, 1);
let region = test_region_id();
let object = test_object_id();
let params = test_params();
let older_obligation =
registry.create_send_object(object, ¶ms, older, region, Time::ZERO);
let newer_obligation = registry.create_acknowledge(object, 1, newer, region, Time::ZERO);
assert_eq!(
registry.obligations_for_task(older),
vec![older_obligation.id()]
);
assert_eq!(
registry.obligations_for_task(newer),
vec![newer_obligation.id()]
);
older_obligation.abort();
newer_obligation.abort();
}
#[test]
fn registry_obligations_for_region_preserve_same_slot_generations() {
let registry = SymbolicObligationRegistry::new();
let holder = test_task_id();
let older = test_region_id_with_generation(7, 0);
let newer = test_region_id_with_generation(7, 1);
let object = test_object_id();
let params = test_params();
let older_obligation =
registry.create_send_object(object, ¶ms, holder, older, Time::ZERO);
let newer_obligation = registry.create_acknowledge(object, 1, holder, newer, Time::ZERO);
assert_eq!(
registry.obligations_for_region(older),
vec![older_obligation.id()]
);
assert_eq!(
registry.obligations_for_region(newer),
vec![newer_obligation.id()]
);
older_obligation.abort();
newer_obligation.abort();
}
#[test]
fn registry_acknowledge_obligation() {
let registry = SymbolicObligationRegistry::new();
let holder = test_task_id();
let region = test_region_id();
let object = test_object_id();
let obligation = registry.create_acknowledge(object, 8, holder, region, Time::ZERO);
assert_eq!(
obligation.kind(),
SymbolicObligationKind::AcknowledgeReceipt
);
assert_eq!(obligation.progress().total, 8);
obligation.abort();
assert!(!registry.has_pending_in_region(region));
}
#[test]
#[should_panic(expected = "fulfilled symbol mismatch")]
fn send_symbol_obligation_rejects_wrong_symbol_progress() {
let object = test_object_id();
let expected = SymbolId::new(object, 0, 0);
let wrong = SymbolId::new(object, 0, 1);
let obligation = SymbolicObligation::new_send_symbol(
ObligationId::from_arena(ArenaIndex::new(99, 0)),
expected,
test_task_id(),
test_region_id(),
Time::ZERO,
None,
);
obligation.fulfill_one(wrong);
}
#[test]
#[should_panic(expected = "fulfilled symbol object mismatch")]
fn send_object_obligation_rejects_wrong_object_progress() {
let obligation = SymbolicObligation::new_send_object(
ObligationId::from_arena(ArenaIndex::new(100, 0)),
test_object_id(),
&test_params(),
test_task_id(),
test_region_id(),
Time::ZERO,
None,
);
let wrong_object_symbol = SymbolId::new(ObjectId::new_for_test(99), 0, 0);
obligation.fulfill_one(wrong_object_symbol);
}
#[test]
#[should_panic(expected = "fulfill_many is invalid for single-symbol obligations")]
fn send_symbol_obligation_rejects_bulk_progress_without_symbol_identity() {
let obligation = SymbolicObligation::new_send_symbol(
ObligationId::from_arena(ArenaIndex::new(101, 0)),
SymbolId::new(test_object_id(), 0, 0),
test_task_id(),
test_region_id(),
Time::ZERO,
None,
);
obligation.fulfill_many(1);
}
#[test]
#[should_panic(expected = "symbolic obligation id overflow")]
fn registry_panics_on_obligation_id_overflow() {
let registry = SymbolicObligationRegistry::new();
registry
.next_id
.store(u64::from(u32::MAX) + 1, Ordering::Relaxed);
let _ = registry.create_decode(
test_object_id(),
1,
test_task_id(),
test_region_id(),
Time::ZERO,
);
}
#[test]
fn symbolic_obligation_kind_debug_clone_copy_eq() {
let a = SymbolicObligationKind::SendObject;
let b = a; let c = a;
assert_eq!(a, b);
assert_eq!(a, c);
assert_ne!(a, SymbolicObligationKind::DecodeObject);
let dbg = format!("{a:?}");
assert!(dbg.contains("SendObject"));
}
#[test]
fn symbolic_obligation_state_debug_clone_copy_eq() {
let a = SymbolicObligationState::Committed;
let b = a; let c = a;
assert_eq!(a, b);
assert_eq!(a, c);
assert_ne!(a, SymbolicObligationState::Aborted);
assert_ne!(a, SymbolicObligationState::Leaked);
let dbg = format!("{a:?}");
assert!(dbg.contains("Committed"));
}
}