use super::cap;
use super::macaroon::{MacaroonToken, VerificationContext, VerificationError};
use super::registry::RegistryHandle;
use crate::combinator::select::SelectAll;
use crate::evidence_sink::EvidenceSink;
#[cfg(feature = "messaging-fabric")]
use crate::messaging::capability::{
FabricCapability, FabricCapabilityGrant, FabricCapabilityGrantError, FabricCapabilityId,
FabricCapabilityRegistry, FabricCapabilityScope, GrantedFabricToken, PublishPermit,
SubjectFamilyTag, SubscribeToken,
};
#[cfg(feature = "messaging-fabric")]
use crate::messaging::class::DeliveryClass;
#[cfg(feature = "messaging-fabric")]
use crate::messaging::ir::CapabilityTokenSchema;
#[cfg(feature = "messaging-fabric")]
use crate::messaging::subject::SubjectPattern;
use crate::observability::{
DiagnosticContext, LogCollector, LogEntry, ObservabilityConfig, SpanId,
};
use crate::remote::RemoteCap;
use crate::runtime::blocking_pool::BlockingPoolHandle;
use crate::runtime::io_driver::IoDriverHandle;
#[cfg(not(target_arch = "wasm32"))]
use crate::runtime::io_driver::IoRegistration;
#[cfg(not(target_arch = "wasm32"))]
use crate::runtime::reactor::{Interest, Source};
use crate::runtime::task_handle::JoinError;
use crate::time::{TimerDriverHandle, timeout};
use crate::trace::distributed::{LogicalClockHandle, LogicalTime};
use crate::trace::{TraceBufferHandle, TraceEvent};
use crate::tracing_compat::{debug, error, info, trace, warn};
use crate::types::{
Budget, CancelKind, CancelReason, CxInner, RegionId, SystemPressure, TaskId, Time,
};
use crate::util::{EntropySource, OsEntropy};
use std::cell::RefCell;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Waker;
use std::time::Duration;
type NamedFuture<T> = (&'static str, Pin<Box<dyn Future<Output = T> + Send>>);
type NamedFutures<T> = Vec<NamedFuture<T>>;
fn wall_clock_now() -> Time {
crate::time::wall_now()
}
fn noop_waker() -> Waker {
Waker::noop().clone()
}
#[derive(Debug, Clone)]
struct CxHandles {
io_driver: Option<IoDriverHandle>,
io_cap: Option<Arc<dyn crate::io::IoCap>>,
timer_driver: Option<TimerDriverHandle>,
blocking_pool: Option<BlockingPoolHandle>,
entropy: Arc<dyn EntropySource>,
logical_clock: LogicalClockHandle,
remote_cap: Option<Arc<RemoteCap>>,
registry: Option<RegistryHandle>,
pressure: Option<Arc<SystemPressure>>,
evidence_sink: Option<Arc<dyn EvidenceSink>>,
macaroon: Option<Arc<MacaroonToken>>,
#[cfg(feature = "messaging-fabric")]
fabric_capabilities: Arc<FabricCapabilityRegistry>,
}
#[derive(Debug)]
pub struct Cx<Caps = cap::All> {
pub(crate) inner: Arc<parking_lot::RwLock<CxInner>>,
observability: Arc<parking_lot::RwLock<ObservabilityState>>,
handles: Arc<CxHandles>,
_caps: PhantomData<fn() -> Caps>,
}
impl<Caps> Clone for Cx<Caps> {
#[inline]
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
observability: Arc::clone(&self.observability),
handles: Arc::clone(&self.handles),
_caps: PhantomData,
}
}
}
#[derive(Debug, Clone)]
pub struct ObservabilityState {
collector: Option<LogCollector>,
context: DiagnosticContext,
trace: Option<TraceBufferHandle>,
include_timestamps: bool,
}
impl ObservabilityState {
fn new(region: RegionId, task: TaskId) -> Self {
let context = DiagnosticContext::new()
.with_task_id(task)
.with_region_id(region)
.with_span_id(SpanId::new());
Self {
collector: None,
context,
trace: None,
include_timestamps: true,
}
}
pub(crate) fn new_with_config(
region: RegionId,
task: TaskId,
config: &ObservabilityConfig,
collector: Option<LogCollector>,
) -> Self {
let context = config
.create_diagnostic_context()
.with_task_id(task)
.with_region_id(region)
.with_span_id(SpanId::new());
Self {
collector,
context,
trace: None,
include_timestamps: config.include_timestamps(),
}
}
fn derive_child(&self, region: RegionId, task: TaskId) -> Self {
let mut context = self.context.clone().fork();
context = context.with_task_id(task).with_region_id(region);
Self {
collector: self.collector.clone(),
context,
trace: self.trace.clone(),
include_timestamps: self.include_timestamps,
}
}
}
struct MaskGuard<'a> {
inner: &'a Arc<parking_lot::RwLock<CxInner>>,
}
impl Drop for MaskGuard<'_> {
fn drop(&mut self) {
let mut inner = self.inner.write();
inner.mask_depth = inner.mask_depth.saturating_sub(1);
}
}
type FullCx = Cx<cap::All>;
thread_local! {
static CURRENT_CX: RefCell<Option<FullCx>> = const { RefCell::new(None) };
}
#[cfg_attr(feature = "test-internals", visibility::make(pub))]
pub(crate) struct CurrentCxGuard {
prev: Option<FullCx>,
_not_send: std::marker::PhantomData<*mut ()>,
}
impl Drop for CurrentCxGuard {
fn drop(&mut self) {
let prev = self.prev.take();
let _ = CURRENT_CX.try_with(|slot| {
*slot.borrow_mut() = prev;
});
}
}
impl FullCx {
#[inline]
#[must_use]
pub fn current() -> Option<Self> {
CURRENT_CX
.try_with(|slot| slot.borrow().clone())
.unwrap_or(None)
}
#[inline]
#[must_use]
#[cfg_attr(feature = "test-internals", visibility::make(pub))]
pub(crate) fn set_current(cx: Option<Self>) -> CurrentCxGuard {
let prev = CURRENT_CX.with(|slot| {
let mut guard = slot.borrow_mut();
let prev = guard.take();
*guard = cx;
prev
});
CurrentCxGuard {
prev,
_not_send: std::marker::PhantomData,
}
}
}
impl<Caps> Cx<Caps> {
#[must_use]
#[allow(dead_code)]
#[cfg_attr(feature = "test-internals", visibility::make(pub))]
pub(crate) fn new(region: RegionId, task: TaskId, budget: Budget) -> Self {
Self::new_with_observability(region, task, budget, None, None, None)
}
#[allow(dead_code)] pub(crate) fn from_inner(inner: Arc<parking_lot::RwLock<CxInner>>) -> Self {
let (region, task) = {
let guard = inner.read();
(guard.region, guard.task)
};
Self {
inner,
observability: Arc::new(parking_lot::RwLock::new(ObservabilityState::new(
region, task,
))),
handles: Arc::new(CxHandles {
io_driver: None,
io_cap: None,
timer_driver: None,
blocking_pool: None,
entropy: Arc::new(OsEntropy),
logical_clock: LogicalClockHandle::default(),
remote_cap: None,
registry: None,
pressure: None,
evidence_sink: None,
macaroon: None,
#[cfg(feature = "messaging-fabric")]
fabric_capabilities: Arc::new(FabricCapabilityRegistry::default()),
}),
_caps: PhantomData,
}
}
#[must_use]
#[cfg_attr(feature = "test-internals", visibility::make(pub))]
pub(crate) fn new_with_observability(
region: RegionId,
task: TaskId,
budget: Budget,
observability: Option<ObservabilityState>,
io_driver: Option<IoDriverHandle>,
entropy: Option<Arc<dyn EntropySource>>,
) -> Self {
Self::new_with_io(
region,
task,
budget,
observability,
io_driver,
None,
entropy,
)
}
#[must_use]
#[cfg_attr(feature = "test-internals", visibility::make(pub))]
pub(crate) fn new_with_io(
region: RegionId,
task: TaskId,
budget: Budget,
observability: Option<ObservabilityState>,
io_driver: Option<IoDriverHandle>,
io_cap: Option<Arc<dyn crate::io::IoCap>>,
entropy: Option<Arc<dyn EntropySource>>,
) -> Self {
Self::new_with_drivers(
region,
task,
budget,
observability,
io_driver,
io_cap,
None,
entropy,
)
}
#[must_use]
#[cfg_attr(feature = "test-internals", visibility::make(pub))]
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_with_drivers(
region: RegionId,
task: TaskId,
budget: Budget,
observability: Option<ObservabilityState>,
io_driver: Option<IoDriverHandle>,
io_cap: Option<Arc<dyn crate::io::IoCap>>,
timer_driver: Option<TimerDriverHandle>,
entropy: Option<Arc<dyn EntropySource>>,
) -> Self {
let inner = Arc::new(parking_lot::RwLock::new(CxInner::new(region, task, budget)));
let observability_state =
observability.unwrap_or_else(|| ObservabilityState::new(region, task));
let observability = Arc::new(parking_lot::RwLock::new(observability_state));
let entropy = entropy.unwrap_or_else(|| Arc::new(OsEntropy));
debug!(
task_id = ?task,
region_id = ?region,
budget_deadline = ?budget.deadline,
budget_poll_quota = budget.poll_quota,
budget_cost_quota = ?budget.cost_quota,
budget_priority = budget.priority,
budget_source = "cx_new",
"budget initialized for context"
);
Self {
inner,
observability,
handles: Arc::new(CxHandles {
io_driver,
io_cap,
timer_driver,
blocking_pool: None,
entropy,
logical_clock: LogicalClockHandle::default(),
remote_cap: None,
registry: None,
pressure: None,
evidence_sink: None,
macaroon: None,
#[cfg(feature = "messaging-fabric")]
fabric_capabilities: Arc::new(FabricCapabilityRegistry::default()),
}),
_caps: PhantomData,
}
}
#[inline]
#[must_use]
pub(crate) fn io_driver_handle(&self) -> Option<IoDriverHandle> {
self.handles.io_driver.clone()
}
#[inline]
#[must_use]
pub(crate) fn blocking_pool_handle(&self) -> Option<BlockingPoolHandle> {
self.handles.blocking_pool.clone()
}
#[must_use]
pub(crate) fn with_blocking_pool_handle(mut self, handle: Option<BlockingPoolHandle>) -> Self {
Arc::make_mut(&mut self.handles).blocking_pool = handle;
self
}
#[must_use]
pub(crate) fn with_logical_clock(mut self, clock: LogicalClockHandle) -> Self {
Arc::make_mut(&mut self.handles).logical_clock = clock;
self
}
#[must_use]
pub fn restrict<NewCaps>(&self) -> Cx<NewCaps>
where
NewCaps: cap::SubsetOf<Caps>,
{
self.retype()
}
#[inline]
#[must_use]
pub(crate) fn retype<NewCaps>(&self) -> Cx<NewCaps> {
Cx {
inner: self.inner.clone(),
observability: self.observability.clone(),
handles: self.handles.clone(),
_caps: PhantomData,
}
}
#[must_use]
pub(crate) fn with_registry_handle(mut self, registry: Option<RegistryHandle>) -> Self {
Arc::make_mut(&mut self.handles).registry = registry;
self
}
#[must_use]
pub fn with_remote_cap(mut self, cap: RemoteCap) -> Self {
Arc::make_mut(&mut self.handles).remote_cap = Some(Arc::new(cap));
self
}
#[must_use]
pub fn with_pressure(mut self, pressure: Arc<SystemPressure>) -> Self {
Arc::make_mut(&mut self.handles).pressure = Some(pressure);
self
}
#[must_use]
#[inline]
pub fn pressure(&self) -> Option<&SystemPressure> {
self.handles.pressure.as_deref()
}
#[allow(dead_code)]
#[must_use]
pub(crate) fn pressure_handle(&self) -> Option<Arc<SystemPressure>> {
self.handles.pressure.clone()
}
#[inline]
#[must_use]
pub(crate) fn remote_cap_handle(&self) -> Option<Arc<RemoteCap>> {
self.handles.remote_cap.clone()
}
#[must_use]
pub(crate) fn with_remote_cap_handle(mut self, cap: Option<Arc<RemoteCap>>) -> Self {
Arc::make_mut(&mut self.handles).remote_cap = cap;
self
}
#[inline]
#[must_use]
pub fn registry_handle(&self) -> Option<RegistryHandle> {
self.handles.registry.clone()
}
#[inline]
#[must_use]
pub fn has_registry(&self) -> bool {
self.handles.registry.is_some()
}
#[cfg(feature = "messaging-fabric")]
pub fn grant_fabric_capability(
&self,
capability: FabricCapability,
) -> Result<FabricCapabilityGrant, FabricCapabilityGrantError> {
self.handles.fabric_capabilities.grant(capability)
}
#[cfg(feature = "messaging-fabric")]
#[must_use]
pub fn fabric_capabilities(&self) -> Vec<FabricCapabilityGrant> {
self.handles.fabric_capabilities.snapshot()
}
#[cfg(feature = "messaging-fabric")]
pub fn grant_publish_capability<S: SubjectFamilyTag>(
&self,
subject: SubjectPattern,
schema: &CapabilityTokenSchema,
delivery_class: DeliveryClass,
) -> Result<GrantedFabricToken<PublishPermit<S>>, FabricCapabilityGrantError> {
let token = PublishPermit::<S>::authorize(schema, delivery_class)?;
let grant = self.grant_fabric_capability(FabricCapability::Publish { subject })?;
Ok(GrantedFabricToken::new(grant, token))
}
#[cfg(feature = "messaging-fabric")]
pub fn grant_subscribe_capability<S: SubjectFamilyTag>(
&self,
subject: SubjectPattern,
schema: &CapabilityTokenSchema,
delivery_class: DeliveryClass,
) -> Result<GrantedFabricToken<SubscribeToken<S>>, FabricCapabilityGrantError> {
let token = SubscribeToken::<S>::authorize(schema, delivery_class)?;
let grant = self.grant_fabric_capability(FabricCapability::Subscribe { subject })?;
Ok(GrantedFabricToken::new(grant, token))
}
#[cfg(feature = "messaging-fabric")]
#[must_use]
pub fn check_fabric_capability(&self, capability: &FabricCapability) -> bool {
self.handles.fabric_capabilities.check(capability)
}
#[cfg(feature = "messaging-fabric")]
#[must_use]
pub fn revoke_fabric_capability(&self, id: FabricCapabilityId) -> Option<FabricCapability> {
self.handles.fabric_capabilities.revoke_by_id(id)
}
#[cfg(feature = "messaging-fabric")]
#[must_use]
pub fn revoke_fabric_capability_by_subject(&self, subject: &SubjectPattern) -> usize {
self.handles.fabric_capabilities.revoke_by_subject(subject)
}
#[cfg(feature = "messaging-fabric")]
#[must_use]
pub fn revoke_fabric_capability_scope(&self, scope: FabricCapabilityScope) -> usize {
self.handles.fabric_capabilities.revoke_scope(scope)
}
#[must_use]
pub fn with_evidence_sink(mut self, sink: Option<Arc<dyn EvidenceSink>>) -> Self {
Arc::make_mut(&mut self.handles).evidence_sink = sink;
self
}
#[inline]
#[must_use]
pub(crate) fn evidence_sink_handle(&self) -> Option<Arc<dyn EvidenceSink>> {
self.handles.evidence_sink.clone()
}
pub fn emit_evidence(&self, entry: &franken_evidence::EvidenceLedger) {
if let Some(ref sink) = self.handles.evidence_sink {
sink.emit(entry);
}
}
#[must_use]
pub fn with_macaroon(mut self, token: MacaroonToken) -> Self {
Arc::make_mut(&mut self.handles).macaroon = Some(Arc::new(token));
self
}
#[must_use]
#[allow(dead_code)] pub(crate) fn with_macaroon_handle(mut self, handle: Option<Arc<MacaroonToken>>) -> Self {
Arc::make_mut(&mut self.handles).macaroon = handle;
self
}
#[inline]
#[must_use]
pub fn macaroon(&self) -> Option<&MacaroonToken> {
self.handles.macaroon.as_deref()
}
#[inline]
#[must_use]
#[allow(dead_code)] pub(crate) fn macaroon_handle(&self) -> Option<Arc<MacaroonToken>> {
self.handles.macaroon.clone()
}
#[must_use]
pub fn attenuate(&self, predicate: super::macaroon::CaveatPredicate) -> Option<Self> {
let token = self.handles.macaroon.as_ref()?;
let attenuated = MacaroonToken::clone(token).add_caveat(predicate);
info!(
token_id = %attenuated.identifier(),
caveat_count = attenuated.caveat_count(),
"capability attenuated"
);
let mut cx = self.clone();
Arc::make_mut(&mut cx.handles).macaroon = Some(Arc::new(attenuated));
Some(cx)
}
#[must_use]
pub fn attenuate_time_limit(&self, deadline_ms: u64) -> Option<Self> {
self.attenuate(super::macaroon::CaveatPredicate::TimeBefore(deadline_ms))
}
#[must_use]
pub fn attenuate_scope(&self, pattern: impl Into<String>) -> Option<Self> {
self.attenuate(super::macaroon::CaveatPredicate::ResourceScope(
pattern.into(),
))
}
#[must_use]
pub fn attenuate_rate_limit(&self, max_count: u32, window_secs: u32) -> Option<Self> {
self.attenuate(super::macaroon::CaveatPredicate::RateLimit {
max_count,
window_secs,
})
}
#[must_use]
pub fn attenuate_from_budget(&self) -> Option<Self> {
let _ = self.handles.macaroon.as_ref()?;
let budget = self.budget();
budget.deadline.map_or_else(
|| Some(self.clone()),
|d| self.attenuate_time_limit(d.as_millis()),
)
}
pub fn verify_capability(
&self,
root_key: &crate::security::key::AuthKey,
context: &VerificationContext,
) -> Result<(), VerificationError> {
let Some(token) = self.handles.macaroon.as_ref() else {
warn!(
task_id = ?self.task_id(),
region_id = ?self.region_id(),
"capability verification failed: no macaroon attached"
);
return Err(VerificationError::InvalidSignature);
};
let result = token.verify(root_key, context);
self.emit_macaroon_evidence(token, &result);
match &result {
Ok(()) => {
info!(
token_id = %token.identifier(),
caveats_checked = token.caveat_count(),
"macaroon verified successfully"
);
}
Err(VerificationError::InvalidSignature) => {
error!(
token_id = %token.identifier(),
"HMAC chain integrity violation — possible tampering"
);
}
#[allow(unused_variables)]
Err(VerificationError::CaveatFailed {
index,
predicate,
reason,
}) => {
info!(
token_id = %token.identifier(),
failed_at_caveat = index,
predicate = %predicate,
reason = %reason,
"macaroon verification failed"
);
}
#[allow(unused_variables)]
Err(VerificationError::MissingDischarge { index, identifier }) => {
info!(
token_id = %token.identifier(),
failed_at_caveat = index,
discharge_id = %identifier,
"missing discharge macaroon"
);
}
#[allow(unused_variables)]
Err(VerificationError::DischargeInvalid { index, identifier }) => {
info!(
token_id = %token.identifier(),
failed_at_caveat = index,
discharge_id = %identifier,
"discharge macaroon verification failed"
);
}
#[allow(unused_variables)]
Err(VerificationError::DischargeChainTooDeep { depth }) => {
info!(
token_id = %token.identifier(),
depth = %depth,
"discharge macaroon chain too deep"
);
}
}
result
}
fn emit_macaroon_evidence(
&self,
token: &MacaroonToken,
result: &Result<(), VerificationError>,
) {
let Some(ref sink) = self.handles.evidence_sink else {
return;
};
let now_ms = wall_clock_now().as_millis();
let (action, loss) = match result {
Ok(()) => ("verify_success".to_string(), 0.0),
Err(VerificationError::InvalidSignature) => ("verify_fail_signature".to_string(), 1.0),
Err(VerificationError::CaveatFailed { index, .. }) => {
(format!("verify_fail_caveat_{index}"), 0.5)
}
Err(VerificationError::MissingDischarge { index, .. }) => {
(format!("verify_fail_missing_discharge_{index}"), 0.8)
}
Err(VerificationError::DischargeInvalid { index, .. }) => {
(format!("verify_fail_discharge_invalid_{index}"), 0.9)
}
Err(VerificationError::DischargeChainTooDeep { depth }) => {
(format!("verify_fail_discharge_chain_too_deep_{depth}"), 1.0)
}
};
let entry = franken_evidence::EvidenceLedger {
ts_unix_ms: now_ms,
component: "cx_macaroon".to_string(),
action: action.clone(),
posterior: vec![1.0],
expected_loss_by_action: std::collections::BTreeMap::from([(action, loss)]),
chosen_expected_loss: loss,
calibration_score: 1.0,
fallback_active: false,
#[allow(clippy::cast_precision_loss)]
top_features: vec![("caveat_count".to_string(), token.caveat_count() as f64)],
};
sink.emit(&entry);
}
#[inline]
#[must_use]
pub fn logical_now(&self) -> LogicalTime {
self.handles.logical_clock.now()
}
#[inline]
#[must_use]
pub(crate) fn logical_clock_handle(&self) -> LogicalClockHandle {
self.handles.logical_clock.clone()
}
#[inline]
#[must_use]
pub fn logical_tick(&self) -> LogicalTime {
self.handles.logical_clock.tick()
}
#[inline]
#[must_use]
pub fn logical_receive(&self, sender_time: &LogicalTime) -> LogicalTime {
self.handles.logical_clock.receive(sender_time)
}
#[inline]
#[must_use]
pub fn timer_driver(&self) -> Option<TimerDriverHandle>
where
Caps: cap::HasTime,
{
self.handles.timer_driver.clone()
}
#[inline]
#[must_use]
pub fn has_timer(&self) -> bool
where
Caps: cap::HasTime,
{
self.handles.timer_driver.is_some()
}
#[inline]
#[must_use]
pub fn io(&self) -> Option<&dyn crate::io::IoCap>
where
Caps: cap::HasIo,
{
self.handles.io_cap.as_ref().map(AsRef::as_ref)
}
#[inline]
#[allow(dead_code)]
#[must_use]
pub(crate) fn io_cap_handle(&self) -> Option<Arc<dyn crate::io::IoCap>> {
self.handles.io_cap.clone()
}
#[inline]
#[must_use]
pub fn has_io(&self) -> bool
where
Caps: cap::HasIo,
{
self.handles.io_cap.is_some()
}
#[inline]
#[must_use]
pub fn fetch_cap(&self) -> Option<&dyn crate::io::FetchIoCap>
where
Caps: cap::HasIo,
{
self.handles.io_cap.as_ref().and_then(|cap| cap.fetch_cap())
}
#[inline]
#[must_use]
pub fn has_fetch_cap(&self) -> bool
where
Caps: cap::HasIo,
{
self.fetch_cap().is_some()
}
#[inline]
#[must_use]
pub fn remote(&self) -> Option<&RemoteCap>
where
Caps: cap::HasRemote,
{
self.handles.remote_cap.as_ref().map(AsRef::as_ref)
}
#[inline]
#[must_use]
pub fn has_remote(&self) -> bool
where
Caps: cap::HasRemote,
{
self.handles.remote_cap.is_some()
}
#[cfg(unix)]
pub fn register_io<S: Source>(
&self,
source: &S,
interest: Interest,
) -> std::io::Result<IoRegistration>
where
Caps: cap::HasIo,
{
let Some(driver) = self.io_driver_handle() else {
return Err(std::io::Error::new(
std::io::ErrorKind::NotConnected,
"I/O driver not available",
));
};
driver.register(source, interest, noop_waker())
}
#[inline]
#[must_use]
pub fn region_id(&self) -> RegionId {
self.inner.read().region
}
#[inline]
#[must_use]
pub fn task_id(&self) -> TaskId {
self.inner.read().task
}
#[inline]
#[must_use]
pub fn task_type(&self) -> Option<String> {
self.inner.read().task_type.clone()
}
pub fn set_task_type(&self, task_type: impl Into<String>) {
let mut inner = self.inner.write();
inner.task_type = Some(task_type.into());
}
#[inline]
#[must_use]
pub fn budget(&self) -> Budget {
self.inner.read().budget
}
#[inline]
#[must_use]
pub fn is_cancel_requested(&self) -> bool {
self.inner.read().cancel_requested
}
#[allow(clippy::result_large_err)]
pub fn checkpoint(&self) -> Result<(), crate::error::Error> {
let checkpoint_time = self.current_checkpoint_time();
let (
cancel_requested,
mask_depth,
task,
region,
budget,
budget_baseline,
cancel_reason,
budget_exhaustion,
) = {
let mut inner = self.inner.write();
inner.checkpoint_state.record_at(checkpoint_time);
let budget_exhaustion = Self::checkpoint_budget_exhaustion(
inner.region,
inner.task,
inner.budget,
checkpoint_time,
);
if let Some((reason, _, _)) = &budget_exhaustion {
inner.cancel_requested = true;
inner
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
if let Some(existing) = &mut inner.cancel_reason {
existing.strengthen(reason);
} else {
inner.cancel_reason = Some(reason.clone());
}
}
if inner.cancel_requested && inner.mask_depth == 0 {
inner.cancel_acknowledged = true;
}
(
inner.cancel_requested,
inner.mask_depth,
inner.task,
inner.region,
inner.budget,
inner.budget_baseline,
inner.cancel_reason.clone(),
budget_exhaustion.map(|(_, exhaustion_kind, deadline_remaining_ms)| {
(exhaustion_kind, deadline_remaining_ms)
}),
)
};
if let Some((exhaustion_kind, deadline_remaining_ms)) = budget_exhaustion {
if let Some(ref sink) = self.handles.evidence_sink {
crate::evidence_sink::emit_budget_evidence(
sink.as_ref(),
exhaustion_kind,
budget.poll_quota,
deadline_remaining_ms,
);
}
}
if cancel_requested && mask_depth == 0 {
if let Some(ref sink) = self.handles.evidence_sink {
let kind_str = cancel_reason
.as_ref()
.map_or_else(|| "unknown".to_string(), |r| format!("{}", r.kind));
crate::evidence_sink::emit_cancel_evidence(
sink.as_ref(),
&kind_str,
budget.poll_quota,
budget.priority,
);
}
}
Self::check_cancel_from_values(
cancel_requested,
mask_depth,
task,
region,
budget,
budget_baseline,
checkpoint_time,
cancel_reason.as_ref(),
)
}
#[allow(clippy::result_large_err)]
pub fn checkpoint_with(&self, msg: impl Into<String>) -> Result<(), crate::error::Error> {
let checkpoint_time = self.current_checkpoint_time();
let (
cancel_requested,
mask_depth,
task,
region,
budget,
budget_baseline,
cancel_reason,
budget_exhaustion,
) = {
let mut inner = self.inner.write();
inner
.checkpoint_state
.record_with_message_at(msg.into(), checkpoint_time);
let budget_exhaustion = Self::checkpoint_budget_exhaustion(
inner.region,
inner.task,
inner.budget,
checkpoint_time,
);
if let Some((reason, _, _)) = &budget_exhaustion {
inner.cancel_requested = true;
inner
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
if let Some(existing) = &mut inner.cancel_reason {
existing.strengthen(reason);
} else {
inner.cancel_reason = Some(reason.clone());
}
}
if inner.cancel_requested && inner.mask_depth == 0 {
inner.cancel_acknowledged = true;
}
(
inner.cancel_requested,
inner.mask_depth,
inner.task,
inner.region,
inner.budget,
inner.budget_baseline,
inner.cancel_reason.clone(),
budget_exhaustion.map(|(_, exhaustion_kind, deadline_remaining_ms)| {
(exhaustion_kind, deadline_remaining_ms)
}),
)
};
if let Some((exhaustion_kind, deadline_remaining_ms)) = budget_exhaustion {
if let Some(ref sink) = self.handles.evidence_sink {
crate::evidence_sink::emit_budget_evidence(
sink.as_ref(),
exhaustion_kind,
budget.poll_quota,
deadline_remaining_ms,
);
}
}
if cancel_requested && mask_depth == 0 {
if let Some(ref sink) = self.handles.evidence_sink {
let kind_str = cancel_reason
.as_ref()
.map_or_else(|| "unknown".to_string(), |r| format!("{}", r.kind));
crate::evidence_sink::emit_cancel_evidence(
sink.as_ref(),
&kind_str,
budget.poll_quota,
budget.priority,
);
}
}
Self::check_cancel_from_values(
cancel_requested,
mask_depth,
task,
region,
budget,
budget_baseline,
checkpoint_time,
cancel_reason.as_ref(),
)
}
#[must_use]
pub fn checkpoint_state(&self) -> crate::types::CheckpointState {
self.inner.read().checkpoint_state.clone()
}
#[must_use]
pub fn now(&self) -> Time
where
Caps: cap::HasTime,
{
self.handles
.timer_driver
.as_ref()
.map_or_else(wall_clock_now, TimerDriverHandle::now)
}
#[inline]
fn current_checkpoint_time(&self) -> Time {
self.handles
.timer_driver
.as_ref()
.map_or_else(wall_clock_now, TimerDriverHandle::now)
}
#[inline]
fn checkpoint_budget_exhaustion(
region: RegionId,
task: TaskId,
budget: Budget,
now: Time,
) -> Option<(CancelReason, &'static str, Option<u64>)> {
let deadline_remaining_ms = budget
.remaining_time(now)
.map(Self::duration_millis_saturating);
let mut exhaustion = if budget.is_past_deadline(now) {
Some((
CancelReason::with_origin(CancelKind::Deadline, region, now).with_task(task),
"time",
deadline_remaining_ms,
))
} else {
None
};
if budget.poll_quota == 0 {
let candidate =
CancelReason::with_origin(CancelKind::PollQuota, region, now).with_task(task);
match &mut exhaustion {
Some((existing, kind, _)) => {
if existing.strengthen(&candidate) {
*kind = "poll";
}
}
None => exhaustion = Some((candidate, "poll", deadline_remaining_ms)),
}
}
if matches!(budget.cost_quota, Some(0)) {
let candidate =
CancelReason::with_origin(CancelKind::CostBudget, region, now).with_task(task);
match &mut exhaustion {
Some((existing, kind, _)) => {
if existing.strengthen(&candidate) {
*kind = "cost";
}
}
None => exhaustion = Some((candidate, "cost", deadline_remaining_ms)),
}
}
exhaustion
}
#[inline]
fn checkpoint_budget_usage(
budget: Budget,
budget_baseline: Budget,
now: Time,
) -> (Option<u32>, Option<u64>, Option<u64>) {
let polls_used = if budget_baseline.poll_quota == u32::MAX {
None
} else {
Some(budget_baseline.poll_quota.saturating_sub(budget.poll_quota))
};
let cost_used = match (budget_baseline.cost_quota, budget.cost_quota) {
(Some(baseline), Some(remaining)) => Some(baseline.saturating_sub(remaining)),
_ => None,
};
let time_remaining_ms = budget
.remaining_time(now)
.map(Self::duration_millis_saturating);
(polls_used, cost_used, time_remaining_ms)
}
#[inline]
fn duration_millis_saturating(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
#[allow(clippy::result_large_err)]
#[allow(clippy::too_many_arguments)]
fn check_cancel_from_values(
cancel_requested: bool,
mask_depth: u32,
task: TaskId,
region: RegionId,
budget: Budget,
budget_baseline: Budget,
checkpoint_time: Time,
cancel_reason: Option<&CancelReason>,
) -> Result<(), crate::error::Error> {
let (polls_used, cost_used, time_remaining_ms) =
Self::checkpoint_budget_usage(budget, budget_baseline, checkpoint_time);
let _ = (
&task,
®ion,
&budget,
&budget_baseline,
&polls_used,
&cost_used,
&time_remaining_ms,
);
trace!(
task_id = ?task,
region_id = ?region,
polls_used = ?polls_used,
polls_remaining = budget.poll_quota,
time_remaining_ms = ?time_remaining_ms,
cost_used = ?cost_used,
cost_remaining = ?budget.cost_quota,
deadline = ?budget.deadline,
cancel_reason = ?cancel_reason,
cancel_requested,
mask_depth,
"checkpoint"
);
if cancel_requested {
if mask_depth == 0 {
let cancel_reason_ref = cancel_reason.as_ref();
let exhausted_resource = cancel_reason_ref
.map_or_else(|| "unknown".to_string(), |r| format!("{:?}", r.kind));
let _ = &exhausted_resource;
info!(
task_id = ?task,
region_id = ?region,
exhausted_resource = %exhausted_resource,
cancel_reason = ?cancel_reason,
budget_deadline = ?budget.deadline,
budget_poll_quota = budget.poll_quota,
budget_cost_quota = ?budget.cost_quota,
"cancel observed at checkpoint - task cancelled"
);
trace!(
task_id = ?task,
region_id = ?region,
cancel_reason = ?cancel_reason,
cancel_kind = ?cancel_reason.as_ref().map(|r| r.kind),
mask_depth,
budget_deadline = ?budget.deadline,
budget_poll_quota = budget.poll_quota,
budget_cost_quota = ?budget.cost_quota,
budget_priority = budget.priority,
"cancel observed at checkpoint"
);
Err(crate::error::Error::new(crate::error::ErrorKind::Cancelled))
} else {
trace!(
task_id = ?task,
region_id = ?region,
cancel_reason = ?cancel_reason,
cancel_kind = ?cancel_reason.as_ref().map(|r| r.kind),
mask_depth,
"cancel observed but masked"
);
Ok(())
}
} else {
Ok(())
}
}
pub fn masked<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
{
let mut inner = self.inner.write();
assert!(
inner.mask_depth < crate::types::task_context::MAX_MASK_DEPTH,
"mask depth exceeded MAX_MASK_DEPTH ({}): this violates INV-MASK-BOUNDED \
and prevents cancellation from ever being observed. \
Reduce nesting of Cx::masked() sections.",
crate::types::task_context::MAX_MASK_DEPTH,
);
inner.mask_depth += 1;
}
let _guard = MaskGuard { inner: &self.inner };
f()
}
pub fn trace(&self, message: &str) {
self.log(LogEntry::trace(message));
let Some(trace) = self.trace_buffer() else {
return;
};
let now = self
.handles
.timer_driver
.as_ref()
.map_or_else(wall_clock_now, TimerDriverHandle::now);
let logical_time = self.logical_tick();
trace.record_event(move |seq| {
TraceEvent::user_trace(seq, now, message).with_logical_time(logical_time)
});
}
pub fn trace_with_fields(&self, message: &str, fields: &[(&str, &str)]) {
let mut entry = LogEntry::trace(message);
for &(k, v) in fields {
entry = entry.with_field(k, v);
}
self.log(entry);
let Some(trace) = self.trace_buffer() else {
return;
};
let now = self
.handles
.timer_driver
.as_ref()
.map_or_else(wall_clock_now, TimerDriverHandle::now);
let logical_time = self.logical_tick();
trace.record_event(move |seq| {
TraceEvent::user_trace(seq, now, message).with_logical_time(logical_time)
});
}
#[must_use]
pub fn enter_span(&self, name: &str) -> SpanGuard<Caps> {
let prev = self.diagnostic_context();
let child = prev.fork().with_custom("span.name", name);
self.set_diagnostic_context(child);
self.log(LogEntry::debug(format!("span enter: {name}")).with_target("tracing"));
SpanGuard {
cx: self.clone(),
prev,
}
}
pub fn set_request_id(&self, id: impl Into<String>) {
let mut obs = self.observability.write();
obs.context = obs.context.clone().with_custom("request_id", id);
}
#[inline]
#[must_use]
pub fn request_id(&self) -> Option<String> {
self.diagnostic_context()
.custom("request_id")
.map(String::from)
}
pub fn log(&self, entry: LogEntry) {
let obs = self.observability.read();
let Some(collector) = obs.collector.clone() else {
return;
};
let include_timestamps = obs.include_timestamps;
let context = obs.context.clone();
drop(obs);
let mut entry = entry.with_context(&context);
if include_timestamps && entry.timestamp() == Time::ZERO {
let now = self
.handles
.timer_driver
.as_ref()
.map_or_else(wall_clock_now, TimerDriverHandle::now);
entry = entry.with_timestamp(now);
}
collector.log(entry);
}
#[must_use]
pub fn diagnostic_context(&self) -> DiagnosticContext {
self.observability.read().context.clone()
}
pub fn set_diagnostic_context(&self, ctx: DiagnosticContext) {
let mut obs = self.observability.write();
obs.context = ctx;
}
pub fn set_log_collector(&self, collector: LogCollector) {
let mut obs = self.observability.write();
obs.collector = Some(collector);
}
#[inline]
#[must_use]
pub fn log_collector(&self) -> Option<LogCollector> {
self.observability.read().collector.clone()
}
pub fn set_trace_buffer(&self, trace: TraceBufferHandle) {
let mut obs = self.observability.write();
obs.trace = Some(trace);
}
#[inline]
#[must_use]
pub fn trace_buffer(&self) -> Option<TraceBufferHandle> {
self.observability.read().trace.clone()
}
pub(crate) fn child_observability(&self, region: RegionId, task: TaskId) -> ObservabilityState {
let obs = self.observability.read();
obs.derive_child(region, task)
}
#[inline]
#[must_use]
pub fn entropy(&self) -> &dyn EntropySource
where
Caps: cap::HasRandom,
{
self.handles.entropy.as_ref()
}
pub(crate) fn child_entropy(&self, task: TaskId) -> Arc<dyn EntropySource> {
self.handles.entropy.fork(task)
}
#[inline]
#[must_use]
pub(crate) fn entropy_handle(&self) -> Arc<dyn EntropySource>
where
Caps: cap::HasRandom,
{
self.handles.entropy.clone()
}
#[must_use]
pub fn random_u64(&self) -> u64
where
Caps: cap::HasRandom,
{
let value = self.handles.entropy.next_u64();
trace!(
source = self.handles.entropy.source_id(),
task_id = ?self.task_id(),
value,
"entropy_u64"
);
value
}
pub fn random_bytes(&self, dest: &mut [u8])
where
Caps: cap::HasRandom,
{
self.handles.entropy.fill_bytes(dest);
trace!(
source = self.handles.entropy.source_id(),
task_id = ?self.task_id(),
len = dest.len(),
"entropy_bytes"
);
}
#[must_use]
pub fn random_usize(&self, bound: usize) -> usize
where
Caps: cap::HasRandom,
{
assert!(bound > 0, "bound must be non-zero");
let bound_u64 = bound as u64;
let threshold = u64::MAX - (u64::MAX % bound_u64);
loop {
let value = self.random_u64();
if value < threshold {
return (value % bound_u64) as usize;
}
}
}
#[must_use]
pub fn random_bool(&self) -> bool
where
Caps: cap::HasRandom,
{
self.random_u64() & 1 == 1
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn random_f64(&self) -> f64
where
Caps: cap::HasRandom,
{
(self.random_u64() >> 11) as f64 / (1u64 << 53) as f64
}
pub fn shuffle<T>(&self, slice: &mut [T])
where
Caps: cap::HasRandom,
{
for i in (1..slice.len()).rev() {
let j = self.random_usize(i + 1);
slice.swap(i, j);
}
}
#[allow(dead_code)]
pub(crate) fn set_cancel_internal(&self, value: bool) {
let mut inner = self.inner.write();
inner.cancel_requested = value;
inner
.fast_cancel
.store(value, std::sync::atomic::Ordering::Release);
if !value {
inner.cancel_reason = None;
}
}
pub fn set_cancel_requested(&self, value: bool) {
let waker = {
let mut inner = self.inner.write();
inner.cancel_requested = value;
inner
.fast_cancel
.store(value, std::sync::atomic::Ordering::Release);
if !value {
inner.cancel_reason = None;
None
} else {
inner.cancel_waker.clone()
}
};
if let Some(waker) = waker {
waker.wake();
}
}
pub fn cancel_with(&self, kind: CancelKind, message: Option<&'static str>) {
let (region, task, waker) = {
let mut inner = self.inner.write();
let region = inner.region;
let task = inner.task;
let mut reason = CancelReason::new(kind).with_region(region).with_task(task);
if let Some(msg) = message {
reason = reason.with_message(msg);
}
inner.cancel_requested = true;
inner
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
inner.cancel_reason = Some(reason);
let waker = inner.cancel_waker.clone();
drop(inner);
(region, task, waker)
};
if let Some(w) = waker {
w.wake();
}
debug!(
task_id = ?task,
region_id = ?region,
cancel_kind = ?kind,
cancel_message = message,
"cancel initiated via cancel_with"
);
let _ = (region, task);
}
pub fn cancel_fast(&self, kind: CancelKind) {
let (region, waker) = {
let mut inner = self.inner.write();
let region = inner.region;
let reason = CancelReason::new(kind).with_region(region);
inner.cancel_requested = true;
inner
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
inner.cancel_reason = Some(reason);
let waker = inner.cancel_waker.clone();
drop(inner);
(region, waker)
};
if let Some(w) = waker {
w.wake();
}
trace!(
region_id = ?region,
cancel_kind = ?kind,
"cancel_fast initiated"
);
let _ = region;
}
#[inline]
#[must_use]
pub fn cancel_reason(&self) -> Option<CancelReason> {
let inner = self.inner.read();
inner.cancel_reason.clone()
}
pub fn cancel_chain(&self) -> impl Iterator<Item = CancelReason> {
let cancel_reason = self.inner.read().cancel_reason.clone();
std::iter::successors(cancel_reason, |r| r.cause.as_deref().cloned())
}
#[must_use]
pub fn root_cancel_cause(&self) -> Option<CancelReason> {
let inner = self.inner.read();
inner.cancel_reason.as_ref().map(|r| r.root_cause().clone())
}
#[must_use]
pub fn cancelled_by(&self, kind: CancelKind) -> bool {
let inner = self.inner.read();
inner.cancel_reason.as_ref().is_some_and(|r| r.kind == kind)
}
#[must_use]
pub fn any_cause_is(&self, kind: CancelKind) -> bool {
let inner = self.inner.read();
inner
.cancel_reason
.as_ref()
.is_some_and(|r| r.any_cause_is(kind))
}
pub fn set_cancel_reason(&self, reason: CancelReason) {
let waker = {
let mut inner = self.inner.write();
inner.cancel_requested = true;
inner
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
inner.cancel_reason = Some(reason);
inner.cancel_waker.clone()
};
if let Some(w) = waker {
w.wake();
}
}
pub async fn race<T>(
&self,
futures: Vec<Pin<Box<dyn Future<Output = T> + Send>>>,
) -> Result<T, JoinError> {
if futures.is_empty() {
return std::future::pending().await;
}
let (res, _) = SelectAll::new(futures)
.await
.map_err(|_| JoinError::PolledAfterCompletion)?;
Ok(res)
}
pub async fn race_named<T>(&self, futures: NamedFutures<T>) -> Result<T, JoinError> {
let futures: Vec<_> = futures.into_iter().map(|(_, f)| f).collect();
self.race(futures).await
}
pub async fn race_timeout<T>(
&self,
duration: Duration,
futures: Vec<Pin<Box<dyn Future<Output = T> + Send>>>,
) -> Result<T, JoinError>
where
Caps: cap::HasTime,
{
let race_fut = std::pin::pin!(self.race(futures));
let now = self
.handles
.timer_driver
.as_ref()
.map_or_else(wall_clock_now, TimerDriverHandle::now);
timeout(now, duration, race_fut)
.await
.unwrap_or_else(|_| Err(JoinError::Cancelled(CancelReason::timeout())))
}
pub async fn race_timeout_named<T>(
&self,
duration: Duration,
futures: NamedFutures<T>,
) -> Result<T, JoinError>
where
Caps: cap::HasTime,
{
let futures: Vec<_> = futures.into_iter().map(|(_, f)| f).collect();
self.race_timeout(duration, futures).await
}
#[must_use]
pub fn scope(&self) -> crate::cx::Scope<'static> {
let budget = self.budget();
debug!(
task_id = ?self.task_id(),
region_id = ?self.region_id(),
budget_deadline = ?budget.deadline,
budget_poll_quota = budget.poll_quota,
budget_cost_quota = ?budget.cost_quota,
budget_priority = budget.priority,
budget_source = "inherited",
"scope budget inherited"
);
crate::cx::Scope::new(self.region_id(), budget)
}
#[must_use]
pub fn scope_with_budget(&self, budget: Budget) -> crate::cx::Scope<'static> {
let parent_budget = self.budget();
let deadline_tightened = match (parent_budget.deadline, budget.deadline) {
(Some(parent), Some(child)) => child < parent,
(None, Some(_)) => true,
_ => false,
};
let poll_tightened = budget.poll_quota < parent_budget.poll_quota;
let cost_tightened = match (parent_budget.cost_quota, budget.cost_quota) {
(Some(parent), Some(child)) => child < parent,
(None, Some(_)) => true,
_ => false,
};
let priority_boosted = budget.priority > parent_budget.priority;
let _ = (
&deadline_tightened,
&poll_tightened,
&cost_tightened,
&priority_boosted,
);
let clamped_deadline = match (parent_budget.deadline, budget.deadline) {
(Some(parent), Some(child)) => Some(if child < parent { child } else { parent }),
(Some(parent), None) => Some(parent),
(None, child) => child,
};
let clamped_poll_quota = budget.poll_quota.min(parent_budget.poll_quota);
let clamped_cost_quota = match (parent_budget.cost_quota, budget.cost_quota) {
(Some(parent), Some(child)) => Some(child.min(parent)),
(Some(parent), None) => Some(parent),
(None, child) => child,
};
let clamped = Budget {
deadline: clamped_deadline,
poll_quota: clamped_poll_quota,
cost_quota: clamped_cost_quota,
priority: budget.priority,
};
debug!(
task_id = ?self.task_id(),
region_id = ?self.region_id(),
parent_deadline = ?parent_budget.deadline,
parent_poll_quota = parent_budget.poll_quota,
parent_cost_quota = ?parent_budget.cost_quota,
parent_priority = parent_budget.priority,
budget_deadline = ?clamped.deadline,
budget_poll_quota = clamped.poll_quota,
budget_cost_quota = ?clamped.cost_quota,
budget_priority = clamped.priority,
deadline_tightened,
poll_tightened,
cost_tightened,
priority_boosted,
budget_source = "explicit",
"scope budget set"
);
crate::cx::Scope::new(self.region_id(), clamped)
}
}
impl Cx<cap::All> {
#[must_use]
pub fn for_testing() -> Self {
Self::new(
RegionId::new_for_test(0, 0),
TaskId::new_for_test(0, 0),
Budget::INFINITE,
)
}
#[must_use]
pub fn for_testing_with_budget(budget: Budget) -> Self {
Self::new(
RegionId::new_for_test(0, 0),
TaskId::new_for_test(0, 0),
budget,
)
}
#[must_use]
pub fn for_testing_with_io() -> Self {
Self::new_with_io(
RegionId::new_for_test(0, 0),
TaskId::new_for_test(0, 0),
Budget::INFINITE,
None,
None,
Some(Arc::new(crate::io::LabIoCap::new())),
None,
)
}
#[must_use]
pub fn for_request_with_budget(budget: Budget) -> Self {
Self::new(RegionId::new_ephemeral(), TaskId::new_ephemeral(), budget)
}
#[must_use]
pub fn for_request() -> Self {
Self::for_request_with_budget(Budget::INFINITE)
}
#[must_use]
pub fn for_testing_with_remote(cap: RemoteCap) -> Self {
let mut cx = Self::for_testing();
Arc::make_mut(&mut cx.handles).remote_cap = Some(Arc::new(cap));
cx
}
}
pub struct SpanGuard<Caps = cap::All> {
cx: Cx<Caps>,
prev: DiagnosticContext,
}
impl<Caps> Drop for SpanGuard<Caps> {
fn drop(&mut self) {
let name = self
.cx
.diagnostic_context()
.custom("span.name")
.unwrap_or("unknown")
.to_owned();
self.cx
.log(LogEntry::debug(format!("span exit: {name}")).with_target("tracing"));
self.cx.set_diagnostic_context(self.prev.clone());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cx::macaroon::CaveatPredicate;
#[cfg(feature = "messaging-fabric")]
use crate::messaging::capability::{CommandFamily, FabricCapability, FabricCapabilityScope};
#[cfg(feature = "messaging-fabric")]
use crate::messaging::class::DeliveryClass;
#[cfg(feature = "messaging-fabric")]
use crate::messaging::ir::{CapabilityPermission, CapabilityTokenSchema, SubjectFamily};
#[cfg(feature = "messaging-fabric")]
use crate::messaging::subject::SubjectPattern;
use crate::trace::TraceBufferHandle;
use crate::util::{ArenaIndex, DetEntropy};
use std::sync::atomic::{AtomicU8, Ordering};
static CURRENT_CX_DTOR_STATE: AtomicU8 = AtomicU8::new(0);
thread_local! {
static CURRENT_CX_DTOR_PROBE: CurrentCxDtorProbe = const { CurrentCxDtorProbe };
}
struct CurrentCxDtorProbe;
impl Drop for CurrentCxDtorProbe {
fn drop(&mut self) {
let state = match CURRENT_CX.try_with(|slot| slot.borrow().clone()) {
Ok(Some(_)) => 1,
Ok(None) => 2,
Err(_) => {
if Cx::current().is_none() {
3
} else {
4
}
}
};
CURRENT_CX_DTOR_STATE.store(state, Ordering::SeqCst);
}
}
fn test_cx() -> Cx {
Cx::new(
RegionId::from_arena(ArenaIndex::new(0, 0)),
TaskId::from_arena(ArenaIndex::new(0, 0)),
Budget::INFINITE,
)
}
fn test_cx_with_entropy(seed: u64) -> Cx {
Cx::new_with_observability(
RegionId::from_arena(ArenaIndex::new(0, 0)),
TaskId::from_arena(ArenaIndex::new(0, 0)),
Budget::INFINITE,
None,
None,
Some(Arc::new(DetEntropy::new(seed))),
)
}
fn trace_message(event: &crate::trace::TraceEvent) -> &str {
match &event.data {
crate::trace::TraceData::Message(message) => message,
other => panic!("expected user trace message, got {other:?}"),
}
}
#[cfg(feature = "messaging-fabric")]
fn capability_schema(
families: Vec<SubjectFamily>,
permissions: Vec<CapabilityPermission>,
) -> CapabilityTokenSchema {
CapabilityTokenSchema {
name: "fabric.cx.demo".to_owned(),
families,
delivery_classes: vec![DeliveryClass::EphemeralInteractive],
permissions,
}
}
#[test]
fn io_not_available_by_default() {
let cx = test_cx();
assert!(!cx.has_io());
assert!(cx.io().is_none());
}
#[test]
fn io_available_with_for_testing_with_io() {
let cx: Cx = Cx::for_testing_with_io();
assert!(cx.has_io());
let io = cx.io().expect("should have io cap");
assert!(!io.is_real_io());
assert_eq!(io.name(), "lab");
}
#[test]
fn checkpoint_without_cancel() {
let cx = test_cx();
assert!(cx.checkpoint().is_ok());
}
#[test]
fn checkpoint_with_cancel() {
let cx = test_cx();
cx.set_cancel_requested(true);
assert!(cx.checkpoint().is_err());
}
#[test]
fn masked_defers_cancel() {
let cx = test_cx();
cx.set_cancel_requested(true);
cx.masked(|| {
assert!(
cx.checkpoint().is_ok(),
"checkpoint should succeed when masked"
);
});
assert!(
cx.checkpoint().is_err(),
"checkpoint should fail after unmasking"
);
}
#[test]
fn trace_attaches_logical_time() {
let cx = test_cx();
let trace = TraceBufferHandle::new(8);
cx.set_trace_buffer(trace.clone());
cx.trace("hello");
let events = trace.snapshot();
let event = events.first().expect("trace event");
assert!(event.logical_time.is_some());
}
#[test]
fn masked_panic_safety() {
use std::panic::{AssertUnwindSafe, catch_unwind};
let cx = test_cx();
cx.set_cancel_requested(true);
assert!(cx.checkpoint().is_err());
let cx_clone = cx.clone();
let _ = catch_unwind(AssertUnwindSafe(|| {
cx_clone.masked(|| {
std::panic::resume_unwind(Box::new("oops"));
});
}));
assert!(
cx.checkpoint().is_err(),
"Cx remains masked after panic! mask_depth leaked."
);
}
#[test]
fn current_returns_none_during_thread_local_teardown() {
CURRENT_CX_DTOR_STATE.store(0, Ordering::SeqCst);
let join = std::thread::spawn(|| {
CURRENT_CX_DTOR_PROBE.with(|_| {});
let cx = test_cx();
let _guard = Cx::set_current(Some(cx));
assert!(Cx::current().is_some(), "current cx should be installed");
});
join.join()
.expect("thread-local teardown should not panic when reading Cx");
assert_eq!(
CURRENT_CX_DTOR_STATE.load(Ordering::SeqCst),
3,
"Cx::current() should fail closed once CURRENT_CX is unavailable"
);
}
#[test]
#[should_panic(expected = "MAX_MASK_DEPTH")]
fn mask_depth_exceeds_bound_panics() {
let cx = test_cx();
{
let mut inner = cx.inner.write();
inner.mask_depth = crate::types::task_context::MAX_MASK_DEPTH;
}
cx.masked(|| {});
}
#[test]
fn random_usize_in_range() {
let cx = test_cx_with_entropy(123);
for _ in 0..100 {
let value = cx.random_usize(7);
assert!(value < 7);
}
}
#[test]
fn shuffle_deterministic() {
let cx1 = test_cx_with_entropy(42);
let cx2 = test_cx_with_entropy(42);
let mut a = [1, 2, 3, 4, 5, 6, 7, 8];
let mut b = [1, 2, 3, 4, 5, 6, 7, 8];
cx1.shuffle(&mut a);
cx2.shuffle(&mut b);
assert_eq!(a, b);
}
#[test]
fn random_f64_range() {
let cx = test_cx_with_entropy(7);
for _ in 0..100 {
let value = cx.random_f64();
assert!((0.0..1.0).contains(&value));
}
}
#[test]
fn cancel_with_sets_reason() {
let cx = test_cx();
assert!(cx.cancel_reason().is_none());
cx.cancel_with(CancelKind::User, Some("manual stop"));
assert!(cx.is_cancel_requested());
let reason = cx.cancel_reason().expect("should have reason");
assert_eq!(reason.kind, CancelKind::User);
assert_eq!(reason.message, Some("manual stop".to_string()));
}
#[test]
fn cancel_with_no_message() {
let cx = test_cx();
cx.cancel_with(CancelKind::Timeout, None);
let reason = cx.cancel_reason().expect("should have reason");
assert_eq!(reason.kind, CancelKind::Timeout);
assert!(reason.message.is_none());
}
#[test]
fn cancel_reason_returns_none_when_not_cancelled() {
let cx = test_cx();
assert!(cx.cancel_reason().is_none());
}
#[test]
fn cancel_chain_empty_when_not_cancelled() {
let cx = test_cx();
assert!(cx.cancel_chain().next().is_none());
}
#[test]
fn cancel_chain_traverses_causes() {
let cx = test_cx();
let deadline = CancelReason::deadline();
let parent1 = CancelReason::parent_cancelled().with_cause(deadline);
let parent2 = CancelReason::parent_cancelled().with_cause(parent1);
cx.set_cancel_reason(parent2);
let chain: Vec<_> = cx.cancel_chain().collect();
assert_eq!(chain.len(), 3);
assert_eq!(chain[0].kind, CancelKind::ParentCancelled);
assert_eq!(chain[1].kind, CancelKind::ParentCancelled);
assert_eq!(chain[2].kind, CancelKind::Deadline);
}
#[test]
fn root_cancel_cause_returns_none_when_not_cancelled() {
let cx = test_cx();
assert!(cx.root_cancel_cause().is_none());
}
#[test]
fn root_cancel_cause_finds_root() {
let cx = test_cx();
let timeout = CancelReason::timeout();
let parent = CancelReason::parent_cancelled().with_cause(timeout);
cx.set_cancel_reason(parent);
let root = cx.root_cancel_cause().expect("should have root");
assert_eq!(root.kind, CancelKind::Timeout);
}
#[test]
fn root_cancel_cause_with_no_chain() {
let cx = test_cx();
cx.cancel_with(CancelKind::Shutdown, None);
let root = cx.root_cancel_cause().expect("should have root");
assert_eq!(root.kind, CancelKind::Shutdown);
}
#[test]
fn cancelled_by_checks_immediate_reason() {
let cx = test_cx();
let deadline = CancelReason::deadline();
let parent = CancelReason::parent_cancelled().with_cause(deadline);
cx.set_cancel_reason(parent);
assert!(cx.cancelled_by(CancelKind::ParentCancelled));
assert!(!cx.cancelled_by(CancelKind::Deadline));
}
#[test]
fn cancelled_by_returns_false_when_not_cancelled() {
let cx = test_cx();
assert!(!cx.cancelled_by(CancelKind::User));
}
#[test]
fn any_cause_is_searches_chain() {
let cx = test_cx();
let timeout = CancelReason::timeout();
let parent1 = CancelReason::parent_cancelled().with_cause(timeout);
let parent2 = CancelReason::parent_cancelled().with_cause(parent1);
cx.set_cancel_reason(parent2);
assert!(cx.any_cause_is(CancelKind::ParentCancelled));
assert!(cx.any_cause_is(CancelKind::Timeout));
assert!(!cx.any_cause_is(CancelKind::Deadline));
assert!(!cx.any_cause_is(CancelKind::Shutdown));
}
#[test]
fn any_cause_is_returns_false_when_not_cancelled() {
let cx = test_cx();
assert!(!cx.any_cause_is(CancelKind::Timeout));
}
#[test]
fn set_cancel_reason_sets_flag_and_reason() {
let cx = test_cx();
assert!(!cx.is_cancel_requested());
cx.set_cancel_reason(CancelReason::shutdown());
assert!(cx.is_cancel_requested());
assert_eq!(
cx.cancel_reason().expect("should have reason").kind,
CancelKind::Shutdown
);
}
#[test]
fn integration_realistic_usage() {
let cx = test_cx();
let timeout_reason = CancelReason::timeout().with_message("request timeout");
let child_reason = CancelReason::parent_cancelled().with_cause(timeout_reason);
cx.set_cancel_reason(child_reason);
assert!(cx.is_cancel_requested());
assert!(cx.cancelled_by(CancelKind::ParentCancelled));
if cx.any_cause_is(CancelKind::Timeout) {
let root = cx.root_cancel_cause().unwrap();
assert_eq!(root.kind, CancelKind::Timeout);
assert_eq!(root.message, Some("request timeout".to_string()));
}
let chain: Vec<_> = cx.cancel_chain().collect();
assert_eq!(chain.len(), 2);
assert_eq!(chain[0].kind, CancelKind::ParentCancelled);
assert_eq!(chain[1].kind, CancelKind::Timeout);
}
#[test]
fn cancel_fast_sets_flag_and_reason() {
let cx = test_cx();
assert!(!cx.is_cancel_requested());
assert!(cx.cancel_reason().is_none());
cx.cancel_fast(CancelKind::Shutdown);
assert!(cx.is_cancel_requested());
let reason = cx.cancel_reason().expect("should have reason");
assert_eq!(reason.kind, CancelKind::Shutdown);
}
#[test]
fn cancel_fast_no_cause_chain() {
let cx = test_cx();
cx.cancel_fast(CancelKind::Timeout);
let reason = cx.cancel_reason().expect("should have reason");
assert!(reason.cause.is_none());
assert!(reason.message.is_none());
assert!(!reason.truncated);
}
#[test]
fn cancel_fast_sets_region() {
let cx = test_cx();
cx.cancel_fast(CancelKind::User);
let reason = cx.cancel_reason().expect("should have reason");
let expected_region = RegionId::from_arena(ArenaIndex::new(0, 0));
assert_eq!(reason.origin_region, expected_region);
}
#[test]
fn cancel_fast_minimal_allocation() {
let cx = test_cx();
cx.cancel_fast(CancelKind::Deadline);
let reason = cx.cancel_reason().expect("should have reason");
assert_eq!(reason.kind, CancelKind::Deadline);
assert!(reason.message.is_none());
assert!(reason.cause.is_none());
assert!(!reason.truncated);
assert!(reason.truncated_at_depth.is_none());
let cost = reason.estimated_memory_cost();
assert!(
cost < 200,
"cancel_fast should have minimal memory cost, got {cost}"
);
}
#[test]
fn checkpoint_records_progress() {
let cx = test_cx();
let state = cx.checkpoint_state();
assert!(state.last_checkpoint.is_none());
assert!(state.last_message.is_none());
assert_eq!(state.checkpoint_count, 0);
assert!(cx.checkpoint().is_ok());
let state = cx.checkpoint_state();
assert!(state.last_checkpoint.is_some());
assert!(state.last_message.is_none());
assert_eq!(state.checkpoint_count, 1);
assert!(cx.checkpoint().is_ok());
let state = cx.checkpoint_state();
assert_eq!(state.checkpoint_count, 2);
}
#[test]
fn checkpoint_with_records_message() {
let cx = test_cx();
assert!(cx.checkpoint_with("processing step 1").is_ok());
let state = cx.checkpoint_state();
assert!(state.last_checkpoint.is_some());
assert_eq!(state.last_message.as_deref(), Some("processing step 1"));
assert_eq!(state.checkpoint_count, 1);
assert!(cx.checkpoint_with("processing step 2").is_ok());
let state = cx.checkpoint_state();
assert_eq!(state.last_message.as_deref(), Some("processing step 2"));
assert_eq!(state.checkpoint_count, 2);
}
#[test]
fn checkpoint_clears_message() {
let cx = test_cx();
assert!(cx.checkpoint_with("step 1").is_ok());
assert_eq!(
cx.checkpoint_state().last_message.as_deref(),
Some("step 1")
);
assert!(cx.checkpoint().is_ok());
assert!(cx.checkpoint_state().last_message.is_none());
}
#[test]
fn checkpoint_with_checks_cancel() {
let cx = test_cx();
cx.set_cancel_requested(true);
assert!(cx.checkpoint_with("should fail").is_err());
let state = cx.checkpoint_state();
assert_eq!(state.checkpoint_count, 1);
assert_eq!(state.last_message.as_deref(), Some("should fail"));
}
#[test]
fn checkpoint_deadline_exhaustion_sets_cancel_reason() {
let cx = Cx::for_testing_with_budget(Budget::new().with_deadline(Time::ZERO));
assert!(cx.checkpoint().is_err());
let reason = cx
.cancel_reason()
.expect("deadline exhaustion must set reason");
assert_eq!(reason.kind, CancelKind::Deadline);
assert!(cx.is_cancel_requested());
}
#[test]
fn checkpoint_poll_budget_exhaustion_sets_cancel_reason() {
let cx = Cx::for_testing_with_budget(Budget::new().with_poll_quota(0));
assert!(cx.checkpoint().is_err());
let reason = cx
.cancel_reason()
.expect("poll quota exhaustion must set reason");
assert_eq!(reason.kind, CancelKind::PollQuota);
assert!(cx.is_cancel_requested());
}
#[test]
fn checkpoint_cost_budget_exhaustion_sets_cancel_reason() {
let cx = Cx::for_testing_with_budget(Budget::new().with_cost_quota(0));
assert!(cx.checkpoint().is_err());
let reason = cx
.cancel_reason()
.expect("cost budget exhaustion must set reason");
assert_eq!(reason.kind, CancelKind::CostBudget);
assert!(cx.is_cancel_requested());
}
#[test]
fn masked_checkpoint_defers_budget_exhaustion() {
let cx = Cx::for_testing_with_budget(Budget::new().with_deadline(Time::ZERO));
cx.masked(|| {
assert!(
cx.checkpoint().is_ok(),
"budget exhaustion should defer while masked"
);
});
let reason = cx
.cancel_reason()
.expect("masked checkpoint should still record exhaustion reason");
assert_eq!(reason.kind, CancelKind::Deadline);
assert!(
cx.checkpoint().is_err(),
"deadline exhaustion should be observed after unmasking"
);
}
#[test]
fn checkpoint_budget_usage_reports_remaining_time_in_millis() {
let budget = Budget::new()
.with_deadline(Time::from_secs(10))
.with_poll_quota(3)
.with_cost_quota(7);
let baseline = Budget::new()
.with_deadline(Time::from_secs(20))
.with_poll_quota(5)
.with_cost_quota(11);
let (polls_used, cost_used, time_remaining_ms) =
Cx::<cap::All>::checkpoint_budget_usage(budget, baseline, Time::from_secs(7));
assert_eq!(polls_used, Some(2));
assert_eq!(cost_used, Some(4));
assert_eq!(time_remaining_ms, Some(3_000));
}
#[test]
fn set_cancel_requested_wakes_registered_cancel_waker() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Waker;
struct CountWaker(Arc<AtomicUsize>);
use std::task::Wake;
impl Wake for CountWaker {
fn wake(self: Arc<Self>) {
self.0.fetch_add(1, Ordering::SeqCst);
}
fn wake_by_ref(self: &Arc<Self>) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let cx = test_cx();
let wakes = Arc::new(AtomicUsize::new(0));
let waker = Waker::from(Arc::new(CountWaker(Arc::clone(&wakes))));
{
let mut inner = cx.inner.write();
inner.cancel_waker = Some(waker);
}
cx.set_cancel_requested(true);
assert_eq!(
wakes.load(Ordering::SeqCst),
1,
"set_cancel_requested(true) must wake the registered cancel waker"
);
cx.set_cancel_requested(false);
assert_eq!(
wakes.load(Ordering::SeqCst),
1,
"clearing cancellation must not spuriously wake the cancel waker"
);
}
#[test]
fn checkpoint_state_is_snapshot() {
let cx = test_cx();
let snapshot = cx.checkpoint_state();
assert_eq!(snapshot.checkpoint_count, 0);
assert!(cx.checkpoint().is_ok());
assert!(cx.checkpoint().is_ok());
assert_eq!(snapshot.checkpoint_count, 0);
assert_eq!(cx.checkpoint_state().checkpoint_count, 2);
}
#[test]
fn checkpoint_with_accepts_string_types() {
let cx = test_cx();
assert!(cx.checkpoint_with("literal").is_ok());
assert!(cx.checkpoint_with(String::from("owned")).is_ok());
assert!(cx.checkpoint_with(format!("item {}", 42)).is_ok());
assert_eq!(cx.checkpoint_state().checkpoint_count, 3);
}
fn test_root_key() -> crate::security::key::AuthKey {
crate::security::key::AuthKey::from_seed(42)
}
#[test]
fn cx_no_macaroon_by_default() {
let cx = test_cx();
assert!(cx.macaroon().is_none());
}
#[test]
fn cx_with_macaroon_attaches_token() {
let key = test_root_key();
let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
let cx = test_cx().with_macaroon(token);
let m = cx.macaroon().expect("should have macaroon");
assert_eq!(m.identifier(), "spawn:r1");
assert_eq!(m.location(), "cx/scheduler");
}
#[test]
fn cx_macaroon_survives_clone() {
let key = test_root_key();
let token = MacaroonToken::mint(&key, "io:net", "cx/io");
let cx = test_cx().with_macaroon(token);
let cx2 = cx.clone();
assert_eq!(
cx.macaroon().unwrap().identifier(),
cx2.macaroon().unwrap().identifier()
);
}
#[test]
fn cx_macaroon_survives_restrict() {
let key = test_root_key();
let token = MacaroonToken::mint(&key, "all:cap", "cx/root");
let cx: Cx<cap::All> = test_cx().with_macaroon(token);
let narrow: Cx<cap::None> = cx.restrict();
assert_eq!(
cx.macaroon().unwrap().identifier(),
narrow.macaroon().unwrap().identifier()
);
}
#[test]
fn cx_attenuate_adds_caveat() {
let key = test_root_key();
let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
let cx = test_cx().with_macaroon(token);
let cx2 = cx
.attenuate(CaveatPredicate::TimeBefore(5000))
.expect("attenuate should succeed");
assert_eq!(cx.macaroon().unwrap().caveat_count(), 0);
assert_eq!(cx2.macaroon().unwrap().caveat_count(), 1);
assert_eq!(
cx.macaroon().unwrap().identifier(),
cx2.macaroon().unwrap().identifier()
);
}
#[test]
fn cx_attenuate_returns_none_without_macaroon() {
let cx = test_cx();
assert!(cx.attenuate(CaveatPredicate::MaxUses(10)).is_none());
}
#[test]
fn cx_attenuate_from_budget_returns_none_without_macaroon() {
let cx = test_cx();
assert!(cx.attenuate_from_budget().is_none());
}
#[test]
fn cx_attenuate_from_budget_preserves_token_without_deadline() {
let key = test_root_key();
let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
let cx = test_cx().with_macaroon(token);
let attenuated = cx
.attenuate_from_budget()
.expect("macaroon should still be present");
assert_eq!(attenuated.macaroon().unwrap().caveat_count(), 0);
assert_eq!(
attenuated.macaroon().unwrap().identifier(),
cx.macaroon().unwrap().identifier()
);
}
#[test]
fn cx_attenuate_from_budget_adds_deadline_caveat() {
let key = test_root_key();
let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
let budget = Budget::new().with_deadline(Time::from_millis(5_000));
let cx = Cx::for_testing_with_budget(budget).with_macaroon(token);
let attenuated = cx
.attenuate_from_budget()
.expect("attenuation with deadline should succeed");
assert_eq!(attenuated.macaroon().unwrap().caveat_count(), 1);
}
#[test]
fn cx_verify_capability_succeeds() {
let key = test_root_key();
let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
let cx = test_cx().with_macaroon(token);
let ctx = VerificationContext::new().with_time(1000);
assert!(cx.verify_capability(&key, &ctx).is_ok());
}
#[test]
fn cx_verify_capability_fails_wrong_key() {
let key = test_root_key();
let wrong_key = crate::security::key::AuthKey::from_seed(99);
let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
let cx = test_cx().with_macaroon(token);
let ctx = VerificationContext::new();
let err = cx.verify_capability(&wrong_key, &ctx).unwrap_err();
assert!(matches!(err, VerificationError::InvalidSignature));
}
#[test]
fn cx_verify_capability_fails_no_macaroon() {
let key = test_root_key();
let cx = test_cx();
let ctx = VerificationContext::new();
let err = cx.verify_capability(&key, &ctx).unwrap_err();
assert!(matches!(err, VerificationError::InvalidSignature));
}
#[test]
fn cx_verify_with_caveats() {
let key = test_root_key();
let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler")
.add_caveat(CaveatPredicate::TimeBefore(5000))
.add_caveat(CaveatPredicate::RegionScope(42));
let cx = test_cx().with_macaroon(token);
let ctx = VerificationContext::new().with_time(1000).with_region(42);
assert!(cx.verify_capability(&key, &ctx).is_ok());
let ctx_expired = VerificationContext::new().with_time(6000).with_region(42);
let err = cx.verify_capability(&key, &ctx_expired).unwrap_err();
assert!(matches!(
err,
VerificationError::CaveatFailed { index: 0, .. }
));
let ctx_wrong_region = VerificationContext::new().with_time(1000).with_region(99);
let err = cx.verify_capability(&key, &ctx_wrong_region).unwrap_err();
assert!(matches!(
err,
VerificationError::CaveatFailed { index: 1, .. }
));
}
#[test]
fn cx_attenuate_then_verify() {
let key = test_root_key();
let token = MacaroonToken::mint(&key, "time:sleep", "cx/time");
let cx = test_cx().with_macaroon(token);
let cx2 = cx.attenuate(CaveatPredicate::TimeBefore(3000)).unwrap();
let cx3 = cx2.attenuate(CaveatPredicate::MaxUses(5)).unwrap();
let ctx = VerificationContext::new().with_time(1000);
assert!(cx.verify_capability(&key, &ctx).is_ok());
assert!(cx2.verify_capability(&key, &ctx).is_ok());
let ctx_late = VerificationContext::new().with_time(4000);
assert!(cx2.verify_capability(&key, &ctx_late).is_err());
let ctx_ok = VerificationContext::new().with_time(1000).with_use_count(3);
assert!(cx3.verify_capability(&key, &ctx_ok).is_ok());
let ctx_overuse = VerificationContext::new()
.with_time(1000)
.with_use_count(10);
assert!(cx3.verify_capability(&key, &ctx_overuse).is_err());
}
#[test]
fn cx_verify_emits_evidence() {
use crate::evidence_sink::CollectorSink;
let key = test_root_key();
let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
let sink = Arc::new(CollectorSink::new());
let cx = test_cx()
.with_macaroon(token)
.with_evidence_sink(Some(sink.clone() as Arc<dyn EvidenceSink>));
let ctx = VerificationContext::new();
cx.verify_capability(&key, &ctx).unwrap();
let entries = sink.entries();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].component, "cx_macaroon");
assert_eq!(entries[0].action, "verify_success");
let wrong_key = crate::security::key::AuthKey::from_seed(99);
let _ = cx.verify_capability(&wrong_key, &ctx);
let entries = sink.entries();
assert_eq!(entries.len(), 2);
assert_eq!(entries[1].action, "verify_fail_signature");
}
#[cfg(feature = "messaging-fabric")]
#[test]
fn cx_grant_publish_capability_mints_token_and_runtime_grant() {
let cx = test_cx();
let schema = capability_schema(
vec![SubjectFamily::Command],
vec![CapabilityPermission::Publish],
);
let granted = cx
.grant_publish_capability::<CommandFamily>(
SubjectPattern::new("orders.>"),
&schema,
DeliveryClass::EphemeralInteractive,
)
.expect("publish capability should mint");
assert_eq!(granted.token().family(), SubjectFamily::Command);
assert!(cx.check_fabric_capability(&FabricCapability::Publish {
subject: SubjectPattern::new("orders.created"),
}));
assert!(!cx.check_fabric_capability(&FabricCapability::Publish {
subject: SubjectPattern::new("payments.created"),
}));
assert_eq!(cx.fabric_capabilities().len(), 1);
}
#[cfg(feature = "messaging-fabric")]
#[test]
fn cx_revoke_fabric_capabilities_by_id_and_scope_propagates_to_children() {
let cx = test_cx();
let child = cx.restrict::<cap::None>();
let publish = cx
.grant_fabric_capability(FabricCapability::Publish {
subject: SubjectPattern::new("orders.>"),
})
.expect("publish grant");
let subscribe = cx
.grant_fabric_capability(FabricCapability::Subscribe {
subject: SubjectPattern::new("orders.created"),
})
.expect("subscribe grant");
assert!(child.check_fabric_capability(&FabricCapability::Publish {
subject: SubjectPattern::new("orders.created"),
}));
assert_eq!(
child.revoke_fabric_capability_scope(FabricCapabilityScope::Publish),
1
);
assert!(!cx.check_fabric_capability(&FabricCapability::Publish {
subject: SubjectPattern::new("orders.created"),
}));
assert_eq!(
cx.revoke_fabric_capability(subscribe.id()),
Some(FabricCapability::Subscribe {
subject: SubjectPattern::new("orders.created"),
})
);
assert!(
!child.check_fabric_capability(&FabricCapability::Subscribe {
subject: SubjectPattern::new("orders.created"),
})
);
assert_eq!(publish.id().raw(), 1);
}
#[cfg(feature = "messaging-fabric")]
#[test]
fn cx_revoke_fabric_capability_by_subject_is_overlap_based() {
let cx = test_cx();
cx.grant_fabric_capability(FabricCapability::Publish {
subject: SubjectPattern::new("orders.>"),
})
.expect("publish grant");
cx.grant_fabric_capability(FabricCapability::Subscribe {
subject: SubjectPattern::new("payments.>"),
})
.expect("subscribe grant");
assert_eq!(
cx.revoke_fabric_capability_by_subject(&SubjectPattern::new("orders.created")),
1
);
assert!(!cx.check_fabric_capability(&FabricCapability::Publish {
subject: SubjectPattern::new("orders.created"),
}));
assert!(cx.check_fabric_capability(&FabricCapability::Subscribe {
subject: SubjectPattern::new("payments.captured"),
}));
}
#[cfg(feature = "messaging-fabric")]
#[test]
fn cx_rejects_empty_stream_capability_names() {
let cx = test_cx();
let error = cx
.grant_fabric_capability(FabricCapability::ConsumeStream {
stream: " ".to_owned(),
})
.expect_err("blank stream names must fail");
assert_eq!(error, FabricCapabilityGrantError::EmptyStreamName);
}
#[test]
fn mr_trace_parent_child_ordering() {
let parent_cx = test_cx();
let trace = TraceBufferHandle::new(16);
parent_cx.set_trace_buffer(trace.clone());
parent_cx.trace("parent trace 1");
let child_cx = parent_cx.clone();
child_cx.trace("child trace 1");
child_cx.trace("child trace 2");
parent_cx.trace("parent trace 2");
let events = trace.snapshot();
assert_eq!(events.len(), 4);
let times: Vec<_> = events
.iter()
.map(|e| e.logical_time.as_ref().expect("logical time"))
.collect();
for i in 1..times.len() {
assert!(
times[i - 1] <= times[i],
"Logical time should be monotonically increasing: {:?} > {:?}",
times[i - 1],
times[i]
);
}
}
#[test]
fn mr_trace_deterministic_interleaving() {
let cx1 = test_cx_with_entropy(42);
let trace1 = TraceBufferHandle::new(16);
cx1.set_trace_buffer(trace1.clone());
for i in 0..5 {
if cx1.random_usize(2) == 0 {
cx1.trace(&format!("branch_a_{}", i));
} else {
cx1.trace(&format!("branch_b_{}", i));
}
}
let cx2 = test_cx_with_entropy(42);
let trace2 = TraceBufferHandle::new(16);
cx2.set_trace_buffer(trace2.clone());
for i in 0..5 {
if cx2.random_usize(2) == 0 {
cx2.trace(&format!("branch_a_{}", i));
} else {
cx2.trace(&format!("branch_b_{}", i));
}
}
let events1 = trace1.snapshot();
let events2 = trace2.snapshot();
assert_eq!(
events1.len(),
events2.len(),
"Trace count should be deterministic"
);
for (i, (e1, e2)) in events1.iter().zip(events2.iter()).enumerate() {
assert_eq!(
trace_message(e1),
trace_message(e2),
"Trace message at index {} should be deterministic: '{}' vs '{}'",
i,
trace_message(e1),
trace_message(e2)
);
}
}
#[test]
fn mr_trace_macaroon_causal_ordering() {
use crate::cx::macaroon::{CaveatPredicate, MacaroonToken};
use crate::security::key::AuthKey;
let key = AuthKey::from_seed(42);
let token = MacaroonToken::mint(&key, "trace:emit", "cx/trace");
let root_cx = test_cx().with_macaroon(token);
let trace = TraceBufferHandle::new(16);
root_cx.set_trace_buffer(trace.clone());
root_cx.trace("root macaroon trace");
let attenuated_cx = root_cx
.attenuate(CaveatPredicate::TimeBefore(5000))
.expect("attenuation should succeed");
attenuated_cx.trace("attenuated trace 1");
let further_attenuated_cx = attenuated_cx
.attenuate(CaveatPredicate::MaxUses(10))
.expect("further attenuation should succeed");
further_attenuated_cx.trace("further attenuated trace");
attenuated_cx.trace("attenuated trace 2");
let events = trace.snapshot();
assert_eq!(events.len(), 4);
let logical_times: Vec<_> = events
.iter()
.map(|e| e.logical_time.as_ref().expect("logical time"))
.collect();
for i in 1..logical_times.len() {
assert!(
logical_times[i - 1] <= logical_times[i],
"Macaroon attenuation should preserve causal ordering: tick {:?} > {:?}",
logical_times[i - 1],
logical_times[i]
);
}
}
#[test]
fn mr_trace_budget_exhaustion_idempotence() {
use crate::types::Budget;
let budget = Budget::new().with_poll_quota(1);
let cx = Cx::for_testing_with_budget(budget);
let trace = TraceBufferHandle::new(16);
cx.set_trace_buffer(trace.clone());
cx.trace("pre-exhaustion trace");
cx.trace("exhaustion trace 1");
cx.trace("exhaustion trace 2"); cx.trace("exhaustion trace 3");
let events = trace.snapshot();
assert_eq!(events.len(), 4, "All traces should be recorded");
let mut logical_times: Vec<_> = events
.iter()
.map(|e| format!("{:?}", e.logical_time.as_ref().expect("logical time")))
.collect();
logical_times.sort_unstable();
logical_times.dedup();
assert_eq!(
logical_times.len(),
4,
"Logical time allocation should be idempotent (no duplicate times)"
);
}
#[test]
fn mr_trace_clone_equivalence() {
let original_cx = test_cx_with_entropy(123);
let trace = TraceBufferHandle::new(16);
original_cx.set_trace_buffer(trace.clone());
let cloned_cx = original_cx.clone();
original_cx.trace("original trace 1");
cloned_cx.trace("cloned trace 1");
original_cx.trace("original trace 2");
cloned_cx.trace("cloned trace 2");
let events = trace.snapshot();
assert_eq!(events.len(), 4, "Both contexts should write to same buffer");
let logical_times: Vec<_> = events
.iter()
.map(|e| e.logical_time.as_ref().expect("logical time"))
.collect();
for i in 1..logical_times.len() {
assert!(
logical_times[i - 1] <= logical_times[i],
"Clone should preserve logical time ordering: {:?} > {:?}",
logical_times[i - 1],
logical_times[i]
);
}
let val1 = original_cx.random_usize(100);
let val2 = cloned_cx.random_usize(100);
assert_eq!(val1, val2, "Cloned context should share entropy state");
}
#[test]
fn mr_trace_composite_ordering() {
use crate::cx::macaroon::{CaveatPredicate, MacaroonToken};
use crate::security::key::AuthKey;
let key = AuthKey::from_seed(789);
let token = MacaroonToken::mint(&key, "trace:composite", "cx/test");
let root_cx = test_cx_with_entropy(456).with_macaroon(token);
let trace = TraceBufferHandle::new(32);
root_cx.set_trace_buffer(trace.clone());
root_cx.trace("parent+macaroon trace");
let child_cx = root_cx.clone();
let attenuated_child = child_cx
.attenuate(CaveatPredicate::TimeBefore(10000))
.expect("attenuation should work");
attenuated_child.trace("child+attenuated trace");
for i in 0..3 {
if root_cx.random_usize(2) == 0 {
root_cx.trace(&format!("parent_branch_{}", i));
} else {
attenuated_child.trace(&format!("child_branch_{}", i));
}
}
let events = trace.snapshot();
assert!(
events.len() >= 5,
"Composite test should produce multiple traces"
);
let logical_times: Vec<_> = events
.iter()
.map(|e| e.logical_time.as_ref().expect("logical time"))
.collect();
for i in 1..logical_times.len() {
assert!(
logical_times[i - 1] <= logical_times[i],
"Composite trace ordering should preserve monotonicity: {:?} > {:?}",
logical_times[i - 1],
logical_times[i]
);
}
assert!(
events.iter().all(|e| !trace_message(e).is_empty()),
"All traces should have non-empty messages"
);
let branch_traces = events
.iter()
.filter(|e| trace_message(e).contains("_branch_"))
.count();
assert_eq!(
branch_traces, 3,
"Deterministic branching should produce exactly 3 branch traces"
);
}
}