use std::sync::Arc;
use std::time::Duration;
use affinidi_did_resolver_cache_sdk::DIDCacheClient;
use chrono::Utc;
use serde_json::Value as JsonValue;
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::info;
use vti_common::config::MessagingConfig;
use vti_common::seed_store::SeedStore;
use vti_common::telemetry::{SharedTelemetrySink, TelemetryEvent, TelemetryKind};
use crate::auth::AuthClaims;
use crate::config::AppConfig;
use crate::didcomm_bridge::DIDCommBridge;
use crate::error::AppError;
use crate::messaging::drain_sweeper::DrainSweeper;
use crate::messaging::handshake::{
HandshakeError, HandshakeOptions, ListenerProver, mediator_handshake,
};
use crate::messaging::registry::{MediatorBinding, MediatorListenerRegistry, RegistryError};
use crate::operations::did_webvh::{UpdateDidWebvhError, UpdateDidWebvhOptions, update_did_webvh};
use crate::operations::protocol::document::{
DocumentPatchError, current_didcomm_service, with_didcomm_service,
};
use crate::operations::protocol::{OpContext, PROTOCOL_LOCK};
use crate::store::KeyspaceHandle;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MigrateAuditKind {
Forward,
Rollback,
}
impl MigrateAuditKind {
fn as_str(self) -> &'static str {
match self {
Self::Forward => "forward",
Self::Rollback => "rollback",
}
}
}
#[derive(Debug, Clone)]
pub struct UpdateDidcommParams {
pub new_mediator_did: String,
pub drain_ttl: Duration,
pub force: bool,
pub handshake_timeout: Duration,
pub audit_kind: MigrateAuditKind,
pub transport: crate::operations::protocol::disable_didcomm::DisableTransport,
}
#[derive(Debug, Clone)]
pub struct UpdateDidcommResult {
pub new_version_id: String,
pub prior_mediator_did: String,
pub active_mediator_did: String,
pub active_mediator_endpoint: String,
pub drains_until: chrono::DateTime<chrono::Utc>,
pub vta_did: String,
pub serverless: bool,
}
#[derive(Debug, Error)]
pub enum UpdateDidcommError {
#[error(
"DIDComm is not currently enabled. Use `pnm services didcomm enable --mediator-did <did>` first."
)]
DidcommNotEnabled,
#[error("new mediator `{0}` is already the active mediator — nothing to migrate")]
SameAsActive(String),
#[error(
"new mediator `{0}` is currently in drain state. \
Either run `pnm services didcomm drain cancel --mediator-did {0}` first, \
or use `pnm services didcomm rollback` to fail-forward to a state where `{0}` is active again."
)]
AlreadyDraining(String),
#[error("drain ttl {requested}s outside allowed range [{min}s, {max}s]")]
DrainTtlOutOfBounds { min: u64, max: u64, requested: u64 },
#[error("VTA DID is not configured — run `vta setup` first")]
VtaDidNotConfigured,
#[error("VTA DID `{0}` has no webvh record")]
VtaDidRecordMissing(String),
#[error("VTA DID `{0}` has no published log")]
VtaDidLogMissing(String),
#[error("VTA DID log is empty")]
EmptyLog,
#[error(
"DIDComm is enabled but the VTA's DID document has no `#vta-didcomm` service entry — \
on-disk state is inconsistent (re-run setup)"
)]
NoActiveMediator,
#[error(transparent)]
Handshake(#[from] HandshakeError),
#[error("DID document patch failed: {0}")]
DocumentPatch(#[from] DocumentPatchError),
#[error("WebVH update failed: {0}")]
WebVHUpdate(#[from] UpdateDidWebvhError),
#[error("config persistence failed: {0}")]
ConfigPersistence(String),
#[error(transparent)]
Registry(#[from] RegistryError),
#[error("auth: {0}")]
Auth(String),
#[error("storage error: {0}")]
Storage(String),
}
impl From<AppError> for UpdateDidcommError {
fn from(value: AppError) -> Self {
Self::Storage(value.to_string())
}
}
impl From<crate::operations::protocol::preconditions::ProtocolPreconditionError>
for UpdateDidcommError
{
fn from(value: crate::operations::protocol::preconditions::ProtocolPreconditionError) -> Self {
use crate::operations::protocol::preconditions::ProtocolPreconditionError as E;
match value {
E::VtaDidNotConfigured => Self::VtaDidNotConfigured,
E::VtaDidRecordMissing(s) => Self::VtaDidRecordMissing(s),
E::VtaDidLogMissing(s) => Self::VtaDidLogMissing(s),
E::EmptyLog => Self::EmptyLog,
E::Storage(s) | E::DocumentParse(s) => Self::Storage(s),
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn update_didcomm(
config: &Arc<RwLock<AppConfig>>,
keys_ks: &KeyspaceHandle,
imported_ks: &KeyspaceHandle,
contexts_ks: &KeyspaceHandle,
webvh_ks: &KeyspaceHandle,
audit_ks: &KeyspaceHandle,
drains_ks: &KeyspaceHandle,
snapshot_ks: &KeyspaceHandle,
_service_state_ks: &KeyspaceHandle,
seed_store: &dyn SeedStore,
did_resolver: &DIDCacheClient,
didcomm_bridge: &Arc<DIDCommBridge>,
registry: &MediatorListenerRegistry,
sweeper: &DrainSweeper,
telemetry: &SharedTelemetrySink,
prover: &(dyn ListenerProver + Send + Sync),
auth: &AuthClaims,
params: UpdateDidcommParams,
ctx: OpContext,
webvh_auth_locks: &crate::operations::did_webvh::WebvhAuthLocks,
channel: &str,
) -> Result<UpdateDidcommResult, UpdateDidcommError> {
auth.require_super_admin()
.map_err(|e| UpdateDidcommError::Auth(e.to_string()))?;
let _guard = PROTOCOL_LOCK.lock().await;
crate::operations::protocol::validate_drain_ttl(params.transport, params.drain_ttl).map_err(
|e| UpdateDidcommError::DrainTtlOutOfBounds {
min: e.min,
max: e.max,
requested: e.requested,
},
)?;
let (vta_did, scid, current_doc, prior_mediator) =
read_preconditions(config, registry, webvh_ks, ¶ms).await?;
use crate::operations::protocol::snapshot::{self, DidcommSnapshot, ServiceConfigSnapshot};
snapshot::write(
snapshot_ks,
ServiceConfigSnapshot::Didcomm(DidcommSnapshot::Enabled {
mediator_did: prior_mediator.clone(),
routing_keys: vec![],
}),
)
.await
.map_err(|e| UpdateDidcommError::Storage(format!("snapshot write: {e}")))?;
let resolved = mediator_handshake(
did_resolver,
prover,
telemetry,
¶ms.new_mediator_did,
&vta_did,
HandshakeOptions {
timeout: params.handshake_timeout,
force: params.force,
},
)
.await?;
let patched = with_didcomm_service(current_doc, &resolved.mediator_did)?;
let update_result = update_did_webvh(
keys_ks,
imported_ks,
contexts_ks,
webvh_ks,
audit_ks,
seed_store,
auth,
&scid,
UpdateDidWebvhOptions {
document: Some(patched),
..Default::default()
},
did_resolver,
didcomm_bridge,
Some(vta_did.as_str()),
webvh_auth_locks,
channel,
)
.await?;
persist_new_mediator(config, &resolved.mediator_did, &resolved.endpoint).await?;
registry
.record_activate(MediatorBinding {
mediator_did: resolved.mediator_did.clone(),
endpoint: resolved.endpoint.clone(),
})
.await;
let deadline = Utc::now()
+ chrono::Duration::from_std(params.drain_ttl).map_err(|e| {
UpdateDidcommError::ConfigPersistence(format!("drain TTL out of range: {e}"))
})?;
let prior_endpoint = best_effort_endpoint(did_resolver, &prior_mediator).await;
registry
.record_drain_persisted(drains_ks, &prior_mediator, prior_endpoint, deadline)
.await?;
sweeper.arm(&prior_mediator, deadline).await;
let mut event = TelemetryEvent::new(TelemetryKind::ServicesDidcommUpdate)
.with_mediator(&resolved.mediator_did)
.with_field("from", JsonValue::from(prior_mediator.clone()))
.with_field("audit_kind", JsonValue::from(params.audit_kind.as_str()))
.with_field(
"new_version_id",
JsonValue::from(update_result.new_version_id.clone()),
)
.with_field(
"drain_ttl_secs",
JsonValue::from(params.drain_ttl.as_secs()),
);
if let Some(tag) = ctx.telemetry_triggered_by() {
event = event.with_field("triggered_by", JsonValue::from(tag));
}
let _ = telemetry.record(event).await;
info!(
channel,
from = %prior_mediator,
to = %resolved.mediator_did,
new_version_id = %update_result.new_version_id,
audit_kind = params.audit_kind.as_str(),
"mediator migrated"
);
Ok(UpdateDidcommResult {
new_version_id: update_result.new_version_id,
prior_mediator_did: prior_mediator,
active_mediator_did: resolved.mediator_did,
active_mediator_endpoint: resolved.endpoint,
drains_until: deadline,
vta_did,
serverless: update_result.serverless,
})
}
async fn read_preconditions(
config: &Arc<RwLock<AppConfig>>,
registry: &MediatorListenerRegistry,
webvh_ks: &KeyspaceHandle,
params: &UpdateDidcommParams,
) -> Result<(String, String, JsonValue, String), UpdateDidcommError> {
{
let cfg = config.read().await;
if !cfg.services.didcomm {
return Err(UpdateDidcommError::DidcommNotEnabled);
}
}
let state = super::preconditions::load_vta_doc_state(config, webvh_ks).await?;
let prior_mediator = current_didcomm_service(&state.current_doc)
.map(|s| s.mediator_did)
.ok_or(UpdateDidcommError::NoActiveMediator)?;
if prior_mediator == params.new_mediator_did {
return Err(UpdateDidcommError::SameAsActive(prior_mediator));
}
if params.audit_kind == MigrateAuditKind::Forward
&& registry
.drain_deadline(¶ms.new_mediator_did)
.await
.is_some()
{
return Err(UpdateDidcommError::AlreadyDraining(
params.new_mediator_did.clone(),
));
}
Ok((state.vta_did, state.scid, state.current_doc, prior_mediator))
}
async fn persist_new_mediator(
config: &Arc<RwLock<AppConfig>>,
mediator_did: &str,
mediator_endpoint: &str,
) -> Result<(), UpdateDidcommError> {
let (contents, path) = {
let mut cfg = config.write().await;
cfg.messaging = Some(MessagingConfig {
mediator_url: mediator_endpoint.to_string(),
mediator_did: mediator_did.to_string(),
mediator_host: None,
});
let contents = toml::to_string_pretty(&*cfg)
.map_err(|e| UpdateDidcommError::ConfigPersistence(e.to_string()))?;
let path = cfg.config_path.clone();
(contents, path)
};
std::fs::write(&path, contents)
.map_err(|e| UpdateDidcommError::ConfigPersistence(e.to_string()))?;
Ok(())
}
async fn best_effort_endpoint(resolver: &DIDCacheClient, mediator_did: &str) -> String {
match crate::messaging::handshake::resolve_mediator(resolver, mediator_did).await {
Ok(r) => r.endpoint,
Err(_) => String::new(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::AppConfig;
use crate::keys::seed_store::PlaintextSeedStore;
use crate::messaging::handshake::AlwaysOkProver;
use crate::operations::protocol::snapshot;
use crate::store::Store;
use vti_common::telemetry::RingBufferTelemetry;
fn fresh_config(tmpdir: &std::path::Path, didcomm: bool) -> Arc<RwLock<AppConfig>> {
use crate::test_support::test_app_config;
let mut cfg = test_app_config(tmpdir.into());
cfg.services.rest = true;
cfg.services.didcomm = didcomm;
cfg.vta_did = Some("did:webvh:scid123:host:vta".into());
cfg.config_path = tmpdir.join("config.toml");
Arc::new(RwLock::new(cfg))
}
fn registry() -> (
Arc<DIDCommBridge>,
Arc<MediatorListenerRegistry>,
SharedTelemetrySink,
) {
let bridge = Arc::new(DIDCommBridge::placeholder());
let sink: SharedTelemetrySink = Arc::new(RingBufferTelemetry::with_capacity(64));
let registry = Arc::new(MediatorListenerRegistry::new(Arc::clone(&sink)));
(bridge, registry, sink)
}
fn sweeper_for(
registry: Arc<MediatorListenerRegistry>,
drains_ks: KeyspaceHandle,
) -> Arc<DrainSweeper> {
let (tx, _rx) = crate::messaging::drain_sweeper::teardown_channel(8);
Arc::new(DrainSweeper::new(registry, drains_ks, tx))
}
async fn empty_keyspace(name: &str) -> (tempfile::TempDir, KeyspaceHandle) {
use vti_common::config::StoreConfig as VtiStoreConfig;
let dir = tempfile::tempdir().unwrap();
let store = Store::open(&VtiStoreConfig {
data_dir: dir.path().into(),
})
.unwrap();
let ks = store.keyspace(name).unwrap();
(dir, ks)
}
fn super_admin() -> AuthClaims {
AuthClaims::unsafe_local_cli_super_admin("test")
}
fn dummy_seed(dir: &std::path::Path) -> Arc<dyn SeedStore> {
Arc::new(PlaintextSeedStore::new(dir))
}
async fn resolver() -> DIDCacheClient {
DIDCacheClient::new(
affinidi_did_resolver_cache_sdk::config::DIDCacheConfigBuilder::default().build(),
)
.await
.unwrap()
}
fn forward_params(new_mediator: &str) -> UpdateDidcommParams {
UpdateDidcommParams {
new_mediator_did: new_mediator.into(),
drain_ttl: Duration::from_secs(3600),
force: false,
handshake_timeout: Duration::from_secs(1),
audit_kind: MigrateAuditKind::Forward,
transport: crate::operations::protocol::disable_didcomm::DisableTransport::Rest,
}
}
#[tokio::test]
async fn refuses_when_didcomm_not_enabled() {
let dir = tempfile::tempdir().unwrap();
let config = fresh_config(dir.path(), false);
let (bridge, reg, sink) = registry();
let (_d1, keys_ks) = empty_keyspace("keys").await;
let (_dimp, imported_ks) = empty_keyspace("imported_secrets").await;
let (_d2, contexts_ks) = empty_keyspace("contexts").await;
let (_d3, webvh_ks) = empty_keyspace("webvh").await;
let (_d4, audit_ks) = empty_keyspace("audit").await;
let (_d5, drains_ks) = empty_keyspace("drains").await;
let (_d6, snapshot_ks) = empty_keyspace(snapshot::KEYSPACE_NAME).await;
let (_d_svc_state, service_state_ks) = empty_keyspace("service_state").await;
let resolver = resolver().await;
let prover = AlwaysOkProver;
let seed = dummy_seed(dir.path());
let err = update_didcomm(
&config,
&keys_ks,
&imported_ks,
&contexts_ks,
&webvh_ks,
&audit_ks,
&drains_ks,
&snapshot_ks,
&service_state_ks,
&*seed,
&resolver,
&bridge,
®,
&sweeper_for(Arc::clone(®), drains_ks.clone()),
&sink,
&prover,
&super_admin(),
forward_params("did:m:B"),
OpContext::Direct,
&crate::operations::did_webvh::WebvhAuthLocks::new(),
"test",
)
.await
.unwrap_err();
assert!(matches!(err, UpdateDidcommError::DidcommNotEnabled));
}
#[tokio::test]
async fn refuses_when_no_vta_did() {
let dir = tempfile::tempdir().unwrap();
let config = fresh_config(dir.path(), true);
config.write().await.vta_did = None;
let (bridge, reg, sink) = registry();
let (_d1, keys_ks) = empty_keyspace("keys").await;
let (_dimp, imported_ks) = empty_keyspace("imported_secrets").await;
let (_d2, contexts_ks) = empty_keyspace("contexts").await;
let (_d3, webvh_ks) = empty_keyspace("webvh").await;
let (_d4, audit_ks) = empty_keyspace("audit").await;
let (_d5, drains_ks) = empty_keyspace("drains").await;
let (_d6, snapshot_ks) = empty_keyspace(snapshot::KEYSPACE_NAME).await;
let (_d_svc_state, service_state_ks) = empty_keyspace("service_state").await;
let resolver = resolver().await;
let prover = AlwaysOkProver;
let seed = dummy_seed(dir.path());
let err = update_didcomm(
&config,
&keys_ks,
&imported_ks,
&contexts_ks,
&webvh_ks,
&audit_ks,
&drains_ks,
&snapshot_ks,
&service_state_ks,
&*seed,
&resolver,
&bridge,
®,
&sweeper_for(Arc::clone(®), drains_ks.clone()),
&sink,
&prover,
&super_admin(),
forward_params("did:m:B"),
OpContext::Direct,
&crate::operations::did_webvh::WebvhAuthLocks::new(),
"test",
)
.await
.unwrap_err();
assert!(matches!(err, UpdateDidcommError::VtaDidNotConfigured));
}
#[tokio::test]
async fn refuses_migrate_to_draining_mediator() {
let dir = tempfile::tempdir().unwrap();
let config = fresh_config(dir.path(), true);
let (bridge, reg, sink) = registry();
let (_d1, keys_ks) = empty_keyspace("keys").await;
let (_dimp, imported_ks) = empty_keyspace("imported_secrets").await;
let (_d2, contexts_ks) = empty_keyspace("contexts").await;
let (_d3, webvh_ks) = empty_keyspace("webvh").await;
let (_d4, audit_ks) = empty_keyspace("audit").await;
let (_d5, drains_ks) = empty_keyspace("drains").await;
let (_d6, snapshot_ks) = empty_keyspace(snapshot::KEYSPACE_NAME).await;
let (_d_svc_state, service_state_ks) = empty_keyspace("service_state").await;
let resolver = resolver().await;
let prover = AlwaysOkProver;
let seed = dummy_seed(dir.path());
reg.record_activate(MediatorBinding {
mediator_did: "did:m:A".into(),
endpoint: "wss://A".into(),
})
.await;
reg.record_activate(MediatorBinding {
mediator_did: "did:m:placeholder".into(),
endpoint: "wss://placeholder".into(),
})
.await;
reg.record_drain_persisted(
&drains_ks,
"did:m:B",
"wss://B".into(),
Utc::now() + chrono::Duration::seconds(3600),
)
.await
.unwrap();
let err = update_didcomm(
&config,
&keys_ks,
&imported_ks,
&contexts_ks,
&webvh_ks,
&audit_ks,
&drains_ks,
&snapshot_ks,
&service_state_ks,
&*seed,
&resolver,
&bridge,
®,
&sweeper_for(Arc::clone(®), drains_ks.clone()),
&sink,
&prover,
&super_admin(),
forward_params("did:m:B"),
OpContext::Direct,
&crate::operations::did_webvh::WebvhAuthLocks::new(),
"test",
)
.await
.unwrap_err();
assert!(
matches!(err, UpdateDidcommError::VtaDidRecordMissing(_))
|| matches!(err, UpdateDidcommError::AlreadyDraining(_)),
"expected refusal, got {err:?}"
);
}
}