use crate::obligation::graded::{AbortedProof, CommittedProof, LeaseKind, ObligationToken};
use crate::types::{RegionId, TaskId, Time};
use crate::util::{DetBuildHasher, DetHashMap};
use std::fmt;
use std::sync::Arc;
pub trait RegistryCap: Send + Sync + 'static {}
#[derive(Clone)]
pub struct RegistryHandle {
inner: Arc<dyn RegistryCap>,
}
impl RegistryHandle {
#[must_use]
pub fn new(inner: Arc<dyn RegistryCap>) -> Self {
Self { inner }
}
#[must_use]
pub fn as_arc(&self) -> Arc<dyn RegistryCap> {
Arc::clone(&self.inner)
}
}
impl fmt::Debug for RegistryHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RegistryHandle")
.field("inner", &format_args!("Arc<dyn RegistryCap>(..)"))
.finish()
}
}
#[derive(Debug)]
pub struct NameLease {
name: String,
holder: TaskId,
region: RegionId,
acquired_at: Time,
token: Option<ObligationToken<LeaseKind>>,
}
impl NameLease {
#[must_use]
pub fn new(
name: impl Into<String>,
holder: TaskId,
region: RegionId,
acquired_at: Time,
) -> Self {
let name = name.into();
let token = ObligationToken::reserve(format!("name_lease:{name}"));
Self {
name,
holder,
region,
acquired_at,
token: Some(token),
}
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn holder(&self) -> TaskId {
self.holder
}
#[must_use]
pub fn region(&self) -> RegionId {
self.region
}
#[must_use]
pub fn acquired_at(&self) -> Time {
self.acquired_at
}
#[must_use]
pub fn is_active(&self) -> bool {
self.token.is_some()
}
pub fn release(&mut self) -> Result<CommittedProof<LeaseKind>, NameLeaseError> {
let token = self.token.take().ok_or(NameLeaseError::AlreadyResolved)?;
Ok(token.commit())
}
pub fn abort(&mut self) -> Result<AbortedProof<LeaseKind>, NameLeaseError> {
let token = self.token.take().ok_or(NameLeaseError::AlreadyResolved)?;
Ok(token.abort())
}
}
#[derive(Debug)]
pub struct NamePermit {
name: String,
holder: TaskId,
region: RegionId,
reserved_at: Time,
permit_id: u64,
token: Option<ObligationToken<LeaseKind>>,
}
impl NamePermit {
#[must_use]
fn new(
name: impl Into<String>,
holder: TaskId,
region: RegionId,
reserved_at: Time,
permit_id: u64,
) -> Self {
let name = name.into();
let token = ObligationToken::reserve(format!("name_permit:{name}"));
Self {
name,
holder,
region,
reserved_at,
permit_id,
token: Some(token),
}
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn holder(&self) -> TaskId {
self.holder
}
#[must_use]
pub fn region(&self) -> RegionId {
self.region
}
#[must_use]
pub fn reserved_at(&self) -> Time {
self.reserved_at
}
#[must_use]
pub fn is_pending(&self) -> bool {
self.token.is_some()
}
#[must_use]
fn permit_id(&self) -> u64 {
self.permit_id
}
fn commit(mut self) -> NameLease {
let token = self
.token
.take()
.expect("NamePermit::commit called on already-resolved permit");
NameLease {
name: self.name,
holder: self.holder,
region: self.region,
acquired_at: self.reserved_at,
token: Some(token),
}
}
pub fn abort(&mut self) -> Result<AbortedProof<LeaseKind>, NameLeaseError> {
let token = self.token.take().ok_or(NameLeaseError::AlreadyResolved)?;
Ok(token.abort())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NameCollisionPolicy {
Fail,
Replace,
Wait {
deadline: Time,
},
}
#[derive(Debug)]
pub enum NameCollisionOutcome {
Registered {
lease: NameLease,
},
Replaced {
lease: NameLease,
displaced_holder: TaskId,
displaced_region: RegionId,
},
Enqueued,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NameLeaseError {
AlreadyResolved,
NameTaken {
name: String,
current_holder: TaskId,
},
NotFound {
name: String,
},
WaitBudgetExceeded {
name: String,
},
PermissionDenied {
name: String,
},
}
impl fmt::Display for NameLeaseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::AlreadyResolved => write!(f, "name lease already resolved"),
Self::NameTaken {
name,
current_holder,
} => {
write!(f, "name '{name}' already held by {current_holder}")
}
Self::NotFound { name } => write!(f, "name '{name}' not found"),
Self::WaitBudgetExceeded { name } => {
write!(f, "wait budget exceeded for name '{name}'")
}
Self::PermissionDenied { name } => {
write!(f, "permit identity mismatch for name '{name}'")
}
}
}
}
impl std::error::Error for NameLeaseError {}
#[derive(Debug)]
pub struct NameRegistry {
leases: DetHashMap<String, NameEntry>,
pending: DetHashMap<String, NameEntry>,
waiters: DetHashMap<String, std::collections::VecDeque<WaiterEntry>>,
granted: Vec<GrantedLease>,
watchers_by_ref: DetHashMap<NameWatchRef, NameWatcher>,
watchers_by_name: DetHashMap<String, Vec<NameWatchRef>>,
watchers_by_region: DetHashMap<RegionId, Vec<NameWatchRef>>,
notifications: Vec<NameOwnershipNotification>,
next_watch_ref: u64,
next_permit_id: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct NameEntry {
holder: TaskId,
region: RegionId,
acquired_at: Time,
identity_nonce: u64,
}
#[derive(Debug)]
struct WaiterEntry {
holder: TaskId,
region: RegionId,
deadline: Time,
}
#[derive(Debug)]
pub struct GrantedLease {
pub name: String,
pub lease: NameLease,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct NameWatchRef(u64);
impl NameWatchRef {
#[must_use]
pub fn id(self) -> u64 {
self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum NameOwnershipKind {
Acquired,
Released,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NameOwnershipNotification {
pub watch_ref: NameWatchRef,
pub watcher: TaskId,
pub watcher_region: RegionId,
pub name: String,
pub holder: TaskId,
pub region: RegionId,
pub kind: NameOwnershipKind,
}
#[derive(Debug, Clone)]
struct NameWatcher {
watch_ref: NameWatchRef,
watcher: TaskId,
watcher_region: RegionId,
name: String,
}
impl NameRegistry {
#[must_use]
pub fn new() -> Self {
Self {
leases: DetHashMap::with_capacity_and_hasher(32, DetBuildHasher),
pending: DetHashMap::with_capacity_and_hasher(16, DetBuildHasher),
waiters: DetHashMap::with_capacity_and_hasher(16, DetBuildHasher),
granted: Vec::with_capacity(8),
watchers_by_ref: DetHashMap::with_capacity_and_hasher(16, DetBuildHasher),
watchers_by_name: DetHashMap::with_capacity_and_hasher(16, DetBuildHasher),
watchers_by_region: DetHashMap::with_capacity_and_hasher(8, DetBuildHasher),
notifications: Vec::with_capacity(8),
next_watch_ref: 1,
next_permit_id: 1,
}
}
pub fn watch_name(
&mut self,
name: impl Into<String>,
watcher: TaskId,
watcher_region: RegionId,
) -> NameWatchRef {
let name = name.into();
let watch_ref = NameWatchRef(self.next_watch_ref);
self.next_watch_ref = self
.next_watch_ref
.checked_add(1)
.expect("watch ref overflow");
let watcher_record = NameWatcher {
watch_ref,
watcher,
watcher_region,
name: name.clone(),
};
self.watchers_by_ref.insert(watch_ref, watcher_record);
self.watchers_by_name
.entry(name)
.or_default()
.push(watch_ref);
self.watchers_by_region
.entry(watcher_region)
.or_default()
.push(watch_ref);
watch_ref
}
pub fn unwatch_name(&mut self, watch_ref: NameWatchRef) -> bool {
let Some(record) = self.watchers_by_ref.remove(&watch_ref) else {
return false;
};
if let Some(refs) = self.watchers_by_name.get_mut(&record.name) {
refs.retain(|r| *r != watch_ref);
if refs.is_empty() {
self.watchers_by_name.remove(&record.name);
}
}
if let Some(refs) = self.watchers_by_region.get_mut(&record.watcher_region) {
refs.retain(|r| *r != watch_ref);
if refs.is_empty() {
self.watchers_by_region.remove(&record.watcher_region);
}
}
true
}
pub fn cleanup_name_watchers_region(&mut self, region: RegionId) -> Vec<NameWatchRef> {
let Some(refs) = self.watchers_by_region.remove(®ion) else {
return Vec::new();
};
let mut removed = Vec::with_capacity(refs.len());
for watch_ref in refs {
if let Some(record) = self.watchers_by_ref.remove(&watch_ref) {
if let Some(name_refs) = self.watchers_by_name.get_mut(&record.name) {
name_refs.retain(|r| *r != watch_ref);
if name_refs.is_empty() {
self.watchers_by_name.remove(&record.name);
}
}
removed.push(watch_ref);
}
}
removed.sort();
removed
}
pub fn cleanup_name_watchers_task(&mut self, task: TaskId) -> Vec<NameWatchRef> {
let mut removed: Vec<NameWatchRef> = self
.watchers_by_ref
.iter()
.filter(|(_, record)| record.watcher == task)
.map(|(watch_ref, _)| *watch_ref)
.collect();
removed.sort();
for watch_ref in &removed {
let _ = self.unwatch_name(*watch_ref);
}
removed
}
#[must_use]
pub fn name_watcher_count(&self) -> usize {
self.watchers_by_ref.len()
}
pub fn take_name_notifications(&mut self) -> Vec<NameOwnershipNotification> {
std::mem::take(&mut self.notifications)
}
fn emit_name_change(
&mut self,
name: &str,
holder: TaskId,
region: RegionId,
kind: NameOwnershipKind,
) {
let Some(refs) = self.watchers_by_name.get(name).cloned() else {
return;
};
let mut refs = refs;
refs.sort();
self.notifications.reserve(refs.len());
for watch_ref in refs {
if let Some(watcher) = self.watchers_by_ref.get(&watch_ref) {
self.notifications.push(NameOwnershipNotification {
watch_ref: watcher.watch_ref,
watcher: watcher.watcher,
watcher_region: watcher.watcher_region,
name: name.to_string(),
holder,
region,
kind,
});
}
}
}
pub fn register(
&mut self,
name: impl Into<String>,
holder: TaskId,
region: RegionId,
now: Time,
) -> Result<NameLease, NameLeaseError> {
let name = name.into();
if let Some(entry) = self.leases.get(&name) {
return Err(NameLeaseError::NameTaken {
name,
current_holder: entry.holder,
});
}
if let Some(entry) = self.pending.get(&name) {
return Err(NameLeaseError::NameTaken {
name,
current_holder: entry.holder,
});
}
self.leases.insert(
name.clone(),
NameEntry {
holder,
region,
acquired_at: now,
identity_nonce: 0,
},
);
self.emit_name_change(&name, holder, region, NameOwnershipKind::Acquired);
Ok(NameLease::new(name, holder, region, now))
}
pub fn reserve(
&mut self,
name: impl Into<String>,
holder: TaskId,
region: RegionId,
now: Time,
) -> Result<NamePermit, NameLeaseError> {
let name = name.into();
if let Some(entry) = self.leases.get(&name) {
return Err(NameLeaseError::NameTaken {
name,
current_holder: entry.holder,
});
}
if let Some(entry) = self.pending.get(&name) {
return Err(NameLeaseError::NameTaken {
name,
current_holder: entry.holder,
});
}
let permit_id = self.next_permit_id;
self.next_permit_id = self
.next_permit_id
.checked_add(1)
.expect("permit identity overflow");
self.pending.insert(
name.clone(),
NameEntry {
holder,
region,
acquired_at: now,
identity_nonce: permit_id,
},
);
Ok(NamePermit::new(name, holder, region, now, permit_id))
}
pub fn commit_permit(&mut self, mut permit: NamePermit) -> Result<NameLease, NameLeaseError> {
if !permit.is_pending() {
return Err(NameLeaseError::AlreadyResolved);
}
let name = permit.name().to_string();
let Some(entry) = self.pending.remove(&name) else {
let _ = permit.abort();
return Err(NameLeaseError::NotFound { name });
};
if permit.holder() != entry.holder
|| permit.region() != entry.region
|| permit.permit_id() != entry.identity_nonce
{
self.pending.insert(name.clone(), entry);
let _ = permit.abort();
return Err(NameLeaseError::PermissionDenied { name });
}
let holder = entry.holder;
let region = entry.region;
self.leases.insert(name, entry);
self.emit_name_change(permit.name(), holder, region, NameOwnershipKind::Acquired);
Ok(permit.commit())
}
pub fn cancel_permit(&mut self, permit: &NamePermit, now: Time) -> Result<(), NameLeaseError> {
let name = permit.name();
let Some(entry) = self.pending.get(name) else {
return Err(NameLeaseError::NotFound {
name: name.to_string(),
});
};
if permit.holder() != entry.holder
|| permit.region() != entry.region
|| permit.permit_id() != entry.identity_nonce
{
return Err(NameLeaseError::PermissionDenied {
name: name.to_string(),
});
}
let removed = self.pending.remove(name);
debug_assert!(
removed.is_some(),
"pending entry disappeared after permit identity check"
);
self.try_grant_to_first_waiter(name, now);
Ok(())
}
pub fn register_with_policy(
&mut self,
name: impl Into<String>,
holder: TaskId,
region: RegionId,
now: Time,
policy: NameCollisionPolicy,
) -> Result<NameCollisionOutcome, NameLeaseError> {
let name = name.into();
let existing = self.leases.get(&name).or_else(|| self.pending.get(&name));
match existing {
None => {
self.leases.insert(
name.clone(),
NameEntry {
holder,
region,
acquired_at: now,
identity_nonce: 0,
},
);
self.emit_name_change(&name, holder, region, NameOwnershipKind::Acquired);
let lease = NameLease::new(&name, holder, region, now);
Ok(NameCollisionOutcome::Registered { lease })
}
Some(entry) => {
let current_holder = entry.holder;
let current_region = entry.region;
match policy {
NameCollisionPolicy::Fail => Err(NameLeaseError::NameTaken {
name,
current_holder,
}),
NameCollisionPolicy::Replace => {
let was_active = self.leases.remove(&name).is_some();
self.pending.remove(&name);
if was_active {
self.emit_name_change(
&name,
current_holder,
current_region,
NameOwnershipKind::Released,
);
}
self.leases.insert(
name.clone(),
NameEntry {
holder,
region,
acquired_at: now,
identity_nonce: 0,
},
);
self.emit_name_change(&name, holder, region, NameOwnershipKind::Acquired);
let lease = NameLease::new(&name, holder, region, now);
Ok(NameCollisionOutcome::Replaced {
lease,
displaced_holder: current_holder,
displaced_region: current_region,
})
}
NameCollisionPolicy::Wait { deadline } => {
if deadline < now {
return Err(NameLeaseError::WaitBudgetExceeded { name });
}
self.waiters
.entry(name)
.or_default()
.push_back(WaiterEntry {
holder,
region,
deadline,
});
Ok(NameCollisionOutcome::Enqueued)
}
}
}
}
}
pub fn unregister(&mut self, name: &str) -> Result<(), NameLeaseError> {
self.leases.remove(name).map_or_else(
|| {
Err(NameLeaseError::NotFound {
name: name.to_string(),
})
},
|entry| {
self.emit_name_change(
name,
entry.holder,
entry.region,
NameOwnershipKind::Released,
);
Ok(())
},
)
}
pub fn unregister_and_grant(&mut self, name: &str, now: Time) -> Result<(), NameLeaseError> {
let Some(entry) = self.leases.remove(name) else {
return Err(NameLeaseError::NotFound {
name: name.to_string(),
});
};
self.emit_name_change(
name,
entry.holder,
entry.region,
NameOwnershipKind::Released,
);
self.try_grant_to_first_waiter(name, now);
Ok(())
}
pub fn unregister_owned_and_grant(
&mut self,
lease: &NameLease,
now: Time,
) -> Result<(), NameLeaseError> {
let name = lease.name();
let Some(entry) = self.leases.get(name) else {
return Err(NameLeaseError::NotFound {
name: name.to_string(),
});
};
if entry.holder != lease.holder()
|| entry.region != lease.region()
|| entry.acquired_at != lease.acquired_at()
{
return Err(NameLeaseError::PermissionDenied {
name: name.to_string(),
});
}
self.unregister_and_grant(name, now)
}
fn try_grant_to_first_waiter(&mut self, name: &str, now: Time) {
let Some(queue) = self.waiters.get_mut(name) else {
return;
};
queue.retain(|w| w.deadline >= now);
if queue.is_empty() {
self.waiters.remove(name);
return;
}
let waiter = queue
.pop_front()
.expect("queue was verified to be non-empty");
if queue.is_empty() {
self.waiters.remove(name);
}
self.leases.insert(
name.to_string(),
NameEntry {
holder: waiter.holder,
region: waiter.region,
acquired_at: now,
identity_nonce: 0,
},
);
self.emit_name_change(
name,
waiter.holder,
waiter.region,
NameOwnershipKind::Acquired,
);
let lease = NameLease::new(name, waiter.holder, waiter.region, now);
self.granted.push(GrantedLease {
name: name.to_string(),
lease,
});
}
pub fn take_granted(&mut self) -> Vec<GrantedLease> {
std::mem::take(&mut self.granted)
}
pub fn drain_expired_waiters(&mut self, now: Time) -> usize {
let mut removed = 0;
self.waiters.retain(|_, queue| {
let before = queue.len();
queue.retain(|w| w.deadline >= now);
removed += before - queue.len();
!queue.is_empty()
});
removed
}
#[must_use]
pub fn waiter_count(&self) -> usize {
self.waiters
.values()
.map(std::collections::VecDeque::len)
.sum()
}
#[must_use]
pub fn whereis(&self, name: &str) -> Option<TaskId> {
self.leases.get(name).map(|e| e.holder)
}
#[must_use]
pub fn is_registered(&self, name: &str) -> bool {
self.leases.contains_key(name)
}
#[must_use]
pub fn registered_names(&self) -> Vec<&str> {
let mut names: Vec<&str> = self.leases.keys().map(String::as_str).collect();
names.sort_unstable();
names
}
#[must_use]
pub fn len(&self) -> usize {
self.leases.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.leases.is_empty()
}
pub fn cleanup_region(&mut self, region: RegionId) -> Vec<String> {
self.cleanup_region_at(region, Time::ZERO)
}
pub fn cleanup_region_at(&mut self, region: RegionId, now: Time) -> Vec<String> {
let _removed_watchers = self.cleanup_name_watchers_region(region);
let mut active_removed: Vec<(String, TaskId, RegionId)> =
Vec::with_capacity(self.leases.len());
let mut to_remove: Vec<String> =
Vec::with_capacity(self.leases.len().saturating_add(self.pending.len()));
for (name, entry) in &self.leases {
if entry.region == region {
active_removed.push((name.clone(), entry.holder, entry.region));
to_remove.push(name.clone());
}
}
to_remove.extend(
self.pending
.iter()
.filter(|(_, e)| e.region == region)
.map(|(name, _)| name.clone()),
);
to_remove.sort();
for name in &to_remove {
self.leases.remove(name);
self.pending.remove(name);
}
active_removed.sort_by(|a, b| a.0.cmp(&b.0));
for (name, holder, holder_region) in &active_removed {
self.emit_name_change(name, *holder, *holder_region, NameOwnershipKind::Released);
}
self.granted.retain_mut(|g| {
if g.lease.region() == region {
let _ = g.lease.abort();
false
} else {
true
}
});
for queue in self.waiters.values_mut() {
queue.retain(|w| w.region != region);
}
self.waiters.retain(|_, q| !q.is_empty());
for name in &to_remove {
self.try_grant_to_first_waiter(name, now);
}
to_remove
}
pub fn cleanup_task(&mut self, task: TaskId) -> Vec<String> {
self.cleanup_task_at(task, Time::ZERO)
}
pub fn cleanup_task_at(&mut self, task: TaskId, now: Time) -> Vec<String> {
let _removed_watchers = self.cleanup_name_watchers_task(task);
let mut active_removed: Vec<(String, TaskId, RegionId)> =
Vec::with_capacity(self.leases.len());
let mut to_remove: Vec<String> =
Vec::with_capacity(self.leases.len().saturating_add(self.pending.len()));
for (name, entry) in &self.leases {
if entry.holder == task {
active_removed.push((name.clone(), entry.holder, entry.region));
to_remove.push(name.clone());
}
}
to_remove.extend(
self.pending
.iter()
.filter(|(_, e)| e.holder == task)
.map(|(name, _)| name.clone()),
);
to_remove.sort();
for name in &to_remove {
self.leases.remove(name);
self.pending.remove(name);
}
active_removed.sort_by(|a, b| a.0.cmp(&b.0));
for (name, holder, region) in &active_removed {
self.emit_name_change(name, *holder, *region, NameOwnershipKind::Released);
}
self.granted.retain_mut(|g| {
if g.lease.holder() == task {
let _ = g.lease.abort();
false
} else {
true
}
});
for queue in self.waiters.values_mut() {
queue.retain(|w| w.holder != task);
}
self.waiters.retain(|_, q| !q.is_empty());
for name in &to_remove {
self.try_grant_to_first_waiter(name, now);
}
to_remove
}
}
impl Default for NameRegistry {
fn default() -> Self {
Self::new()
}
}
impl RegistryCap for NameRegistry {}
impl<T: RegistryCap> RegistryCap for parking_lot::Mutex<T> {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RegistryEvent {
NameRegistered {
name: String,
holder: TaskId,
region: RegionId,
},
NameReleased {
name: String,
holder: TaskId,
},
NameAborted {
name: String,
holder: TaskId,
reason: String,
},
RegionCleanup {
region: RegionId,
count: usize,
},
TaskCleanup {
task: TaskId,
count: usize,
},
NameReserved {
name: String,
holder: TaskId,
region: RegionId,
},
NamePermitCommitted {
name: String,
holder: TaskId,
},
NamePermitAborted {
name: String,
holder: TaskId,
reason: String,
},
NameReplaced {
name: String,
new_holder: TaskId,
displaced_holder: TaskId,
},
WaiterEnqueued {
name: String,
holder: TaskId,
deadline: Time,
},
WaiterGranted {
name: String,
holder: TaskId,
},
}
#[cfg(test)]
mod tests {
use super::*;
use crate::util::ArenaIndex;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn tid(n: u32) -> TaskId {
TaskId::from_arena(ArenaIndex::new(n, 0))
}
fn rid(n: u32) -> RegionId {
RegionId::from_arena(ArenaIndex::new(n, 0))
}
#[test]
fn name_lease_lifecycle() {
init_test("name_lease_lifecycle");
let mut lease = NameLease::new("my_server", tid(1), rid(0), Time::from_secs(1));
assert_eq!(lease.name(), "my_server");
assert_eq!(lease.holder(), tid(1));
assert_eq!(lease.region(), rid(0));
assert_eq!(lease.acquired_at(), Time::from_secs(1));
assert!(lease.is_active());
let proof = lease.release().unwrap();
assert!(!lease.is_active());
let _ = proof;
crate::test_complete!("name_lease_lifecycle");
}
#[test]
fn name_lease_abort() {
init_test("name_lease_abort");
let mut lease = NameLease::new("worker", tid(2), rid(0), Time::ZERO);
assert!(lease.is_active());
let proof = lease.abort().unwrap();
assert!(!lease.is_active());
let _ = proof;
crate::test_complete!("name_lease_abort");
}
#[test]
fn name_lease_double_resolve_errors() {
init_test("name_lease_double_resolve_errors");
let mut lease = NameLease::new("svc", tid(1), rid(0), Time::ZERO);
lease.release().unwrap();
assert!(matches!(
lease.release(),
Err(NameLeaseError::AlreadyResolved)
));
assert!(matches!(
lease.abort(),
Err(NameLeaseError::AlreadyResolved)
));
crate::test_complete!("name_lease_double_resolve_errors");
}
#[test]
fn registry_register_and_whereis() {
init_test("registry_register_and_whereis");
let mut reg = NameRegistry::new();
assert!(reg.is_empty());
let mut lease = reg
.register("my_server", tid(1), rid(0), Time::from_secs(1))
.unwrap();
assert_eq!(reg.len(), 1);
assert_eq!(reg.whereis("my_server"), Some(tid(1)));
assert!(reg.is_registered("my_server"));
assert_eq!(reg.whereis("unknown"), None);
lease.release().unwrap();
crate::test_complete!("registry_register_and_whereis");
}
#[test]
fn registry_reserve_commit_makes_visible() {
init_test("registry_reserve_commit_makes_visible");
let mut reg = NameRegistry::new();
let permit = reg
.reserve("svc", tid(1), rid(0), Time::from_secs(1))
.expect("reserve ok");
assert_eq!(reg.whereis("svc"), None);
assert!(!reg.is_registered("svc"));
let mut lease = reg.commit_permit(permit).expect("commit ok");
assert_eq!(reg.whereis("svc"), Some(tid(1)));
assert!(reg.is_registered("svc"));
lease.release().unwrap();
crate::test_complete!("registry_reserve_commit_makes_visible");
}
#[test]
fn registry_reserve_abort_releases_name() {
init_test("registry_reserve_abort_releases_name");
let mut reg = NameRegistry::new();
let mut permit = reg
.reserve("svc", tid(1), rid(0), Time::from_secs(1))
.expect("reserve ok");
permit.abort().unwrap();
reg.cancel_permit(&permit, Time::from_secs(1))
.expect("cancel permit");
let mut lease = reg
.register("svc", tid(2), rid(0), Time::from_secs(2))
.unwrap();
assert_eq!(reg.whereis("svc"), Some(tid(2)));
lease.release().unwrap();
crate::test_complete!("registry_reserve_abort_releases_name");
}
#[test]
fn registry_reserve_blocks_register() {
init_test("registry_reserve_blocks_register");
let mut reg = NameRegistry::new();
let mut permit = reg
.reserve("svc", tid(1), rid(0), Time::ZERO)
.expect("reserve ok");
let err = reg.register("svc", tid(2), rid(0), Time::ZERO).unwrap_err();
assert_eq!(
err,
NameLeaseError::NameTaken {
name: "svc".into(),
current_holder: tid(1),
}
);
permit.abort().unwrap();
reg.cancel_permit(&permit, Time::ZERO)
.expect("cancel permit");
crate::test_complete!("registry_reserve_blocks_register");
}
#[test]
fn registry_cleanup_region_removes_pending_permits() {
init_test("registry_cleanup_region_removes_pending_permits");
let mut reg = NameRegistry::new();
let mut permit = reg
.reserve("svc", tid(1), rid(1), Time::ZERO)
.expect("reserve ok");
let removed = reg.cleanup_region(rid(1));
assert_eq!(removed, vec!["svc"]);
permit.abort().unwrap();
let mut lease = reg
.register("svc", tid(2), rid(0), Time::from_secs(1))
.unwrap();
lease.release().unwrap();
crate::test_complete!("registry_cleanup_region_removes_pending_permits");
}
#[test]
fn registry_name_taken() {
init_test("registry_name_taken");
let mut reg = NameRegistry::new();
let mut lease = reg
.register("singleton", tid(1), rid(0), Time::ZERO)
.unwrap();
let err = reg
.register("singleton", tid(2), rid(0), Time::ZERO)
.unwrap_err();
assert_eq!(
err,
NameLeaseError::NameTaken {
name: "singleton".into(),
current_holder: tid(1),
}
);
lease.release().unwrap();
crate::test_complete!("registry_name_taken");
}
#[test]
fn registry_unregister() {
init_test("registry_unregister");
let mut reg = NameRegistry::new();
let mut lease = reg.register("temp", tid(1), rid(0), Time::ZERO).unwrap();
reg.unregister("temp").unwrap();
assert!(!reg.is_registered("temp"));
assert!(reg.is_empty());
assert_eq!(
reg.unregister("unknown"),
Err(NameLeaseError::NotFound {
name: "unknown".into()
})
);
lease.release().unwrap();
crate::test_complete!("registry_unregister");
}
#[test]
fn registry_registered_names_sorted() {
init_test("registry_registered_names_sorted");
let mut reg = NameRegistry::new();
let mut l1 = reg.register("zebra", tid(1), rid(0), Time::ZERO).unwrap();
let mut l2 = reg.register("alpha", tid(2), rid(0), Time::ZERO).unwrap();
let mut l3 = reg.register("middle", tid(3), rid(0), Time::ZERO).unwrap();
assert_eq!(reg.registered_names(), vec!["alpha", "middle", "zebra"]);
l1.release().unwrap();
l2.release().unwrap();
l3.release().unwrap();
crate::test_complete!("registry_registered_names_sorted");
}
#[test]
fn registry_cleanup_region() {
init_test("registry_cleanup_region");
let mut reg = NameRegistry::new();
let mut l1 = reg.register("svc_a", tid(1), rid(1), Time::ZERO).unwrap();
let mut l2 = reg.register("svc_b", tid(2), rid(1), Time::ZERO).unwrap();
let mut l3 = reg.register("svc_c", tid(3), rid(2), Time::ZERO).unwrap();
assert_eq!(reg.len(), 3);
let removed = reg.cleanup_region(rid(1));
assert_eq!(removed, vec!["svc_a", "svc_b"]); assert_eq!(reg.len(), 1);
assert!(reg.is_registered("svc_c"));
assert!(!reg.is_registered("svc_a"));
l1.abort().unwrap();
l2.abort().unwrap();
l3.release().unwrap();
crate::test_complete!("registry_cleanup_region");
}
#[test]
fn registry_cleanup_task() {
init_test("registry_cleanup_task");
let mut reg = NameRegistry::new();
let mut l1 = reg.register("name_a", tid(5), rid(0), Time::ZERO).unwrap();
let mut l2 = reg.register("name_b", tid(5), rid(0), Time::ZERO).unwrap();
let mut l3 = reg.register("name_c", tid(6), rid(0), Time::ZERO).unwrap();
let removed = reg.cleanup_task(tid(5));
assert_eq!(removed, vec!["name_a", "name_b"]);
assert_eq!(reg.len(), 1);
l1.abort().unwrap();
l2.abort().unwrap();
l3.release().unwrap();
crate::test_complete!("registry_cleanup_task");
}
#[test]
fn registry_cleanup_region_empty() {
init_test("registry_cleanup_region_empty");
let mut reg = NameRegistry::new();
let removed = reg.cleanup_region(rid(99));
assert!(removed.is_empty());
crate::test_complete!("registry_cleanup_region_empty");
}
#[test]
fn registry_re_register_after_unregister() {
init_test("registry_re_register_after_unregister");
let mut reg = NameRegistry::new();
let mut l1 = reg
.register("reusable", tid(1), rid(0), Time::ZERO)
.unwrap();
reg.unregister("reusable").unwrap();
l1.release().unwrap();
let mut l2 = reg
.register("reusable", tid(2), rid(0), Time::from_secs(1))
.unwrap();
assert_eq!(reg.whereis("reusable"), Some(tid(2)));
l2.release().unwrap();
crate::test_complete!("registry_re_register_after_unregister");
}
#[test]
fn name_lease_error_display() {
init_test("name_lease_error_display");
let err = NameLeaseError::AlreadyResolved;
assert_eq!(err.to_string(), "name lease already resolved");
let err = NameLeaseError::NameTaken {
name: "foo".into(),
current_holder: tid(42),
};
assert!(err.to_string().contains("foo"));
assert!(err.to_string().contains("42"));
let err = NameLeaseError::NotFound { name: "bar".into() };
assert!(err.to_string().contains("bar"));
crate::test_complete!("name_lease_error_display");
}
#[test]
fn registry_event_variants() {
init_test("registry_event_variants");
let _registered = RegistryEvent::NameRegistered {
name: "svc".into(),
holder: tid(1),
region: rid(0),
};
let _released = RegistryEvent::NameReleased {
name: "svc".into(),
holder: tid(1),
};
let _aborted = RegistryEvent::NameAborted {
name: "svc".into(),
holder: tid(1),
reason: "task cancelled".into(),
};
let _region_cleanup = RegistryEvent::RegionCleanup {
region: rid(0),
count: 3,
};
let _task_cleanup = RegistryEvent::TaskCleanup {
task: tid(1),
count: 2,
};
let _reserved = RegistryEvent::NameReserved {
name: "svc".into(),
holder: tid(1),
region: rid(0),
};
let _committed = RegistryEvent::NamePermitCommitted {
name: "svc".into(),
holder: tid(1),
};
let _permit_aborted = RegistryEvent::NamePermitAborted {
name: "svc".into(),
holder: tid(1),
reason: "setup failed".into(),
};
let _replaced = RegistryEvent::NameReplaced {
name: "svc".into(),
new_holder: tid(2),
displaced_holder: tid(1),
};
let _waiter_enqueued = RegistryEvent::WaiterEnqueued {
name: "svc".into(),
holder: tid(2),
deadline: Time::from_secs(60),
};
let _waiter_granted = RegistryEvent::WaiterGranted {
name: "svc".into(),
holder: tid(2),
};
crate::test_complete!("registry_event_variants");
}
#[test]
fn registry_default_is_empty() {
init_test("registry_default_is_empty");
let reg = NameRegistry::default();
assert!(reg.is_empty());
assert_eq!(reg.len(), 0);
crate::test_complete!("registry_default_is_empty");
}
#[test]
fn name_watch_emits_acquire_and_release() {
init_test("name_watch_emits_acquire_and_release");
let mut reg = NameRegistry::new();
let watch_ref = reg.watch_name("svc", tid(50), rid(9));
assert_eq!(reg.name_watcher_count(), 1);
let mut lease = reg
.register("svc", tid(1), rid(1), Time::from_secs(10))
.unwrap();
reg.unregister("svc").unwrap();
lease.release().unwrap();
let notifications = reg.take_name_notifications();
assert_eq!(notifications.len(), 2);
let acquired = ¬ifications[0];
assert_eq!(acquired.watch_ref, watch_ref);
assert_eq!(acquired.watcher, tid(50));
assert_eq!(acquired.watcher_region, rid(9));
assert_eq!(acquired.name, "svc");
assert_eq!(acquired.holder, tid(1));
assert_eq!(acquired.region, rid(1));
assert_eq!(acquired.kind, NameOwnershipKind::Acquired);
let released = ¬ifications[1];
assert_eq!(released.watch_ref, watch_ref);
assert_eq!(released.watcher, tid(50));
assert_eq!(released.watcher_region, rid(9));
assert_eq!(released.name, "svc");
assert_eq!(released.holder, tid(1));
assert_eq!(released.region, rid(1));
assert_eq!(released.kind, NameOwnershipKind::Released);
assert!(reg.take_name_notifications().is_empty());
crate::test_complete!("name_watch_emits_acquire_and_release");
}
#[test]
fn name_watch_multiple_watchers_ordered_by_ref() {
init_test("name_watch_multiple_watchers_ordered_by_ref");
let mut reg = NameRegistry::new();
let w1 = reg.watch_name("svc", tid(10), rid(7));
let w2 = reg.watch_name("svc", tid(11), rid(7));
let w3 = reg.watch_name("svc", tid(12), rid(8));
let mut lease = reg.register("svc", tid(1), rid(1), Time::ZERO).unwrap();
let notifications = reg.take_name_notifications();
assert_eq!(notifications.len(), 3);
let refs: Vec<NameWatchRef> = notifications.iter().map(|n| n.watch_ref).collect();
assert_eq!(refs, vec![w1, w2, w3]);
assert!(
notifications
.iter()
.all(|n| n.kind == NameOwnershipKind::Acquired)
);
lease.release().unwrap();
crate::test_complete!("name_watch_multiple_watchers_ordered_by_ref");
}
#[test]
fn name_watch_region_cleanup_suppresses_release_notifications() {
init_test("name_watch_region_cleanup_suppresses_release_notifications");
let mut reg = NameRegistry::new();
let _closed_region_watch = reg.watch_name("svc", tid(10), rid(1));
let open_region_watch = reg.watch_name("svc", tid(11), rid(2));
let mut lease = reg.register("svc", tid(1), rid(1), Time::ZERO).unwrap();
let acquired = reg.take_name_notifications();
assert_eq!(acquired.len(), 2);
let removed_watchers = reg.cleanup_name_watchers_region(rid(1));
assert_eq!(removed_watchers.len(), 1);
assert_eq!(reg.name_watcher_count(), 1);
reg.unregister("svc").unwrap();
lease.release().unwrap();
let released = reg.take_name_notifications();
assert_eq!(released.len(), 1);
assert_eq!(released[0].watch_ref, open_region_watch);
assert_eq!(released[0].kind, NameOwnershipKind::Released);
crate::test_complete!("name_watch_region_cleanup_suppresses_release_notifications");
}
#[test]
fn name_watch_task_cleanup_removes_only_dead_task_watchers() {
init_test("name_watch_task_cleanup_removes_only_dead_task_watchers");
let mut reg = NameRegistry::new();
let _closed_task_watch = reg.watch_name("svc", tid(10), rid(1));
let live_watch = reg.watch_name("svc", tid(11), rid(1));
assert_eq!(reg.name_watcher_count(), 2);
let removed = reg.cleanup_task(tid(10));
assert!(removed.is_empty());
assert_eq!(reg.name_watcher_count(), 1);
let mut lease = reg.register("svc", tid(1), rid(9), Time::ZERO).unwrap();
reg.unregister("svc").unwrap();
lease.release().unwrap();
let notifications = reg.take_name_notifications();
assert_eq!(notifications.len(), 2);
assert!(notifications.iter().all(|n| n.watch_ref == live_watch));
crate::test_complete!("name_watch_task_cleanup_removes_only_dead_task_watchers");
}
#[test]
fn name_watch_replace_emits_release_then_acquire() {
init_test("name_watch_replace_emits_release_then_acquire");
let mut reg = NameRegistry::new();
let watch_ref = reg.watch_name("svc", tid(42), rid(9));
let mut old_lease = reg.register("svc", tid(1), rid(1), Time::ZERO).unwrap();
reg.take_name_notifications();
let outcome = reg
.register_with_policy(
"svc",
tid(2),
rid(2),
Time::from_secs(5),
NameCollisionPolicy::Replace,
)
.unwrap();
let mut new_lease = match outcome {
NameCollisionOutcome::Replaced { lease, .. } => lease,
other => panic!("expected Replaced outcome, got {other:?}"),
};
let notifications = reg.take_name_notifications();
assert_eq!(notifications.len(), 2);
assert_eq!(notifications[0].watch_ref, watch_ref);
assert_eq!(notifications[0].kind, NameOwnershipKind::Released);
assert_eq!(notifications[0].holder, tid(1));
assert_eq!(notifications[1].watch_ref, watch_ref);
assert_eq!(notifications[1].kind, NameOwnershipKind::Acquired);
assert_eq!(notifications[1].holder, tid(2));
old_lease.abort().unwrap();
new_lease.release().unwrap();
crate::test_complete!("name_watch_replace_emits_release_then_acquire");
}
struct DummyRegistry;
impl RegistryCap for DummyRegistry {}
#[test]
fn registry_handle_basic() {
init_test("registry_handle_basic");
let handle = RegistryHandle::new(Arc::new(DummyRegistry));
let _arc = handle.as_arc();
let _clone = handle.clone();
let debug = format!("{handle:?}");
assert!(debug.contains("RegistryHandle"));
crate::test_complete!("registry_handle_basic");
}
#[test]
fn conformance_no_stale_names_after_task_crash() {
init_test("conformance_no_stale_names_after_task_crash");
let mut reg = NameRegistry::new();
let mut l1 = reg.register("svc_a", tid(1), rid(0), Time::ZERO).unwrap();
let mut l2 = reg
.register("svc_b", tid(1), rid(0), Time::from_secs(1))
.unwrap();
let mut l3 = reg
.register("svc_c", tid(1), rid(1), Time::from_secs(2))
.unwrap();
let mut l4 = reg
.register("other", tid(2), rid(0), Time::from_secs(3))
.unwrap();
assert_eq!(reg.len(), 4);
let removed = reg.cleanup_task(tid(1));
assert_eq!(removed, vec!["svc_a", "svc_b", "svc_c"]);
assert_eq!(reg.len(), 1);
assert_eq!(reg.whereis("svc_a"), None, "stale name svc_a after crash");
assert_eq!(reg.whereis("svc_b"), None, "stale name svc_b after crash");
assert_eq!(reg.whereis("svc_c"), None, "stale name svc_c after crash");
assert_eq!(reg.whereis("other"), Some(tid(2)), "surviving name lost");
assert_eq!(reg.registered_names(), vec!["other"]);
l1.abort().unwrap();
l2.abort().unwrap();
l3.abort().unwrap();
l4.release().unwrap();
crate::test_complete!("conformance_no_stale_names_after_task_crash");
}
#[test]
fn conformance_no_stale_names_after_region_stop() {
init_test("conformance_no_stale_names_after_region_stop");
let mut reg = NameRegistry::new();
let mut l1 = reg.register("db", tid(10), rid(1), Time::ZERO).unwrap();
let mut l2 = reg
.register("cache", tid(11), rid(1), Time::from_secs(1))
.unwrap();
let mut l3 = reg
.register("worker", tid(12), rid(1), Time::from_secs(2))
.unwrap();
let mut l4 = reg
.register("api", tid(20), rid(2), Time::from_secs(3))
.unwrap();
let mut l5 = reg
.register("logger", tid(30), rid(3), Time::from_secs(4))
.unwrap();
assert_eq!(reg.len(), 5);
let removed = reg.cleanup_region(rid(1));
assert_eq!(removed, vec!["cache", "db", "worker"]);
assert_eq!(reg.len(), 2);
for name in &["cache", "db", "worker"] {
assert_eq!(
reg.whereis(name),
None,
"stale name '{name}' after region stop"
);
assert!(!reg.is_registered(name));
}
assert_eq!(reg.whereis("api"), Some(tid(20)));
assert_eq!(reg.whereis("logger"), Some(tid(30)));
assert_eq!(reg.registered_names(), vec!["api", "logger"]);
l1.abort().unwrap();
l2.abort().unwrap();
l3.abort().unwrap();
l4.release().unwrap();
l5.release().unwrap();
crate::test_complete!("conformance_no_stale_names_after_region_stop");
}
#[test]
fn conformance_deterministic_winner_simultaneous_register() {
init_test("conformance_deterministic_winner_simultaneous_register");
let mut reg = NameRegistry::new();
let mut winner = reg
.register("singleton", tid(99), rid(0), Time::ZERO)
.unwrap();
let err = reg
.register("singleton", tid(1), rid(0), Time::ZERO)
.unwrap_err();
assert_eq!(
err,
NameLeaseError::NameTaken {
name: "singleton".into(),
current_holder: tid(99),
},
"loser must see the correct holder"
);
let err = reg
.register("singleton", tid(50), rid(1), Time::from_secs(1))
.unwrap_err();
assert_eq!(
err,
NameLeaseError::NameTaken {
name: "singleton".into(),
current_holder: tid(99),
},
"second loser must also see the original holder"
);
assert_eq!(reg.len(), 1);
assert_eq!(reg.whereis("singleton"), Some(tid(99)));
winner.release().unwrap();
crate::test_complete!("conformance_deterministic_winner_simultaneous_register");
}
#[test]
fn conformance_register_winner_stable_across_trials() {
init_test("conformance_register_winner_stable_across_trials");
for trial in 0..20 {
let mut reg = NameRegistry::new();
let mut lease = reg
.register("stable_name", tid(7), rid(0), Time::ZERO)
.unwrap();
let err = reg
.register("stable_name", tid(3), rid(0), Time::ZERO)
.unwrap_err();
assert_eq!(
err,
NameLeaseError::NameTaken {
name: "stable_name".into(),
current_holder: tid(7),
},
"trial {trial}: winner must be tid(7)"
);
lease.release().unwrap();
}
crate::test_complete!("conformance_register_winner_stable_across_trials");
}
#[test]
fn conformance_lease_abort_on_cancellation() {
init_test("conformance_lease_abort_on_cancellation");
let mut reg = NameRegistry::new();
let mut lease = reg
.register("cancellable", tid(1), rid(0), Time::ZERO)
.unwrap();
assert!(lease.is_active());
reg.unregister("cancellable").unwrap();
assert!(!reg.is_registered("cancellable"));
let proof = lease.abort().unwrap();
assert!(!lease.is_active());
let resolved = proof.into_resolved_proof();
assert_eq!(
resolved.resolution(),
crate::obligation::graded::Resolution::Abort,
"abort proof must show Abort resolution"
);
assert_eq!(lease.abort().unwrap_err(), NameLeaseError::AlreadyResolved);
crate::test_complete!("conformance_lease_abort_on_cancellation");
}
#[test]
fn conformance_region_cancel_aborts_all_leases() {
init_test("conformance_region_cancel_aborts_all_leases");
let mut reg = NameRegistry::new();
let target_region = rid(5);
let mut l1 = reg
.register("a", tid(1), target_region, Time::ZERO)
.unwrap();
let mut l2 = reg
.register("b", tid(2), target_region, Time::from_secs(1))
.unwrap();
let mut l3 = reg
.register("c", tid(3), target_region, Time::from_secs(2))
.unwrap();
let mut l4 = reg
.register("d", tid(4), rid(99), Time::from_secs(3))
.unwrap();
let removed = reg.cleanup_region(target_region);
assert_eq!(removed.len(), 3);
for (lease, name) in [(&mut l1, "a"), (&mut l2, "b"), (&mut l3, "c")] {
assert!(
lease.is_active(),
"lease '{name}' should still be active pre-abort"
);
let proof = lease.abort().unwrap();
assert!(!lease.is_active());
let _ = proof; }
assert_eq!(reg.len(), 1);
assert_eq!(reg.whereis("d"), Some(tid(4)));
l4.release().unwrap();
crate::test_complete!("conformance_region_cancel_aborts_all_leases");
}
#[test]
fn conformance_event_ordering_stable_across_seeds() {
fn build_event_sequence() -> Vec<RegistryEvent> {
vec![
RegistryEvent::NameRegistered {
name: "b".into(),
holder: tid(2),
region: rid(0),
},
RegistryEvent::NameRegistered {
name: "a".into(),
holder: tid(1),
region: rid(0),
},
RegistryEvent::NameRegistered {
name: "c".into(),
holder: tid(3),
region: rid(0),
},
RegistryEvent::RegionCleanup {
region: rid(0),
count: 3,
},
RegistryEvent::NameAborted {
name: "a".into(),
holder: tid(1),
reason: "region cleanup".into(),
},
RegistryEvent::NameAborted {
name: "b".into(),
holder: tid(2),
reason: "region cleanup".into(),
},
RegistryEvent::NameAborted {
name: "c".into(),
holder: tid(3),
reason: "region cleanup".into(),
},
]
}
init_test("conformance_event_ordering_stable_across_seeds");
let canonical = build_event_sequence();
for trial in 0..10 {
let events = build_event_sequence();
assert_eq!(
events, canonical,
"trial {trial}: event sequence diverged from canonical"
);
}
let mut reg = NameRegistry::new();
let mut l1 = reg.register("b", tid(2), rid(0), Time::ZERO).unwrap();
let mut l2 = reg
.register("a", tid(1), rid(0), Time::from_secs(1))
.unwrap();
let mut l3 = reg
.register("c", tid(3), rid(0), Time::from_secs(2))
.unwrap();
let removed = reg.cleanup_region(rid(0));
assert_eq!(
removed,
vec!["a", "b", "c"],
"cleanup must return sorted names"
);
l1.abort().unwrap();
l2.abort().unwrap();
l3.abort().unwrap();
crate::test_complete!("conformance_event_ordering_stable_across_seeds");
}
#[test]
fn conformance_cleanup_task_deterministic_order() {
init_test("conformance_cleanup_task_deterministic_order");
let mut reg = NameRegistry::new();
let mut l1 = reg.register("z_last", tid(1), rid(0), Time::ZERO).unwrap();
let mut l2 = reg
.register("m_mid", tid(1), rid(0), Time::from_secs(1))
.unwrap();
let mut l3 = reg
.register("a_first", tid(1), rid(0), Time::from_secs(2))
.unwrap();
let removed = reg.cleanup_task(tid(1));
assert_eq!(
removed,
vec!["a_first", "m_mid", "z_last"],
"cleanup_task must return names in sorted order"
);
l1.abort().unwrap();
l2.abort().unwrap();
l3.abort().unwrap();
crate::test_complete!("conformance_cleanup_task_deterministic_order");
}
#[test]
fn conformance_re_register_after_crash_clean() {
init_test("conformance_re_register_after_crash_clean");
let mut reg = NameRegistry::new();
let mut old_lease = reg
.register("primary_db", tid(10), rid(0), Time::ZERO)
.unwrap();
let removed = reg.cleanup_task(tid(10));
assert_eq!(removed, vec!["primary_db"]);
old_lease.abort().unwrap();
let mut new_lease = reg
.register("primary_db", tid(20), rid(1), Time::from_secs(10))
.unwrap();
assert_eq!(reg.len(), 1);
assert_eq!(reg.whereis("primary_db"), Some(tid(20)));
assert_eq!(new_lease.holder(), tid(20));
assert_eq!(new_lease.region(), rid(1));
assert_eq!(new_lease.acquired_at(), Time::from_secs(10));
let old_removed = reg.cleanup_task(tid(10));
assert!(old_removed.is_empty(), "old task must have no entries");
new_lease.release().unwrap();
crate::test_complete!("conformance_re_register_after_crash_clean");
}
#[test]
fn conformance_registry_invariant_under_churn() {
init_test("conformance_registry_invariant_under_churn");
let mut reg = NameRegistry::new();
let mut active_leases: Vec<NameLease> = Vec::new();
for i in 0..10 {
let name = format!("svc_{i:03}");
let lease = reg
.register(&name, tid(i), rid(i % 3), Time::from_secs(u64::from(i)))
.unwrap();
active_leases.push(lease);
}
assert_eq!(reg.len(), 10);
let removed = reg.cleanup_region(rid(1));
for name in &removed {
if let Some(lease) = active_leases.iter_mut().find(|l| l.name() == name.as_str()) {
lease.abort().unwrap();
}
}
reg.unregister("svc_000").unwrap();
if let Some(lease) = active_leases.iter_mut().find(|l| l.name() == "svc_000") {
lease.release().unwrap();
}
let new_lease = reg
.register("svc_001", tid(100), rid(5), Time::from_secs(100))
.unwrap();
active_leases.push(new_lease);
let names = reg.registered_names();
assert_eq!(
reg.len(),
names.len(),
"len() and registered_names().len() must agree"
);
for name in &names {
assert!(
reg.is_registered(name),
"name '{name}' in registered_names but is_registered returns false"
);
assert!(
reg.whereis(name).is_some(),
"name '{name}' in registered_names but whereis returns None"
);
}
for window in names.windows(2) {
assert!(
window[0] <= window[1],
"registered_names not sorted: '{}' > '{}'",
window[0],
window[1]
);
}
for lease in &mut active_leases {
if lease.is_active() {
let _ = lease.abort();
}
}
crate::test_complete!("conformance_registry_invariant_under_churn");
}
#[test]
fn conformance_linearity_proofs() {
init_test("conformance_linearity_proofs");
let mut committed_lease = NameLease::new("committed", tid(1), rid(0), Time::ZERO);
let committed = committed_lease.release().unwrap();
let resolved = committed.into_resolved_proof();
assert_eq!(
resolved.resolution(),
crate::obligation::graded::Resolution::Commit,
"release must produce Commit proof"
);
let mut aborted_lease = NameLease::new("aborted", tid(2), rid(0), Time::ZERO);
let aborted = aborted_lease.abort().unwrap();
let resolved = aborted.into_resolved_proof();
assert_eq!(
resolved.resolution(),
crate::obligation::graded::Resolution::Abort,
"abort must produce Abort proof"
);
crate::test_complete!("conformance_linearity_proofs");
}
#[test]
fn conformance_cross_region_isolation() {
init_test("conformance_cross_region_isolation");
let mut reg = NameRegistry::new();
let mut l1 = reg.register("r1_name", tid(1), rid(1), Time::ZERO).unwrap();
let mut l2 = reg
.register("r2_name", tid(1), rid(2), Time::from_secs(1))
.unwrap();
let removed = reg.cleanup_region(rid(1));
assert_eq!(removed, vec!["r1_name"]);
assert_eq!(reg.len(), 1);
assert_eq!(reg.whereis("r2_name"), Some(tid(1)));
assert!(reg.is_registered("r2_name"));
assert!(!reg.is_registered("r1_name"));
l1.abort().unwrap();
l2.release().unwrap();
crate::test_complete!("conformance_cross_region_isolation");
}
#[test]
fn conformance_cleanup_task_removes_pending_permits() {
init_test("conformance_cleanup_task_removes_pending_permits");
let mut reg = NameRegistry::new();
let mut lease = reg.register("active", tid(1), rid(0), Time::ZERO).unwrap();
let mut permit = reg
.reserve("pending_name", tid(1), rid(0), Time::from_secs(1))
.expect("reserve ok");
let removed = reg.cleanup_task(tid(1));
assert_eq!(removed, vec!["active", "pending_name"]);
assert_eq!(reg.len(), 0);
let mut l2 = reg
.register("pending_name", tid(2), rid(0), Time::from_secs(2))
.unwrap();
lease.abort().unwrap();
permit.abort().unwrap();
l2.release().unwrap();
crate::test_complete!("conformance_cleanup_task_removes_pending_permits");
}
#[test]
fn conformance_double_reserve_blocked() {
init_test("conformance_double_reserve_blocked");
let mut reg = NameRegistry::new();
let mut p1 = reg
.reserve("singleton", tid(1), rid(0), Time::ZERO)
.expect("first reserve ok");
let err = reg
.reserve("singleton", tid(2), rid(0), Time::ZERO)
.unwrap_err();
assert_eq!(
err,
NameLeaseError::NameTaken {
name: "singleton".into(),
current_holder: tid(1),
}
);
p1.abort().unwrap();
reg.cancel_permit(&p1, Time::ZERO).expect("cancel permit");
crate::test_complete!("conformance_double_reserve_blocked");
}
#[test]
fn permit_accessors() {
init_test("permit_accessors");
let mut reg = NameRegistry::new();
let mut permit = reg
.reserve("my_svc", tid(7), rid(3), Time::from_secs(42))
.expect("reserve ok");
assert_eq!(permit.name(), "my_svc");
assert_eq!(permit.holder(), tid(7));
assert_eq!(permit.region(), rid(3));
assert_eq!(permit.reserved_at(), Time::from_secs(42));
assert!(permit.is_pending());
permit.abort().unwrap();
assert!(!permit.is_pending());
reg.cancel_permit(&permit, Time::from_secs(42))
.expect("cancel permit");
crate::test_complete!("permit_accessors");
}
#[test]
fn conformance_permit_commit_transfers_token() {
init_test("conformance_permit_commit_transfers_token");
let mut reg = NameRegistry::new();
let permit = reg
.reserve("transfer", tid(1), rid(0), Time::from_secs(5))
.expect("reserve ok");
let mut lease = reg.commit_permit(permit).expect("commit ok");
assert_eq!(lease.name(), "transfer");
assert_eq!(lease.holder(), tid(1));
assert_eq!(lease.region(), rid(0));
assert_eq!(lease.acquired_at(), Time::from_secs(5));
assert!(lease.is_active());
let proof = lease.release().unwrap();
let resolved = proof.into_resolved_proof();
assert_eq!(
resolved.resolution(),
crate::obligation::graded::Resolution::Commit,
);
crate::test_complete!("conformance_permit_commit_transfers_token");
}
#[test]
fn conformance_permit_abort_proof() {
init_test("conformance_permit_abort_proof");
let mut reg = NameRegistry::new();
let mut permit = reg
.reserve("abortable", tid(1), rid(0), Time::ZERO)
.expect("reserve ok");
let proof = permit.abort().unwrap();
let resolved = proof.into_resolved_proof();
assert_eq!(
resolved.resolution(),
crate::obligation::graded::Resolution::Abort,
);
assert_eq!(permit.abort().unwrap_err(), NameLeaseError::AlreadyResolved);
reg.cancel_permit(&permit, Time::ZERO)
.expect("cancel permit");
crate::test_complete!("conformance_permit_abort_proof");
}
#[test]
fn conformance_lease_blocks_reserve() {
init_test("conformance_lease_blocks_reserve");
let mut reg = NameRegistry::new();
let mut lease = reg.register("taken", tid(1), rid(0), Time::ZERO).unwrap();
let err = reg
.reserve("taken", tid(2), rid(0), Time::ZERO)
.unwrap_err();
assert_eq!(
err,
NameLeaseError::NameTaken {
name: "taken".into(),
current_holder: tid(1),
}
);
lease.release().unwrap();
crate::test_complete!("conformance_lease_blocks_reserve");
}
#[test]
fn conformance_commit_after_cancel_fails() {
init_test("conformance_commit_after_cancel_fails");
let mut reg = NameRegistry::new();
let permit = reg
.reserve("ephemeral", tid(1), rid(0), Time::ZERO)
.expect("reserve ok");
reg.cancel_permit(&permit, Time::ZERO)
.expect("cancel permit");
let err = reg.commit_permit(permit).unwrap_err();
assert_eq!(
err,
NameLeaseError::NotFound {
name: "ephemeral".into()
}
);
crate::test_complete!("conformance_commit_after_cancel_fails");
}
#[test]
fn commit_permit_rejects_stale_same_identity_replay() {
init_test("commit_permit_rejects_stale_same_identity_replay");
let mut reg = NameRegistry::new();
let stale_permit = reg
.reserve("svc", tid(1), rid(0), Time::ZERO)
.expect("reserve ok");
reg.cancel_permit(&stale_permit, Time::ZERO)
.expect("cancel stale permit entry");
let fresh_permit = reg
.reserve("svc", tid(1), rid(0), Time::ZERO)
.expect("reserve again ok");
let err = reg.commit_permit(stale_permit).unwrap_err();
assert_eq!(err, NameLeaseError::PermissionDenied { name: "svc".into() });
assert_eq!(reg.whereis("svc"), None);
let mut lease = reg.commit_permit(fresh_permit).expect("fresh commit ok");
assert_eq!(reg.whereis("svc"), Some(tid(1)));
reg.unregister("svc").unwrap();
lease.release().unwrap();
crate::test_complete!("commit_permit_rejects_stale_same_identity_replay");
}
#[test]
fn cancel_permit_rejects_stale_same_identity_replay() {
init_test("cancel_permit_rejects_stale_same_identity_replay");
let mut reg = NameRegistry::new();
let mut stale_permit = reg
.reserve("svc", tid(1), rid(0), Time::ZERO)
.expect("reserve ok");
stale_permit.abort().expect("abort stale permit");
reg.cancel_permit(&stale_permit, Time::ZERO)
.expect("cancel original permit entry");
let fresh_permit = reg
.reserve("svc", tid(1), rid(0), Time::ZERO)
.expect("reserve again ok");
let err = reg
.cancel_permit(&stale_permit, Time::from_secs(1))
.unwrap_err();
assert_eq!(err, NameLeaseError::PermissionDenied { name: "svc".into() });
let mut lease = reg.commit_permit(fresh_permit).expect("fresh commit ok");
assert_eq!(reg.whereis("svc"), Some(tid(1)));
reg.unregister("svc").unwrap();
lease.release().unwrap();
crate::test_complete!("cancel_permit_rejects_stale_same_identity_replay");
}
#[test]
fn commit_permit_rejects_aborted_permit_without_mutating_registry() {
init_test("commit_permit_rejects_aborted_permit_without_mutating_registry");
let mut reg = NameRegistry::new();
let mut permit = reg
.reserve("svc", tid(1), rid(0), Time::ZERO)
.expect("reserve ok");
permit.abort().expect("abort permit");
let err = reg.commit_permit(permit).unwrap_err();
assert_eq!(err, NameLeaseError::AlreadyResolved);
assert_eq!(reg.whereis("svc"), None);
assert_eq!(
reg.reserve("svc", tid(2), rid(0), Time::from_secs(1))
.unwrap_err(),
NameLeaseError::NameTaken {
name: "svc".into(),
current_holder: tid(1),
}
);
let mut cleanup = NamePermit::new("svc", tid(1), rid(0), Time::ZERO, 1);
reg.cancel_permit(&cleanup, Time::from_secs(1))
.expect("cleanup pending entry");
cleanup.abort().expect("resolve cleanup permit");
let replacement = reg
.reserve("svc", tid(2), rid(0), Time::from_secs(2))
.expect("reserve after cleanup");
let mut lease = reg.commit_permit(replacement).expect("commit replacement");
reg.unregister("svc").expect("unregister replacement");
lease.release().expect("release replacement");
crate::test_complete!("commit_permit_rejects_aborted_permit_without_mutating_registry");
}
#[test]
fn collision_fail_rejects_duplicate() {
init_test("collision_fail_rejects_duplicate");
let mut reg = NameRegistry::new();
let mut lease = reg
.register("singleton", tid(1), rid(0), Time::ZERO)
.unwrap();
let err = reg
.register_with_policy(
"singleton",
tid(2),
rid(0),
Time::from_secs(1),
NameCollisionPolicy::Fail,
)
.unwrap_err();
assert_eq!(
err,
NameLeaseError::NameTaken {
name: "singleton".into(),
current_holder: tid(1),
}
);
assert_eq!(reg.len(), 1);
lease.release().unwrap();
crate::test_complete!("collision_fail_rejects_duplicate");
}
#[test]
fn collision_fail_succeeds_when_no_collision() {
init_test("collision_fail_succeeds_when_no_collision");
let mut reg = NameRegistry::new();
let outcome = reg
.register_with_policy(
"fresh",
tid(1),
rid(0),
Time::ZERO,
NameCollisionPolicy::Fail,
)
.unwrap();
let mut lease = match outcome {
NameCollisionOutcome::Registered { lease } => lease,
other => panic!("expected Registered, got {other:?}"), };
assert_eq!(reg.whereis("fresh"), Some(tid(1)));
lease.release().unwrap();
crate::test_complete!("collision_fail_succeeds_when_no_collision");
}
#[test]
fn collision_replace_displaces_old_holder() {
init_test("collision_replace_displaces_old_holder");
let mut reg = NameRegistry::new();
let mut old_lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
let outcome = reg
.register_with_policy(
"svc",
tid(2),
rid(1),
Time::from_secs(5),
NameCollisionPolicy::Replace,
)
.unwrap();
let mut new_lease = match outcome {
NameCollisionOutcome::Replaced {
lease,
displaced_holder,
displaced_region,
} => {
assert_eq!(displaced_holder, tid(1));
assert_eq!(displaced_region, rid(0));
lease
}
other => panic!("expected Replaced, got {other:?}"), };
assert_eq!(reg.whereis("svc"), Some(tid(2)));
assert_eq!(reg.len(), 1);
old_lease.abort().unwrap();
new_lease.release().unwrap();
crate::test_complete!("collision_replace_displaces_old_holder");
}
#[test]
fn collision_replace_on_free_name_registers_normally() {
init_test("collision_replace_on_free_name_registers_normally");
let mut reg = NameRegistry::new();
let outcome = reg
.register_with_policy(
"svc",
tid(1),
rid(0),
Time::ZERO,
NameCollisionPolicy::Replace,
)
.unwrap();
let mut lease = match outcome {
NameCollisionOutcome::Registered { lease } => lease,
other => panic!("expected Registered, got {other:?}"), };
assert_eq!(reg.whereis("svc"), Some(tid(1)));
lease.release().unwrap();
crate::test_complete!("collision_replace_on_free_name_registers_normally");
}
#[test]
fn collision_wait_enqueues_waiter() {
init_test("collision_wait_enqueues_waiter");
let mut reg = NameRegistry::new();
let mut lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
let outcome = reg
.register_with_policy(
"svc",
tid(2),
rid(0),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(60),
},
)
.unwrap();
assert!(matches!(outcome, NameCollisionOutcome::Enqueued));
assert_eq!(reg.waiter_count(), 1);
assert_eq!(reg.whereis("svc"), Some(tid(1)));
lease.abort().unwrap();
crate::test_complete!("collision_wait_enqueues_waiter");
}
#[test]
fn collision_wait_grants_on_unregister() {
init_test("collision_wait_grants_on_unregister");
let mut reg = NameRegistry::new();
let mut lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
let outcome = reg
.register_with_policy(
"svc",
tid(2),
rid(0),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(60),
},
)
.unwrap();
assert!(matches!(outcome, NameCollisionOutcome::Enqueued));
reg.unregister_and_grant("svc", Time::from_secs(10))
.unwrap();
lease.release().unwrap();
assert_eq!(reg.waiter_count(), 0);
assert_eq!(reg.whereis("svc"), Some(tid(2)));
let granted = reg.take_granted();
assert_eq!(granted.len(), 1);
assert_eq!(granted[0].name, "svc");
let mut granted_lease = granted.into_iter().next().unwrap().lease;
assert_eq!(granted_lease.holder(), tid(2));
granted_lease.release().unwrap();
crate::test_complete!("collision_wait_grants_on_unregister");
}
#[test]
fn unregister_owned_and_grant_rejects_stale_lease_identity() {
init_test("unregister_owned_and_grant_rejects_stale_lease_identity");
let mut reg = NameRegistry::new();
let mut current = reg
.register("svc", tid(1), rid(0), Time::from_secs(10))
.unwrap();
let mut stale = NameLease::new("svc", tid(1), rid(0), Time::from_secs(5));
assert_eq!(
reg.unregister_owned_and_grant(&stale, Time::from_secs(12)),
Err(NameLeaseError::PermissionDenied {
name: "svc".to_string(),
}),
);
assert_eq!(reg.whereis("svc"), Some(tid(1)));
reg.unregister_owned_and_grant(¤t, Time::from_secs(12))
.unwrap();
current.release().unwrap();
stale.abort().unwrap();
crate::test_complete!("unregister_owned_and_grant_rejects_stale_lease_identity");
}
#[test]
fn collision_wait_expired_waiter_not_granted() {
init_test("collision_wait_expired_waiter_not_granted");
let mut reg = NameRegistry::new();
let mut lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
reg.register_with_policy(
"svc",
tid(2),
rid(0),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(5),
},
)
.unwrap();
reg.unregister_and_grant("svc", Time::from_secs(10))
.unwrap();
lease.release().unwrap();
assert_eq!(reg.waiter_count(), 0);
assert_eq!(reg.whereis("svc"), None);
let granted = reg.take_granted();
assert!(granted.is_empty());
crate::test_complete!("collision_wait_expired_waiter_not_granted");
}
#[test]
fn collision_wait_rejects_already_expired_budget() {
init_test("collision_wait_rejects_already_expired_budget");
let mut reg = NameRegistry::new();
let mut lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
let err = reg
.register_with_policy(
"svc",
tid(2),
rid(0),
Time::from_secs(10),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(5),
},
)
.unwrap_err();
assert_eq!(
err,
NameLeaseError::WaitBudgetExceeded { name: "svc".into() }
);
assert_eq!(reg.waiter_count(), 0);
assert_eq!(reg.whereis("svc"), Some(tid(1)));
reg.unregister("svc").unwrap();
lease.release().unwrap();
crate::test_complete!("collision_wait_rejects_already_expired_budget");
}
#[test]
fn collision_wait_fifo_ordering() {
init_test("collision_wait_fifo_ordering");
let mut reg = NameRegistry::new();
let mut lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
reg.register_with_policy(
"svc",
tid(2),
rid(0),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(60),
},
)
.unwrap();
reg.register_with_policy(
"svc",
tid(3),
rid(0),
Time::from_secs(2),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(60),
},
)
.unwrap();
assert_eq!(reg.waiter_count(), 2);
reg.unregister_and_grant("svc", Time::from_secs(10))
.unwrap();
lease.release().unwrap();
assert_eq!(reg.whereis("svc"), Some(tid(2)));
assert_eq!(reg.waiter_count(), 1);
let mut granted1 = reg.take_granted().into_iter().next().unwrap().lease;
reg.unregister_and_grant("svc", Time::from_secs(20))
.unwrap();
granted1.release().unwrap();
assert_eq!(reg.whereis("svc"), Some(tid(3)));
assert_eq!(reg.waiter_count(), 0);
let mut granted2 = reg.take_granted().into_iter().next().unwrap().lease;
granted2.release().unwrap();
crate::test_complete!("collision_wait_fifo_ordering");
}
#[test]
fn collision_wait_cleanup_region_removes_waiters() {
init_test("collision_wait_cleanup_region_removes_waiters");
let mut reg = NameRegistry::new();
let mut lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
reg.register_with_policy(
"svc",
tid(2),
rid(1),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(60),
},
)
.unwrap();
assert_eq!(reg.waiter_count(), 1);
reg.cleanup_region(rid(1));
assert_eq!(reg.waiter_count(), 0);
lease.abort().unwrap();
crate::test_complete!("collision_wait_cleanup_region_removes_waiters");
}
#[test]
fn collision_wait_cleanup_task_removes_waiters() {
init_test("collision_wait_cleanup_task_removes_waiters");
let mut reg = NameRegistry::new();
let mut lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
reg.register_with_policy(
"svc",
tid(2),
rid(0),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(60),
},
)
.unwrap();
assert_eq!(reg.waiter_count(), 1);
reg.cleanup_task(tid(2));
assert_eq!(reg.waiter_count(), 0);
lease.abort().unwrap();
crate::test_complete!("collision_wait_cleanup_task_removes_waiters");
}
#[test]
fn cleanup_region_aborts_granted_lease_obligation() {
init_test("cleanup_region_aborts_granted_lease_obligation");
let mut reg = NameRegistry::new();
let mut lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
reg.register_with_policy(
"svc",
tid(2),
rid(1),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(60),
},
)
.unwrap();
reg.unregister_and_grant("svc", Time::from_secs(5)).unwrap();
lease.release().unwrap();
assert_eq!(reg.whereis("svc"), Some(tid(2)));
reg.cleanup_region(rid(1));
let granted = reg.take_granted();
assert!(granted.is_empty());
drop(reg);
crate::test_complete!("cleanup_region_aborts_granted_lease_obligation");
}
#[test]
fn cleanup_task_aborts_granted_lease_obligation() {
init_test("cleanup_task_aborts_granted_lease_obligation");
let mut reg = NameRegistry::new();
let mut lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
reg.register_with_policy(
"svc",
tid(2),
rid(0),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(60),
},
)
.unwrap();
reg.unregister_and_grant("svc", Time::from_secs(5)).unwrap();
lease.release().unwrap();
assert_eq!(reg.whereis("svc"), Some(tid(2)));
reg.cleanup_task(tid(2));
let granted = reg.take_granted();
assert!(granted.is_empty());
drop(reg);
crate::test_complete!("cleanup_task_aborts_granted_lease_obligation");
}
#[test]
fn collision_drain_expired_waiters() {
init_test("collision_drain_expired_waiters");
let mut reg = NameRegistry::new();
let mut lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
reg.register_with_policy(
"svc",
tid(2),
rid(0),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(10),
},
)
.unwrap();
reg.register_with_policy(
"svc",
tid(3),
rid(0),
Time::from_secs(2),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(100),
},
)
.unwrap();
assert_eq!(reg.waiter_count(), 2);
let removed = reg.drain_expired_waiters(Time::from_secs(50));
assert_eq!(removed, 1);
assert_eq!(reg.waiter_count(), 1);
lease.abort().unwrap();
crate::test_complete!("collision_drain_expired_waiters");
}
#[test]
fn collision_replace_displaces_pending_permit() {
init_test("collision_replace_displaces_pending_permit");
let mut reg = NameRegistry::new();
let mut permit = reg
.reserve("svc", tid(1), rid(0), Time::ZERO)
.expect("reserve ok");
let outcome = reg
.register_with_policy(
"svc",
tid(2),
rid(0),
Time::from_secs(1),
NameCollisionPolicy::Replace,
)
.unwrap();
let mut new_lease = match outcome {
NameCollisionOutcome::Replaced {
lease,
displaced_holder,
..
} => {
assert_eq!(displaced_holder, tid(1));
lease
}
other => panic!("expected Replaced, got {other:?}"), };
assert_eq!(reg.whereis("svc"), Some(tid(2)));
permit.abort().unwrap();
new_lease.release().unwrap();
crate::test_complete!("collision_replace_displaces_pending_permit");
}
#[test]
fn conformance_policy_fail_equivalent_to_register() {
init_test("conformance_policy_fail_equivalent_to_register");
let mut reg = NameRegistry::new();
let outcome = reg
.register_with_policy("svc", tid(1), rid(0), Time::ZERO, NameCollisionPolicy::Fail)
.unwrap();
let mut lease = match outcome {
NameCollisionOutcome::Registered { lease } => lease,
other => panic!("expected Registered, got {other:?}"), };
let err_policy = reg
.register_with_policy("svc", tid(2), rid(0), Time::ZERO, NameCollisionPolicy::Fail)
.unwrap_err();
let err_register = reg.register("svc", tid(3), rid(0), Time::ZERO).unwrap_err();
match (&err_policy, &err_register) {
(
NameLeaseError::NameTaken {
current_holder: h1, ..
},
NameLeaseError::NameTaken {
current_holder: h2, ..
},
) => assert_eq!(h1, h2),
_ => panic!("expected NameTaken from both"),
}
lease.release().unwrap();
crate::test_complete!("conformance_policy_fail_equivalent_to_register");
}
#[test]
fn conformance_replace_lease_is_valid() {
init_test("conformance_replace_lease_is_valid");
let mut reg = NameRegistry::new();
let mut old_lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
let outcome = reg
.register_with_policy(
"svc",
tid(2),
rid(1),
Time::from_secs(5),
NameCollisionPolicy::Replace,
)
.unwrap();
let mut new_lease = match outcome {
NameCollisionOutcome::Replaced { lease, .. } => lease,
other => panic!("expected Replaced, got {other:?}"), };
assert_eq!(new_lease.name(), "svc");
assert_eq!(new_lease.holder(), tid(2));
assert_eq!(new_lease.region(), rid(1));
assert_eq!(new_lease.acquired_at(), Time::from_secs(5));
assert!(new_lease.is_active());
let proof = new_lease.release().unwrap();
let resolved = proof.into_resolved_proof();
assert_eq!(
resolved.resolution(),
crate::obligation::graded::Resolution::Commit,
);
old_lease.abort().unwrap();
crate::test_complete!("conformance_replace_lease_is_valid");
}
#[test]
fn wait_budget_exceeded_display() {
init_test("wait_budget_exceeded_display");
let err = NameLeaseError::WaitBudgetExceeded { name: "svc".into() };
assert!(err.to_string().contains("svc"));
assert!(err.to_string().contains("budget"));
crate::test_complete!("wait_budget_exceeded_display");
}
#[test]
fn cleanup_region_grants_to_cross_region_waiter() {
init_test("cleanup_region_grants_to_cross_region_waiter");
let mut reg = NameRegistry::new();
let mut lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
reg.register_with_policy(
"svc",
tid(2),
rid(1),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(60),
},
)
.unwrap();
assert_eq!(reg.waiter_count(), 1);
reg.cleanup_region_at(rid(0), Time::from_secs(2));
lease.abort().unwrap();
assert_eq!(reg.waiter_count(), 0);
assert_eq!(reg.whereis("svc"), Some(tid(2)));
let granted = reg.take_granted();
assert_eq!(granted.len(), 1);
assert_eq!(granted[0].name, "svc");
let mut granted_lease = granted.into_iter().next().unwrap().lease;
granted_lease.release().unwrap();
crate::test_complete!("cleanup_region_grants_to_cross_region_waiter");
}
#[test]
fn cleanup_task_grants_to_other_task_waiter() {
init_test("cleanup_task_grants_to_other_task_waiter");
let mut reg = NameRegistry::new();
let mut lease = reg.register("svc", tid(1), rid(0), Time::ZERO).unwrap();
reg.register_with_policy(
"svc",
tid(2),
rid(0),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(60),
},
)
.unwrap();
assert_eq!(reg.waiter_count(), 1);
reg.cleanup_task_at(tid(1), Time::from_secs(2));
lease.abort().unwrap();
assert_eq!(reg.waiter_count(), 0);
assert_eq!(reg.whereis("svc"), Some(tid(2)));
let granted = reg.take_granted();
assert_eq!(granted.len(), 1);
let mut granted_lease = granted.into_iter().next().unwrap().lease;
granted_lease.release().unwrap();
crate::test_complete!("cleanup_task_grants_to_other_task_waiter");
}
#[test]
fn cleanup_region_grants_waiter_for_pending_permit() {
init_test("cleanup_region_grants_waiter_for_pending_permit");
let mut reg = NameRegistry::new();
let mut permit = reg
.reserve("svc", tid(1), rid(0), Time::ZERO)
.expect("reserve ok");
let outcome = reg
.register_with_policy(
"svc",
tid(2),
rid(1),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(60),
},
)
.unwrap();
assert!(matches!(outcome, NameCollisionOutcome::Enqueued));
assert_eq!(reg.waiter_count(), 1);
reg.cleanup_region_at(rid(0), Time::from_secs(2));
permit.abort().unwrap();
assert_eq!(reg.waiter_count(), 0);
assert_eq!(reg.whereis("svc"), Some(tid(2)));
let granted = reg.take_granted();
assert_eq!(granted.len(), 1);
assert_eq!(granted[0].name, "svc");
let mut granted_lease = granted.into_iter().next().unwrap().lease;
granted_lease.release().unwrap();
crate::test_complete!("cleanup_region_grants_waiter_for_pending_permit");
}
#[test]
fn cleanup_task_grants_waiter_for_pending_permit() {
init_test("cleanup_task_grants_waiter_for_pending_permit");
let mut reg = NameRegistry::new();
let mut permit = reg
.reserve("svc", tid(1), rid(0), Time::ZERO)
.expect("reserve ok");
let outcome = reg
.register_with_policy(
"svc",
tid(2),
rid(0),
Time::from_secs(1),
NameCollisionPolicy::Wait {
deadline: Time::from_secs(60),
},
)
.unwrap();
assert!(matches!(outcome, NameCollisionOutcome::Enqueued));
assert_eq!(reg.waiter_count(), 1);
reg.cleanup_task_at(tid(1), Time::from_secs(2));
permit.abort().unwrap();
assert_eq!(reg.waiter_count(), 0);
assert_eq!(reg.whereis("svc"), Some(tid(2)));
let granted = reg.take_granted();
assert_eq!(granted.len(), 1);
let mut granted_lease = granted.into_iter().next().unwrap().lease;
granted_lease.release().unwrap();
crate::test_complete!("cleanup_task_grants_waiter_for_pending_permit");
}
}