use std::sync::Arc;
use affinidi_did_resolver_cache_sdk::DIDCacheClient;
use chrono::Utc;
use didwebvh_rs::url::WebVHURL;
use thiserror::Error;
use tracing::info;
use crate::audit;
use crate::auth::AuthClaims;
use crate::didcomm_bridge::DIDCommBridge;
use crate::error::AppError;
use crate::store::KeyspaceHandle;
use crate::webvh_store;
use super::{RaceDetected, RecordSnapshot, WebvhTransport};
const SERVERLESS_MARKER: &str = "serverless";
#[derive(Debug, Clone)]
pub struct RegisterDidWithServerParams {
pub did: String,
pub server_id: String,
pub force: bool,
}
#[derive(Debug, Clone)]
pub struct RegisterDidWithServerResult {
pub did: String,
pub server_id: String,
pub log_entry_count: u32,
}
#[derive(Debug, Error)]
pub enum RegisterDidWithServerError {
#[error("auth: {0}")]
Auth(String),
#[error("DID not found: {0}")]
DidNotFound(String),
#[error(
"DID `{did}` is already managed by webvh server `{server_id}`. \
Re-pointing a server-managed DID at a different host is not supported \
— only serverless DIDs can be registered."
)]
AlreadyServerManaged { did: String, server_id: String },
#[error(
"webvh server `{0}` is not registered. \
Add it first with `pnm webvh add-server --id {0} --did <server-did>` (online) \
or `vta webvh add-server --id {0} --did <server-did>` (offline, daemon stopped)."
)]
ServerNotFound(String),
#[error("DID `{0}` has no published log on disk (cannot push to server)")]
LogMissing(String),
#[error("transport setup failed: {0}")]
Transport(String),
#[error("publish to server failed: {0}")]
Publish(String),
#[error("storage error: {0}")]
Storage(String),
#[error("could not derive URL from DID `{did}`: {reason}")]
DidUrlParse { did: String, reason: String },
#[error(
"DID was modified concurrently between read and write — re-run \
`pnm webvh register-did` after confirming no other operator \
is registering the same DID. Reason: {0}"
)]
Conflict(RaceDetected),
}
impl From<AppError> for RegisterDidWithServerError {
fn from(value: AppError) -> Self {
Self::Storage(value.to_string())
}
}
pub async fn register_did_with_server(
webvh_ks: &KeyspaceHandle,
audit_ks: &KeyspaceHandle,
auth: &AuthClaims,
did_resolver: &DIDCacheClient,
didcomm_bridge: &Arc<DIDCommBridge>,
params: RegisterDidWithServerParams,
channel: &str,
) -> Result<RegisterDidWithServerResult, RegisterDidWithServerError> {
auth.require_super_admin()
.map_err(|e| RegisterDidWithServerError::Auth(e.to_string()))?;
let mut record = webvh_store::get_did(webvh_ks, ¶ms.did)
.await?
.ok_or_else(|| RegisterDidWithServerError::DidNotFound(params.did.clone()))?;
let snapshot = RecordSnapshot::capture(&record);
if record.server_id != SERVERLESS_MARKER {
return Err(RegisterDidWithServerError::AlreadyServerManaged {
did: params.did.clone(),
server_id: record.server_id.clone(),
});
}
let server = webvh_store::get_server(webvh_ks, ¶ms.server_id)
.await?
.ok_or_else(|| RegisterDidWithServerError::ServerNotFound(params.server_id.clone()))?;
let did_log = webvh_store::get_did_log(webvh_ks, ¶ms.did)
.await?
.ok_or_else(|| RegisterDidWithServerError::LogMissing(params.did.clone()))?;
let transport = WebvhTransport::from_server(&server, did_resolver, didcomm_bridge)
.await
.map_err(|e| RegisterDidWithServerError::Transport(e.to_string()))?;
let parsed = WebVHURL::parse_did_url(¶ms.did).map_err(|e| {
RegisterDidWithServerError::DidUrlParse {
did: params.did.clone(),
reason: e.to_string(),
}
})?;
let request_path = parsed.path.trim_matches('/').to_string();
let response = transport
.register_did_atomic(&request_path, &did_log, params.force)
.await
.map_err(|e| RegisterDidWithServerError::Publish(e.to_string()))?;
let current = webvh_store::get_did(webvh_ks, ¶ms.did)
.await?
.ok_or_else(|| RegisterDidWithServerError::DidNotFound(params.did.clone()))?;
snapshot
.assert_unchanged(¤t)
.map_err(RegisterDidWithServerError::Conflict)?;
record.server_id = params.server_id.clone();
record.mnemonic = response.mnemonic;
record.updated_at = Utc::now();
let log_entry_count = record.log_entry_count;
webvh_store::store_did(webvh_ks, &record).await?;
let resource = format!("did:webvh:{}", record.scid);
if let Err(e) = audit::record(
audit_ks,
"did.register_server",
&auth.did,
Some(&resource),
"success",
Some(channel),
Some(&record.context_id),
)
.await
{
tracing::warn!(error = %e, "audit emission failed for did.register_server");
}
info!(
channel,
did = %record.did,
server_id = %record.server_id,
log_entry_count,
"did:webvh registered with server"
);
Ok(RegisterDidWithServerResult {
did: record.did,
server_id: record.server_id,
log_entry_count,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::Store;
use crate::test_support::test_app_config;
use vta_sdk::webvh::{WebvhDidRecord, WebvhServerRecord};
use vti_common::config::StoreConfig as VtiStoreConfig;
async fn setup() -> (tempfile::TempDir, KeyspaceHandle, KeyspaceHandle) {
let dir = tempfile::tempdir().unwrap();
let store = Store::open(&VtiStoreConfig {
data_dir: dir.path().into(),
})
.unwrap();
let webvh_ks = store.keyspace("webvh").unwrap();
let audit_ks = store.keyspace("audit").unwrap();
let _ = test_app_config(dir.path().into());
(dir, webvh_ks, audit_ks)
}
fn serverless_record(did: &str) -> WebvhDidRecord {
let now = chrono::Utc::now();
WebvhDidRecord {
did: did.into(),
server_id: "serverless".into(),
mnemonic: "test-mnemonic".into(),
scid: "scid".into(),
context_id: "vta".into(),
portable: true,
log_entry_count: 1,
pre_rotation_count: 0,
next_fragment_id: 1,
created_at: now,
updated_at: now,
}
}
fn server_record(id: &str) -> WebvhServerRecord {
let now = chrono::Utc::now();
WebvhServerRecord {
id: id.into(),
did: format!("did:web:{id}.example"),
label: None,
access_token: None,
access_expires_at: None,
refresh_token: None,
created_at: now,
updated_at: now,
}
}
fn super_admin() -> AuthClaims {
AuthClaims::unsafe_local_cli_super_admin("test")
}
fn other_user() -> AuthClaims {
use vti_common::acl::Role;
AuthClaims {
did: "did:key:z6Mk-test".into(),
role: Role::Admin,
allowed_contexts: vec!["vta".into()],
}
}
async fn resolver() -> DIDCacheClient {
DIDCacheClient::new(
affinidi_did_resolver_cache_sdk::config::DIDCacheConfigBuilder::default().build(),
)
.await
.unwrap()
}
fn bridge() -> Arc<DIDCommBridge> {
Arc::new(DIDCommBridge::placeholder())
}
#[tokio::test]
async fn rejects_non_super_admin() {
let (_dir, webvh_ks, audit_ks) = setup().await;
let resolver = resolver().await;
let bridge = bridge();
let err = register_did_with_server(
&webvh_ks,
&audit_ks,
&other_user(),
&resolver,
&bridge,
RegisterDidWithServerParams {
did: "did:webvh:scid:host:vta".into(),
server_id: "primary".into(),
force: false,
},
"test",
)
.await
.unwrap_err();
assert!(matches!(err, RegisterDidWithServerError::Auth(_)));
}
#[tokio::test]
async fn rejects_when_did_not_found() {
let (_dir, webvh_ks, audit_ks) = setup().await;
let resolver = resolver().await;
let bridge = bridge();
let err = register_did_with_server(
&webvh_ks,
&audit_ks,
&super_admin(),
&resolver,
&bridge,
RegisterDidWithServerParams {
did: "did:webvh:nonexistent:host:vta".into(),
server_id: "primary".into(),
force: false,
},
"test",
)
.await
.unwrap_err();
assert!(matches!(err, RegisterDidWithServerError::DidNotFound(_)));
}
#[tokio::test]
async fn rejects_when_already_server_managed() {
let (_dir, webvh_ks, audit_ks) = setup().await;
let did = "did:webvh:scid:host:vta";
let mut rec = serverless_record(did);
rec.server_id = "existing-host".into();
webvh_store::store_did(&webvh_ks, &rec).await.unwrap();
let resolver = resolver().await;
let bridge = bridge();
let err = register_did_with_server(
&webvh_ks,
&audit_ks,
&super_admin(),
&resolver,
&bridge,
RegisterDidWithServerParams {
did: did.into(),
server_id: "primary".into(),
force: false,
},
"test",
)
.await
.unwrap_err();
assert!(matches!(
err,
RegisterDidWithServerError::AlreadyServerManaged { .. }
));
}
#[tokio::test]
async fn rejects_when_server_not_registered() {
let (_dir, webvh_ks, audit_ks) = setup().await;
let did = "did:webvh:scid:host:vta";
webvh_store::store_did(&webvh_ks, &serverless_record(did))
.await
.unwrap();
webvh_store::store_did_log(&webvh_ks, did, "{}\n")
.await
.unwrap();
let resolver = resolver().await;
let bridge = bridge();
let err = register_did_with_server(
&webvh_ks,
&audit_ks,
&super_admin(),
&resolver,
&bridge,
RegisterDidWithServerParams {
did: did.into(),
server_id: "missing-host".into(),
force: false,
},
"test",
)
.await
.unwrap_err();
assert!(matches!(err, RegisterDidWithServerError::ServerNotFound(_)));
}
#[tokio::test]
async fn rejects_when_log_missing() {
let (_dir, webvh_ks, audit_ks) = setup().await;
let did = "did:webvh:scid:host:vta";
webvh_store::store_did(&webvh_ks, &serverless_record(did))
.await
.unwrap();
webvh_store::store_server(&webvh_ks, &server_record("primary"))
.await
.unwrap();
let resolver = resolver().await;
let bridge = bridge();
let err = register_did_with_server(
&webvh_ks,
&audit_ks,
&super_admin(),
&resolver,
&bridge,
RegisterDidWithServerParams {
did: did.into(),
server_id: "primary".into(),
force: false,
},
"test",
)
.await
.unwrap_err();
assert!(matches!(err, RegisterDidWithServerError::LogMissing(_)));
}
#[test]
fn conflict_variant_carries_race_reason() {
use super::super::RaceDetected;
let err = RegisterDidWithServerError::Conflict(RaceDetected::ServerIdChanged {
did: "did:webvh:foo".into(),
expected: "serverless".into(),
current: "webvh-prod".into(),
});
let msg = err.to_string();
assert!(
msg.contains("modified concurrently"),
"user-facing wrapper text missing: {msg}"
);
assert!(msg.contains("serverless"), "race reason inlined: {msg}");
assert!(msg.contains("webvh-prod"), "race reason inlined: {msg}");
assert!(matches!(err, RegisterDidWithServerError::Conflict(_)));
}
}