pub mod disable_didcomm;
pub mod disable_rest;
pub mod disable_webauthn;
pub mod document;
pub mod drain_cancel;
pub mod enable_didcomm;
pub mod enable_rest;
pub mod enable_webauthn;
pub mod invariant;
pub mod list;
pub mod list_drain;
pub mod passkey_vm_cleanup;
pub mod preconditions;
pub mod report;
pub mod rollback_didcomm;
pub mod rollback_rest;
pub mod rollback_webauthn;
pub mod runtime_state;
pub(crate) mod service_lifecycle;
pub mod snapshot;
pub mod update_didcomm;
pub mod update_rest;
pub mod update_webauthn;
pub static PROTOCOL_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct DrainTtlBoundsError {
pub min: u64,
pub max: u64,
pub requested: u64,
}
impl std::fmt::Display for DrainTtlBoundsError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"drain ttl {}s outside allowed range [{}s, {}s]",
self.requested, self.min, self.max
)
}
}
impl std::error::Error for DrainTtlBoundsError {}
pub fn validate_drain_ttl(
transport: crate::operations::protocol::disable_didcomm::DisableTransport,
ttl: std::time::Duration,
) -> Result<(), DrainTtlBoundsError> {
use crate::messaging::registry::MAX_DRAIN_TTL;
use crate::operations::protocol::disable_didcomm::{
DisableTransport, MIN_DRAIN_TTL_OVER_DIDCOMM,
};
let min: u64 = match transport {
DisableTransport::Didcomm => MIN_DRAIN_TTL_OVER_DIDCOMM.as_secs(),
DisableTransport::Rest => 0,
};
let max: u64 = MAX_DRAIN_TTL.num_seconds() as u64;
let requested = ttl.as_secs();
if requested < min || requested > max {
return Err(DrainTtlBoundsError {
min,
max,
requested,
});
}
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OpContext {
Direct,
Rollback,
}
impl OpContext {
#[must_use]
pub fn telemetry_triggered_by(self) -> Option<&'static str> {
match self {
OpContext::Direct => None,
OpContext::Rollback => Some("rollback"),
}
}
}
pub struct ServiceOpDeps<'a> {
pub config: &'a std::sync::Arc<tokio::sync::RwLock<crate::config::AppConfig>>,
pub keys_ks: &'a crate::store::KeyspaceHandle,
pub imported_ks: &'a crate::store::KeyspaceHandle,
pub contexts_ks: &'a crate::store::KeyspaceHandle,
pub webvh_ks: &'a crate::store::KeyspaceHandle,
pub audit_ks: &'a crate::store::KeyspaceHandle,
pub snapshot_ks: &'a crate::store::KeyspaceHandle,
pub service_state_ks: &'a crate::store::KeyspaceHandle,
pub drains_ks: &'a crate::store::KeyspaceHandle,
pub seed_store: &'a dyn vti_common::seed_store::SeedStore,
pub did_resolver: &'a affinidi_did_resolver_cache_sdk::DIDCacheClient,
pub didcomm_bridge: &'a std::sync::Arc<crate::didcomm_bridge::DIDCommBridge>,
pub telemetry: &'a vti_common::telemetry::SharedTelemetrySink,
pub webvh_auth_locks: &'a crate::operations::did_webvh::WebvhAuthLocks,
#[cfg(feature = "webvh")]
pub registry: &'a crate::messaging::registry::MediatorListenerRegistry,
#[cfg(feature = "webvh")]
pub sweeper: &'a crate::messaging::drain_sweeper::DrainSweeper,
}
impl<'a> ServiceOpDeps<'a> {
#[cfg(all(feature = "webvh", feature = "didcomm"))]
pub fn from_app_state(
s: &'a crate::server::AppState,
did_resolver: &'a affinidi_did_resolver_cache_sdk::DIDCacheClient,
) -> Self {
Self {
config: &s.config,
keys_ks: &s.keys_ks,
imported_ks: &s.imported_ks,
contexts_ks: &s.contexts_ks,
webvh_ks: &s.webvh_ks,
audit_ks: &s.audit_ks,
snapshot_ks: &s.snapshot_ks,
service_state_ks: &s.service_state_ks,
drains_ks: &s.drains_ks,
seed_store: &*s.seed_store,
did_resolver,
didcomm_bridge: &s.didcomm_bridge,
telemetry: &s.telemetry,
webvh_auth_locks: &s.webvh_auth_locks,
registry: &s.mediator_registry,
sweeper: &s.drain_sweeper,
}
}
#[cfg(all(feature = "webvh", feature = "didcomm"))]
pub fn from_vta_state(
s: &'a crate::messaging::router::VtaState,
did_resolver: &'a affinidi_did_resolver_cache_sdk::DIDCacheClient,
) -> Self {
Self {
config: &s.config,
keys_ks: &s.keys_ks,
imported_ks: &s.imported_ks,
contexts_ks: &s.contexts_ks,
webvh_ks: &s.webvh_ks,
audit_ks: &s.audit_ks,
snapshot_ks: &s.snapshot_ks,
service_state_ks: &s.service_state_ks,
drains_ks: &s.drains_ks,
seed_store: &*s.seed_store,
did_resolver,
didcomm_bridge: &s.didcomm_bridge,
telemetry: &s.telemetry,
webvh_auth_locks: &s.webvh_auth_locks,
registry: &s.mediator_registry,
sweeper: &s.drain_sweeper,
}
}
pub fn webvh(&self) -> crate::operations::did_webvh::WebvhDeps<'a> {
crate::operations::did_webvh::WebvhDeps {
keys_ks: self.keys_ks,
imported_ks: self.imported_ks,
contexts_ks: self.contexts_ks,
webvh_ks: self.webvh_ks,
audit_ks: self.audit_ks,
seed_store: self.seed_store,
did_resolver: self.did_resolver,
didcomm_bridge: self.didcomm_bridge,
auth_locks: self.webvh_auth_locks,
}
}
}
#[cfg(test)]
mod tests {
use super::PROTOCOL_LOCK;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn protocol_lock_serializes_concurrent_mutations() {
let in_section = Arc::new(AtomicUsize::new(0));
let max_observed = Arc::new(AtomicUsize::new(0));
async fn critical(in_section: Arc<AtomicUsize>, max_observed: Arc<AtomicUsize>) {
let _guard = PROTOCOL_LOCK.lock().await;
let n = in_section.fetch_add(1, Ordering::SeqCst) + 1;
max_observed.fetch_max(n, Ordering::SeqCst);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
in_section.fetch_sub(1, Ordering::SeqCst);
}
let a = tokio::spawn(critical(Arc::clone(&in_section), Arc::clone(&max_observed)));
let b = tokio::spawn(critical(Arc::clone(&in_section), Arc::clone(&max_observed)));
let (ra, rb) = tokio::join!(a, b);
ra.unwrap();
rb.unwrap();
assert_eq!(
max_observed.load(Ordering::SeqCst),
1,
"PROTOCOL_LOCK must serialize: at most one task in the critical section at a time"
);
}
}