use std::sync::Arc;
use std::time::Duration;
use affinidi_messaging_didcomm::Message;
use affinidi_messaging_didcomm_service::{
DIDCommResponse, DIDCommService, DIDCommServiceConfig, DIDCommServiceError, Extension,
HandlerContext, ListenerConfig, ListenerEvent, MESSAGE_PICKUP_STATUS_TYPE, RestartPolicy,
RetryConfig, Router, TRUST_PING_TYPE, handler_fn, ignore_handler, trust_ping_handler,
};
use affinidi_tdk::common::profiles::TDKProfile;
use affinidi_tdk::secrets_resolver::{SecretsResolver, ThreadedSecretsResolver};
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use vta_sdk::protocols::credential_exchange::PRESENT as CREDENTIAL_PRESENT_TYPE;
use vta_sdk::protocols::credential_exchange::REQUEST as CREDENTIAL_REQUEST_TYPE;
use vta_sdk::protocols::credential_exchange::{
ISSUE as CREDENTIAL_ISSUE_TYPE, IssueBody, PresentBody, RequestBody,
};
use vta_sdk::protocols::join_requests::{
JOIN_REQUEST_ACCEPT_RECEIPT_TYPE, JOIN_REQUEST_ACCEPT_TYPE,
JOIN_REQUEST_MANIFEST_RESPONSE_TYPE, JOIN_REQUEST_MANIFEST_TYPE,
JOIN_REQUEST_STATUS_RESPONSE_TYPE, JOIN_REQUEST_STATUS_TYPE, JOIN_REQUEST_SUBMIT_RECEIPT_TYPE,
JOIN_REQUEST_SUBMIT_TYPE, JoinRequestAcceptBody, JoinRequestAcceptReceiptBody,
JoinRequestStatusBody, JoinRequestSubmitBody, JoinRequestSubmitReceiptBody,
MEMBER_SELF_REMOVE_RECEIPT_TYPE, MEMBER_SELF_REMOVE_TYPE, SelfRemoveBody,
SelfRemoveReceiptBody,
};
use crate::config::AppConfig;
use crate::join::JoinTransport;
use crate::members::Disposition;
use crate::routes::join_requests::accept::accept_inner;
use crate::routes::join_requests::manifest::manifest_inner;
use crate::routes::join_requests::status::status_inner;
use crate::routes::join_requests::submit::submit_inner;
use crate::routes::members::remove::remove_inner;
use crate::server::AppState;
pub async fn run_didcomm_service(
config: &AppConfig,
secrets_resolver: &Arc<ThreadedSecretsResolver>,
vtc_did: &str,
state: AppState,
shutdown_rx: &mut watch::Receiver<bool>,
) {
let mediator_did = match &config.messaging {
Some(m) => &m.mediator_did,
None => {
warn!("messaging not configured — inbound message handling disabled");
let _ = shutdown_rx.changed().await;
return;
}
};
let signing_id = format!("{vtc_did}#key-0");
let ka_id = format!("{vtc_did}#key-1");
let mut secrets = Vec::new();
if let Some(s) = secrets_resolver.get_secret(&signing_id).await {
secrets.push(s);
} else {
warn!("VTC signing secret not found — messaging disabled");
let _ = shutdown_rx.changed().await;
return;
}
if let Some(s) = secrets_resolver.get_secret(&ka_id).await {
secrets.push(s);
}
let profile = TDKProfile::new("VTC", vtc_did, Some(mediator_did), secrets);
let service_config = DIDCommServiceConfig {
listeners: vec![ListenerConfig {
id: "vtc-main".into(),
profile,
restart_policy: RestartPolicy::Always {
backoff: RetryConfig {
initial_delay_secs: 5,
max_delay_secs: 60,
},
},
..Default::default()
}],
};
let router = match Router::new()
.extension(state)
.route(TRUST_PING_TYPE, handler_fn(trust_ping_handler))
.and_then(|r| r.route(MESSAGE_PICKUP_STATUS_TYPE, handler_fn(ignore_handler)))
.and_then(|r| {
r.route(
JOIN_REQUEST_SUBMIT_TYPE,
handler_fn(join_request_submit_handler),
)
})
.and_then(|r| {
r.route(
JOIN_REQUEST_ACCEPT_TYPE,
handler_fn(join_request_accept_handler),
)
})
.and_then(|r| {
r.route(
JOIN_REQUEST_MANIFEST_TYPE,
handler_fn(join_request_manifest_handler),
)
})
.and_then(|r| {
r.route(
JOIN_REQUEST_STATUS_TYPE,
handler_fn(join_request_status_handler),
)
})
.and_then(|r| {
r.route(
MEMBER_SELF_REMOVE_TYPE,
handler_fn(member_self_remove_handler),
)
})
.and_then(|r| {
r.route(
CREDENTIAL_REQUEST_TYPE,
handler_fn(credential_request_handler),
)
})
.and_then(|r| {
r.route(
CREDENTIAL_PRESENT_TYPE,
handler_fn(credential_present_handler),
)
}) {
Ok(r) => r,
Err(e) => {
warn!("failed to build DIDComm router: {e}");
let _ = shutdown_rx.changed().await;
return;
}
};
let shutdown_token = CancellationToken::new();
let service = match DIDCommService::start(service_config, router, shutdown_token.clone()).await
{
Ok(s) => s,
Err(e) => {
warn!("failed to start DIDComm service: {e}");
let _ = shutdown_rx.changed().await;
return;
}
};
if let Err(e) = service
.wait_connected("vtc-main", Duration::from_secs(30))
.await
{
warn!("DIDComm listener not connected after 30s: {e}");
}
let mut event_rx = service.subscribe();
let event_task = tokio::spawn(async move {
loop {
match event_rx.recv().await {
Ok(ListenerEvent::Connected { listener_id }) => {
info!(listener = %listener_id, "DIDComm listener connected");
}
Ok(ListenerEvent::Disconnected { listener_id, error }) => {
warn!(
listener = %listener_id,
error = error.as_deref().unwrap_or("none"),
"DIDComm listener disconnected"
);
}
Ok(ListenerEvent::Restarting {
listener_id,
attempt,
delay,
}) => {
info!(
listener = %listener_id,
attempt,
delay_secs = delay.as_secs(),
"DIDComm listener restarting"
);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
warn!(skipped = n, "DIDComm event logger lagged");
}
}
}
});
info!("DIDComm service started");
let _ = shutdown_rx.changed().await;
service.shutdown().await;
event_task.abort();
info!("DIDComm service stopped");
}
async fn join_request_submit_handler(
ctx: HandlerContext,
message: Message,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let applicant_did = ctx.sender_did.clone().ok_or_else(|| {
DIDCommServiceError::Internal("join-request submit has no DIDComm sender".into())
})?;
let body: JoinRequestSubmitBody = serde_json::from_value(message.body.clone())
.map_err(|e| DIDCommServiceError::Internal(format!("malformed join-request body: {e}")))?;
let outcome = submit_inner(
&state,
applicant_did,
body.vp,
body.registry_consent,
body.extensions,
None,
JoinTransport::DIDComm,
)
.await
.map_err(|e| DIDCommServiceError::Internal(format!("submit failed: {e}")))?;
let receipt = JoinRequestSubmitReceiptBody {
request_id: outcome.request.id,
status: outcome.request.status.to_string(),
};
let body = serde_json::to_value(&receipt)
.map_err(|e| DIDCommServiceError::Internal(format!("receipt serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(JOIN_REQUEST_SUBMIT_RECEIPT_TYPE, body).thid(message.id),
))
}
async fn join_request_accept_handler(
ctx: HandlerContext,
message: Message,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let member_did = ctx.sender_did.clone().ok_or_else(|| {
DIDCommServiceError::Internal("join-request accept has no DIDComm sender".into())
})?;
let body: JoinRequestAcceptBody = serde_json::from_value(message.body.clone())
.map_err(|e| DIDCommServiceError::Internal(format!("malformed join-accept body: {e}")))?;
let outcome = accept_inner(
&state,
body.request_id,
member_did,
body.vmc_id,
body.vc,
None,
JoinTransport::DIDComm,
)
.await
.map_err(|e| DIDCommServiceError::Internal(format!("accept failed: {e}")))?;
let receipt = JoinRequestAcceptReceiptBody {
request_id: outcome.request_id,
status: "accepted".to_string(),
reciprocal_vc_id: outcome.reciprocal_vc_id,
};
let body = serde_json::to_value(&receipt)
.map_err(|e| DIDCommServiceError::Internal(format!("receipt serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(JOIN_REQUEST_ACCEPT_RECEIPT_TYPE, body).thid(message.id),
))
}
async fn join_request_manifest_handler(
_ctx: HandlerContext,
message: Message,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let manifest = manifest_inner(&state)
.await
.map_err(|e| DIDCommServiceError::Internal(format!("manifest failed: {e}")))?;
let body = serde_json::to_value(&manifest)
.map_err(|e| DIDCommServiceError::Internal(format!("manifest serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(JOIN_REQUEST_MANIFEST_RESPONSE_TYPE, body).thid(message.id),
))
}
async fn join_request_status_handler(
ctx: HandlerContext,
message: Message,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let applicant_did = ctx.sender_did.clone().ok_or_else(|| {
DIDCommServiceError::Internal("join-request status has no DIDComm sender".into())
})?;
let body: JoinRequestStatusBody = serde_json::from_value(message.body.clone())
.map_err(|e| DIDCommServiceError::Internal(format!("malformed status body: {e}")))?;
let resp = status_inner(&state, body.request_id, applicant_did, None)
.await
.map_err(|e| DIDCommServiceError::Internal(format!("status failed: {e}")))?;
let body = serde_json::to_value(&resp)
.map_err(|e| DIDCommServiceError::Internal(format!("status serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(JOIN_REQUEST_STATUS_RESPONSE_TYPE, body).thid(message.id),
))
}
async fn member_self_remove_handler(
ctx: HandlerContext,
message: Message,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let caller_did = ctx
.sender_did
.clone()
.ok_or_else(|| DIDCommServiceError::Internal("self-remove has no DIDComm sender".into()))?;
let body: SelfRemoveBody = serde_json::from_value(message.body.clone())
.map_err(|e| DIDCommServiceError::Internal(format!("malformed self-remove body: {e}")))?;
let disposition = body
.disposition
.as_deref()
.map(parse_disposition)
.transpose()
.map_err(DIDCommServiceError::Internal)?;
let outcome = remove_inner(&state, &caller_did, &caller_did, disposition, String::new())
.await
.map_err(|e| DIDCommServiceError::Internal(format!("self-remove: {e}")))?;
let receipt = SelfRemoveReceiptBody {
did: outcome.did,
disposition: outcome.disposition,
removed: outcome.removed,
};
let body = serde_json::to_value(&receipt)
.map_err(|e| DIDCommServiceError::Internal(format!("receipt serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(MEMBER_SELF_REMOVE_RECEIPT_TYPE, body).thid(message.id),
))
}
async fn credential_request_handler(
_ctx: HandlerContext,
message: Message,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let body: RequestBody = serde_json::from_value(message.body.clone())
.map_err(|e| DIDCommServiceError::Internal(format!("malformed credential request: {e}")))?;
let response = crate::credentials::redeem(
&state.join_requests_ks,
&body.credential_request,
chrono::Utc::now(),
)
.await
.map_err(|e| DIDCommServiceError::Internal(format!("credential issuance: {e}")))?;
let issue = IssueBody {
credential_response: Some(response),
sealed: None,
};
let issue_body = serde_json::to_value(&issue)
.map_err(|e| DIDCommServiceError::Internal(format!("issue serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(CREDENTIAL_ISSUE_TYPE, issue_body).thid(message.id),
))
}
async fn credential_present_handler(
_ctx: HandlerContext,
message: Message,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let body: PresentBody = serde_json::from_value(message.body.clone())
.map_err(|e| DIDCommServiceError::Internal(format!("malformed present body: {e}")))?;
let thread_id = message.thid.clone().ok_or_else(|| {
DIDCommServiceError::Internal(
"present carries no thread id (thid) to correlate its challenge".into(),
)
})?;
let now = chrono::Utc::now();
let challenge =
crate::credentials::present_challenge::consume(&state.join_requests_ks, &thread_id, now)
.await
.map_err(|e| DIDCommServiceError::Internal(format!("present challenge: {e}")))?;
let outcome = crate::routes::join_requests::present::present_and_decide_join(
&state,
&body.vp_token,
&challenge.aud,
&challenge.nonce,
JoinTransport::DIDComm,
now,
)
.await
.map_err(|e| DIDCommServiceError::Internal(format!("present decision: {e}")))?;
if let Some(admit) = outcome.admit.as_deref() {
let holder_did = outcome.request.applicant_did.clone();
if let Err(e) =
crate::credentials::delivery::deliver_membership_credentials(&state, &holder_did, admit)
.await
{
warn!(holder = %holder_did, request = %outcome.request.id, error = %e, "membership-credential delivery failed; credential is issued and can be re-delivered");
} else {
info!(holder = %holder_did, request = %outcome.request.id, "delivered membership credentials to holder over DIDComm");
}
}
let status = serde_json::to_value(outcome.request.status)
.ok()
.and_then(|v| v.as_str().map(str::to_string))
.unwrap_or_default();
let receipt = JoinRequestSubmitReceiptBody {
request_id: outcome.request.id,
status,
};
let receipt_body = serde_json::to_value(&receipt)
.map_err(|e| DIDCommServiceError::Internal(format!("receipt serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(JOIN_REQUEST_SUBMIT_RECEIPT_TYPE, receipt_body).thid(message.id),
))
}
fn parse_disposition(s: &str) -> Result<Disposition, String> {
match s.to_ascii_lowercase().as_str() {
"purge" => Ok(Disposition::Purge),
"tombstone" => Ok(Disposition::Tombstone),
"historical" => Ok(Disposition::Historical),
"policydefault" => Ok(Disposition::PolicyDefault),
other => Err(format!(
"unknown disposition '{other}' (expected purge|tombstone|historical|policydefault)"
)),
}
}