1use super::cap;
55use super::macaroon::{MacaroonToken, VerificationContext, VerificationError};
56use super::registry::RegistryHandle;
57use crate::combinator::select::SelectAll;
58use crate::evidence_sink::EvidenceSink;
59#[cfg(feature = "messaging-fabric")]
60use crate::messaging::capability::{
61 FabricCapability, FabricCapabilityGrant, FabricCapabilityGrantError, FabricCapabilityId,
62 FabricCapabilityRegistry, FabricCapabilityScope, GrantedFabricToken, PublishPermit,
63 SubjectFamilyTag, SubscribeToken,
64};
65#[cfg(feature = "messaging-fabric")]
66use crate::messaging::class::DeliveryClass;
67#[cfg(feature = "messaging-fabric")]
68use crate::messaging::ir::CapabilityTokenSchema;
69#[cfg(feature = "messaging-fabric")]
70use crate::messaging::subject::SubjectPattern;
71use crate::observability::{
72 DiagnosticContext, LogCollector, LogEntry, ObservabilityConfig, SpanId,
73};
74use crate::remote::RemoteCap;
75use crate::runtime::blocking_pool::BlockingPoolHandle;
76use crate::runtime::io_driver::IoDriverHandle;
77#[cfg(not(target_arch = "wasm32"))]
78use crate::runtime::io_driver::IoRegistration;
79#[cfg(not(target_arch = "wasm32"))]
80use crate::runtime::reactor::{Interest, Source};
81use crate::runtime::task_handle::JoinError;
82use crate::time::{TimerDriverHandle, timeout};
83use crate::trace::distributed::{LogicalClockHandle, LogicalTime};
84use crate::trace::{TraceBufferHandle, TraceEvent};
85use crate::tracing_compat::{debug, error, info, trace, warn};
86use crate::types::{
87 Budget, CancelKind, CancelReason, CxInner, RegionId, SystemPressure, TaskId, Time,
88};
89use crate::util::{EntropySource, OsEntropy};
90use std::cell::RefCell;
91use std::future::Future;
92use std::marker::PhantomData;
93use std::pin::Pin;
94use std::sync::Arc;
95use std::task::Waker;
96use std::time::Duration;
97
98type NamedFuture<T> = (&'static str, Pin<Box<dyn Future<Output = T> + Send>>);
99type NamedFutures<T> = Vec<NamedFuture<T>>;
100
101fn wall_clock_now() -> Time {
103 crate::time::wall_now()
104}
105
106fn noop_waker() -> Waker {
107 Waker::noop().clone()
108}
109
110#[derive(Debug, Clone)]
113struct CxHandles {
114 io_driver: Option<IoDriverHandle>,
115 io_cap: Option<Arc<dyn crate::io::IoCap>>,
116 timer_driver: Option<TimerDriverHandle>,
117 blocking_pool: Option<BlockingPoolHandle>,
118 entropy: Arc<dyn EntropySource>,
119 logical_clock: LogicalClockHandle,
120 remote_cap: Option<Arc<RemoteCap>>,
121 registry: Option<RegistryHandle>,
122 pressure: Option<Arc<SystemPressure>>,
123 evidence_sink: Option<Arc<dyn EvidenceSink>>,
124 macaroon: Option<Arc<MacaroonToken>>,
125 #[cfg(feature = "messaging-fabric")]
126 fabric_capabilities: Arc<FabricCapabilityRegistry>,
127}
128
129#[derive(Debug)]
150pub struct Cx<Caps = cap::All> {
151 pub(crate) inner: Arc<parking_lot::RwLock<CxInner>>,
152 observability: Arc<parking_lot::RwLock<ObservabilityState>>,
153 handles: Arc<CxHandles>,
154 _caps: PhantomData<fn() -> Caps>,
156}
157
158impl<Caps> Clone for Cx<Caps> {
161 #[inline]
162 fn clone(&self) -> Self {
163 Self {
164 inner: Arc::clone(&self.inner),
165 observability: Arc::clone(&self.observability),
166 handles: Arc::clone(&self.handles),
167 _caps: PhantomData,
168 }
169 }
170}
171
172#[derive(Debug, Clone)]
174pub struct ObservabilityState {
175 collector: Option<LogCollector>,
176 context: DiagnosticContext,
177 trace: Option<TraceBufferHandle>,
178 include_timestamps: bool,
179}
180
181impl ObservabilityState {
182 fn new(region: RegionId, task: TaskId) -> Self {
183 let context = DiagnosticContext::new()
184 .with_task_id(task)
185 .with_region_id(region)
186 .with_span_id(SpanId::new());
187 Self {
188 collector: None,
189 context,
190 trace: None,
191 include_timestamps: true,
192 }
193 }
194
195 pub(crate) fn new_with_config(
196 region: RegionId,
197 task: TaskId,
198 config: &ObservabilityConfig,
199 collector: Option<LogCollector>,
200 ) -> Self {
201 let context = config
202 .create_diagnostic_context()
203 .with_task_id(task)
204 .with_region_id(region)
205 .with_span_id(SpanId::new());
206 Self {
207 collector,
208 context,
209 trace: None,
210 include_timestamps: config.include_timestamps(),
211 }
212 }
213
214 fn derive_child(&self, region: RegionId, task: TaskId) -> Self {
215 let mut context = self.context.clone().fork();
216 context = context.with_task_id(task).with_region_id(region);
217 Self {
218 collector: self.collector.clone(),
219 context,
220 trace: self.trace.clone(),
221 include_timestamps: self.include_timestamps,
222 }
223 }
224}
225
226struct MaskGuard<'a> {
228 inner: &'a Arc<parking_lot::RwLock<CxInner>>,
229}
230
231impl Drop for MaskGuard<'_> {
232 fn drop(&mut self) {
235 let mut inner = self.inner.write();
236 inner.mask_depth = inner.mask_depth.saturating_sub(1);
237 }
238}
239
240type FullCx = Cx<cap::All>;
241
242thread_local! {
243 static CURRENT_CX: RefCell<Option<FullCx>> = const { RefCell::new(None) };
244}
245
246#[cfg_attr(feature = "test-internals", visibility::make(pub))]
248pub(crate) struct CurrentCxGuard {
249 prev: Option<FullCx>,
250 _not_send: std::marker::PhantomData<*mut ()>,
251}
252
253impl Drop for CurrentCxGuard {
254 fn drop(&mut self) {
255 let prev = self.prev.take();
256 let _ = CURRENT_CX.try_with(|slot| {
257 *slot.borrow_mut() = prev;
258 });
259 }
260}
261
262impl FullCx {
263 #[inline]
271 #[must_use]
272 pub fn current() -> Option<Self> {
273 CURRENT_CX
274 .try_with(|slot| slot.borrow().clone())
275 .unwrap_or(None)
276 }
277
278 #[inline]
280 #[must_use]
281 #[cfg_attr(feature = "test-internals", visibility::make(pub))]
282 pub(crate) fn set_current(cx: Option<Self>) -> CurrentCxGuard {
283 let prev = CURRENT_CX.with(|slot| {
284 let mut guard = slot.borrow_mut();
285 let prev = guard.take();
286 *guard = cx;
287 prev
288 });
289 CurrentCxGuard {
290 prev,
291 _not_send: std::marker::PhantomData,
292 }
293 }
294}
295
296impl<Caps> Cx<Caps> {
297 #[must_use]
299 #[allow(dead_code)]
300 #[cfg_attr(feature = "test-internals", visibility::make(pub))]
301 pub(crate) fn new(region: RegionId, task: TaskId, budget: Budget) -> Self {
302 Self::new_with_observability(region, task, budget, None, None, None)
303 }
304
305 #[allow(dead_code)] pub(crate) fn from_inner(inner: Arc<parking_lot::RwLock<CxInner>>) -> Self {
308 let (region, task) = {
309 let guard = inner.read();
310 (guard.region, guard.task)
311 };
312 Self {
313 inner,
314 observability: Arc::new(parking_lot::RwLock::new(ObservabilityState::new(
315 region, task,
316 ))),
317 handles: Arc::new(CxHandles {
318 io_driver: None,
319 io_cap: None,
320 timer_driver: None,
321 blocking_pool: None,
322 entropy: Arc::new(OsEntropy),
323 logical_clock: LogicalClockHandle::default(),
324 remote_cap: None,
325 registry: None,
326 pressure: None,
327 evidence_sink: None,
328 macaroon: None,
329 #[cfg(feature = "messaging-fabric")]
330 fabric_capabilities: Arc::new(FabricCapabilityRegistry::default()),
331 }),
332 _caps: PhantomData,
333 }
334 }
335
336 #[must_use]
338 #[cfg_attr(feature = "test-internals", visibility::make(pub))]
339 pub(crate) fn new_with_observability(
340 region: RegionId,
341 task: TaskId,
342 budget: Budget,
343 observability: Option<ObservabilityState>,
344 io_driver: Option<IoDriverHandle>,
345 entropy: Option<Arc<dyn EntropySource>>,
346 ) -> Self {
347 Self::new_with_io(
348 region,
349 task,
350 budget,
351 observability,
352 io_driver,
353 None,
354 entropy,
355 )
356 }
357
358 #[must_use]
360 #[cfg_attr(feature = "test-internals", visibility::make(pub))]
361 pub(crate) fn new_with_io(
362 region: RegionId,
363 task: TaskId,
364 budget: Budget,
365 observability: Option<ObservabilityState>,
366 io_driver: Option<IoDriverHandle>,
367 io_cap: Option<Arc<dyn crate::io::IoCap>>,
368 entropy: Option<Arc<dyn EntropySource>>,
369 ) -> Self {
370 Self::new_with_drivers(
371 region,
372 task,
373 budget,
374 observability,
375 io_driver,
376 io_cap,
377 None,
378 entropy,
379 )
380 }
381
382 #[must_use]
384 #[cfg_attr(feature = "test-internals", visibility::make(pub))]
385 #[allow(clippy::too_many_arguments)]
386 pub(crate) fn new_with_drivers(
387 region: RegionId,
388 task: TaskId,
389 budget: Budget,
390 observability: Option<ObservabilityState>,
391 io_driver: Option<IoDriverHandle>,
392 io_cap: Option<Arc<dyn crate::io::IoCap>>,
393 timer_driver: Option<TimerDriverHandle>,
394 entropy: Option<Arc<dyn EntropySource>>,
395 ) -> Self {
396 let inner = Arc::new(parking_lot::RwLock::new(CxInner::new(region, task, budget)));
397 let observability_state =
398 observability.unwrap_or_else(|| ObservabilityState::new(region, task));
399 let observability = Arc::new(parking_lot::RwLock::new(observability_state));
400 let entropy = entropy.unwrap_or_else(|| Arc::new(OsEntropy));
401
402 debug!(
403 task_id = ?task,
404 region_id = ?region,
405 budget_deadline = ?budget.deadline,
406 budget_poll_quota = budget.poll_quota,
407 budget_cost_quota = ?budget.cost_quota,
408 budget_priority = budget.priority,
409 budget_source = "cx_new",
410 "budget initialized for context"
411 );
412
413 Self {
414 inner,
415 observability,
416 handles: Arc::new(CxHandles {
417 io_driver,
418 io_cap,
419 timer_driver,
420 blocking_pool: None,
421 entropy,
422 logical_clock: LogicalClockHandle::default(),
423 remote_cap: None,
424 registry: None,
425 pressure: None,
426 evidence_sink: None,
427 macaroon: None,
428 #[cfg(feature = "messaging-fabric")]
429 fabric_capabilities: Arc::new(FabricCapabilityRegistry::default()),
430 }),
431 _caps: PhantomData,
432 }
433 }
434
435 #[inline]
437 #[must_use]
438 pub(crate) fn io_driver_handle(&self) -> Option<IoDriverHandle> {
439 self.handles.io_driver.clone()
440 }
441
442 #[inline]
444 #[must_use]
445 pub(crate) fn blocking_pool_handle(&self) -> Option<BlockingPoolHandle> {
446 self.handles.blocking_pool.clone()
447 }
448
449 #[must_use]
451 pub(crate) fn with_blocking_pool_handle(mut self, handle: Option<BlockingPoolHandle>) -> Self {
452 Arc::make_mut(&mut self.handles).blocking_pool = handle;
453 self
454 }
455
456 #[must_use]
458 pub(crate) fn with_logical_clock(mut self, clock: LogicalClockHandle) -> Self {
459 Arc::make_mut(&mut self.handles).logical_clock = clock;
460 self
461 }
462
463 #[must_use]
468 pub fn restrict<NewCaps>(&self) -> Cx<NewCaps>
469 where
470 NewCaps: cap::SubsetOf<Caps>,
471 {
472 self.retype()
473 }
474
475 #[inline]
477 #[must_use]
478 pub(crate) fn retype<NewCaps>(&self) -> Cx<NewCaps> {
479 Cx {
480 inner: self.inner.clone(),
481 observability: self.observability.clone(),
482 handles: self.handles.clone(),
483 _caps: PhantomData,
484 }
485 }
486
487 #[must_use]
492 pub(crate) fn with_registry_handle(mut self, registry: Option<RegistryHandle>) -> Self {
493 Arc::make_mut(&mut self.handles).registry = registry;
494 self
495 }
496
497 #[must_use]
501 pub fn with_remote_cap(mut self, cap: RemoteCap) -> Self {
502 Arc::make_mut(&mut self.handles).remote_cap = Some(Arc::new(cap));
503 self
504 }
505
506 #[must_use]
512 pub fn with_pressure(mut self, pressure: Arc<SystemPressure>) -> Self {
513 Arc::make_mut(&mut self.handles).pressure = Some(pressure);
514 self
515 }
516
517 #[must_use]
521 #[inline]
522 pub fn pressure(&self) -> Option<&SystemPressure> {
523 self.handles.pressure.as_deref()
524 }
525
526 #[allow(dead_code)]
532 #[must_use]
533 pub(crate) fn pressure_handle(&self) -> Option<Arc<SystemPressure>> {
534 self.handles.pressure.clone()
535 }
536
537 #[inline]
542 #[must_use]
543 pub(crate) fn remote_cap_handle(&self) -> Option<Arc<RemoteCap>> {
544 self.handles.remote_cap.clone()
545 }
546
547 #[must_use]
552 pub(crate) fn with_remote_cap_handle(mut self, cap: Option<Arc<RemoteCap>>) -> Self {
553 Arc::make_mut(&mut self.handles).remote_cap = cap;
554 self
555 }
556
557 #[inline]
559 #[must_use]
560 pub fn registry_handle(&self) -> Option<RegistryHandle> {
561 self.handles.registry.clone()
562 }
563
564 #[inline]
566 #[must_use]
567 pub fn has_registry(&self) -> bool {
568 self.handles.registry.is_some()
569 }
570
571 #[cfg(feature = "messaging-fabric")]
573 pub fn grant_fabric_capability(
574 &self,
575 capability: FabricCapability,
576 ) -> Result<FabricCapabilityGrant, FabricCapabilityGrantError> {
577 self.handles.fabric_capabilities.grant(capability)
578 }
579
580 #[cfg(feature = "messaging-fabric")]
582 #[must_use]
583 pub fn fabric_capabilities(&self) -> Vec<FabricCapabilityGrant> {
584 self.handles.fabric_capabilities.snapshot()
585 }
586
587 #[cfg(feature = "messaging-fabric")]
589 pub fn grant_publish_capability<S: SubjectFamilyTag>(
590 &self,
591 subject: SubjectPattern,
592 schema: &CapabilityTokenSchema,
593 delivery_class: DeliveryClass,
594 ) -> Result<GrantedFabricToken<PublishPermit<S>>, FabricCapabilityGrantError> {
595 let token = PublishPermit::<S>::authorize(schema, delivery_class)?;
596 let grant = self.grant_fabric_capability(FabricCapability::Publish { subject })?;
597 Ok(GrantedFabricToken::new(grant, token))
598 }
599
600 #[cfg(feature = "messaging-fabric")]
602 pub fn grant_subscribe_capability<S: SubjectFamilyTag>(
603 &self,
604 subject: SubjectPattern,
605 schema: &CapabilityTokenSchema,
606 delivery_class: DeliveryClass,
607 ) -> Result<GrantedFabricToken<SubscribeToken<S>>, FabricCapabilityGrantError> {
608 let token = SubscribeToken::<S>::authorize(schema, delivery_class)?;
609 let grant = self.grant_fabric_capability(FabricCapability::Subscribe { subject })?;
610 Ok(GrantedFabricToken::new(grant, token))
611 }
612
613 #[cfg(feature = "messaging-fabric")]
615 #[must_use]
616 pub fn check_fabric_capability(&self, capability: &FabricCapability) -> bool {
617 self.handles.fabric_capabilities.check(capability)
618 }
619
620 #[cfg(feature = "messaging-fabric")]
622 #[must_use]
623 pub fn revoke_fabric_capability(&self, id: FabricCapabilityId) -> Option<FabricCapability> {
624 self.handles.fabric_capabilities.revoke_by_id(id)
625 }
626
627 #[cfg(feature = "messaging-fabric")]
629 #[must_use]
630 pub fn revoke_fabric_capability_by_subject(&self, subject: &SubjectPattern) -> usize {
631 self.handles.fabric_capabilities.revoke_by_subject(subject)
632 }
633
634 #[cfg(feature = "messaging-fabric")]
636 #[must_use]
637 pub fn revoke_fabric_capability_scope(&self, scope: FabricCapabilityScope) -> usize {
638 self.handles.fabric_capabilities.revoke_scope(scope)
639 }
640
641 #[must_use]
643 pub fn with_evidence_sink(mut self, sink: Option<Arc<dyn EvidenceSink>>) -> Self {
644 Arc::make_mut(&mut self.handles).evidence_sink = sink;
645 self
646 }
647
648 #[inline]
650 #[must_use]
651 pub(crate) fn evidence_sink_handle(&self) -> Option<Arc<dyn EvidenceSink>> {
652 self.handles.evidence_sink.clone()
653 }
654
655 pub fn emit_evidence(&self, entry: &franken_evidence::EvidenceLedger) {
660 if let Some(ref sink) = self.handles.evidence_sink {
661 sink.emit(entry);
662 }
663 }
664
665 #[must_use]
675 pub fn with_macaroon(mut self, token: MacaroonToken) -> Self {
676 Arc::make_mut(&mut self.handles).macaroon = Some(Arc::new(token));
677 self
678 }
679
680 #[must_use]
682 #[allow(dead_code)] pub(crate) fn with_macaroon_handle(mut self, handle: Option<Arc<MacaroonToken>>) -> Self {
684 Arc::make_mut(&mut self.handles).macaroon = handle;
685 self
686 }
687
688 #[inline]
690 #[must_use]
691 pub fn macaroon(&self) -> Option<&MacaroonToken> {
692 self.handles.macaroon.as_deref()
693 }
694
695 #[inline]
697 #[must_use]
698 #[allow(dead_code)] pub(crate) fn macaroon_handle(&self) -> Option<Arc<MacaroonToken>> {
700 self.handles.macaroon.clone()
701 }
702
703 #[must_use]
711 pub fn attenuate(&self, predicate: super::macaroon::CaveatPredicate) -> Option<Self> {
712 let token = self.handles.macaroon.as_ref()?;
713 let attenuated = MacaroonToken::clone(token).add_caveat(predicate);
714
715 info!(
716 token_id = %attenuated.identifier(),
717 caveat_count = attenuated.caveat_count(),
718 "capability attenuated"
719 );
720
721 let mut cx = self.clone();
722 Arc::make_mut(&mut cx.handles).macaroon = Some(Arc::new(attenuated));
723 Some(cx)
724 }
725
726 #[must_use]
733 pub fn attenuate_time_limit(&self, deadline_ms: u64) -> Option<Self> {
734 self.attenuate(super::macaroon::CaveatPredicate::TimeBefore(deadline_ms))
735 }
736
737 #[must_use]
744 pub fn attenuate_scope(&self, pattern: impl Into<String>) -> Option<Self> {
745 self.attenuate(super::macaroon::CaveatPredicate::ResourceScope(
746 pattern.into(),
747 ))
748 }
749
750 #[must_use]
758 pub fn attenuate_rate_limit(&self, max_count: u32, window_secs: u32) -> Option<Self> {
759 self.attenuate(super::macaroon::CaveatPredicate::RateLimit {
760 max_count,
761 window_secs,
762 })
763 }
764
765 #[must_use]
772 pub fn attenuate_from_budget(&self) -> Option<Self> {
773 let _ = self.handles.macaroon.as_ref()?;
774 let budget = self.budget();
775 budget.deadline.map_or_else(
776 || Some(self.clone()),
777 |d| self.attenuate_time_limit(d.as_millis()),
778 )
779 }
780
781 pub fn verify_capability(
794 &self,
795 root_key: &crate::security::key::AuthKey,
796 context: &VerificationContext,
797 ) -> Result<(), VerificationError> {
798 let Some(token) = self.handles.macaroon.as_ref() else {
799 warn!(
801 task_id = ?self.task_id(),
802 region_id = ?self.region_id(),
803 "capability verification failed: no macaroon attached"
804 );
805 return Err(VerificationError::InvalidSignature);
806 };
807
808 let result = token.verify(root_key, context);
809
810 self.emit_macaroon_evidence(token, &result);
812
813 match &result {
814 Ok(()) => {
815 info!(
816 token_id = %token.identifier(),
817 caveats_checked = token.caveat_count(),
818 "macaroon verified successfully"
819 );
820 }
821 Err(VerificationError::InvalidSignature) => {
822 error!(
823 token_id = %token.identifier(),
824 "HMAC chain integrity violation — possible tampering"
825 );
826 }
827 #[allow(unused_variables)]
828 Err(VerificationError::CaveatFailed {
829 index,
830 predicate,
831 reason,
832 }) => {
833 info!(
834 token_id = %token.identifier(),
835 failed_at_caveat = index,
836 predicate = %predicate,
837 reason = %reason,
838 "macaroon verification failed"
839 );
840 }
841 #[allow(unused_variables)]
842 Err(VerificationError::MissingDischarge { index, identifier }) => {
843 info!(
844 token_id = %token.identifier(),
845 failed_at_caveat = index,
846 discharge_id = %identifier,
847 "missing discharge macaroon"
848 );
849 }
850 #[allow(unused_variables)]
851 Err(VerificationError::DischargeInvalid { index, identifier }) => {
852 info!(
853 token_id = %token.identifier(),
854 failed_at_caveat = index,
855 discharge_id = %identifier,
856 "discharge macaroon verification failed"
857 );
858 }
859 #[allow(unused_variables)]
860 Err(VerificationError::DischargeChainTooDeep { depth }) => {
861 info!(
862 token_id = %token.identifier(),
863 depth = %depth,
864 "discharge macaroon chain too deep"
865 );
866 }
867 }
868
869 result
870 }
871
872 fn emit_macaroon_evidence(
874 &self,
875 token: &MacaroonToken,
876 result: &Result<(), VerificationError>,
877 ) {
878 let Some(ref sink) = self.handles.evidence_sink else {
879 return;
880 };
881
882 let now_ms = wall_clock_now().as_millis();
883
884 let (action, loss) = match result {
885 Ok(()) => ("verify_success".to_string(), 0.0),
886 Err(VerificationError::InvalidSignature) => ("verify_fail_signature".to_string(), 1.0),
887 Err(VerificationError::CaveatFailed { index, .. }) => {
888 (format!("verify_fail_caveat_{index}"), 0.5)
889 }
890 Err(VerificationError::MissingDischarge { index, .. }) => {
891 (format!("verify_fail_missing_discharge_{index}"), 0.8)
892 }
893 Err(VerificationError::DischargeInvalid { index, .. }) => {
894 (format!("verify_fail_discharge_invalid_{index}"), 0.9)
895 }
896 Err(VerificationError::DischargeChainTooDeep { depth }) => {
897 (format!("verify_fail_discharge_chain_too_deep_{depth}"), 1.0)
898 }
899 };
900
901 let entry = franken_evidence::EvidenceLedger {
902 ts_unix_ms: now_ms,
903 component: "cx_macaroon".to_string(),
904 action: action.clone(),
905 posterior: vec![1.0],
906 expected_loss_by_action: std::collections::BTreeMap::from([(action, loss)]),
907 chosen_expected_loss: loss,
908 calibration_score: 1.0,
909 fallback_active: false,
910 #[allow(clippy::cast_precision_loss)]
911 top_features: vec![("caveat_count".to_string(), token.caveat_count() as f64)],
912 };
913 sink.emit(&entry);
914 }
915
916 #[inline]
918 #[must_use]
919 pub fn logical_now(&self) -> LogicalTime {
920 self.handles.logical_clock.now()
921 }
922
923 #[inline]
925 #[must_use]
926 pub(crate) fn logical_clock_handle(&self) -> LogicalClockHandle {
927 self.handles.logical_clock.clone()
928 }
929
930 #[inline]
932 #[must_use]
933 pub fn logical_tick(&self) -> LogicalTime {
934 self.handles.logical_clock.tick()
935 }
936
937 #[inline]
939 #[must_use]
940 pub fn logical_receive(&self, sender_time: &LogicalTime) -> LogicalTime {
941 self.handles.logical_clock.receive(sender_time)
942 }
943
944 #[inline]
959 #[must_use]
960 pub fn timer_driver(&self) -> Option<TimerDriverHandle>
961 where
962 Caps: cap::HasTime,
963 {
964 self.handles.timer_driver.clone()
965 }
966
967 #[inline]
972 #[must_use]
973 pub fn has_timer(&self) -> bool
974 where
975 Caps: cap::HasTime,
976 {
977 self.handles.timer_driver.is_some()
978 }
979
980 #[inline]
1005 #[must_use]
1006 pub fn io(&self) -> Option<&dyn crate::io::IoCap>
1007 where
1008 Caps: cap::HasIo,
1009 {
1010 self.handles.io_cap.as_ref().map(AsRef::as_ref)
1011 }
1012
1013 #[inline]
1020 #[allow(dead_code)]
1021 #[must_use]
1022 pub(crate) fn io_cap_handle(&self) -> Option<Arc<dyn crate::io::IoCap>> {
1023 self.handles.io_cap.clone()
1024 }
1025
1026 #[inline]
1030 #[must_use]
1031 pub fn has_io(&self) -> bool
1032 where
1033 Caps: cap::HasIo,
1034 {
1035 self.handles.io_cap.is_some()
1036 }
1037
1038 #[inline]
1044 #[must_use]
1045 pub fn fetch_cap(&self) -> Option<&dyn crate::io::FetchIoCap>
1046 where
1047 Caps: cap::HasIo,
1048 {
1049 self.handles.io_cap.as_ref().and_then(|cap| cap.fetch_cap())
1050 }
1051
1052 #[inline]
1054 #[must_use]
1055 pub fn has_fetch_cap(&self) -> bool
1056 where
1057 Caps: cap::HasIo,
1058 {
1059 self.fetch_cap().is_some()
1060 }
1061
1062 #[inline]
1075 #[must_use]
1076 pub fn remote(&self) -> Option<&RemoteCap>
1077 where
1078 Caps: cap::HasRemote,
1079 {
1080 self.handles.remote_cap.as_ref().map(AsRef::as_ref)
1081 }
1082
1083 #[inline]
1087 #[must_use]
1088 pub fn has_remote(&self) -> bool
1089 where
1090 Caps: cap::HasRemote,
1091 {
1092 self.handles.remote_cap.is_some()
1093 }
1094
1095 #[cfg(unix)]
1117 pub fn register_io<S: Source>(
1118 &self,
1119 source: &S,
1120 interest: Interest,
1121 ) -> std::io::Result<IoRegistration>
1122 where
1123 Caps: cap::HasIo,
1124 {
1125 let Some(driver) = self.io_driver_handle() else {
1126 return Err(std::io::Error::new(
1127 std::io::ErrorKind::NotConnected,
1128 "I/O driver not available",
1129 ));
1130 };
1131 driver.register(source, interest, noop_waker())
1132 }
1133
1134 #[inline]
1147 #[must_use]
1148 pub fn region_id(&self) -> RegionId {
1149 self.inner.read().region
1150 }
1151
1152 #[inline]
1165 #[must_use]
1166 pub fn task_id(&self) -> TaskId {
1167 self.inner.read().task
1168 }
1169
1170 #[inline]
1175 #[must_use]
1176 pub fn task_type(&self) -> Option<String> {
1177 self.inner.read().task_type.clone()
1178 }
1179
1180 pub fn set_task_type(&self, task_type: impl Into<String>) {
1185 let mut inner = self.inner.write();
1186 inner.task_type = Some(task_type.into());
1187 }
1188
1189 #[inline]
1211 #[must_use]
1212 pub fn budget(&self) -> Budget {
1213 self.inner.read().budget
1214 }
1215
1216 #[inline]
1240 #[must_use]
1241 pub fn is_cancel_requested(&self) -> bool {
1242 self.inner.read().cancel_requested
1243 }
1244
1245 #[allow(clippy::result_large_err)]
1286 pub fn checkpoint(&self) -> Result<(), crate::error::Error> {
1287 let checkpoint_time = self.current_checkpoint_time();
1288 let (
1290 cancel_requested,
1291 mask_depth,
1292 task,
1293 region,
1294 budget,
1295 budget_baseline,
1296 cancel_reason,
1297 budget_exhaustion,
1298 ) = {
1299 let mut inner = self.inner.write();
1300 inner.checkpoint_state.record_at(checkpoint_time);
1301 let budget_exhaustion = Self::checkpoint_budget_exhaustion(
1302 inner.region,
1303 inner.task,
1304 inner.budget,
1305 checkpoint_time,
1306 );
1307 if let Some((reason, _, _)) = &budget_exhaustion {
1308 inner.cancel_requested = true;
1309 inner
1310 .fast_cancel
1311 .store(true, std::sync::atomic::Ordering::Release);
1312 if let Some(existing) = &mut inner.cancel_reason {
1313 existing.strengthen(reason);
1314 } else {
1315 inner.cancel_reason = Some(reason.clone());
1316 }
1317 }
1318 if inner.cancel_requested && inner.mask_depth == 0 {
1319 inner.cancel_acknowledged = true;
1320 }
1321 (
1322 inner.cancel_requested,
1323 inner.mask_depth,
1324 inner.task,
1325 inner.region,
1326 inner.budget,
1327 inner.budget_baseline,
1328 inner.cancel_reason.clone(),
1329 budget_exhaustion.map(|(_, exhaustion_kind, deadline_remaining_ms)| {
1330 (exhaustion_kind, deadline_remaining_ms)
1331 }),
1332 )
1333 };
1334
1335 if let Some((exhaustion_kind, deadline_remaining_ms)) = budget_exhaustion {
1336 if let Some(ref sink) = self.handles.evidence_sink {
1337 crate::evidence_sink::emit_budget_evidence(
1338 sink.as_ref(),
1339 exhaustion_kind,
1340 budget.poll_quota,
1341 deadline_remaining_ms,
1342 );
1343 }
1344 }
1345
1346 if cancel_requested && mask_depth == 0 {
1348 if let Some(ref sink) = self.handles.evidence_sink {
1349 let kind_str = cancel_reason
1350 .as_ref()
1351 .map_or_else(|| "unknown".to_string(), |r| format!("{}", r.kind));
1352 crate::evidence_sink::emit_cancel_evidence(
1353 sink.as_ref(),
1354 &kind_str,
1355 budget.poll_quota,
1356 budget.priority,
1357 );
1358 }
1359 }
1360
1361 Self::check_cancel_from_values(
1362 cancel_requested,
1363 mask_depth,
1364 task,
1365 region,
1366 budget,
1367 budget_baseline,
1368 checkpoint_time,
1369 cancel_reason.as_ref(),
1370 )
1371 }
1372
1373 #[allow(clippy::result_large_err)]
1397 pub fn checkpoint_with(&self, msg: impl Into<String>) -> Result<(), crate::error::Error> {
1398 let checkpoint_time = self.current_checkpoint_time();
1399 let (
1401 cancel_requested,
1402 mask_depth,
1403 task,
1404 region,
1405 budget,
1406 budget_baseline,
1407 cancel_reason,
1408 budget_exhaustion,
1409 ) = {
1410 let mut inner = self.inner.write();
1411 inner
1412 .checkpoint_state
1413 .record_with_message_at(msg.into(), checkpoint_time);
1414 let budget_exhaustion = Self::checkpoint_budget_exhaustion(
1415 inner.region,
1416 inner.task,
1417 inner.budget,
1418 checkpoint_time,
1419 );
1420 if let Some((reason, _, _)) = &budget_exhaustion {
1421 inner.cancel_requested = true;
1422 inner
1423 .fast_cancel
1424 .store(true, std::sync::atomic::Ordering::Release);
1425 if let Some(existing) = &mut inner.cancel_reason {
1426 existing.strengthen(reason);
1427 } else {
1428 inner.cancel_reason = Some(reason.clone());
1429 }
1430 }
1431 if inner.cancel_requested && inner.mask_depth == 0 {
1432 inner.cancel_acknowledged = true;
1433 }
1434 (
1435 inner.cancel_requested,
1436 inner.mask_depth,
1437 inner.task,
1438 inner.region,
1439 inner.budget,
1440 inner.budget_baseline,
1441 inner.cancel_reason.clone(),
1442 budget_exhaustion.map(|(_, exhaustion_kind, deadline_remaining_ms)| {
1443 (exhaustion_kind, deadline_remaining_ms)
1444 }),
1445 )
1446 };
1447
1448 if let Some((exhaustion_kind, deadline_remaining_ms)) = budget_exhaustion {
1449 if let Some(ref sink) = self.handles.evidence_sink {
1450 crate::evidence_sink::emit_budget_evidence(
1451 sink.as_ref(),
1452 exhaustion_kind,
1453 budget.poll_quota,
1454 deadline_remaining_ms,
1455 );
1456 }
1457 }
1458
1459 if cancel_requested && mask_depth == 0 {
1461 if let Some(ref sink) = self.handles.evidence_sink {
1462 let kind_str = cancel_reason
1463 .as_ref()
1464 .map_or_else(|| "unknown".to_string(), |r| format!("{}", r.kind));
1465 crate::evidence_sink::emit_cancel_evidence(
1466 sink.as_ref(),
1467 &kind_str,
1468 budget.poll_quota,
1469 budget.priority,
1470 );
1471 }
1472 }
1473
1474 Self::check_cancel_from_values(
1475 cancel_requested,
1476 mask_depth,
1477 task,
1478 region,
1479 budget,
1480 budget_baseline,
1481 checkpoint_time,
1482 cancel_reason.as_ref(),
1483 )
1484 }
1485
1486 #[must_use]
1504 pub fn checkpoint_state(&self) -> crate::types::CheckpointState {
1505 self.inner.read().checkpoint_state.clone()
1506 }
1507
1508 #[must_use]
1511 pub fn now(&self) -> Time
1512 where
1513 Caps: cap::HasTime,
1514 {
1515 self.handles
1516 .timer_driver
1517 .as_ref()
1518 .map_or_else(wall_clock_now, TimerDriverHandle::now)
1519 }
1520
1521 #[inline]
1523 fn current_checkpoint_time(&self) -> Time {
1524 self.handles
1525 .timer_driver
1526 .as_ref()
1527 .map_or_else(wall_clock_now, TimerDriverHandle::now)
1528 }
1529
1530 #[inline]
1531 fn checkpoint_budget_exhaustion(
1532 region: RegionId,
1533 task: TaskId,
1534 budget: Budget,
1535 now: Time,
1536 ) -> Option<(CancelReason, &'static str, Option<u64>)> {
1537 let deadline_remaining_ms = budget
1538 .remaining_time(now)
1539 .map(Self::duration_millis_saturating);
1540
1541 let mut exhaustion = if budget.is_past_deadline(now) {
1542 Some((
1543 CancelReason::with_origin(CancelKind::Deadline, region, now).with_task(task),
1544 "time",
1545 deadline_remaining_ms,
1546 ))
1547 } else {
1548 None
1549 };
1550
1551 if budget.poll_quota == 0 {
1552 let candidate =
1553 CancelReason::with_origin(CancelKind::PollQuota, region, now).with_task(task);
1554 match &mut exhaustion {
1555 Some((existing, kind, _)) => {
1556 if existing.strengthen(&candidate) {
1557 *kind = "poll";
1558 }
1559 }
1560 None => exhaustion = Some((candidate, "poll", deadline_remaining_ms)),
1561 }
1562 }
1563
1564 if matches!(budget.cost_quota, Some(0)) {
1565 let candidate =
1566 CancelReason::with_origin(CancelKind::CostBudget, region, now).with_task(task);
1567 match &mut exhaustion {
1568 Some((existing, kind, _)) => {
1569 if existing.strengthen(&candidate) {
1570 *kind = "cost";
1571 }
1572 }
1573 None => exhaustion = Some((candidate, "cost", deadline_remaining_ms)),
1574 }
1575 }
1576
1577 exhaustion
1578 }
1579
1580 #[inline]
1581 fn checkpoint_budget_usage(
1582 budget: Budget,
1583 budget_baseline: Budget,
1584 now: Time,
1585 ) -> (Option<u32>, Option<u64>, Option<u64>) {
1586 let polls_used = if budget_baseline.poll_quota == u32::MAX {
1587 None
1588 } else {
1589 Some(budget_baseline.poll_quota.saturating_sub(budget.poll_quota))
1590 };
1591 let cost_used = match (budget_baseline.cost_quota, budget.cost_quota) {
1592 (Some(baseline), Some(remaining)) => Some(baseline.saturating_sub(remaining)),
1593 _ => None,
1594 };
1595 let time_remaining_ms = budget
1596 .remaining_time(now)
1597 .map(Self::duration_millis_saturating);
1598 (polls_used, cost_used, time_remaining_ms)
1599 }
1600
1601 #[inline]
1602 fn duration_millis_saturating(duration: Duration) -> u64 {
1603 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
1604 }
1605
1606 #[allow(clippy::result_large_err)]
1608 #[allow(clippy::too_many_arguments)]
1609 fn check_cancel_from_values(
1610 cancel_requested: bool,
1611 mask_depth: u32,
1612 task: TaskId,
1613 region: RegionId,
1614 budget: Budget,
1615 budget_baseline: Budget,
1616 checkpoint_time: Time,
1617 cancel_reason: Option<&CancelReason>,
1618 ) -> Result<(), crate::error::Error> {
1619 let (polls_used, cost_used, time_remaining_ms) =
1620 Self::checkpoint_budget_usage(budget, budget_baseline, checkpoint_time);
1621
1622 let _ = (
1623 &task,
1624 ®ion,
1625 &budget,
1626 &budget_baseline,
1627 &polls_used,
1628 &cost_used,
1629 &time_remaining_ms,
1630 );
1631
1632 trace!(
1633 task_id = ?task,
1634 region_id = ?region,
1635 polls_used = ?polls_used,
1636 polls_remaining = budget.poll_quota,
1637 time_remaining_ms = ?time_remaining_ms,
1638 cost_used = ?cost_used,
1639 cost_remaining = ?budget.cost_quota,
1640 deadline = ?budget.deadline,
1641 cancel_reason = ?cancel_reason,
1642 cancel_requested,
1643 mask_depth,
1644 "checkpoint"
1645 );
1646
1647 if cancel_requested {
1648 if mask_depth == 0 {
1649 let cancel_reason_ref = cancel_reason.as_ref();
1650 let exhausted_resource = cancel_reason_ref
1651 .map_or_else(|| "unknown".to_string(), |r| format!("{:?}", r.kind));
1652 let _ = &exhausted_resource;
1653
1654 info!(
1655 task_id = ?task,
1656 region_id = ?region,
1657 exhausted_resource = %exhausted_resource,
1658 cancel_reason = ?cancel_reason,
1659 budget_deadline = ?budget.deadline,
1660 budget_poll_quota = budget.poll_quota,
1661 budget_cost_quota = ?budget.cost_quota,
1662 "cancel observed at checkpoint - task cancelled"
1663 );
1664
1665 trace!(
1666 task_id = ?task,
1667 region_id = ?region,
1668 cancel_reason = ?cancel_reason,
1669 cancel_kind = ?cancel_reason.as_ref().map(|r| r.kind),
1670 mask_depth,
1671 budget_deadline = ?budget.deadline,
1672 budget_poll_quota = budget.poll_quota,
1673 budget_cost_quota = ?budget.cost_quota,
1674 budget_priority = budget.priority,
1675 "cancel observed at checkpoint"
1676 );
1677 Err(crate::error::Error::new(crate::error::ErrorKind::Cancelled))
1678 } else {
1679 trace!(
1680 task_id = ?task,
1681 region_id = ?region,
1682 cancel_reason = ?cancel_reason,
1683 cancel_kind = ?cancel_reason.as_ref().map(|r| r.kind),
1684 mask_depth,
1685 "cancel observed but masked"
1686 );
1687 Ok(())
1688 }
1689 } else {
1690 Ok(())
1691 }
1692 }
1693
1694 pub fn masked<F, R>(&self, f: F) -> R
1731 where
1732 F: FnOnce() -> R,
1733 {
1734 {
1735 let mut inner = self.inner.write();
1736 assert!(
1737 inner.mask_depth < crate::types::task_context::MAX_MASK_DEPTH,
1738 "mask depth exceeded MAX_MASK_DEPTH ({}): this violates INV-MASK-BOUNDED \
1739 and prevents cancellation from ever being observed. \
1740 Reduce nesting of Cx::masked() sections.",
1741 crate::types::task_context::MAX_MASK_DEPTH,
1742 );
1743 inner.mask_depth += 1;
1744 }
1745
1746 let _guard = MaskGuard { inner: &self.inner };
1747 f()
1748 }
1749
1750 pub fn trace(&self, message: &str) {
1776 self.log(LogEntry::trace(message));
1777 let Some(trace) = self.trace_buffer() else {
1778 return;
1779 };
1780 let now = self
1781 .handles
1782 .timer_driver
1783 .as_ref()
1784 .map_or_else(wall_clock_now, TimerDriverHandle::now);
1785 let logical_time = self.logical_tick();
1786 trace.record_event(move |seq| {
1787 TraceEvent::user_trace(seq, now, message).with_logical_time(logical_time)
1788 });
1789 }
1790
1791 pub fn trace_with_fields(&self, message: &str, fields: &[(&str, &str)]) {
1806 let mut entry = LogEntry::trace(message);
1807 for &(k, v) in fields {
1808 entry = entry.with_field(k, v);
1809 }
1810 self.log(entry);
1811 let Some(trace) = self.trace_buffer() else {
1812 return;
1813 };
1814 let now = self
1815 .handles
1816 .timer_driver
1817 .as_ref()
1818 .map_or_else(wall_clock_now, TimerDriverHandle::now);
1819 let logical_time = self.logical_tick();
1820 trace.record_event(move |seq| {
1821 TraceEvent::user_trace(seq, now, message).with_logical_time(logical_time)
1822 });
1823 }
1824
1825 #[must_use]
1840 pub fn enter_span(&self, name: &str) -> SpanGuard<Caps> {
1841 let prev = self.diagnostic_context();
1842 let child = prev.fork().with_custom("span.name", name);
1843 self.set_diagnostic_context(child);
1844 self.log(LogEntry::debug(format!("span enter: {name}")).with_target("tracing"));
1845 SpanGuard {
1846 cx: self.clone(),
1847 prev,
1848 }
1849 }
1850
1851 pub fn set_request_id(&self, id: impl Into<String>) {
1856 let mut obs = self.observability.write();
1857 obs.context = obs.context.clone().with_custom("request_id", id);
1858 }
1859
1860 #[inline]
1862 #[must_use]
1863 pub fn request_id(&self) -> Option<String> {
1864 self.diagnostic_context()
1865 .custom("request_id")
1866 .map(String::from)
1867 }
1868
1869 pub fn log(&self, entry: LogEntry) {
1871 let obs = self.observability.read();
1872 let Some(collector) = obs.collector.clone() else {
1873 return;
1874 };
1875 let include_timestamps = obs.include_timestamps;
1876 let context = obs.context.clone();
1877 drop(obs);
1878 let mut entry = entry.with_context(&context);
1879 if include_timestamps && entry.timestamp() == Time::ZERO {
1880 let now = self
1881 .handles
1882 .timer_driver
1883 .as_ref()
1884 .map_or_else(wall_clock_now, TimerDriverHandle::now);
1885 entry = entry.with_timestamp(now);
1886 }
1887 collector.log(entry);
1888 }
1889
1890 #[must_use]
1892 pub fn diagnostic_context(&self) -> DiagnosticContext {
1893 self.observability.read().context.clone()
1894 }
1895
1896 pub fn set_diagnostic_context(&self, ctx: DiagnosticContext) {
1898 let mut obs = self.observability.write();
1899 obs.context = ctx;
1900 }
1901
1902 pub fn set_log_collector(&self, collector: LogCollector) {
1904 let mut obs = self.observability.write();
1905 obs.collector = Some(collector);
1906 }
1907
1908 #[inline]
1910 #[must_use]
1911 pub fn log_collector(&self) -> Option<LogCollector> {
1912 self.observability.read().collector.clone()
1913 }
1914
1915 pub fn set_trace_buffer(&self, trace: TraceBufferHandle) {
1917 let mut obs = self.observability.write();
1918 obs.trace = Some(trace);
1919 }
1920
1921 #[inline]
1923 #[must_use]
1924 pub fn trace_buffer(&self) -> Option<TraceBufferHandle> {
1925 self.observability.read().trace.clone()
1926 }
1927
1928 pub(crate) fn child_observability(&self, region: RegionId, task: TaskId) -> ObservabilityState {
1930 let obs = self.observability.read();
1931 obs.derive_child(region, task)
1932 }
1933
1934 #[inline]
1936 #[must_use]
1937 pub fn entropy(&self) -> &dyn EntropySource
1938 where
1939 Caps: cap::HasRandom,
1940 {
1941 self.handles.entropy.as_ref()
1942 }
1943
1944 pub(crate) fn child_entropy(&self, task: TaskId) -> Arc<dyn EntropySource> {
1946 self.handles.entropy.fork(task)
1947 }
1948
1949 #[inline]
1951 #[must_use]
1952 pub(crate) fn entropy_handle(&self) -> Arc<dyn EntropySource>
1953 where
1954 Caps: cap::HasRandom,
1955 {
1956 self.handles.entropy.clone()
1957 }
1958
1959 #[must_use]
1961 pub fn random_u64(&self) -> u64
1962 where
1963 Caps: cap::HasRandom,
1964 {
1965 let value = self.handles.entropy.next_u64();
1966 trace!(
1967 source = self.handles.entropy.source_id(),
1968 task_id = ?self.task_id(),
1969 value,
1970 "entropy_u64"
1971 );
1972 value
1973 }
1974
1975 pub fn random_bytes(&self, dest: &mut [u8])
1977 where
1978 Caps: cap::HasRandom,
1979 {
1980 self.handles.entropy.fill_bytes(dest);
1981 trace!(
1982 source = self.handles.entropy.source_id(),
1983 task_id = ?self.task_id(),
1984 len = dest.len(),
1985 "entropy_bytes"
1986 );
1987 }
1988
1989 #[must_use]
1991 pub fn random_usize(&self, bound: usize) -> usize
1992 where
1993 Caps: cap::HasRandom,
1994 {
1995 assert!(bound > 0, "bound must be non-zero");
1996 let bound_u64 = bound as u64;
1997 let threshold = u64::MAX - (u64::MAX % bound_u64);
1998 loop {
1999 let value = self.random_u64();
2000 if value < threshold {
2001 return (value % bound_u64) as usize;
2002 }
2003 }
2004 }
2005
2006 #[must_use]
2008 pub fn random_bool(&self) -> bool
2009 where
2010 Caps: cap::HasRandom,
2011 {
2012 self.random_u64() & 1 == 1
2013 }
2014
2015 #[must_use]
2017 #[allow(clippy::cast_precision_loss)]
2018 pub fn random_f64(&self) -> f64
2019 where
2020 Caps: cap::HasRandom,
2021 {
2022 (self.random_u64() >> 11) as f64 / (1u64 << 53) as f64
2023 }
2024
2025 pub fn shuffle<T>(&self, slice: &mut [T])
2027 where
2028 Caps: cap::HasRandom,
2029 {
2030 for i in (1..slice.len()).rev() {
2031 let j = self.random_usize(i + 1);
2032 slice.swap(i, j);
2033 }
2034 }
2035
2036 #[allow(dead_code)]
2038 pub(crate) fn set_cancel_internal(&self, value: bool) {
2039 let mut inner = self.inner.write();
2040 inner.cancel_requested = value;
2041 inner
2042 .fast_cancel
2043 .store(value, std::sync::atomic::Ordering::Release);
2044 if !value {
2045 inner.cancel_reason = None;
2046 }
2047 }
2048
2049 pub fn set_cancel_requested(&self, value: bool) {
2072 let waker = {
2073 let mut inner = self.inner.write();
2074 inner.cancel_requested = value;
2075 inner
2076 .fast_cancel
2077 .store(value, std::sync::atomic::Ordering::Release);
2078 if !value {
2079 inner.cancel_reason = None;
2080 None
2081 } else {
2082 inner.cancel_waker.clone()
2083 }
2084 };
2085 if let Some(waker) = waker {
2086 waker.wake();
2087 }
2088 }
2089
2090 pub fn cancel_with(&self, kind: CancelKind, message: Option<&'static str>) {
2126 let (region, task, waker) = {
2127 let mut inner = self.inner.write();
2128 let region = inner.region;
2129 let task = inner.task;
2130
2131 let mut reason = CancelReason::new(kind).with_region(region).with_task(task);
2132 if let Some(msg) = message {
2133 reason = reason.with_message(msg);
2134 }
2135
2136 inner.cancel_requested = true;
2137 inner
2138 .fast_cancel
2139 .store(true, std::sync::atomic::Ordering::Release);
2140 inner.cancel_reason = Some(reason);
2141 let waker = inner.cancel_waker.clone();
2142 drop(inner);
2143 (region, task, waker)
2144 };
2145
2146 if let Some(w) = waker {
2147 w.wake();
2148 }
2149
2150 debug!(
2151 task_id = ?task,
2152 region_id = ?region,
2153 cancel_kind = ?kind,
2154 cancel_message = message,
2155 "cancel initiated via cancel_with"
2156 );
2157 let _ = (region, task);
2158 }
2159
2160 pub fn cancel_fast(&self, kind: CancelKind) {
2186 let (region, waker) = {
2187 let mut inner = self.inner.write();
2188 let region = inner.region;
2189
2190 let reason = CancelReason::new(kind).with_region(region);
2192
2193 inner.cancel_requested = true;
2194 inner
2195 .fast_cancel
2196 .store(true, std::sync::atomic::Ordering::Release);
2197 inner.cancel_reason = Some(reason);
2198 let waker = inner.cancel_waker.clone();
2199 drop(inner);
2200 (region, waker)
2201 };
2202
2203 if let Some(w) = waker {
2204 w.wake();
2205 }
2206
2207 trace!(
2208 region_id = ?region,
2209 cancel_kind = ?kind,
2210 "cancel_fast initiated"
2211 );
2212 let _ = region;
2213 }
2214
2215 #[inline]
2236 #[must_use]
2237 pub fn cancel_reason(&self) -> Option<CancelReason> {
2238 let inner = self.inner.read();
2239 inner.cancel_reason.clone()
2240 }
2241
2242 pub fn cancel_chain(&self) -> impl Iterator<Item = CancelReason> {
2270 let cancel_reason = self.inner.read().cancel_reason.clone();
2271 std::iter::successors(cancel_reason, |r| r.cause.as_deref().cloned())
2272 }
2273
2274 #[must_use]
2303 pub fn root_cancel_cause(&self) -> Option<CancelReason> {
2304 let inner = self.inner.read();
2305 inner.cancel_reason.as_ref().map(|r| r.root_cause().clone())
2306 }
2307
2308 #[must_use]
2329 pub fn cancelled_by(&self, kind: CancelKind) -> bool {
2330 let inner = self.inner.read();
2331 inner.cancel_reason.as_ref().is_some_and(|r| r.kind == kind)
2332 }
2333
2334 #[must_use]
2360 pub fn any_cause_is(&self, kind: CancelKind) -> bool {
2361 let inner = self.inner.read();
2362 inner
2363 .cancel_reason
2364 .as_ref()
2365 .is_some_and(|r| r.any_cause_is(kind))
2366 }
2367
2368 pub fn set_cancel_reason(&self, reason: CancelReason) {
2391 let waker = {
2392 let mut inner = self.inner.write();
2393 inner.cancel_requested = true;
2394 inner
2395 .fast_cancel
2396 .store(true, std::sync::atomic::Ordering::Release);
2397 inner.cancel_reason = Some(reason);
2398 inner.cancel_waker.clone()
2399 };
2400 if let Some(w) = waker {
2401 w.wake();
2402 }
2403 }
2404
2405 pub async fn race<T>(
2422 &self,
2423 futures: Vec<Pin<Box<dyn Future<Output = T> + Send>>>,
2424 ) -> Result<T, JoinError> {
2425 if futures.is_empty() {
2426 return std::future::pending().await;
2427 }
2428 let (res, _) = SelectAll::new(futures)
2429 .await
2430 .map_err(|_| JoinError::PolledAfterCompletion)?;
2431 Ok(res)
2432 }
2433
2434 pub async fn race_named<T>(&self, futures: NamedFutures<T>) -> Result<T, JoinError> {
2444 let futures: Vec<_> = futures.into_iter().map(|(_, f)| f).collect();
2445 self.race(futures).await
2446 }
2447
2448 pub async fn race_timeout<T>(
2458 &self,
2459 duration: Duration,
2460 futures: Vec<Pin<Box<dyn Future<Output = T> + Send>>>,
2461 ) -> Result<T, JoinError>
2462 where
2463 Caps: cap::HasTime,
2464 {
2465 let race_fut = std::pin::pin!(self.race(futures));
2466 let now = self
2467 .handles
2468 .timer_driver
2469 .as_ref()
2470 .map_or_else(wall_clock_now, TimerDriverHandle::now);
2471 timeout(now, duration, race_fut)
2472 .await
2473 .unwrap_or_else(|_| Err(JoinError::Cancelled(CancelReason::timeout())))
2474 }
2475
2476 pub async fn race_timeout_named<T>(
2484 &self,
2485 duration: Duration,
2486 futures: NamedFutures<T>,
2487 ) -> Result<T, JoinError>
2488 where
2489 Caps: cap::HasTime,
2490 {
2491 let futures: Vec<_> = futures.into_iter().map(|(_, f)| f).collect();
2492 self.race_timeout(duration, futures).await
2493 }
2494
2495 #[must_use]
2521 pub fn scope(&self) -> crate::cx::Scope<'static> {
2522 let budget = self.budget();
2523 debug!(
2524 task_id = ?self.task_id(),
2525 region_id = ?self.region_id(),
2526 budget_deadline = ?budget.deadline,
2527 budget_poll_quota = budget.poll_quota,
2528 budget_cost_quota = ?budget.cost_quota,
2529 budget_priority = budget.priority,
2530 budget_source = "inherited",
2531 "scope budget inherited"
2532 );
2533 crate::cx::Scope::new(self.region_id(), budget)
2534 }
2535
2536 #[must_use]
2545 pub fn scope_with_budget(&self, budget: Budget) -> crate::cx::Scope<'static> {
2546 let parent_budget = self.budget();
2547 let deadline_tightened = match (parent_budget.deadline, budget.deadline) {
2548 (Some(parent), Some(child)) => child < parent,
2549 (None, Some(_)) => true,
2550 _ => false,
2551 };
2552 let poll_tightened = budget.poll_quota < parent_budget.poll_quota;
2553 let cost_tightened = match (parent_budget.cost_quota, budget.cost_quota) {
2554 (Some(parent), Some(child)) => child < parent,
2555 (None, Some(_)) => true,
2556 _ => false,
2557 };
2558 let priority_boosted = budget.priority > parent_budget.priority;
2559 let _ = (
2560 &deadline_tightened,
2561 &poll_tightened,
2562 &cost_tightened,
2563 &priority_boosted,
2564 );
2565
2566 let clamped_deadline = match (parent_budget.deadline, budget.deadline) {
2570 (Some(parent), Some(child)) => Some(if child < parent { child } else { parent }),
2571 (Some(parent), None) => Some(parent),
2572 (None, child) => child,
2573 };
2574 let clamped_poll_quota = budget.poll_quota.min(parent_budget.poll_quota);
2575 let clamped_cost_quota = match (parent_budget.cost_quota, budget.cost_quota) {
2576 (Some(parent), Some(child)) => Some(child.min(parent)),
2577 (Some(parent), None) => Some(parent),
2578 (None, child) => child,
2579 };
2580 let clamped = Budget {
2581 deadline: clamped_deadline,
2582 poll_quota: clamped_poll_quota,
2583 cost_quota: clamped_cost_quota,
2584 priority: budget.priority,
2585 };
2586
2587 debug!(
2588 task_id = ?self.task_id(),
2589 region_id = ?self.region_id(),
2590 parent_deadline = ?parent_budget.deadline,
2591 parent_poll_quota = parent_budget.poll_quota,
2592 parent_cost_quota = ?parent_budget.cost_quota,
2593 parent_priority = parent_budget.priority,
2594 budget_deadline = ?clamped.deadline,
2595 budget_poll_quota = clamped.poll_quota,
2596 budget_cost_quota = ?clamped.cost_quota,
2597 budget_priority = clamped.priority,
2598 deadline_tightened,
2599 poll_tightened,
2600 cost_tightened,
2601 priority_boosted,
2602 budget_source = "explicit",
2603 "scope budget set"
2604 );
2605 crate::cx::Scope::new(self.region_id(), clamped)
2606 }
2607}
2608
2609impl Cx<cap::All> {
2610 #[must_use]
2631 pub fn for_testing() -> Self {
2632 Self::new(
2633 RegionId::new_for_test(0, 0),
2634 TaskId::new_for_test(0, 0),
2635 Budget::INFINITE,
2636 )
2637 }
2638
2639 #[must_use]
2660 pub fn for_testing_with_budget(budget: Budget) -> Self {
2661 Self::new(
2662 RegionId::new_for_test(0, 0),
2663 TaskId::new_for_test(0, 0),
2664 budget,
2665 )
2666 }
2667
2668 #[must_use]
2687 pub fn for_testing_with_io() -> Self {
2688 Self::new_with_io(
2689 RegionId::new_for_test(0, 0),
2690 TaskId::new_for_test(0, 0),
2691 Budget::INFINITE,
2692 None,
2693 None,
2694 Some(Arc::new(crate::io::LabIoCap::new())),
2695 None,
2696 )
2697 }
2698
2699 #[must_use]
2704 pub fn for_request_with_budget(budget: Budget) -> Self {
2705 Self::new(RegionId::new_ephemeral(), TaskId::new_ephemeral(), budget)
2706 }
2707
2708 #[must_use]
2710 pub fn for_request() -> Self {
2711 Self::for_request_with_budget(Budget::INFINITE)
2712 }
2713
2714 #[must_use]
2723 pub fn for_testing_with_remote(cap: RemoteCap) -> Self {
2724 let mut cx = Self::for_testing();
2725 Arc::make_mut(&mut cx.handles).remote_cap = Some(Arc::new(cap));
2726 cx
2727 }
2728}
2729
2730pub struct SpanGuard<Caps = cap::All> {
2735 cx: Cx<Caps>,
2736 prev: DiagnosticContext,
2737}
2738
2739impl<Caps> Drop for SpanGuard<Caps> {
2740 fn drop(&mut self) {
2741 let name = self
2742 .cx
2743 .diagnostic_context()
2744 .custom("span.name")
2745 .unwrap_or("unknown")
2746 .to_owned();
2747 self.cx
2748 .log(LogEntry::debug(format!("span exit: {name}")).with_target("tracing"));
2749 self.cx.set_diagnostic_context(self.prev.clone());
2750 }
2751}
2752
2753#[cfg(test)]
2754mod tests {
2755 use super::*;
2756 use crate::cx::macaroon::CaveatPredicate;
2757 #[cfg(feature = "messaging-fabric")]
2758 use crate::messaging::capability::{CommandFamily, FabricCapability, FabricCapabilityScope};
2759 #[cfg(feature = "messaging-fabric")]
2760 use crate::messaging::class::DeliveryClass;
2761 #[cfg(feature = "messaging-fabric")]
2762 use crate::messaging::ir::{CapabilityPermission, CapabilityTokenSchema, SubjectFamily};
2763 #[cfg(feature = "messaging-fabric")]
2764 use crate::messaging::subject::SubjectPattern;
2765 use crate::trace::TraceBufferHandle;
2766 use crate::util::{ArenaIndex, DetEntropy};
2767 use std::sync::atomic::{AtomicU8, Ordering};
2768
2769 static CURRENT_CX_DTOR_STATE: AtomicU8 = AtomicU8::new(0);
2770
2771 thread_local! {
2772 static CURRENT_CX_DTOR_PROBE: CurrentCxDtorProbe = const { CurrentCxDtorProbe };
2773 }
2774
2775 struct CurrentCxDtorProbe;
2776
2777 impl Drop for CurrentCxDtorProbe {
2778 fn drop(&mut self) {
2779 let state = match CURRENT_CX.try_with(|slot| slot.borrow().clone()) {
2780 Ok(Some(_)) => 1,
2781 Ok(None) => 2,
2782 Err(_) => {
2783 if Cx::current().is_none() {
2784 3
2785 } else {
2786 4
2787 }
2788 }
2789 };
2790 CURRENT_CX_DTOR_STATE.store(state, Ordering::SeqCst);
2791 }
2792 }
2793
2794 fn test_cx() -> Cx {
2795 Cx::new(
2796 RegionId::from_arena(ArenaIndex::new(0, 0)),
2797 TaskId::from_arena(ArenaIndex::new(0, 0)),
2798 Budget::INFINITE,
2799 )
2800 }
2801
2802 fn test_cx_with_entropy(seed: u64) -> Cx {
2803 Cx::new_with_observability(
2804 RegionId::from_arena(ArenaIndex::new(0, 0)),
2805 TaskId::from_arena(ArenaIndex::new(0, 0)),
2806 Budget::INFINITE,
2807 None,
2808 None,
2809 Some(Arc::new(DetEntropy::new(seed))),
2810 )
2811 }
2812
2813 fn trace_message(event: &crate::trace::TraceEvent) -> &str {
2814 match &event.data {
2815 crate::trace::TraceData::Message(message) => message,
2816 other => panic!("expected user trace message, got {other:?}"),
2817 }
2818 }
2819
2820 #[cfg(feature = "messaging-fabric")]
2821 fn capability_schema(
2822 families: Vec<SubjectFamily>,
2823 permissions: Vec<CapabilityPermission>,
2824 ) -> CapabilityTokenSchema {
2825 CapabilityTokenSchema {
2826 name: "fabric.cx.demo".to_owned(),
2827 families,
2828 delivery_classes: vec![DeliveryClass::EphemeralInteractive],
2829 permissions,
2830 }
2831 }
2832
2833 #[test]
2834 fn io_not_available_by_default() {
2835 let cx = test_cx();
2836 assert!(!cx.has_io());
2837 assert!(cx.io().is_none());
2838 }
2839
2840 #[test]
2841 fn io_available_with_for_testing_with_io() {
2842 let cx: Cx = Cx::for_testing_with_io();
2843 assert!(cx.has_io());
2844 let io = cx.io().expect("should have io cap");
2845 assert!(!io.is_real_io());
2846 assert_eq!(io.name(), "lab");
2847 }
2848
2849 #[test]
2850 fn checkpoint_without_cancel() {
2851 let cx = test_cx();
2852 assert!(cx.checkpoint().is_ok());
2853 }
2854
2855 #[test]
2856 fn checkpoint_with_cancel() {
2857 let cx = test_cx();
2858 cx.set_cancel_requested(true);
2859 assert!(cx.checkpoint().is_err());
2860 }
2861
2862 #[test]
2863 fn masked_defers_cancel() {
2864 let cx = test_cx();
2865 cx.set_cancel_requested(true);
2866
2867 cx.masked(|| {
2868 assert!(
2869 cx.checkpoint().is_ok(),
2870 "checkpoint should succeed when masked"
2871 );
2872 });
2873
2874 assert!(
2875 cx.checkpoint().is_err(),
2876 "checkpoint should fail after unmasking"
2877 );
2878 }
2879
2880 #[test]
2881 fn trace_attaches_logical_time() {
2882 let cx = test_cx();
2883 let trace = TraceBufferHandle::new(8);
2884 cx.set_trace_buffer(trace.clone());
2885
2886 cx.trace("hello");
2887
2888 let events = trace.snapshot();
2889 let event = events.first().expect("trace event");
2890 assert!(event.logical_time.is_some());
2891 }
2892
2893 #[test]
2894 fn masked_panic_safety() {
2895 use std::panic::{AssertUnwindSafe, catch_unwind};
2896
2897 let cx = test_cx();
2898 cx.set_cancel_requested(true);
2899
2900 assert!(cx.checkpoint().is_err());
2902
2903 let cx_clone = cx.clone();
2905 let _ = catch_unwind(AssertUnwindSafe(|| {
2906 cx_clone.masked(|| {
2907 std::panic::resume_unwind(Box::new("oops"));
2910 });
2911 }));
2912
2913 assert!(
2916 cx.checkpoint().is_err(),
2917 "Cx remains masked after panic! mask_depth leaked."
2918 );
2919 }
2920
2921 #[test]
2922 fn current_returns_none_during_thread_local_teardown() {
2923 CURRENT_CX_DTOR_STATE.store(0, Ordering::SeqCst);
2924
2925 let join = std::thread::spawn(|| {
2926 CURRENT_CX_DTOR_PROBE.with(|_| {});
2929
2930 let cx = test_cx();
2931 let _guard = Cx::set_current(Some(cx));
2932 assert!(Cx::current().is_some(), "current cx should be installed");
2933 });
2934
2935 join.join()
2936 .expect("thread-local teardown should not panic when reading Cx");
2937 assert_eq!(
2938 CURRENT_CX_DTOR_STATE.load(Ordering::SeqCst),
2939 3,
2940 "Cx::current() should fail closed once CURRENT_CX is unavailable"
2941 );
2942 }
2943
2944 #[test]
2946 #[should_panic(expected = "MAX_MASK_DEPTH")]
2947 fn mask_depth_exceeds_bound_panics() {
2948 let cx = test_cx();
2949
2950 {
2954 let mut inner = cx.inner.write();
2955 inner.mask_depth = crate::types::task_context::MAX_MASK_DEPTH;
2956 }
2957 cx.masked(|| {});
2959 }
2960
2961 #[test]
2962 fn random_usize_in_range() {
2963 let cx = test_cx_with_entropy(123);
2964 for _ in 0..100 {
2965 let value = cx.random_usize(7);
2966 assert!(value < 7);
2967 }
2968 }
2969
2970 #[test]
2971 fn shuffle_deterministic() {
2972 let cx1 = test_cx_with_entropy(42);
2973 let cx2 = test_cx_with_entropy(42);
2974
2975 let mut a = [1, 2, 3, 4, 5, 6, 7, 8];
2976 let mut b = [1, 2, 3, 4, 5, 6, 7, 8];
2977
2978 cx1.shuffle(&mut a);
2979 cx2.shuffle(&mut b);
2980
2981 assert_eq!(a, b);
2982 }
2983
2984 #[test]
2985 fn random_f64_range() {
2986 let cx = test_cx_with_entropy(7);
2987 for _ in 0..100 {
2988 let value = cx.random_f64();
2989 assert!((0.0..1.0).contains(&value));
2990 }
2991 }
2992
2993 #[test]
2998 fn cancel_with_sets_reason() {
2999 let cx = test_cx();
3000 assert!(cx.cancel_reason().is_none());
3001
3002 cx.cancel_with(CancelKind::User, Some("manual stop"));
3003
3004 assert!(cx.is_cancel_requested());
3005 let reason = cx.cancel_reason().expect("should have reason");
3006 assert_eq!(reason.kind, CancelKind::User);
3007 assert_eq!(reason.message, Some("manual stop".to_string()));
3008 }
3009
3010 #[test]
3011 fn cancel_with_no_message() {
3012 let cx = test_cx();
3013 cx.cancel_with(CancelKind::Timeout, None);
3014
3015 let reason = cx.cancel_reason().expect("should have reason");
3016 assert_eq!(reason.kind, CancelKind::Timeout);
3017 assert!(reason.message.is_none());
3018 }
3019
3020 #[test]
3021 fn cancel_reason_returns_none_when_not_cancelled() {
3022 let cx = test_cx();
3023 assert!(cx.cancel_reason().is_none());
3024 }
3025
3026 #[test]
3027 fn cancel_chain_empty_when_not_cancelled() {
3028 let cx = test_cx();
3029 assert!(cx.cancel_chain().next().is_none());
3030 }
3031
3032 #[test]
3033 fn cancel_chain_traverses_causes() {
3034 let cx = test_cx();
3035
3036 let deadline = CancelReason::deadline();
3038 let parent1 = CancelReason::parent_cancelled().with_cause(deadline);
3039 let parent2 = CancelReason::parent_cancelled().with_cause(parent1);
3040
3041 cx.set_cancel_reason(parent2);
3042
3043 let chain: Vec<_> = cx.cancel_chain().collect();
3044 assert_eq!(chain.len(), 3);
3045 assert_eq!(chain[0].kind, CancelKind::ParentCancelled);
3046 assert_eq!(chain[1].kind, CancelKind::ParentCancelled);
3047 assert_eq!(chain[2].kind, CancelKind::Deadline);
3048 }
3049
3050 #[test]
3051 fn root_cancel_cause_returns_none_when_not_cancelled() {
3052 let cx = test_cx();
3053 assert!(cx.root_cancel_cause().is_none());
3054 }
3055
3056 #[test]
3057 fn root_cancel_cause_finds_root() {
3058 let cx = test_cx();
3059
3060 let timeout = CancelReason::timeout();
3062 let parent = CancelReason::parent_cancelled().with_cause(timeout);
3063
3064 cx.set_cancel_reason(parent);
3065
3066 let root = cx.root_cancel_cause().expect("should have root");
3067 assert_eq!(root.kind, CancelKind::Timeout);
3068 }
3069
3070 #[test]
3071 fn root_cancel_cause_with_no_chain() {
3072 let cx = test_cx();
3073 cx.cancel_with(CancelKind::Shutdown, None);
3074
3075 let root = cx.root_cancel_cause().expect("should have root");
3076 assert_eq!(root.kind, CancelKind::Shutdown);
3077 }
3078
3079 #[test]
3080 fn cancelled_by_checks_immediate_reason() {
3081 let cx = test_cx();
3082
3083 let deadline = CancelReason::deadline();
3085 let parent = CancelReason::parent_cancelled().with_cause(deadline);
3086
3087 cx.set_cancel_reason(parent);
3088
3089 assert!(cx.cancelled_by(CancelKind::ParentCancelled));
3091 assert!(!cx.cancelled_by(CancelKind::Deadline));
3093 }
3094
3095 #[test]
3096 fn cancelled_by_returns_false_when_not_cancelled() {
3097 let cx = test_cx();
3098 assert!(!cx.cancelled_by(CancelKind::User));
3099 }
3100
3101 #[test]
3102 fn any_cause_is_searches_chain() {
3103 let cx = test_cx();
3104
3105 let timeout = CancelReason::timeout();
3107 let parent1 = CancelReason::parent_cancelled().with_cause(timeout);
3108 let parent2 = CancelReason::parent_cancelled().with_cause(parent1);
3109
3110 cx.set_cancel_reason(parent2);
3111
3112 assert!(cx.any_cause_is(CancelKind::ParentCancelled));
3114 assert!(cx.any_cause_is(CancelKind::Timeout));
3115
3116 assert!(!cx.any_cause_is(CancelKind::Deadline));
3118 assert!(!cx.any_cause_is(CancelKind::Shutdown));
3119 }
3120
3121 #[test]
3122 fn any_cause_is_returns_false_when_not_cancelled() {
3123 let cx = test_cx();
3124 assert!(!cx.any_cause_is(CancelKind::Timeout));
3125 }
3126
3127 #[test]
3128 fn set_cancel_reason_sets_flag_and_reason() {
3129 let cx = test_cx();
3130 assert!(!cx.is_cancel_requested());
3131
3132 cx.set_cancel_reason(CancelReason::shutdown());
3133
3134 assert!(cx.is_cancel_requested());
3135 assert_eq!(
3136 cx.cancel_reason().expect("should have reason").kind,
3137 CancelKind::Shutdown
3138 );
3139 }
3140
3141 #[test]
3142 fn integration_realistic_usage() {
3143 let cx = test_cx();
3149
3150 let timeout_reason = CancelReason::timeout().with_message("request timeout");
3152 let child_reason = CancelReason::parent_cancelled().with_cause(timeout_reason);
3153
3154 cx.set_cancel_reason(child_reason);
3155
3156 assert!(cx.is_cancel_requested());
3158
3159 assert!(cx.cancelled_by(CancelKind::ParentCancelled));
3161
3162 if cx.any_cause_is(CancelKind::Timeout) {
3164 let root = cx.root_cancel_cause().unwrap();
3166 assert_eq!(root.kind, CancelKind::Timeout);
3167 assert_eq!(root.message, Some("request timeout".to_string()));
3168 }
3169
3170 let chain: Vec<_> = cx.cancel_chain().collect();
3172 assert_eq!(chain.len(), 2);
3173 assert_eq!(chain[0].kind, CancelKind::ParentCancelled);
3174 assert_eq!(chain[1].kind, CancelKind::Timeout);
3175 }
3176
3177 #[test]
3178 fn cancel_fast_sets_flag_and_reason() {
3179 let cx = test_cx();
3180 assert!(!cx.is_cancel_requested());
3181 assert!(cx.cancel_reason().is_none());
3182
3183 cx.cancel_fast(CancelKind::Shutdown);
3184
3185 assert!(cx.is_cancel_requested());
3186 let reason = cx.cancel_reason().expect("should have reason");
3187 assert_eq!(reason.kind, CancelKind::Shutdown);
3188 }
3189
3190 #[test]
3191 fn cancel_fast_no_cause_chain() {
3192 let cx = test_cx();
3194
3195 cx.cancel_fast(CancelKind::Timeout);
3196
3197 let reason = cx.cancel_reason().expect("should have reason");
3198 assert!(reason.cause.is_none());
3200 assert!(reason.message.is_none());
3202 assert!(!reason.truncated);
3204 }
3205
3206 #[test]
3207 fn cancel_fast_sets_region() {
3208 let cx = test_cx();
3209
3210 cx.cancel_fast(CancelKind::User);
3211
3212 let reason = cx.cancel_reason().expect("should have reason");
3213 let expected_region = RegionId::from_arena(ArenaIndex::new(0, 0));
3215 assert_eq!(reason.origin_region, expected_region);
3216 }
3217
3218 #[test]
3219 fn cancel_fast_minimal_allocation() {
3220 let cx = test_cx();
3222
3223 cx.cancel_fast(CancelKind::Deadline);
3224
3225 let reason = cx.cancel_reason().expect("should have reason");
3226 assert_eq!(reason.kind, CancelKind::Deadline);
3228 assert!(reason.message.is_none());
3229 assert!(reason.cause.is_none());
3230 assert!(!reason.truncated);
3231 assert!(reason.truncated_at_depth.is_none());
3232
3233 let cost = reason.estimated_memory_cost();
3235 assert!(
3237 cost < 200,
3238 "cancel_fast should have minimal memory cost, got {cost}"
3239 );
3240 }
3241
3242 #[test]
3247 fn checkpoint_records_progress() {
3248 let cx = test_cx();
3249
3250 let state = cx.checkpoint_state();
3252 assert!(state.last_checkpoint.is_none());
3253 assert!(state.last_message.is_none());
3254 assert_eq!(state.checkpoint_count, 0);
3255
3256 assert!(cx.checkpoint().is_ok());
3258 let state = cx.checkpoint_state();
3259 assert!(state.last_checkpoint.is_some());
3260 assert!(state.last_message.is_none());
3261 assert_eq!(state.checkpoint_count, 1);
3262
3263 assert!(cx.checkpoint().is_ok());
3265 let state = cx.checkpoint_state();
3266 assert_eq!(state.checkpoint_count, 2);
3267 }
3268
3269 #[test]
3270 fn checkpoint_with_records_message() {
3271 let cx = test_cx();
3272
3273 assert!(cx.checkpoint_with("processing step 1").is_ok());
3275 let state = cx.checkpoint_state();
3276 assert!(state.last_checkpoint.is_some());
3277 assert_eq!(state.last_message.as_deref(), Some("processing step 1"));
3278 assert_eq!(state.checkpoint_count, 1);
3279
3280 assert!(cx.checkpoint_with("processing step 2").is_ok());
3282 let state = cx.checkpoint_state();
3283 assert_eq!(state.last_message.as_deref(), Some("processing step 2"));
3284 assert_eq!(state.checkpoint_count, 2);
3285 }
3286
3287 #[test]
3288 fn checkpoint_clears_message() {
3289 let cx = test_cx();
3290
3291 assert!(cx.checkpoint_with("step 1").is_ok());
3293 assert_eq!(
3294 cx.checkpoint_state().last_message.as_deref(),
3295 Some("step 1")
3296 );
3297
3298 assert!(cx.checkpoint().is_ok());
3300 assert!(cx.checkpoint_state().last_message.is_none());
3301 }
3302
3303 #[test]
3304 fn checkpoint_with_checks_cancel() {
3305 let cx = test_cx();
3306 cx.set_cancel_requested(true);
3307
3308 assert!(cx.checkpoint_with("should fail").is_err());
3310
3311 let state = cx.checkpoint_state();
3313 assert_eq!(state.checkpoint_count, 1);
3314 assert_eq!(state.last_message.as_deref(), Some("should fail"));
3315 }
3316
3317 #[test]
3318 fn checkpoint_deadline_exhaustion_sets_cancel_reason() {
3319 let cx = Cx::for_testing_with_budget(Budget::new().with_deadline(Time::ZERO));
3320
3321 assert!(cx.checkpoint().is_err());
3322 let reason = cx
3323 .cancel_reason()
3324 .expect("deadline exhaustion must set reason");
3325 assert_eq!(reason.kind, CancelKind::Deadline);
3326 assert!(cx.is_cancel_requested());
3327 }
3328
3329 #[test]
3330 fn checkpoint_poll_budget_exhaustion_sets_cancel_reason() {
3331 let cx = Cx::for_testing_with_budget(Budget::new().with_poll_quota(0));
3332
3333 assert!(cx.checkpoint().is_err());
3334 let reason = cx
3335 .cancel_reason()
3336 .expect("poll quota exhaustion must set reason");
3337 assert_eq!(reason.kind, CancelKind::PollQuota);
3338 assert!(cx.is_cancel_requested());
3339 }
3340
3341 #[test]
3342 fn checkpoint_cost_budget_exhaustion_sets_cancel_reason() {
3343 let cx = Cx::for_testing_with_budget(Budget::new().with_cost_quota(0));
3344
3345 assert!(cx.checkpoint().is_err());
3346 let reason = cx
3347 .cancel_reason()
3348 .expect("cost budget exhaustion must set reason");
3349 assert_eq!(reason.kind, CancelKind::CostBudget);
3350 assert!(cx.is_cancel_requested());
3351 }
3352
3353 #[test]
3354 fn masked_checkpoint_defers_budget_exhaustion() {
3355 let cx = Cx::for_testing_with_budget(Budget::new().with_deadline(Time::ZERO));
3356
3357 cx.masked(|| {
3358 assert!(
3359 cx.checkpoint().is_ok(),
3360 "budget exhaustion should defer while masked"
3361 );
3362 });
3363
3364 let reason = cx
3365 .cancel_reason()
3366 .expect("masked checkpoint should still record exhaustion reason");
3367 assert_eq!(reason.kind, CancelKind::Deadline);
3368 assert!(
3369 cx.checkpoint().is_err(),
3370 "deadline exhaustion should be observed after unmasking"
3371 );
3372 }
3373
3374 #[test]
3375 fn checkpoint_budget_usage_reports_remaining_time_in_millis() {
3376 let budget = Budget::new()
3377 .with_deadline(Time::from_secs(10))
3378 .with_poll_quota(3)
3379 .with_cost_quota(7);
3380 let baseline = Budget::new()
3381 .with_deadline(Time::from_secs(20))
3382 .with_poll_quota(5)
3383 .with_cost_quota(11);
3384
3385 let (polls_used, cost_used, time_remaining_ms) =
3386 Cx::<cap::All>::checkpoint_budget_usage(budget, baseline, Time::from_secs(7));
3387
3388 assert_eq!(polls_used, Some(2));
3389 assert_eq!(cost_used, Some(4));
3390 assert_eq!(time_remaining_ms, Some(3_000));
3391 }
3392
3393 #[test]
3394 fn set_cancel_requested_wakes_registered_cancel_waker() {
3395 use std::sync::atomic::{AtomicUsize, Ordering};
3396 use std::task::Waker;
3397
3398 struct CountWaker(Arc<AtomicUsize>);
3399
3400 use std::task::Wake;
3401 impl Wake for CountWaker {
3402 fn wake(self: Arc<Self>) {
3403 self.0.fetch_add(1, Ordering::SeqCst);
3404 }
3405
3406 fn wake_by_ref(self: &Arc<Self>) {
3407 self.0.fetch_add(1, Ordering::SeqCst);
3408 }
3409 }
3410
3411 let cx = test_cx();
3412 let wakes = Arc::new(AtomicUsize::new(0));
3413 let waker = Waker::from(Arc::new(CountWaker(Arc::clone(&wakes))));
3414
3415 {
3416 let mut inner = cx.inner.write();
3417 inner.cancel_waker = Some(waker);
3418 }
3419
3420 cx.set_cancel_requested(true);
3421
3422 assert_eq!(
3423 wakes.load(Ordering::SeqCst),
3424 1,
3425 "set_cancel_requested(true) must wake the registered cancel waker"
3426 );
3427
3428 cx.set_cancel_requested(false);
3429
3430 assert_eq!(
3431 wakes.load(Ordering::SeqCst),
3432 1,
3433 "clearing cancellation must not spuriously wake the cancel waker"
3434 );
3435 }
3436
3437 #[test]
3438 fn checkpoint_state_is_snapshot() {
3439 let cx = test_cx();
3440
3441 let snapshot = cx.checkpoint_state();
3443 assert_eq!(snapshot.checkpoint_count, 0);
3444
3445 assert!(cx.checkpoint().is_ok());
3447 assert!(cx.checkpoint().is_ok());
3448
3449 assert_eq!(snapshot.checkpoint_count, 0);
3451
3452 assert_eq!(cx.checkpoint_state().checkpoint_count, 2);
3454 }
3455
3456 #[test]
3457 fn checkpoint_with_accepts_string_types() {
3458 let cx = test_cx();
3459
3460 assert!(cx.checkpoint_with("literal").is_ok());
3462
3463 assert!(cx.checkpoint_with(String::from("owned")).is_ok());
3465
3466 assert!(cx.checkpoint_with(format!("item {}", 42)).is_ok());
3468
3469 assert_eq!(cx.checkpoint_state().checkpoint_count, 3);
3470 }
3471
3472 fn test_root_key() -> crate::security::key::AuthKey {
3477 crate::security::key::AuthKey::from_seed(42)
3478 }
3479
3480 #[test]
3481 fn cx_no_macaroon_by_default() {
3482 let cx = test_cx();
3483 assert!(cx.macaroon().is_none());
3484 }
3485
3486 #[test]
3487 fn cx_with_macaroon_attaches_token() {
3488 let key = test_root_key();
3489 let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3490 let cx = test_cx().with_macaroon(token);
3491
3492 let m = cx.macaroon().expect("should have macaroon");
3493 assert_eq!(m.identifier(), "spawn:r1");
3494 assert_eq!(m.location(), "cx/scheduler");
3495 }
3496
3497 #[test]
3498 fn cx_macaroon_survives_clone() {
3499 let key = test_root_key();
3500 let token = MacaroonToken::mint(&key, "io:net", "cx/io");
3501 let cx = test_cx().with_macaroon(token);
3502 let cx2 = cx.clone();
3503
3504 assert_eq!(
3505 cx.macaroon().unwrap().identifier(),
3506 cx2.macaroon().unwrap().identifier()
3507 );
3508 }
3509
3510 #[test]
3511 fn cx_macaroon_survives_restrict() {
3512 let key = test_root_key();
3513 let token = MacaroonToken::mint(&key, "all:cap", "cx/root");
3514 let cx: Cx<cap::All> = test_cx().with_macaroon(token);
3515 let narrow: Cx<cap::None> = cx.restrict();
3516
3517 assert_eq!(
3518 cx.macaroon().unwrap().identifier(),
3519 narrow.macaroon().unwrap().identifier()
3520 );
3521 }
3522
3523 #[test]
3524 fn cx_attenuate_adds_caveat() {
3525 let key = test_root_key();
3526 let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3527 let cx = test_cx().with_macaroon(token);
3528
3529 let cx2 = cx
3530 .attenuate(CaveatPredicate::TimeBefore(5000))
3531 .expect("attenuate should succeed");
3532
3533 assert_eq!(cx.macaroon().unwrap().caveat_count(), 0);
3535 assert_eq!(cx2.macaroon().unwrap().caveat_count(), 1);
3537 assert_eq!(
3539 cx.macaroon().unwrap().identifier(),
3540 cx2.macaroon().unwrap().identifier()
3541 );
3542 }
3543
3544 #[test]
3545 fn cx_attenuate_returns_none_without_macaroon() {
3546 let cx = test_cx();
3547 assert!(cx.attenuate(CaveatPredicate::MaxUses(10)).is_none());
3548 }
3549
3550 #[test]
3551 fn cx_attenuate_from_budget_returns_none_without_macaroon() {
3552 let cx = test_cx();
3553 assert!(cx.attenuate_from_budget().is_none());
3554 }
3555
3556 #[test]
3557 fn cx_attenuate_from_budget_preserves_token_without_deadline() {
3558 let key = test_root_key();
3559 let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3560 let cx = test_cx().with_macaroon(token);
3561
3562 let attenuated = cx
3563 .attenuate_from_budget()
3564 .expect("macaroon should still be present");
3565 assert_eq!(attenuated.macaroon().unwrap().caveat_count(), 0);
3566 assert_eq!(
3567 attenuated.macaroon().unwrap().identifier(),
3568 cx.macaroon().unwrap().identifier()
3569 );
3570 }
3571
3572 #[test]
3573 fn cx_attenuate_from_budget_adds_deadline_caveat() {
3574 let key = test_root_key();
3575 let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3576 let budget = Budget::new().with_deadline(Time::from_millis(5_000));
3577 let cx = Cx::for_testing_with_budget(budget).with_macaroon(token);
3578
3579 let attenuated = cx
3580 .attenuate_from_budget()
3581 .expect("attenuation with deadline should succeed");
3582 assert_eq!(attenuated.macaroon().unwrap().caveat_count(), 1);
3583 }
3584
3585 #[test]
3586 fn cx_verify_capability_succeeds() {
3587 let key = test_root_key();
3588 let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3589 let cx = test_cx().with_macaroon(token);
3590
3591 let ctx = VerificationContext::new().with_time(1000);
3592 assert!(cx.verify_capability(&key, &ctx).is_ok());
3593 }
3594
3595 #[test]
3596 fn cx_verify_capability_fails_wrong_key() {
3597 let key = test_root_key();
3598 let wrong_key = crate::security::key::AuthKey::from_seed(99);
3599 let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3600 let cx = test_cx().with_macaroon(token);
3601
3602 let ctx = VerificationContext::new();
3603 let err = cx.verify_capability(&wrong_key, &ctx).unwrap_err();
3604 assert!(matches!(err, VerificationError::InvalidSignature));
3605 }
3606
3607 #[test]
3608 fn cx_verify_capability_fails_no_macaroon() {
3609 let key = test_root_key();
3610 let cx = test_cx();
3611
3612 let ctx = VerificationContext::new();
3613 let err = cx.verify_capability(&key, &ctx).unwrap_err();
3614 assert!(matches!(err, VerificationError::InvalidSignature));
3615 }
3616
3617 #[test]
3618 fn cx_verify_with_caveats() {
3619 let key = test_root_key();
3620 let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler")
3621 .add_caveat(CaveatPredicate::TimeBefore(5000))
3622 .add_caveat(CaveatPredicate::RegionScope(42));
3623
3624 let cx = test_cx().with_macaroon(token);
3625
3626 let ctx = VerificationContext::new().with_time(1000).with_region(42);
3628 assert!(cx.verify_capability(&key, &ctx).is_ok());
3629
3630 let ctx_expired = VerificationContext::new().with_time(6000).with_region(42);
3632 let err = cx.verify_capability(&key, &ctx_expired).unwrap_err();
3633 assert!(matches!(
3634 err,
3635 VerificationError::CaveatFailed { index: 0, .. }
3636 ));
3637
3638 let ctx_wrong_region = VerificationContext::new().with_time(1000).with_region(99);
3640 let err = cx.verify_capability(&key, &ctx_wrong_region).unwrap_err();
3641 assert!(matches!(
3642 err,
3643 VerificationError::CaveatFailed { index: 1, .. }
3644 ));
3645 }
3646
3647 #[test]
3648 fn cx_attenuate_then_verify() {
3649 let key = test_root_key();
3650 let token = MacaroonToken::mint(&key, "time:sleep", "cx/time");
3651 let cx = test_cx().with_macaroon(token);
3652
3653 let cx2 = cx.attenuate(CaveatPredicate::TimeBefore(3000)).unwrap();
3655
3656 let cx3 = cx2.attenuate(CaveatPredicate::MaxUses(5)).unwrap();
3658
3659 let ctx = VerificationContext::new().with_time(1000);
3661 assert!(cx.verify_capability(&key, &ctx).is_ok());
3662
3663 assert!(cx2.verify_capability(&key, &ctx).is_ok());
3665 let ctx_late = VerificationContext::new().with_time(4000);
3666 assert!(cx2.verify_capability(&key, &ctx_late).is_err());
3667
3668 let ctx_ok = VerificationContext::new().with_time(1000).with_use_count(3);
3670 assert!(cx3.verify_capability(&key, &ctx_ok).is_ok());
3671 let ctx_overuse = VerificationContext::new()
3672 .with_time(1000)
3673 .with_use_count(10);
3674 assert!(cx3.verify_capability(&key, &ctx_overuse).is_err());
3675 }
3676
3677 #[test]
3678 fn cx_verify_emits_evidence() {
3679 use crate::evidence_sink::CollectorSink;
3680
3681 let key = test_root_key();
3682 let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3683 let sink = Arc::new(CollectorSink::new());
3684 let cx = test_cx()
3685 .with_macaroon(token)
3686 .with_evidence_sink(Some(sink.clone() as Arc<dyn EvidenceSink>));
3687
3688 let ctx = VerificationContext::new();
3689
3690 cx.verify_capability(&key, &ctx).unwrap();
3692 let entries = sink.entries();
3693 assert_eq!(entries.len(), 1);
3694 assert_eq!(entries[0].component, "cx_macaroon");
3695 assert_eq!(entries[0].action, "verify_success");
3696
3697 let wrong_key = crate::security::key::AuthKey::from_seed(99);
3699 let _ = cx.verify_capability(&wrong_key, &ctx);
3700 let entries = sink.entries();
3701 assert_eq!(entries.len(), 2);
3702 assert_eq!(entries[1].action, "verify_fail_signature");
3703 }
3704
3705 #[cfg(feature = "messaging-fabric")]
3706 #[test]
3707 fn cx_grant_publish_capability_mints_token_and_runtime_grant() {
3708 let cx = test_cx();
3709 let schema = capability_schema(
3710 vec![SubjectFamily::Command],
3711 vec![CapabilityPermission::Publish],
3712 );
3713
3714 let granted = cx
3715 .grant_publish_capability::<CommandFamily>(
3716 SubjectPattern::new("orders.>"),
3717 &schema,
3718 DeliveryClass::EphemeralInteractive,
3719 )
3720 .expect("publish capability should mint");
3721
3722 assert_eq!(granted.token().family(), SubjectFamily::Command);
3723 assert!(cx.check_fabric_capability(&FabricCapability::Publish {
3724 subject: SubjectPattern::new("orders.created"),
3725 }));
3726 assert!(!cx.check_fabric_capability(&FabricCapability::Publish {
3727 subject: SubjectPattern::new("payments.created"),
3728 }));
3729 assert_eq!(cx.fabric_capabilities().len(), 1);
3730 }
3731
3732 #[cfg(feature = "messaging-fabric")]
3733 #[test]
3734 fn cx_revoke_fabric_capabilities_by_id_and_scope_propagates_to_children() {
3735 let cx = test_cx();
3736 let child = cx.restrict::<cap::None>();
3737 let publish = cx
3738 .grant_fabric_capability(FabricCapability::Publish {
3739 subject: SubjectPattern::new("orders.>"),
3740 })
3741 .expect("publish grant");
3742 let subscribe = cx
3743 .grant_fabric_capability(FabricCapability::Subscribe {
3744 subject: SubjectPattern::new("orders.created"),
3745 })
3746 .expect("subscribe grant");
3747
3748 assert!(child.check_fabric_capability(&FabricCapability::Publish {
3749 subject: SubjectPattern::new("orders.created"),
3750 }));
3751 assert_eq!(
3752 child.revoke_fabric_capability_scope(FabricCapabilityScope::Publish),
3753 1
3754 );
3755 assert!(!cx.check_fabric_capability(&FabricCapability::Publish {
3756 subject: SubjectPattern::new("orders.created"),
3757 }));
3758 assert_eq!(
3759 cx.revoke_fabric_capability(subscribe.id()),
3760 Some(FabricCapability::Subscribe {
3761 subject: SubjectPattern::new("orders.created"),
3762 })
3763 );
3764 assert!(
3765 !child.check_fabric_capability(&FabricCapability::Subscribe {
3766 subject: SubjectPattern::new("orders.created"),
3767 })
3768 );
3769 assert_eq!(publish.id().raw(), 1);
3770 }
3771
3772 #[cfg(feature = "messaging-fabric")]
3773 #[test]
3774 fn cx_revoke_fabric_capability_by_subject_is_overlap_based() {
3775 let cx = test_cx();
3776 cx.grant_fabric_capability(FabricCapability::Publish {
3777 subject: SubjectPattern::new("orders.>"),
3778 })
3779 .expect("publish grant");
3780 cx.grant_fabric_capability(FabricCapability::Subscribe {
3781 subject: SubjectPattern::new("payments.>"),
3782 })
3783 .expect("subscribe grant");
3784
3785 assert_eq!(
3786 cx.revoke_fabric_capability_by_subject(&SubjectPattern::new("orders.created")),
3787 1
3788 );
3789 assert!(!cx.check_fabric_capability(&FabricCapability::Publish {
3790 subject: SubjectPattern::new("orders.created"),
3791 }));
3792 assert!(cx.check_fabric_capability(&FabricCapability::Subscribe {
3793 subject: SubjectPattern::new("payments.captured"),
3794 }));
3795 }
3796
3797 #[cfg(feature = "messaging-fabric")]
3798 #[test]
3799 fn cx_rejects_empty_stream_capability_names() {
3800 let cx = test_cx();
3801
3802 let error = cx
3803 .grant_fabric_capability(FabricCapability::ConsumeStream {
3804 stream: " ".to_owned(),
3805 })
3806 .expect_err("blank stream names must fail");
3807
3808 assert_eq!(error, FabricCapabilityGrantError::EmptyStreamName);
3809 }
3810
3811 #[test]
3819 fn mr_trace_parent_child_ordering() {
3820 let parent_cx = test_cx();
3821 let trace = TraceBufferHandle::new(16);
3822 parent_cx.set_trace_buffer(trace.clone());
3823
3824 parent_cx.trace("parent trace 1");
3826
3827 let child_cx = parent_cx.clone();
3829 child_cx.trace("child trace 1");
3830 child_cx.trace("child trace 2");
3831
3832 parent_cx.trace("parent trace 2");
3834
3835 let events = trace.snapshot();
3836 assert_eq!(events.len(), 4);
3837
3838 let times: Vec<_> = events
3840 .iter()
3841 .map(|e| e.logical_time.as_ref().expect("logical time"))
3842 .collect();
3843
3844 for i in 1..times.len() {
3848 assert!(
3849 times[i - 1] <= times[i],
3850 "Logical time should be monotonically increasing: {:?} > {:?}",
3851 times[i - 1],
3852 times[i]
3853 );
3854 }
3855 }
3856
3857 #[test]
3861 fn mr_trace_deterministic_interleaving() {
3862 let cx1 = test_cx_with_entropy(42);
3864 let trace1 = TraceBufferHandle::new(16);
3865 cx1.set_trace_buffer(trace1.clone());
3866
3867 for i in 0..5 {
3869 if cx1.random_usize(2) == 0 {
3870 cx1.trace(&format!("branch_a_{}", i));
3871 } else {
3872 cx1.trace(&format!("branch_b_{}", i));
3873 }
3874 }
3875
3876 let cx2 = test_cx_with_entropy(42);
3878 let trace2 = TraceBufferHandle::new(16);
3879 cx2.set_trace_buffer(trace2.clone());
3880
3881 for i in 0..5 {
3882 if cx2.random_usize(2) == 0 {
3883 cx2.trace(&format!("branch_a_{}", i));
3884 } else {
3885 cx2.trace(&format!("branch_b_{}", i));
3886 }
3887 }
3888
3889 let events1 = trace1.snapshot();
3890 let events2 = trace2.snapshot();
3891
3892 assert_eq!(
3894 events1.len(),
3895 events2.len(),
3896 "Trace count should be deterministic"
3897 );
3898
3899 for (i, (e1, e2)) in events1.iter().zip(events2.iter()).enumerate() {
3900 assert_eq!(
3903 trace_message(e1),
3904 trace_message(e2),
3905 "Trace message at index {} should be deterministic: '{}' vs '{}'",
3906 i,
3907 trace_message(e1),
3908 trace_message(e2)
3909 );
3910 }
3911 }
3912
3913 #[test]
3917 fn mr_trace_macaroon_causal_ordering() {
3918 use crate::cx::macaroon::{CaveatPredicate, MacaroonToken};
3919 use crate::security::key::AuthKey;
3920
3921 let key = AuthKey::from_seed(42);
3922 let token = MacaroonToken::mint(&key, "trace:emit", "cx/trace");
3923
3924 let root_cx = test_cx().with_macaroon(token);
3926 let trace = TraceBufferHandle::new(16);
3927 root_cx.set_trace_buffer(trace.clone());
3928
3929 root_cx.trace("root macaroon trace");
3930
3931 let attenuated_cx = root_cx
3933 .attenuate(CaveatPredicate::TimeBefore(5000))
3934 .expect("attenuation should succeed");
3935 attenuated_cx.trace("attenuated trace 1");
3936
3937 let further_attenuated_cx = attenuated_cx
3939 .attenuate(CaveatPredicate::MaxUses(10))
3940 .expect("further attenuation should succeed");
3941 further_attenuated_cx.trace("further attenuated trace");
3942
3943 attenuated_cx.trace("attenuated trace 2");
3945
3946 let events = trace.snapshot();
3947 assert_eq!(events.len(), 4);
3948
3949 let logical_times: Vec<_> = events
3951 .iter()
3952 .map(|e| e.logical_time.as_ref().expect("logical time"))
3953 .collect();
3954
3955 for i in 1..logical_times.len() {
3957 assert!(
3958 logical_times[i - 1] <= logical_times[i],
3959 "Macaroon attenuation should preserve causal ordering: tick {:?} > {:?}",
3960 logical_times[i - 1],
3961 logical_times[i]
3962 );
3963 }
3964 }
3965
3966 #[test]
3970 fn mr_trace_budget_exhaustion_idempotence() {
3971 use crate::types::Budget;
3972
3973 let budget = Budget::new().with_poll_quota(1);
3975 let cx = Cx::for_testing_with_budget(budget);
3976 let trace = TraceBufferHandle::new(16);
3977 cx.set_trace_buffer(trace.clone());
3978
3979 cx.trace("pre-exhaustion trace");
3981
3982 cx.trace("exhaustion trace 1");
3986 cx.trace("exhaustion trace 2"); cx.trace("exhaustion trace 3"); let events = trace.snapshot();
3990
3991 assert_eq!(events.len(), 4, "All traces should be recorded");
3994
3995 let mut logical_times: Vec<_> = events
3997 .iter()
3998 .map(|e| format!("{:?}", e.logical_time.as_ref().expect("logical time")))
3999 .collect();
4000 logical_times.sort_unstable();
4001 logical_times.dedup();
4002
4003 assert_eq!(
4004 logical_times.len(),
4005 4,
4006 "Logical time allocation should be idempotent (no duplicate times)"
4007 );
4008 }
4009
4010 #[test]
4014 fn mr_trace_clone_equivalence() {
4015 let original_cx = test_cx_with_entropy(123);
4016 let trace = TraceBufferHandle::new(16);
4017 original_cx.set_trace_buffer(trace.clone());
4018
4019 let cloned_cx = original_cx.clone();
4021
4022 original_cx.trace("original trace 1");
4024 cloned_cx.trace("cloned trace 1");
4025 original_cx.trace("original trace 2");
4026 cloned_cx.trace("cloned trace 2");
4027
4028 let events = trace.snapshot();
4029 assert_eq!(events.len(), 4, "Both contexts should write to same buffer");
4030
4031 let logical_times: Vec<_> = events
4033 .iter()
4034 .map(|e| e.logical_time.as_ref().expect("logical time"))
4035 .collect();
4036
4037 for i in 1..logical_times.len() {
4038 assert!(
4039 logical_times[i - 1] <= logical_times[i],
4040 "Clone should preserve logical time ordering: {:?} > {:?}",
4041 logical_times[i - 1],
4042 logical_times[i]
4043 );
4044 }
4045
4046 let val1 = original_cx.random_usize(100);
4048 let val2 = cloned_cx.random_usize(100);
4049
4050 assert_eq!(val1, val2, "Cloned context should share entropy state");
4052 }
4053
4054 #[test]
4057 fn mr_trace_composite_ordering() {
4058 use crate::cx::macaroon::{CaveatPredicate, MacaroonToken};
4059 use crate::security::key::AuthKey;
4060
4061 let key = AuthKey::from_seed(789);
4062 let token = MacaroonToken::mint(&key, "trace:composite", "cx/test");
4063
4064 let root_cx = test_cx_with_entropy(456).with_macaroon(token);
4066 let trace = TraceBufferHandle::new(32);
4067 root_cx.set_trace_buffer(trace.clone());
4068
4069 root_cx.trace("parent+macaroon trace");
4071
4072 let child_cx = root_cx.clone();
4074
4075 let attenuated_child = child_cx
4077 .attenuate(CaveatPredicate::TimeBefore(10000))
4078 .expect("attenuation should work");
4079
4080 attenuated_child.trace("child+attenuated trace");
4082
4083 for i in 0..3 {
4085 if root_cx.random_usize(2) == 0 {
4086 root_cx.trace(&format!("parent_branch_{}", i));
4087 } else {
4088 attenuated_child.trace(&format!("child_branch_{}", i));
4089 }
4090 }
4091
4092 let events = trace.snapshot();
4093 assert!(
4094 events.len() >= 5,
4095 "Composite test should produce multiple traces"
4096 );
4097
4098 let logical_times: Vec<_> = events
4102 .iter()
4103 .map(|e| e.logical_time.as_ref().expect("logical time"))
4104 .collect();
4105
4106 for i in 1..logical_times.len() {
4107 assert!(
4108 logical_times[i - 1] <= logical_times[i],
4109 "Composite trace ordering should preserve monotonicity: {:?} > {:?}",
4110 logical_times[i - 1],
4111 logical_times[i]
4112 );
4113 }
4114
4115 assert!(
4117 events.iter().all(|e| !trace_message(e).is_empty()),
4118 "All traces should have non-empty messages"
4119 );
4120
4121 let branch_traces = events
4123 .iter()
4124 .filter(|e| trace_message(e).contains("_branch_"))
4125 .count();
4126 assert_eq!(
4127 branch_traces, 3,
4128 "Deterministic branching should produce exactly 3 branch traces"
4129 );
4130 }
4131}