pub mod disable_didcomm;
pub mod disable_rest;
pub mod disable_webauthn;
pub mod document;
pub mod drain_cancel;
pub mod enable_didcomm;
pub mod enable_rest;
pub mod enable_webauthn;
pub mod invariant;
pub mod list;
pub mod list_drain;
pub mod passkey_vm_cleanup;
pub mod preconditions;
pub mod report;
pub mod rollback_didcomm;
pub mod rollback_rest;
pub mod rollback_webauthn;
pub mod runtime_state;
pub mod snapshot;
pub mod update_didcomm;
pub mod update_rest;
pub mod update_webauthn;
pub static PROTOCOL_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct DrainTtlBoundsError {
pub min: u64,
pub max: u64,
pub requested: u64,
}
impl std::fmt::Display for DrainTtlBoundsError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"drain ttl {}s outside allowed range [{}s, {}s]",
self.requested, self.min, self.max
)
}
}
impl std::error::Error for DrainTtlBoundsError {}
pub fn validate_drain_ttl(
transport: crate::operations::protocol::disable_didcomm::DisableTransport,
ttl: std::time::Duration,
) -> Result<(), DrainTtlBoundsError> {
use crate::messaging::registry::MAX_DRAIN_TTL;
use crate::operations::protocol::disable_didcomm::{
DisableTransport, MIN_DRAIN_TTL_OVER_DIDCOMM,
};
let min: u64 = match transport {
DisableTransport::Didcomm => MIN_DRAIN_TTL_OVER_DIDCOMM.as_secs(),
DisableTransport::Rest => 0,
};
let max: u64 = MAX_DRAIN_TTL.num_seconds() as u64;
let requested = ttl.as_secs();
if requested < min || requested > max {
return Err(DrainTtlBoundsError {
min,
max,
requested,
});
}
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OpContext {
Direct,
Rollback,
}
impl OpContext {
#[must_use]
pub fn telemetry_triggered_by(self) -> Option<&'static str> {
match self {
OpContext::Direct => None,
OpContext::Rollback => Some("rollback"),
}
}
}
#[cfg(test)]
mod tests {
use super::PROTOCOL_LOCK;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn protocol_lock_serializes_concurrent_mutations() {
let in_section = Arc::new(AtomicUsize::new(0));
let max_observed = Arc::new(AtomicUsize::new(0));
async fn critical(in_section: Arc<AtomicUsize>, max_observed: Arc<AtomicUsize>) {
let _guard = PROTOCOL_LOCK.lock().await;
let n = in_section.fetch_add(1, Ordering::SeqCst) + 1;
max_observed.fetch_max(n, Ordering::SeqCst);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
in_section.fetch_sub(1, Ordering::SeqCst);
}
let a = tokio::spawn(critical(Arc::clone(&in_section), Arc::clone(&max_observed)));
let b = tokio::spawn(critical(Arc::clone(&in_section), Arc::clone(&max_observed)));
let (ra, rb) = tokio::join!(a, b);
ra.unwrap();
rb.unwrap();
assert_eq!(
max_observed.load(Ordering::SeqCst),
1,
"PROTOCOL_LOCK must serialize: at most one task in the critical section at a time"
);
}
}