use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
#[cfg(feature = "native")]
use asupersync::types::Time as NativeTime;
#[cfg(feature = "native")]
use asupersync::types::{CancelKind as NativeCancelKind, CancelReason as NativeCancelReason};
#[cfg(feature = "native")]
use asupersync::{Budget as NativeBudget, Cx as NativeCx};
#[cfg(not(feature = "native"))]
mod native_cx_shim {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NativeCancelKind {
User,
Timeout,
Deadline,
PollQuota,
CostBudget,
FailFast,
RaceLost,
ParentCancelled,
Shutdown,
LinkedExit,
ResourceUnavailable,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeCancelReason {
pub kind: NativeCancelKind,
}
impl NativeCancelReason {
#[must_use]
pub const fn timeout() -> Self {
Self {
kind: NativeCancelKind::Timeout,
}
}
#[must_use]
pub fn user(_message: impl Into<String>) -> Self {
Self {
kind: NativeCancelKind::User,
}
}
#[must_use]
pub const fn parent_cancelled() -> Self {
Self {
kind: NativeCancelKind::ParentCancelled,
}
}
#[must_use]
pub const fn resource_unavailable() -> Self {
Self {
kind: NativeCancelKind::ResourceUnavailable,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NativeCheckpointError;
#[derive(Debug, Default)]
struct NativeCxInner {
cancel_requested: AtomicBool,
cancel_reason: Mutex<Option<NativeCancelReason>>,
}
#[derive(Debug, Clone, Default)]
pub struct NativeCx {
inner: Arc<NativeCxInner>,
}
impl NativeCx {
#[must_use]
pub fn for_testing() -> Self {
Self::default()
}
pub fn set_cancel_requested(&self, requested: bool) {
self.inner
.cancel_requested
.store(requested, Ordering::Release);
if !requested {
*self
.inner
.cancel_reason
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = None;
}
}
pub fn set_cancel_reason(&self, reason: NativeCancelReason) {
*self
.inner
.cancel_reason
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(reason);
self.inner.cancel_requested.store(true, Ordering::Release);
}
#[must_use]
pub fn is_cancel_requested(&self) -> bool {
self.inner.cancel_requested.load(Ordering::Acquire)
}
#[must_use]
pub fn cancel_reason(&self) -> Option<NativeCancelReason> {
self.inner
.cancel_reason
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
pub fn checkpoint(&self) -> std::result::Result<(), NativeCheckpointError> {
if self.is_cancel_requested() {
Err(NativeCheckpointError)
} else {
Ok(())
}
}
}
}
#[cfg(not(feature = "native"))]
use native_cx_shim::NativeCx;
use crate::eprocess::{EProcessDecision, EProcessOracle, EProcessSnapshot};
pub const SQLITE_INTERRUPT: i32 = 9;
pub const MAX_MASK_DEPTH: u32 = 64;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CancelState {
Created,
Running,
CancelRequested,
Cancelling,
Finalizing,
Completed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum CancelReason {
Timeout = 0,
UserInterrupt = 1,
RegionClose = 2,
Abort = 3,
}
pub mod cap {
mod sealed {
pub trait Sealed {}
pub struct Bit<const V: bool>;
pub trait Le {}
impl Le for (Bit<false>, Bit<false>) {}
impl Le for (Bit<false>, Bit<true>) {}
impl Le for (Bit<true>, Bit<true>) {}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct CapSet<
const SPAWN: bool,
const TIME: bool,
const RANDOM: bool,
const IO: bool,
const REMOTE: bool,
>;
impl<
const SPAWN: bool,
const TIME: bool,
const RANDOM: bool,
const IO: bool,
const REMOTE: bool,
> sealed::Sealed for CapSet<SPAWN, TIME, RANDOM, IO, REMOTE>
{
}
pub type All = CapSet<true, true, true, true, true>;
pub type None = CapSet<false, false, false, false, false>;
pub trait SubsetOf<Super>: sealed::Sealed {}
impl<
const S_SPAWN: bool,
const S_TIME: bool,
const S_RANDOM: bool,
const S_IO: bool,
const S_REMOTE: bool,
const P_SPAWN: bool,
const P_TIME: bool,
const P_RANDOM: bool,
const P_IO: bool,
const P_REMOTE: bool,
> SubsetOf<CapSet<P_SPAWN, P_TIME, P_RANDOM, P_IO, P_REMOTE>>
for CapSet<S_SPAWN, S_TIME, S_RANDOM, S_IO, S_REMOTE>
where
(sealed::Bit<S_SPAWN>, sealed::Bit<P_SPAWN>): sealed::Le,
(sealed::Bit<S_TIME>, sealed::Bit<P_TIME>): sealed::Le,
(sealed::Bit<S_RANDOM>, sealed::Bit<P_RANDOM>): sealed::Le,
(sealed::Bit<S_IO>, sealed::Bit<P_IO>): sealed::Le,
(sealed::Bit<S_REMOTE>, sealed::Bit<P_REMOTE>): sealed::Le,
{
}
pub trait HasSpawn: sealed::Sealed {}
impl<const TIME: bool, const RANDOM: bool, const IO: bool, const REMOTE: bool> HasSpawn
for CapSet<true, TIME, RANDOM, IO, REMOTE>
{
}
pub trait HasTime: sealed::Sealed {}
impl<const SPAWN: bool, const RANDOM: bool, const IO: bool, const REMOTE: bool> HasTime
for CapSet<SPAWN, true, RANDOM, IO, REMOTE>
{
}
pub trait HasRandom: sealed::Sealed {}
impl<const SPAWN: bool, const TIME: bool, const IO: bool, const REMOTE: bool> HasRandom
for CapSet<SPAWN, TIME, true, IO, REMOTE>
{
}
pub trait HasIo: sealed::Sealed {}
impl<const SPAWN: bool, const TIME: bool, const RANDOM: bool, const REMOTE: bool> HasIo
for CapSet<SPAWN, TIME, RANDOM, true, REMOTE>
{
}
pub trait HasRemote: sealed::Sealed {}
impl<const SPAWN: bool, const TIME: bool, const RANDOM: bool, const IO: bool> HasRemote
for CapSet<SPAWN, TIME, RANDOM, IO, true>
{
}
}
pub type FullCaps = cap::All;
pub type StorageCaps = cap::CapSet<false, true, false, true, false>;
pub type ComputeCaps = cap::None;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Budget {
pub deadline: Option<Duration>,
pub poll_quota: u32,
pub cost_quota: Option<u64>,
pub priority: u8,
}
impl Budget {
pub const INFINITE: Self = Self {
deadline: None,
poll_quota: u32::MAX,
cost_quota: None,
priority: 0,
};
pub const MINIMAL: Self = Self {
deadline: None,
poll_quota: 100,
cost_quota: None,
priority: 0,
};
#[must_use]
pub const fn with_deadline(self, deadline: Duration) -> Self {
Self {
deadline: Some(deadline),
..self
}
}
#[must_use]
pub const fn with_priority(self, priority: u8) -> Self {
Self { priority, ..self }
}
#[must_use]
pub const fn with_poll_quota(self, poll_quota: u32) -> Self {
Self { poll_quota, ..self }
}
#[must_use]
pub const fn with_cost_quota(self, cost_quota: u64) -> Self {
Self {
cost_quota: Some(cost_quota),
..self
}
}
#[must_use]
pub fn meet(self, other: Self) -> Self {
Self {
deadline: match (self.deadline, other.deadline) {
(Some(a), Some(b)) => Some(a.min(b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
},
poll_quota: self.poll_quota.min(other.poll_quota),
cost_quota: match (self.cost_quota, other.cost_quota) {
(Some(a), Some(b)) => Some(a.min(b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
},
priority: self.priority.max(other.priority),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ErrorKind {
Cancelled,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Error {
kind: ErrorKind,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.kind {
ErrorKind::Cancelled => write!(f, "operation cancelled"),
}
}
}
impl std::error::Error for Error {}
impl Error {
#[must_use]
pub const fn cancelled() -> Self {
Self {
kind: ErrorKind::Cancelled,
}
}
#[must_use]
pub const fn kind(&self) -> ErrorKind {
self.kind
}
#[must_use]
pub const fn sqlite_error_code(&self) -> i32 {
match self.kind {
ErrorKind::Cancelled => SQLITE_INTERRUPT,
}
}
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
struct CxInner {
cancel_requested: AtomicBool,
cancel_state: Mutex<CancelState>,
cancel_reason: Mutex<Option<CancelReason>>,
mask_depth: AtomicU32,
children: Mutex<Vec<Weak<Self>>>,
last_checkpoint_msg: Mutex<Option<String>>,
last_eprocess_decision: Mutex<Option<EProcessDecision>>,
eprocess_oracle: std::sync::OnceLock<Arc<EProcessOracle>>,
#[cfg(feature = "native")]
attached_native_cx: Mutex<Option<NativeCx>>,
#[cfg(feature = "native")]
fallback_native_cx: std::sync::OnceLock<NativeCx>,
unix_millis: AtomicU64,
}
impl CxInner {
fn new() -> Self {
Self {
cancel_requested: AtomicBool::new(false),
cancel_state: Mutex::new(CancelState::Created),
cancel_reason: Mutex::new(None),
mask_depth: AtomicU32::new(0),
children: Mutex::new(Vec::new()),
last_checkpoint_msg: Mutex::new(None),
last_eprocess_decision: Mutex::new(None),
eprocess_oracle: std::sync::OnceLock::new(),
#[cfg(feature = "native")]
attached_native_cx: Mutex::new(None),
#[cfg(feature = "native")]
fallback_native_cx: std::sync::OnceLock::new(),
unix_millis: AtomicU64::new(0),
}
}
}
#[cfg(feature = "native")]
#[must_use]
fn local_reason_to_native(reason: CancelReason) -> NativeCancelReason {
match reason {
CancelReason::Timeout => NativeCancelReason::timeout(),
CancelReason::UserInterrupt => NativeCancelReason::user("sqlite interrupt"),
CancelReason::RegionClose => NativeCancelReason::parent_cancelled(),
CancelReason::Abort => NativeCancelReason::resource_unavailable(),
}
}
#[cfg(feature = "native")]
#[must_use]
fn native_reason_to_local(reason: &NativeCancelReason) -> CancelReason {
match reason.kind {
NativeCancelKind::User => CancelReason::UserInterrupt,
NativeCancelKind::Timeout
| NativeCancelKind::Deadline
| NativeCancelKind::PollQuota
| NativeCancelKind::CostBudget => CancelReason::Timeout,
NativeCancelKind::FailFast
| NativeCancelKind::RaceLost
| NativeCancelKind::ParentCancelled
| NativeCancelKind::Shutdown
| NativeCancelKind::LinkedExit => CancelReason::RegionClose,
NativeCancelKind::ResourceUnavailable => CancelReason::Abort,
}
}
#[cfg(feature = "native")]
fn sync_native_cx_cancel(inner: &CxInner, reason: CancelReason) {
let attached_native = inner
.attached_native_cx
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.as_ref()
.cloned();
if let Some(native) = attached_native {
native.set_cancel_reason(local_reason_to_native(reason));
}
if let Some(native) = inner.fallback_native_cx.get() {
native.set_cancel_reason(local_reason_to_native(reason));
}
}
#[cfg(feature = "native")]
#[must_use]
#[allow(dead_code)]
fn native_budget_from_local(budget: Budget) -> NativeBudget {
let mut native_budget = NativeBudget::new()
.with_poll_quota(budget.poll_quota)
.with_priority(budget.priority);
if let Some(cost_quota) = budget.cost_quota {
native_budget = native_budget.with_cost_quota(cost_quota);
}
if let Some(deadline) = budget.deadline {
native_budget = native_budget.with_deadline(local_deadline_to_native_time(deadline));
}
native_budget
}
#[cfg(feature = "native")]
#[must_use]
#[allow(dead_code)]
fn wall_clock_now_since_epoch() -> Duration {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
}
#[cfg(feature = "native")]
#[must_use]
#[allow(dead_code)]
fn local_deadline_to_native_time(deadline: Duration) -> NativeTime {
let absolute_deadline = wall_clock_now_since_epoch()
.checked_add(deadline)
.unwrap_or(Duration::MAX);
let nanos = u64::try_from(absolute_deadline.as_nanos()).unwrap_or(u64::MAX);
NativeTime::from_nanos(nanos)
}
fn propagate_cancel(inner: &CxInner, reason: CancelReason) {
inner.cancel_requested.store(true, Ordering::Release);
{
let mut r = inner
.cancel_reason
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
match *r {
Some(existing) if existing >= reason => {}
_ => *r = Some(reason),
}
}
{
let mut state = inner
.cancel_state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if matches!(*state, CancelState::Created | CancelState::Running) {
*state = CancelState::CancelRequested;
}
}
#[cfg(feature = "native")]
sync_native_cx_cancel(inner, reason);
let children: Vec<Arc<CxInner>> = {
let mut guard = inner
.children
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
guard.retain(|child| child.strong_count() > 0);
guard.iter().filter_map(Weak::upgrade).collect()
};
for child in &children {
propagate_cancel(child, reason);
}
}
#[derive(Debug)]
pub struct Cx<Caps: cap::SubsetOf<cap::All> = FullCaps> {
inner: Arc<CxInner>,
budget: Budget,
trace_id: u64,
decision_id: u64,
policy_id: u64,
_caps: PhantomData<fn() -> Caps>,
}
impl<Caps: cap::SubsetOf<cap::All>> Clone for Cx<Caps> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
budget: self.budget,
trace_id: self.trace_id,
decision_id: self.decision_id,
policy_id: self.policy_id,
_caps: PhantomData,
}
}
}
impl Default for Cx<FullCaps> {
fn default() -> Self {
Self::new()
}
}
impl Cx<FullCaps> {
#[must_use]
pub fn new() -> Self {
Self::with_budget(Budget::INFINITE)
}
}
impl<Caps: cap::SubsetOf<cap::All>> Cx<Caps> {
#[cfg(feature = "native")]
#[must_use]
#[allow(dead_code)]
fn effective_native_cx(&self) -> NativeCx {
let attached_native = self
.inner
.attached_native_cx
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.as_ref()
.cloned();
if let Some(native) = attached_native {
return native;
}
self.inner
.fallback_native_cx
.get_or_init(|| {
let native =
NativeCx::for_request_with_budget(native_budget_from_local(self.budget));
if let Some(reason) = self.cancel_reason() {
native.set_cancel_reason(local_reason_to_native(reason));
} else if self.is_cancel_requested() {
native.set_cancel_requested(true);
}
native
})
.clone()
}
#[cfg(feature = "native")]
#[must_use]
fn native_cx_for_checkpoint(&self) -> Option<NativeCx> {
let attached_native = self
.inner
.attached_native_cx
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.as_ref()
.cloned();
attached_native.or_else(|| self.inner.fallback_native_cx.get().cloned())
}
#[must_use]
pub fn with_budget(budget: Budget) -> Self {
Self {
inner: Arc::new(CxInner::new()),
budget,
trace_id: 0,
decision_id: 0,
policy_id: 0,
_caps: PhantomData,
}
}
#[must_use]
pub fn budget(&self) -> Budget {
self.budget
}
#[must_use]
pub fn trace_id(&self) -> u64 {
self.trace_id
}
#[must_use]
pub fn decision_id(&self) -> u64 {
self.decision_id
}
#[must_use]
pub fn policy_id(&self) -> u64 {
self.policy_id
}
#[must_use]
pub fn with_trace_context(mut self, trace_id: u64, decision_id: u64, policy_id: u64) -> Self {
self.trace_id = trace_id;
self.decision_id = decision_id;
self.policy_id = policy_id;
self
}
#[must_use]
pub fn with_decision_id(mut self, decision_id: u64) -> Self {
self.decision_id = decision_id;
self
}
#[must_use]
pub fn with_policy_id(mut self, policy_id: u64) -> Self {
self.policy_id = policy_id;
self
}
#[must_use]
pub fn scope_with_budget(&self, child: Budget) -> Self {
Self {
inner: Arc::clone(&self.inner),
budget: self.budget.meet(child),
trace_id: self.trace_id,
decision_id: self.decision_id,
policy_id: self.policy_id,
_caps: PhantomData,
}
}
#[must_use]
pub fn cleanup_scope(&self) -> Self {
self.scope_with_budget(Budget::MINIMAL)
}
#[must_use]
pub fn restrict<NewCaps>(&self) -> Cx<NewCaps>
where
NewCaps: cap::SubsetOf<cap::All> + cap::SubsetOf<Caps>,
{
self.retype()
}
#[must_use]
fn retype<NewCaps>(&self) -> Cx<NewCaps>
where
NewCaps: cap::SubsetOf<cap::All>,
{
Cx {
inner: Arc::clone(&self.inner),
budget: self.budget,
trace_id: self.trace_id,
decision_id: self.decision_id,
policy_id: self.policy_id,
_caps: PhantomData,
}
}
#[must_use]
pub fn is_cancel_requested(&self) -> bool {
self.inner.cancel_requested.load(Ordering::Acquire)
}
pub fn cancel(&self) {
self.cancel_with_reason(CancelReason::UserInterrupt);
}
pub fn cancel_with_reason(&self, reason: CancelReason) {
propagate_cancel(&self.inner, reason);
}
#[must_use]
pub fn cancel_state(&self) -> CancelState {
*self
.inner
.cancel_state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
#[must_use]
pub fn cancel_reason(&self) -> Option<CancelReason> {
*self
.inner
.cancel_reason
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
pub fn transition_to_running(&self) {
let mut state = self
.inner
.cancel_state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if *state == CancelState::Created {
*state = CancelState::Running;
}
}
pub fn transition_to_finalizing(&self) {
let mut state = self
.inner
.cancel_state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if *state == CancelState::Cancelling {
*state = CancelState::Finalizing;
}
}
pub fn transition_to_completed(&self) {
let mut state = self
.inner
.cancel_state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if matches!(*state, CancelState::Finalizing | CancelState::Running) {
*state = CancelState::Completed;
}
}
pub fn set_eprocess_oracle(&self, oracle: Arc<EProcessOracle>) {
let _ = self.inner.eprocess_oracle.set(oracle);
}
pub fn clear_eprocess_oracle(&self) {
}
#[cfg(feature = "native")]
pub fn set_native_cx(&self, native_cx: NativeCx) {
if let Some(reason) = self.cancel_reason() {
native_cx.set_cancel_reason(local_reason_to_native(reason));
} else if self.is_cancel_requested() {
native_cx.set_cancel_requested(true);
}
*self
.inner
.attached_native_cx
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(native_cx);
}
#[cfg(not(feature = "native"))]
pub fn set_native_cx<T>(&self, _native_cx: T) {}
#[cfg(feature = "native")]
#[must_use]
pub fn attached_native_cx(&self) -> Option<NativeCx> {
self.inner
.attached_native_cx
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
#[cfg(not(feature = "native"))]
#[must_use]
pub fn attached_native_cx(&self) -> Option<NativeCx> {
None
}
#[cfg(feature = "native")]
pub fn clear_native_cx(&self) {
*self
.inner
.attached_native_cx
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = None;
}
#[cfg(not(feature = "native"))]
pub fn clear_native_cx(&self) {}
#[must_use]
fn maybe_cancel_via_eprocess(&self) -> bool {
let Some(oracle) = self.inner.eprocess_oracle.get() else {
return false;
};
let decision = oracle.decision(self.budget.priority);
self.record_eprocess_decision(decision.clone());
tracing::debug!(
target: "fsqlite::cx",
event = "eprocess_checkpoint",
trace_id = self.trace_id,
decision_id = self.decision_id,
policy_id = self.policy_id,
priority = decision.priority,
evalue = decision.snapshot.evalue,
threshold = decision.snapshot.rejection_threshold,
observations = decision.snapshot.observations,
priority_threshold = decision.snapshot.priority_threshold,
should_shed = decision.should_shed,
signal = ?decision.snapshot.last_signal
);
if decision.should_shed {
tracing::info!(
target: "fsqlite::cx",
event = "eprocess_shedding_triggered",
trace_id = self.trace_id,
decision_id = self.decision_id,
policy_id = self.policy_id,
priority = decision.priority,
evalue = decision.snapshot.evalue,
threshold = decision.snapshot.rejection_threshold,
signal = ?decision.snapshot.last_signal
);
self.cancel_with_reason(CancelReason::Abort);
return true;
}
false
}
#[cfg(feature = "native")]
#[must_use]
fn maybe_cancel_via_native_cx(&self, masked: bool) -> bool {
let Some(native) = self.native_cx_for_checkpoint() else {
return false;
};
if masked {
if native.is_cancel_requested() {
let reason = native
.cancel_reason()
.as_ref()
.map_or(CancelReason::Timeout, native_reason_to_local);
self.cancel_with_reason(reason);
return true;
}
return false;
}
if native.checkpoint().is_err() {
let reason = native
.cancel_reason()
.as_ref()
.map_or(CancelReason::Timeout, native_reason_to_local);
self.cancel_with_reason(reason);
return true;
}
false
}
pub fn checkpoint(&self) -> Result<()> {
let cancel_requested = self.inner.cancel_requested.load(Ordering::Acquire);
if !cancel_requested {
if !self.maybe_cancel_via_eprocess() {
#[cfg(feature = "native")]
{
let masked = self.inner.mask_depth.load(Ordering::Acquire) > 0;
if !self.maybe_cancel_via_native_cx(masked) {
return Ok(());
}
}
#[cfg(not(feature = "native"))]
{
return Ok(());
}
}
}
let masked = self.inner.mask_depth.load(Ordering::Acquire) > 0;
if masked {
return Ok(());
}
{
let mut state = self
.inner
.cancel_state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if *state == CancelState::CancelRequested {
*state = CancelState::Cancelling;
}
}
Err(Error::cancelled())
}
pub fn checkpoint_with(&self, msg: impl Into<String>) -> Result<()> {
{
let mut guard = self
.inner
.last_checkpoint_msg
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*guard = Some(msg.into());
}
self.checkpoint()
}
#[must_use]
pub fn last_checkpoint_message(&self) -> Option<String> {
self.inner
.last_checkpoint_msg
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
#[must_use]
pub fn last_eprocess_decision(&self) -> Option<EProcessDecision> {
self.inner
.last_eprocess_decision
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
#[must_use]
pub fn last_eprocess_snapshot(&self) -> Option<EProcessSnapshot> {
self.last_eprocess_decision()
.map(|decision| decision.snapshot)
}
fn record_eprocess_decision(&self, decision: EProcessDecision) {
*self
.inner
.last_eprocess_decision
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(decision);
}
#[must_use]
pub fn masked(&self) -> MaskGuard<'_> {
let prev = self.inner.mask_depth.fetch_add(1, Ordering::AcqRel);
if prev >= MAX_MASK_DEPTH {
self.inner.mask_depth.fetch_sub(1, Ordering::Release);
assert!(
prev < MAX_MASK_DEPTH,
"MAX_MASK_DEPTH ({MAX_MASK_DEPTH}) exceeded: mask nesting depth would be {}",
prev + 1
);
}
MaskGuard { inner: &self.inner }
}
#[must_use]
pub fn mask_depth(&self) -> u32 {
self.inner.mask_depth.load(Ordering::Acquire)
}
pub fn commit_section<R>(
&self,
poll_quota: u32,
body: impl FnOnce(&CommitCtx) -> R,
finalizer: impl FnOnce(),
) -> R {
struct FinGuard<G: FnOnce()>(Option<G>);
impl<G: FnOnce()> Drop for FinGuard<G> {
fn drop(&mut self) {
if let Some(f) = self.0.take() {
f();
}
}
}
let _mask = self.masked();
let _fin = FinGuard(Some(finalizer));
let ctx = CommitCtx::new(poll_quota);
body(&ctx)
}
#[must_use]
pub fn create_child(&self) -> Self {
let mut child = Self::with_budget(self.budget);
child.trace_id = self.trace_id;
child.decision_id = self.decision_id;
child.policy_id = self.policy_id;
if let Some(oracle) = self.inner.eprocess_oracle.get().cloned() {
child.set_eprocess_oracle(oracle);
}
#[cfg(feature = "native")]
if let Some(native_cx) = self.attached_native_cx() {
child.set_native_cx(native_cx);
}
{
let mut children = self
.inner
.children
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
children.push(Arc::downgrade(&child.inner));
}
if let Some(reason) = self.cancel_reason() {
child.cancel_with_reason(reason);
} else if self.is_cancel_requested() {
child.cancel();
}
child
}
pub fn set_unix_millis_for_testing(&self, millis: u64)
where
Caps: cap::HasTime,
{
self.inner.unix_millis.store(millis, Ordering::Release);
}
#[must_use]
pub fn current_time_julian_day(&self) -> f64
where
Caps: cap::HasTime,
{
let millis = self.inner.unix_millis.load(Ordering::Acquire);
#[allow(clippy::cast_precision_loss)]
let secs = (millis as f64) / 1000.0;
2_440_587.5 + (secs / 86_400.0)
}
}
#[derive(Debug)]
pub struct MaskGuard<'a> {
inner: &'a CxInner,
}
impl Drop for MaskGuard<'_> {
fn drop(&mut self) {
self.inner.mask_depth.fetch_sub(1, Ordering::Release);
}
}
#[derive(Debug)]
pub struct CommitCtx {
poll_remaining: AtomicU32,
}
impl CommitCtx {
fn new(poll_quota: u32) -> Self {
Self {
poll_remaining: AtomicU32::new(poll_quota),
}
}
#[must_use]
pub fn poll_remaining(&self) -> u32 {
self.poll_remaining.load(Ordering::Acquire)
}
pub fn tick(&self) -> bool {
let prev = self.poll_remaining.load(Ordering::Acquire);
if prev == 0 {
return false;
}
self.poll_remaining.fetch_sub(1, Ordering::AcqRel);
true
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::eprocess::{EProcessConfig, EProcessSignal};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};
#[test]
fn test_cx_checkpoint_observes_cancellation() {
let cx = Cx::new();
assert!(cx.checkpoint().is_ok());
cx.cancel();
let err = cx.checkpoint().unwrap_err();
assert_eq!(err.kind(), ErrorKind::Cancelled);
assert_eq!(err.sqlite_error_code(), SQLITE_INTERRUPT);
}
#[test]
fn test_cx_capability_narrowing_compiles() {
let cx = Cx::<FullCaps>::new();
let _compute = cx.restrict::<ComputeCaps>();
let _storage = cx.restrict::<StorageCaps>();
}
#[test]
fn test_cx_budget_meet_tightens() {
let parent = Budget::INFINITE.with_deadline(Duration::from_millis(100));
let child = Budget::INFINITE.with_deadline(Duration::from_millis(200));
let effective = parent.meet(child);
assert_eq!(effective.deadline, Some(Duration::from_millis(100)));
}
#[test]
fn test_cx_budget_priority_join() {
let parent = Budget::INFINITE.with_priority(2);
let child = Budget::INFINITE.with_priority(5);
let effective = parent.meet(child);
assert_eq!(effective.priority, 5);
}
#[test]
fn test_cx_scope_with_budget_cannot_loosen() {
let cx =
Cx::<FullCaps>::with_budget(Budget::INFINITE.with_deadline(Duration::from_millis(50)));
let child = Budget::INFINITE.with_deadline(Duration::from_millis(100));
let scoped = cx.scope_with_budget(child);
assert_eq!(scoped.budget().deadline, Some(Duration::from_millis(50)));
}
#[test]
fn test_cx_checkpoint_with_message_records_message() {
let cx = Cx::new();
assert!(cx.checkpoint_with("vdbe pc=5").is_ok());
assert_eq!(cx.last_checkpoint_message().as_deref(), Some("vdbe pc=5"));
}
#[test]
fn test_cx_cleanup_uses_minimal_budget() {
let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_poll_quota(10_000));
let cleanup = cx.cleanup_scope();
assert_eq!(cleanup.budget(), Budget::MINIMAL);
}
#[test]
fn test_cx_restrict_storage_to_compute() {
let cx = Cx::<FullCaps>::new();
let storage = cx.restrict::<StorageCaps>();
let _compute = storage.restrict::<ComputeCaps>();
}
#[test]
fn test_cx_restrict_is_zero_cost() {
assert_eq!(
std::mem::size_of::<Cx<FullCaps>>(),
std::mem::size_of::<Cx<ComputeCaps>>()
);
}
#[test]
fn test_budget_mixed_lattice() {
let a = Budget {
deadline: Some(Duration::from_millis(100)),
poll_quota: 500,
cost_quota: Some(1000),
priority: 2,
};
let b = Budget {
deadline: Some(Duration::from_millis(200)),
poll_quota: 300,
cost_quota: Some(2000),
priority: 5,
};
let m = a.meet(b);
assert_eq!(m.deadline, Some(Duration::from_millis(100)));
assert_eq!(m.poll_quota, 300);
assert_eq!(m.cost_quota, Some(1000));
assert_eq!(m.priority, 5);
}
#[test]
fn test_budget_meet_commutative() {
let a = Budget {
deadline: Some(Duration::from_millis(50)),
poll_quota: 400,
cost_quota: Some(800),
priority: 3,
};
let b = Budget {
deadline: Some(Duration::from_millis(150)),
poll_quota: 200,
cost_quota: None,
priority: 7,
};
assert_eq!(a.meet(b), b.meet(a));
}
#[test]
fn test_budget_meet_associative() {
let a = Budget::INFINITE
.with_deadline(Duration::from_millis(50))
.with_poll_quota(100)
.with_priority(1);
let b = Budget::INFINITE
.with_deadline(Duration::from_millis(150))
.with_poll_quota(200)
.with_priority(5);
let c = Budget::INFINITE
.with_deadline(Duration::from_millis(75))
.with_poll_quota(50)
.with_priority(3);
assert_eq!(a.meet(b).meet(c), a.meet(b.meet(c)));
}
#[test]
fn test_budget_minimal_is_stricter_than_normal() {
let normal = Budget::INFINITE.with_poll_quota(10_000);
let effective = normal.meet(Budget::MINIMAL);
assert_eq!(effective.poll_quota, Budget::MINIMAL.poll_quota);
}
#[test]
fn test_cx_cancel_shared_across_clones() {
let cx1 = Cx::<FullCaps>::new();
let cx2 = cx1.clone();
assert!(!cx2.is_cancel_requested());
cx1.cancel();
assert!(cx2.is_cancel_requested());
assert!(cx2.checkpoint().is_err());
}
#[test]
fn test_cx_cancel_shared_across_restrict() {
let cx = Cx::<FullCaps>::new();
let compute = cx.restrict::<ComputeCaps>();
cx.cancel();
assert!(compute.checkpoint().is_err());
}
#[test]
fn test_cx_current_time_julian_day() {
let cx = Cx::<FullCaps>::new();
cx.set_unix_millis_for_testing(0);
let jd = cx.current_time_julian_day();
assert!((jd - 2_440_587.5).abs() < 1e-10);
cx.set_unix_millis_for_testing(86_400_000);
let jd = cx.current_time_julian_day();
assert!((jd - 2_440_588.5).abs() < 1e-10);
}
#[test]
fn test_capset_is_zero_sized() {
assert_eq!(std::mem::size_of::<cap::All>(), 0);
assert_eq!(std::mem::size_of::<cap::None>(), 0);
assert_eq!(
std::mem::size_of::<cap::CapSet<true, false, true, false, true>>(),
0
);
}
#[test]
fn test_cx_checkpoint_not_cancelled() {
let cx = Cx::new();
assert!(cx.checkpoint().is_ok());
assert!(cx.checkpoint_with("still going").is_ok());
}
#[test]
fn test_cx_checkpoint_maps_to_sqlite_interrupt() {
let cx = Cx::new();
cx.cancel();
let err = cx.checkpoint().unwrap_err();
assert_eq!(err.sqlite_error_code(), SQLITE_INTERRUPT);
}
#[test]
fn test_cx_checkpoint_eprocess_sheds_low_priority_context() {
let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(3));
let oracle = Arc::new(EProcessOracle::new(
EProcessConfig {
p0: 0.1,
lambda: 5.0,
alpha: 0.05,
max_evalue: 1e12,
},
1,
));
let signal = EProcessSignal::new(1.0, 1.0, 1.0);
oracle.observe_signal(signal);
oracle.observe_signal(signal);
cx.set_eprocess_oracle(oracle);
let err = cx.checkpoint().unwrap_err();
assert_eq!(err.kind(), ErrorKind::Cancelled);
assert_eq!(cx.cancel_reason(), Some(CancelReason::Abort));
let decision = cx
.last_eprocess_decision()
.expect("checkpoint should record an e-process decision");
assert!(decision.should_shed);
assert_eq!(decision.snapshot.last_signal, Some(signal));
}
#[test]
fn test_cx_checkpoint_eprocess_respects_priority_threshold() {
let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(1));
let oracle = Arc::new(EProcessOracle::new(
EProcessConfig {
p0: 0.1,
lambda: 5.0,
alpha: 0.05,
max_evalue: 1e12,
},
1,
));
let signal = EProcessSignal::new(1.0, 1.0, 1.0);
oracle.observe_signal(signal);
oracle.observe_signal(signal);
cx.set_eprocess_oracle(oracle);
assert!(cx.checkpoint().is_ok());
assert!(!cx.is_cancel_requested());
let decision = cx
.last_eprocess_decision()
.expect("checkpoint should still record non-shedding decisions");
assert!(!decision.should_shed);
assert_eq!(decision.priority, 1);
assert_eq!(decision.snapshot.last_signal, Some(signal));
}
#[test]
fn test_cx_checkpoint_eprocess_preserves_masking_semantics() {
let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(3));
let oracle = Arc::new(EProcessOracle::new(
EProcessConfig {
p0: 0.1,
lambda: 5.0,
alpha: 0.05,
max_evalue: 1e12,
},
1,
));
let signal = EProcessSignal::new(1.0, 1.0, 1.0);
oracle.observe_signal(signal);
oracle.observe_signal(signal);
cx.set_eprocess_oracle(oracle);
{
let _mask = cx.masked();
assert!(cx.checkpoint().is_ok());
assert!(cx.is_cancel_requested());
assert_eq!(cx.cancel_state(), CancelState::CancelRequested);
assert_eq!(
cx.last_eprocess_snapshot()
.expect("checkpoint should record the masked decision")
.last_signal,
Some(signal)
);
}
let err = cx.checkpoint().unwrap_err();
assert_eq!(err.kind(), ErrorKind::Cancelled);
}
#[test]
fn test_create_child_inherits_eprocess_oracle() {
let parent = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(3));
let oracle = Arc::new(EProcessOracle::new(
EProcessConfig {
p0: 0.1,
lambda: 5.0,
alpha: 0.05,
max_evalue: 1e12,
},
1,
));
let signal = EProcessSignal::new(1.0, 1.0, 1.0);
oracle.observe_signal(signal);
oracle.observe_signal(signal);
parent.set_eprocess_oracle(oracle);
let child = parent.create_child();
let err = child.checkpoint().unwrap_err();
assert_eq!(err.kind(), ErrorKind::Cancelled);
assert_eq!(child.cancel_reason(), Some(CancelReason::Abort));
assert_eq!(
child
.last_eprocess_snapshot()
.expect("child checkpoint should record inherited oracle decision")
.last_signal,
Some(signal)
);
}
#[test]
fn test_create_child_inherits_preexisting_parent_cancellation() {
let parent = Cx::<FullCaps>::new();
parent.cancel_with_reason(CancelReason::RegionClose);
let child = parent.create_child();
assert_eq!(child.cancel_reason(), Some(CancelReason::RegionClose));
assert_eq!(child.cancel_state(), CancelState::CancelRequested);
let err = child.checkpoint().unwrap_err();
assert_eq!(err.kind(), ErrorKind::Cancelled);
}
#[cfg(feature = "native")]
#[test]
fn test_cx_checkpoint_native_cx_cancellation_maps_reason() {
let cx = Cx::<FullCaps>::new();
let native = NativeCx::for_testing();
cx.set_native_cx(native.clone());
native.set_cancel_reason(NativeCancelReason::timeout());
let err = cx.checkpoint().unwrap_err();
assert_eq!(err.kind(), ErrorKind::Cancelled);
assert_eq!(cx.cancel_reason(), Some(CancelReason::Timeout));
}
#[cfg(feature = "native")]
#[test]
fn test_cx_cancel_reason_propagates_to_native_cx() {
let cx = Cx::<FullCaps>::new();
let native = NativeCx::for_testing();
cx.set_native_cx(native.clone());
cx.cancel_with_reason(CancelReason::RegionClose);
let reason = native
.cancel_reason()
.expect("native cancel reason must be set");
assert_eq!(reason.kind, NativeCancelKind::ParentCancelled);
}
#[cfg(feature = "native")]
#[test]
fn test_cx_checkpoint_native_cx_respects_local_masking() {
let cx = Cx::<FullCaps>::new();
let native = NativeCx::for_testing();
cx.set_native_cx(native.clone());
native.set_cancel_reason(NativeCancelReason::user("cancel"));
{
let _mask = cx.masked();
assert!(cx.checkpoint().is_ok());
assert!(cx.is_cancel_requested());
assert_eq!(cx.cancel_state(), CancelState::CancelRequested);
}
let err = cx.checkpoint().unwrap_err();
assert_eq!(err.kind(), ErrorKind::Cancelled);
}
#[cfg(feature = "native")]
#[test]
fn test_cx_effective_native_cx_uses_fallback_without_marking_explicit_attachment() {
let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(7));
assert!(cx.attached_native_cx().is_none());
let native = cx.effective_native_cx();
assert!(cx.attached_native_cx().is_none());
assert!(native.checkpoint().is_ok());
}
#[cfg(feature = "native")]
#[test]
fn test_cx_checkpoint_without_native_context_does_not_create_fallback() {
let cx = Cx::<FullCaps>::new();
assert!(cx.inner.fallback_native_cx.get().is_none());
assert!(cx.checkpoint().is_ok());
assert!(cx.inner.fallback_native_cx.get().is_none());
}
#[cfg(feature = "native")]
#[test]
fn test_cx_set_native_cx_replaces_fallback_context() {
let cx = Cx::<FullCaps>::new();
let _ = cx.effective_native_cx();
let replacement = NativeCx::for_testing();
cx.set_native_cx(replacement.clone());
replacement.set_cancel_reason(NativeCancelReason::timeout());
let err = cx.checkpoint().unwrap_err();
assert_eq!(err.kind(), ErrorKind::Cancelled);
assert_eq!(cx.cancel_reason(), Some(CancelReason::Timeout));
}
#[cfg(feature = "native")]
#[test]
fn test_create_child_copies_preexisting_cancellation_into_fallback_native_cx() {
let parent = Cx::<FullCaps>::new();
parent.cancel_with_reason(CancelReason::RegionClose);
let child = parent.create_child();
let reason = child
.effective_native_cx()
.cancel_reason()
.expect("fallback native cx should mirror inherited cancellation");
assert_eq!(reason.kind, NativeCancelKind::ParentCancelled);
}
#[cfg(feature = "native")]
#[test]
fn test_create_child_inherits_explicit_native_cx_attachment() {
let parent = Cx::<FullCaps>::new();
let native = NativeCx::for_testing();
parent.set_native_cx(native.clone());
let child = parent.create_child();
assert!(child.attached_native_cx().is_some());
native.set_cancel_reason(NativeCancelReason::timeout());
let err = child
.checkpoint()
.expect_err("child should observe inherited native cancel");
assert_eq!(err.kind(), ErrorKind::Cancelled);
assert_eq!(child.cancel_reason(), Some(CancelReason::Timeout));
}
#[test]
fn test_budget_infinite_is_identity_for_meet() {
let budget = Budget {
deadline: Some(Duration::from_millis(42)),
poll_quota: 500,
cost_quota: Some(1000),
priority: 7,
};
assert_eq!(budget.meet(Budget::INFINITE), budget);
assert_eq!(Budget::INFINITE.meet(budget), budget);
}
#[test]
fn test_budget_none_constraints_propagate() {
let a = Budget {
deadline: None,
poll_quota: u32::MAX,
cost_quota: None,
priority: 0,
};
let b = Budget {
deadline: Some(Duration::from_millis(50)),
poll_quota: 100,
cost_quota: Some(500),
priority: 3,
};
let m = a.meet(b);
assert_eq!(m.deadline, Some(Duration::from_millis(50)));
assert_eq!(m.poll_quota, 100);
assert_eq!(m.cost_quota, Some(500));
assert_eq!(m.priority, 3);
}
#[test]
fn test_cx_scope_budget_chains() {
let cx = Cx::<FullCaps>::with_budget(
Budget::INFINITE
.with_deadline(Duration::from_millis(100))
.with_poll_quota(1000),
);
let s1 = cx.scope_with_budget(Budget::INFINITE.with_deadline(Duration::from_millis(50)));
assert_eq!(s1.budget().deadline, Some(Duration::from_millis(50)));
assert_eq!(s1.budget().poll_quota, 1000);
let s2 = s1.scope_with_budget(Budget::INFINITE.with_poll_quota(200));
assert_eq!(s2.budget().deadline, Some(Duration::from_millis(50)));
assert_eq!(s2.budget().poll_quota, 200);
}
fn collect_rs_files(dir: &Path, out: &mut Vec<PathBuf>) -> std::io::Result<()> {
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
collect_rs_files(&path, out)?;
} else if path.extension().is_some_and(|ext| ext == "rs") {
out.push(path);
}
}
Ok(())
}
fn scan_file_outside_cfg_test_items(src: &str, patterns: &[&str]) -> Vec<(usize, String)> {
let mut hits = Vec::new();
let mut brace_depth: i32 = 0;
let mut pending_cfg_test = false;
let mut pending_attr_paren_depth: i32 = 0;
let mut skip_until_depth: Option<i32> = None;
for (idx, line) in src.lines().enumerate() {
let trimmed = line.trim_start();
let paren_delta = i32::try_from(line.matches('(').count()).unwrap_or(i32::MAX)
- i32::try_from(line.matches(')').count()).unwrap_or(i32::MAX);
if skip_until_depth.is_none() {
if trimmed.starts_with("#[cfg(test)]") && trimmed.contains('{') {
pending_cfg_test = false;
pending_attr_paren_depth = 0;
skip_until_depth = Some(brace_depth);
} else if trimmed.contains("fn test_") && trimmed.contains('{') {
skip_until_depth = Some(brace_depth);
} else if trimmed.starts_with("#[cfg(test)]") {
pending_cfg_test = true;
pending_attr_paren_depth = 0;
} else if pending_cfg_test {
if trimmed.starts_with("#[") || pending_attr_paren_depth > 0 {
pending_attr_paren_depth =
pending_attr_paren_depth.saturating_add(paren_delta);
} else if trimmed.is_empty() || trimmed.starts_with("//") {
} else if trimmed.contains('{') {
pending_cfg_test = false;
pending_attr_paren_depth = 0;
skip_until_depth = Some(brace_depth);
} else {
pending_cfg_test = false;
pending_attr_paren_depth = 0;
}
} else {
for &pat in patterns {
if line.contains(pat) {
hits.push((idx + 1, pat.to_string()));
}
}
}
}
let opens = i32::try_from(line.matches('{').count()).unwrap_or(i32::MAX);
let closes = i32::try_from(line.matches('}').count()).unwrap_or(i32::MAX);
brace_depth = brace_depth.saturating_add(opens).saturating_sub(closes);
if let Some(until) = skip_until_depth {
if brace_depth <= until {
skip_until_depth = None;
}
}
}
hits
}
#[test]
fn test_scan_file_outside_cfg_test_items_skips_cfg_test_functions_and_modules() {
let src = r"
fn production_path() {
let _ = Cx::new();
}
#[cfg(test)]
fn test_only_helper() {
let _ = Cx::new();
}
#[cfg(test)]
mod tests {
fn nested_test_helper() {
let _ = Cx::default();
}
}
";
let hits = scan_file_outside_cfg_test_items(src, &["Cx::new(", "Cx::default("]);
assert_eq!(hits, vec![(3, "Cx::new(".to_string())]);
}
#[test]
fn test_no_direct_cx_constructors_in_runtime_production_code() {
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let repo_root = manifest_dir
.parent()
.and_then(Path::parent)
.expect("fsqlite-types manifest dir must be crates/<name>");
let crates_dir = repo_root.join("crates");
let runtime_crates = [
"fsqlite-core",
"fsqlite-vdbe",
"fsqlite-btree",
"fsqlite-pager",
"fsqlite-wal",
"fsqlite-mvcc",
];
let forbidden = ["Cx::new(", "Cx::default("];
let mut violations: Vec<String> = Vec::new();
let mut crate_dirs: Vec<PathBuf> = Vec::new();
for entry in std::fs::read_dir(&crates_dir).expect("read crates/ dir") {
let entry = entry.expect("read crates/ entry");
let path = entry.path();
if path.is_dir() {
crate_dirs.push(path);
}
}
for crate_dir in crate_dirs {
let crate_name = crate_dir
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("<unknown>");
if !runtime_crates.contains(&crate_name) {
continue;
}
let src_dir = crate_dir.join("src");
if !src_dir.is_dir() {
continue;
}
let mut files = Vec::new();
collect_rs_files(&src_dir, &mut files).expect("collect rs files");
for file in files {
if file
.file_name()
.and_then(|name| name.to_str())
.is_some_and(|name| name.contains("test"))
{
continue;
}
let src = std::fs::read_to_string(&file).expect("read file");
let rel_path = file.strip_prefix(repo_root).unwrap_or(&file);
for (line, pat) in scan_file_outside_cfg_test_items(&src, &forbidden) {
let line_text = src.lines().nth(line - 1).unwrap_or("").trim();
let allowed_detached_root_constructor = rel_path
== Path::new("crates/fsqlite-core/src/connection.rs")
&& pat == "Cx::new("
&& line_text.contains("Cx::new().with_trace_context(");
if allowed_detached_root_constructor {
continue;
}
violations.push(format!(
"{crate_name}:{path}:{line} uses forbidden `{pat}` outside cfg(test) code: {line_text}",
path = rel_path.display()
));
}
}
}
assert!(
violations.is_empty(),
"direct `Cx::new()` / `Cx::default()` production-path violations:\n{}",
violations.join("\n")
);
}
#[test]
fn test_ambient_authority_audit_gate() {
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let repo_root = manifest_dir
.parent()
.and_then(Path::parent)
.expect("fsqlite-types manifest dir must be crates/<name>");
let crates_dir = repo_root.join("crates");
let always_forbidden = [
"SystemTime::now(",
"Instant::now(",
"thread_rng(",
"getrandom",
"std::net::",
"std::thread::spawn",
"tokio::spawn",
];
let non_vfs_forbidden = ["std::fs::"];
let exempt_crates = [
"fsqlite-harness",
"fsqlite-cli",
"fsqlite-e2e",
"fsqlite-observability",
"fsqlite-core",
"fsqlite-vdbe",
"fsqlite-mvcc",
"fsqlite-parser",
"fsqlite-planner",
"fsqlite-wal",
"fsqlite-vfs",
];
let mut violations: Vec<String> = Vec::new();
let mut crate_dirs: Vec<PathBuf> = Vec::new();
for entry in std::fs::read_dir(&crates_dir).expect("read crates/ dir") {
let entry = entry.expect("read crates/ entry");
let path = entry.path();
if path.is_dir() {
crate_dirs.push(path);
}
}
for crate_dir in crate_dirs {
let crate_name = crate_dir
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("<unknown>");
if exempt_crates.contains(&crate_name) {
continue;
}
let src_dir = crate_dir.join("src");
if !src_dir.is_dir() {
continue;
}
let mut files = Vec::new();
collect_rs_files(&src_dir, &mut files).expect("collect rs files");
for file in files {
let src = std::fs::read_to_string(&file).expect("read file");
for (line, pat) in scan_file_outside_cfg_test_items(&src, &always_forbidden) {
violations.push(format!(
"{crate_name}:{path}:{line} uses forbidden `{pat}`",
path = file.display()
));
}
if crate_name != "fsqlite-vfs" {
for (line, pat) in scan_file_outside_cfg_test_items(&src, &non_vfs_forbidden) {
violations.push(format!(
"{crate_name}:{path}:{line} uses forbidden `{pat}` (non-vfs crate)",
path = file.display()
));
}
}
}
}
assert!(
violations.is_empty(),
"ambient authority violations (outside cfg(test) modules):\n{}",
violations.join("\n")
);
}
const BEAD_ID: &str = "bd-samf";
#[test]
fn test_cancel_state_machine_all_transitions() {
let cx = Cx::<FullCaps>::new();
assert_eq!(
cx.cancel_state(),
CancelState::Created,
"bead_id={BEAD_ID} initial_state"
);
cx.transition_to_running();
assert_eq!(
cx.cancel_state(),
CancelState::Running,
"bead_id={BEAD_ID} after_start"
);
cx.cancel_with_reason(CancelReason::UserInterrupt);
assert_eq!(
cx.cancel_state(),
CancelState::CancelRequested,
"bead_id={BEAD_ID} after_cancel"
);
let err = cx.checkpoint();
assert!(err.is_err(), "bead_id={BEAD_ID} checkpoint_returns_err");
assert_eq!(
cx.cancel_state(),
CancelState::Cancelling,
"bead_id={BEAD_ID} after_checkpoint_observation"
);
cx.transition_to_finalizing();
assert_eq!(
cx.cancel_state(),
CancelState::Finalizing,
"bead_id={BEAD_ID} after_finalize_start"
);
cx.transition_to_completed();
assert_eq!(
cx.cancel_state(),
CancelState::Completed,
"bead_id={BEAD_ID} after_complete"
);
}
#[test]
fn test_cancel_propagates_to_children() {
let parent = Cx::<FullCaps>::new();
parent.transition_to_running();
let child1 = parent.create_child();
child1.transition_to_running();
let child2 = parent.create_child();
child2.transition_to_running();
let child3 = parent.create_child();
child3.transition_to_running();
assert!(!child1.is_cancel_requested());
assert!(!child2.is_cancel_requested());
assert!(!child3.is_cancel_requested());
parent.cancel_with_reason(CancelReason::RegionClose);
assert!(
child1.is_cancel_requested(),
"bead_id={BEAD_ID} child1_cancelled"
);
assert!(
child2.is_cancel_requested(),
"bead_id={BEAD_ID} child2_cancelled"
);
assert!(
child3.is_cancel_requested(),
"bead_id={BEAD_ID} child3_cancelled"
);
assert_eq!(child1.cancel_state(), CancelState::CancelRequested);
assert_eq!(child2.cancel_state(), CancelState::CancelRequested);
assert_eq!(child3.cancel_state(), CancelState::CancelRequested);
assert_eq!(child1.cancel_reason(), Some(CancelReason::RegionClose));
}
#[test]
fn test_dropped_children_are_pruned_from_parent_links() {
let parent = Cx::<FullCaps>::new();
let live_child = parent.create_child();
let dropped_child = parent.create_child();
drop(dropped_child);
parent.cancel_with_reason(CancelReason::RegionClose);
let live_count = {
let children = parent
.inner
.children
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
children.iter().filter_map(Weak::upgrade).count()
};
assert_eq!(live_count, 1, "only the live child should remain linked");
assert!(live_child.is_cancel_requested());
}
#[test]
fn test_cancel_idempotent_strongest_wins() {
let cx = Cx::<FullCaps>::new();
cx.transition_to_running();
cx.cancel_with_reason(CancelReason::Timeout);
assert_eq!(
cx.cancel_reason(),
Some(CancelReason::Timeout),
"bead_id={BEAD_ID} first_reason"
);
cx.cancel_with_reason(CancelReason::Abort);
assert_eq!(
cx.cancel_reason(),
Some(CancelReason::Abort),
"bead_id={BEAD_ID} upgraded_reason"
);
cx.cancel_with_reason(CancelReason::UserInterrupt);
assert_eq!(
cx.cancel_reason(),
Some(CancelReason::Abort),
"bead_id={BEAD_ID} reason_stays_strongest"
);
}
#[test]
fn test_losers_drain_on_race() {
use std::sync::atomic::AtomicBool;
let loser_cx = Cx::<FullCaps>::new();
loser_cx.transition_to_running();
let obligation_resolved = Arc::new(AtomicBool::new(false));
let ob_clone = Arc::clone(&obligation_resolved);
loser_cx.cancel_with_reason(CancelReason::RegionClose);
assert!(loser_cx.checkpoint().is_err());
assert_eq!(loser_cx.cancel_state(), CancelState::Cancelling);
ob_clone.store(true, Ordering::Release);
loser_cx.transition_to_finalizing();
loser_cx.transition_to_completed();
assert!(
obligation_resolved.load(Ordering::Acquire),
"bead_id={BEAD_ID} loser_obligation_resolved"
);
assert_eq!(
loser_cx.cancel_state(),
CancelState::Completed,
"bead_id={BEAD_ID} loser_drained"
);
}
#[test]
fn test_vdbe_checkpoint_cancel_observed_at_next_opcode() {
let cx = Cx::<FullCaps>::new();
cx.transition_to_running();
let mut last_executed = 0u32;
for opcode in 0..100u32 {
if cx.checkpoint_with(format!("vdbe pc={opcode}")).is_err() {
last_executed = opcode;
break;
}
last_executed = opcode;
if opcode == 50 {
cx.cancel_with_reason(CancelReason::UserInterrupt);
}
}
assert_eq!(
last_executed, 51,
"bead_id={BEAD_ID} cancel_observed_at_opcode_51"
);
}
#[test]
fn test_btree_checkpoint_cancel_within_one_node() {
let cx = Cx::<FullCaps>::new();
cx.transition_to_running();
let nodes = ["root", "internal_l", "internal_r", "leaf_a", "leaf_b"];
let cancel_at = 2; let mut observed_at = None;
for (i, node) in nodes.iter().enumerate() {
if cx.checkpoint_with(format!("btree node={node}")).is_err() {
observed_at = Some(i);
break;
}
if i == cancel_at {
cx.cancel_with_reason(CancelReason::UserInterrupt);
}
}
assert_eq!(
observed_at,
Some(cancel_at + 1),
"bead_id={BEAD_ID} btree_cancel_within_one_node"
);
}
#[test]
fn test_masked_section_defers_cancel() {
let cx = Cx::<FullCaps>::new();
cx.transition_to_running();
cx.cancel_with_reason(CancelReason::UserInterrupt);
assert!(cx.is_cancel_requested());
{
let _guard = cx.masked();
assert_eq!(cx.mask_depth(), 1);
assert!(
cx.checkpoint().is_ok(),
"bead_id={BEAD_ID} checkpoint_ok_while_masked"
);
{
let _inner = cx.masked();
assert_eq!(cx.mask_depth(), 2);
assert!(cx.checkpoint().is_ok());
}
assert_eq!(cx.mask_depth(), 1);
}
assert_eq!(cx.mask_depth(), 0);
assert!(
cx.checkpoint().is_err(),
"bead_id={BEAD_ID} checkpoint_err_after_mask_exit"
);
}
#[test]
#[should_panic(expected = "MAX_MASK_DEPTH")]
#[allow(clippy::collection_is_never_read)]
fn test_max_mask_depth_exceeded_panics() {
let cx = Cx::<FullCaps>::new();
let mut guards = Vec::new();
for _ in 0..MAX_MASK_DEPTH {
guards.push(cx.masked());
}
let _overflow = cx.masked();
}
#[test]
fn test_commit_section_completes_under_cancel() {
let cx = Cx::<FullCaps>::new();
cx.transition_to_running();
let ops_completed = Arc::new(AtomicU32::new(0));
let finalizer_ran = Arc::new(AtomicBool::new(false));
let ops = Arc::clone(&ops_completed);
let fin = Arc::clone(&finalizer_ran);
cx.commit_section(
10,
|ctx| {
assert!(ctx.tick());
ops.fetch_add(1, Ordering::Release);
cx.cancel_with_reason(CancelReason::UserInterrupt);
assert!(ctx.tick());
ops.fetch_add(1, Ordering::Release);
assert!(
cx.checkpoint().is_ok(),
"bead_id={BEAD_ID} masked_during_commit"
);
assert!(ctx.tick());
ops.fetch_add(1, Ordering::Release);
},
move || {
fin.store(true, Ordering::Release);
},
);
assert_eq!(
ops_completed.load(Ordering::Acquire),
3,
"bead_id={BEAD_ID} all_ops_completed"
);
assert!(
finalizer_ran.load(Ordering::Acquire),
"bead_id={BEAD_ID} finalizer_ran"
);
assert!(cx.checkpoint().is_err());
}
#[test]
fn test_commit_section_enforces_poll_quota() {
let cx = Cx::<FullCaps>::new();
cx.transition_to_running();
let ticks_succeeded = Arc::new(AtomicU32::new(0));
let ts = Arc::clone(&ticks_succeeded);
cx.commit_section(
3,
|ctx| {
assert_eq!(ctx.poll_remaining(), 3);
for _ in 0..5 {
if ctx.tick() {
ts.fetch_add(1, Ordering::Release);
}
}
},
|| {},
);
assert_eq!(
ticks_succeeded.load(Ordering::Acquire),
3,
"bead_id={BEAD_ID} poll_quota_enforced"
);
}
#[test]
fn test_cancel_unaware_hot_loop_detected() {
let cx = Cx::<FullCaps>::new();
cx.transition_to_running();
let deadline = 100u32;
let mut iterations_without_checkpoint = 0u32;
let mut detected_unaware = false;
cx.cancel_with_reason(CancelReason::UserInterrupt);
for _i in 0..200u32 {
iterations_without_checkpoint += 1;
if iterations_without_checkpoint >= deadline {
detected_unaware = true;
break;
}
}
assert!(
detected_unaware,
"bead_id={BEAD_ID} cancel_unaware_loop_detected"
);
let cx2 = Cx::<FullCaps>::new();
cx2.transition_to_running();
cx2.cancel_with_reason(CancelReason::UserInterrupt);
let mut compliant_iters = 0u32;
for _ in 0..200u32 {
if cx2.checkpoint().is_err() {
break;
}
compliant_iters += 1;
}
assert_eq!(
compliant_iters, 0,
"bead_id={BEAD_ID} compliant_loop_exits_immediately"
);
}
#[test]
fn test_write_coordinator_commit_section() {
let cx = Cx::<FullCaps>::new();
cx.transition_to_running();
let proof_published = Arc::new(AtomicBool::new(false));
let marker_published = Arc::new(AtomicBool::new(false));
let reservation_released = Arc::new(AtomicBool::new(false));
let proof = Arc::clone(&proof_published);
let marker = Arc::clone(&marker_published);
let release = Arc::clone(&reservation_released);
cx.commit_section(
10,
|ctx| {
assert!(ctx.tick());
cx.cancel_with_reason(CancelReason::RegionClose);
assert!(ctx.tick());
proof.store(true, Ordering::Release);
assert!(cx.checkpoint().is_ok());
assert!(ctx.tick());
marker.store(true, Ordering::Release);
},
move || {
release.store(true, Ordering::Release);
},
);
assert!(
proof_published.load(Ordering::Acquire),
"bead_id={BEAD_ID} proof_published"
);
assert!(
marker_published.load(Ordering::Acquire),
"bead_id={BEAD_ID} marker_published"
);
assert!(
reservation_released.load(Ordering::Acquire),
"bead_id={BEAD_ID} reservation_released"
);
assert!(cx.checkpoint().is_err());
}
#[test]
fn test_trace_ids_default_to_zero() {
let cx = Cx::<FullCaps>::new();
assert_eq!(cx.trace_id(), 0);
assert_eq!(cx.decision_id(), 0);
assert_eq!(cx.policy_id(), 0);
}
#[test]
fn test_with_trace_context_sets_all_ids() {
let cx = Cx::<FullCaps>::new().with_trace_context(42, 99, 7);
assert_eq!(cx.trace_id(), 42);
assert_eq!(cx.decision_id(), 99);
assert_eq!(cx.policy_id(), 7);
}
#[test]
fn test_with_decision_id_preserves_other_ids() {
let cx = Cx::<FullCaps>::new()
.with_trace_context(10, 20, 30)
.with_decision_id(55);
assert_eq!(cx.trace_id(), 10);
assert_eq!(cx.decision_id(), 55);
assert_eq!(cx.policy_id(), 30);
}
#[test]
fn test_with_policy_id_preserves_other_ids() {
let cx = Cx::<FullCaps>::new()
.with_trace_context(100, 200, 300)
.with_policy_id(88);
assert_eq!(cx.trace_id(), 100);
assert_eq!(cx.decision_id(), 200);
assert_eq!(cx.policy_id(), 88);
}
#[test]
#[allow(clippy::redundant_clone)]
fn test_clone_propagates_trace_ids() {
let cx = Cx::<FullCaps>::new().with_trace_context(1, 2, 3);
let cloned = cx.clone();
assert_eq!(cloned.trace_id(), 1);
assert_eq!(cloned.decision_id(), 2);
assert_eq!(cloned.policy_id(), 3);
}
#[test]
fn test_restrict_propagates_trace_ids() {
let cx = Cx::<FullCaps>::new();
let compute = cx.restrict::<ComputeCaps>();
assert_eq!(compute.trace_id(), 0);
assert_eq!(compute.decision_id(), 0);
assert_eq!(compute.policy_id(), 0);
}
#[test]
fn test_scope_with_budget_propagates_trace_ids() {
let cx = Cx::<FullCaps>::new().with_trace_context(5, 6, 7);
let scoped = cx.scope_with_budget(Budget::MINIMAL);
assert_eq!(scoped.trace_id(), 5);
assert_eq!(scoped.decision_id(), 6);
assert_eq!(scoped.policy_id(), 7);
assert_eq!(scoped.budget().poll_quota, Budget::MINIMAL.poll_quota);
}
#[test]
fn test_cleanup_scope_propagates_trace_ids() {
let cx = Cx::<FullCaps>::new().with_trace_context(11, 22, 33);
let cleanup = cx.cleanup_scope();
assert_eq!(cleanup.trace_id(), 11);
assert_eq!(cleanup.decision_id(), 22);
assert_eq!(cleanup.policy_id(), 33);
}
#[test]
fn test_create_child_propagates_trace_ids() {
let parent = Cx::<FullCaps>::new().with_trace_context(50, 60, 70);
let child = parent.create_child();
assert_eq!(child.trace_id(), 50);
assert_eq!(child.decision_id(), 60);
assert_eq!(child.policy_id(), 70);
parent.cancel();
assert!(parent.is_cancel_requested());
assert!(child.is_cancel_requested()); }
#[test]
fn test_trace_ids_independent_across_children() {
let parent = Cx::<FullCaps>::new().with_trace_context(1, 2, 3);
let child1 = parent.create_child().with_decision_id(100);
let child2 = parent.create_child().with_decision_id(200);
assert_eq!(child1.trace_id(), 1);
assert_eq!(child2.trace_id(), 1);
assert_eq!(child1.decision_id(), 100);
assert_eq!(child2.decision_id(), 200);
assert_eq!(parent.decision_id(), 2);
}
#[test]
fn test_with_budget_starts_at_zero_trace_ids() {
let cx = Cx::<FullCaps>::with_budget(Budget::MINIMAL);
assert_eq!(cx.trace_id(), 0);
assert_eq!(cx.decision_id(), 0);
assert_eq!(cx.policy_id(), 0);
}
}