use std::sync::Arc;
use std::time::Duration;
use affinidi_messaging_didcomm::{Message, UnpackMetadata};
use affinidi_messaging_didcomm_service::{
DIDCommResponse, DIDCommService, DIDCommServiceConfig, DIDCommServiceError, Extension,
HandlerContext, ListenerConfig, ListenerEvent, MESSAGE_PICKUP_STATUS_TYPE, RestartPolicy,
RetryConfig, Router, TRUST_PING_TYPE, handler_fn, ignore_handler, trust_ping_handler,
};
use affinidi_tdk::common::profiles::TDKProfile;
use affinidi_tdk::secrets_resolver::{SecretsResolver, ThreadedSecretsResolver};
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use serde_json::json;
use vti_common::error::AppError;
use vta_sdk::protocols::credential_exchange::PRESENT as CREDENTIAL_PRESENT_TYPE;
use vta_sdk::protocols::credential_exchange::REQUEST as CREDENTIAL_REQUEST_TYPE;
use vta_sdk::protocols::credential_exchange::{
ISSUE as CREDENTIAL_ISSUE_TYPE, IssueBody, PresentBody, RequestBody,
};
use vta_sdk::protocols::join_requests::{
JOIN_REQUEST_ACCEPT_RECEIPT_TYPE, JOIN_REQUEST_ACCEPT_TYPE,
JOIN_REQUEST_MANIFEST_RESPONSE_TYPE, JOIN_REQUEST_MANIFEST_TYPE,
JOIN_REQUEST_STATUS_RESPONSE_TYPE, JOIN_REQUEST_STATUS_TYPE, JOIN_REQUEST_SUBMIT_RECEIPT_TYPE,
JOIN_REQUEST_SUBMIT_TYPE, JoinRequestAcceptBody, JoinRequestAcceptReceiptBody,
JoinRequestStatusBody, JoinRequestSubmitBody, JoinRequestSubmitReceiptBody,
MEMBER_SELF_REMOVE_RECEIPT_TYPE, MEMBER_SELF_REMOVE_TYPE, SelfRemoveBody,
SelfRemoveReceiptBody,
};
use vta_sdk::protocols::{PROBLEM_REPORT_TYPE, problem_report_codes as codes};
use crate::ceremony::remove_inner;
use crate::config::AppConfig;
use crate::join::JoinTransport;
use crate::join::submit_inner;
use crate::members::Disposition;
use crate::routes::join_requests::accept::accept_inner;
use crate::routes::join_requests::manifest::manifest_inner;
use crate::routes::join_requests::status::status_inner;
use crate::server::AppState;
pub async fn run_didcomm_service(
config: &AppConfig,
secrets_resolver: &Arc<ThreadedSecretsResolver>,
vtc_did: &str,
state: AppState,
shutdown_rx: &mut watch::Receiver<bool>,
) {
let mediator_did = match &config.messaging {
Some(m) => &m.mediator_did,
None => {
warn!("messaging not configured — inbound message handling disabled");
let _ = shutdown_rx.changed().await;
return;
}
};
let signing_id = format!("{vtc_did}#key-0");
let ka_id = format!("{vtc_did}#key-1");
let mut secrets = Vec::new();
if let Some(s) = secrets_resolver.get_secret(&signing_id).await {
secrets.push(s);
} else {
warn!("VTC signing secret not found — messaging disabled");
let _ = shutdown_rx.changed().await;
return;
}
if let Some(s) = secrets_resolver.get_secret(&ka_id).await {
secrets.push(s);
}
let profile = TDKProfile::new("VTC", vtc_did, Some(mediator_did), secrets);
let service_config = DIDCommServiceConfig {
listeners: vec![ListenerConfig {
id: "vtc-main".into(),
profile,
restart_policy: RestartPolicy::Always {
backoff: RetryConfig {
initial_delay_secs: 5,
max_delay_secs: 60,
},
},
..Default::default()
}],
};
let router = match Router::new()
.extension(state)
.route(TRUST_PING_TYPE, handler_fn(trust_ping_handler))
.and_then(|r| r.route(MESSAGE_PICKUP_STATUS_TYPE, handler_fn(ignore_handler)))
.and_then(|r| {
r.route(
JOIN_REQUEST_SUBMIT_TYPE,
handler_fn(join_request_submit_handler),
)
})
.and_then(|r| {
r.route(
JOIN_REQUEST_ACCEPT_TYPE,
handler_fn(join_request_accept_handler),
)
})
.and_then(|r| {
r.route(
JOIN_REQUEST_MANIFEST_TYPE,
handler_fn(join_request_manifest_handler),
)
})
.and_then(|r| {
r.route(
JOIN_REQUEST_STATUS_TYPE,
handler_fn(join_request_status_handler),
)
})
.and_then(|r| {
r.route(
MEMBER_SELF_REMOVE_TYPE,
handler_fn(member_self_remove_handler),
)
})
.and_then(|r| {
r.route(
CREDENTIAL_REQUEST_TYPE,
handler_fn(credential_request_handler),
)
})
.and_then(|r| {
r.route(
CREDENTIAL_PRESENT_TYPE,
handler_fn(credential_present_handler),
)
}) {
Ok(r) => r,
Err(e) => {
warn!("failed to build DIDComm router: {e}");
let _ = shutdown_rx.changed().await;
return;
}
};
let shutdown_token = CancellationToken::new();
let service = match DIDCommService::start(service_config, router, shutdown_token.clone()).await
{
Ok(s) => s,
Err(e) => {
warn!("failed to start DIDComm service: {e}");
let _ = shutdown_rx.changed().await;
return;
}
};
if let Err(e) = service
.wait_connected("vtc-main", Duration::from_secs(30))
.await
{
warn!("DIDComm listener not connected after 30s: {e}");
}
let mut event_rx = service.subscribe();
let event_task = tokio::spawn(async move {
loop {
match event_rx.recv().await {
Ok(ListenerEvent::Connected { listener_id }) => {
info!(listener = %listener_id, "DIDComm listener connected");
}
Ok(ListenerEvent::Disconnected { listener_id, error }) => {
warn!(
listener = %listener_id,
error = error.as_deref().unwrap_or("none"),
"DIDComm listener disconnected"
);
}
Ok(ListenerEvent::Restarting {
listener_id,
attempt,
delay,
}) => {
info!(
listener = %listener_id,
attempt,
delay_secs = delay.as_secs(),
"DIDComm listener restarting"
);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
warn!(skipped = n, "DIDComm event logger lagged");
}
}
}
});
info!("DIDComm service started");
let _ = shutdown_rx.changed().await;
service.shutdown().await;
event_task.abort();
info!("DIDComm service stopped");
}
fn problem_report(thid: String, code: &str, comment: impl Into<String>) -> DIDCommResponse {
DIDCommResponse::new(PROBLEM_REPORT_TYPE, problem_report_body(code, comment)).thid(thid)
}
fn problem_report_body(code: &str, comment: impl Into<String>) -> serde_json::Value {
json!({ "code": code, "comment": comment.into() })
}
pub(crate) fn app_error_code(err: &AppError) -> &'static str {
match err {
AppError::Forbidden(_) | AppError::StepUpRequired(_) => codes::FORBIDDEN,
AppError::Unauthorized(_) | AppError::Authentication(_) => codes::UNAUTHORIZED,
AppError::NotFound(_) => codes::NOT_FOUND,
AppError::Conflict(_) | AppError::IdempotencyKeyConflict => codes::CONFLICT,
AppError::Validation(_)
| AppError::TrustTaskMalformed(_)
| AppError::TrustTaskMissing
| AppError::InvalidCursor => codes::BAD_REQUEST,
_ => codes::INTERNAL,
}
}
fn app_error_report(thid: String, err: &AppError) -> DIDCommResponse {
problem_report(thid, app_error_code(err), err.to_string())
}
async fn join_request_submit_handler(
message: Message,
meta: UnpackMetadata,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let thid = message.id.clone();
let applicant_did = authenticated_sender_did(&message, &meta)?;
let body: JoinRequestSubmitBody = match serde_json::from_value(message.body.clone()) {
Ok(b) => b,
Err(e) => {
return Ok(Some(problem_report(
thid,
codes::BAD_REQUEST,
format!("malformed join-request body: {e}"),
)));
}
};
let outcome = match submit_inner(
&state,
applicant_did,
body.vp,
body.registry_consent,
body.extensions,
None,
JoinTransport::DIDComm,
)
.await
{
Ok(o) => o,
Err(e) => return Ok(Some(app_error_report(thid, &e))),
};
let receipt = JoinRequestSubmitReceiptBody {
request_id: outcome.request.id,
status: outcome.request.status.to_string(),
};
let body = serde_json::to_value(&receipt)
.map_err(|e| DIDCommServiceError::Internal(format!("receipt serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(JOIN_REQUEST_SUBMIT_RECEIPT_TYPE, body).thid(message.id),
))
}
async fn join_request_accept_handler(
message: Message,
meta: UnpackMetadata,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let thid = message.id.clone();
let member_did = authenticated_sender_did(&message, &meta)?;
let body: JoinRequestAcceptBody = match serde_json::from_value(message.body.clone()) {
Ok(b) => b,
Err(e) => {
return Ok(Some(problem_report(
thid,
codes::BAD_REQUEST,
format!("malformed join-accept body: {e}"),
)));
}
};
let outcome = match accept_inner(
&state,
body.request_id,
member_did,
body.vmc_id,
body.vc,
None,
JoinTransport::DIDComm,
)
.await
{
Ok(o) => o,
Err(e) => return Ok(Some(app_error_report(thid, &e))),
};
let receipt = JoinRequestAcceptReceiptBody {
request_id: outcome.request_id,
status: "accepted".to_string(),
reciprocal_vc_id: outcome.reciprocal_vc_id,
};
let body = serde_json::to_value(&receipt)
.map_err(|e| DIDCommServiceError::Internal(format!("receipt serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(JOIN_REQUEST_ACCEPT_RECEIPT_TYPE, body).thid(message.id),
))
}
async fn join_request_manifest_handler(
_ctx: HandlerContext,
message: Message,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let manifest = match manifest_inner(&state).await {
Ok(m) => m,
Err(e) => return Ok(Some(app_error_report(message.id.clone(), &e))),
};
let body = serde_json::to_value(&manifest)
.map_err(|e| DIDCommServiceError::Internal(format!("manifest serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(JOIN_REQUEST_MANIFEST_RESPONSE_TYPE, body).thid(message.id),
))
}
async fn join_request_status_handler(
message: Message,
meta: UnpackMetadata,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let thid = message.id.clone();
let applicant_did = authenticated_sender_did(&message, &meta)?;
let body: JoinRequestStatusBody = match serde_json::from_value(message.body.clone()) {
Ok(b) => b,
Err(e) => {
return Ok(Some(problem_report(
thid,
codes::BAD_REQUEST,
format!("malformed status body: {e}"),
)));
}
};
let resp = match status_inner(&state, body.request_id, applicant_did, None).await {
Ok(r) => r,
Err(e) => return Ok(Some(app_error_report(thid, &e))),
};
let body = serde_json::to_value(&resp)
.map_err(|e| DIDCommServiceError::Internal(format!("status serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(JOIN_REQUEST_STATUS_RESPONSE_TYPE, body).thid(message.id),
))
}
async fn member_self_remove_handler(
message: Message,
meta: UnpackMetadata,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let thid = message.id.clone();
let caller_did = authenticated_sender_did(&message, &meta)?;
let body: SelfRemoveBody = match serde_json::from_value(message.body.clone()) {
Ok(b) => b,
Err(e) => {
return Ok(Some(problem_report(
thid,
codes::BAD_REQUEST,
format!("malformed self-remove body: {e}"),
)));
}
};
let disposition = match body
.disposition
.as_deref()
.map(parse_disposition)
.transpose()
{
Ok(d) => d,
Err(e) => return Ok(Some(problem_report(thid, codes::BAD_REQUEST, e))),
};
let outcome =
match remove_inner(&state, &caller_did, &caller_did, disposition, String::new()).await {
Ok(o) => o,
Err(e) => return Ok(Some(app_error_report(thid, &e))),
};
let receipt = SelfRemoveReceiptBody {
did: outcome.did,
disposition: outcome.disposition,
removed: outcome.removed,
};
let body = serde_json::to_value(&receipt)
.map_err(|e| DIDCommServiceError::Internal(format!("receipt serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(MEMBER_SELF_REMOVE_RECEIPT_TYPE, body).thid(message.id),
))
}
async fn credential_request_handler(
_ctx: HandlerContext,
message: Message,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let body: RequestBody = serde_json::from_value(message.body.clone())
.map_err(|e| DIDCommServiceError::Internal(format!("malformed credential request: {e}")))?;
let response = crate::credentials::redeem(
&state.join_requests_ks,
&body.credential_request,
chrono::Utc::now(),
)
.await
.map_err(|e| DIDCommServiceError::Internal(format!("credential issuance: {e}")))?;
let issue = IssueBody {
credential_response: Some(response),
sealed: None,
};
let issue_body = serde_json::to_value(&issue)
.map_err(|e| DIDCommServiceError::Internal(format!("issue serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(CREDENTIAL_ISSUE_TYPE, issue_body).thid(message.id),
))
}
async fn credential_present_handler(
_ctx: HandlerContext,
message: Message,
Extension(state): Extension<AppState>,
) -> Result<Option<DIDCommResponse>, DIDCommServiceError> {
let body: PresentBody = serde_json::from_value(message.body.clone())
.map_err(|e| DIDCommServiceError::Internal(format!("malformed present body: {e}")))?;
let thread_id = message.thid.clone().ok_or_else(|| {
DIDCommServiceError::Internal(
"present carries no thread id (thid) to correlate its challenge".into(),
)
})?;
let now = chrono::Utc::now();
let challenge =
crate::credentials::present_challenge::consume(&state.join_requests_ks, &thread_id, now)
.await
.map_err(|e| DIDCommServiceError::Internal(format!("present challenge: {e}")))?;
let outcome = crate::routes::join_requests::present::present_and_decide_join(
&state,
&body.vp_token,
&challenge.aud,
&challenge.nonce,
JoinTransport::DIDComm,
now,
)
.await
.map_err(|e| DIDCommServiceError::Internal(format!("present decision: {e}")))?;
if let Some(admit) = outcome.admit.as_deref() {
let holder_did = outcome.request.applicant_did.clone();
if let Err(e) =
crate::credentials::delivery::deliver_membership_credentials(&state, &holder_did, admit)
.await
{
warn!(holder = %holder_did, request = %outcome.request.id, error = %e, "membership-credential delivery failed; credential is issued and can be re-delivered");
} else {
info!(holder = %holder_did, request = %outcome.request.id, "delivered membership credentials to holder over DIDComm");
}
}
let status = serde_json::to_value(outcome.request.status)
.ok()
.and_then(|v| v.as_str().map(str::to_string))
.unwrap_or_default();
let receipt = JoinRequestSubmitReceiptBody {
request_id: outcome.request.id,
status,
};
let receipt_body = serde_json::to_value(&receipt)
.map_err(|e| DIDCommServiceError::Internal(format!("receipt serialise: {e}")))?;
Ok(Some(
DIDCommResponse::new(JOIN_REQUEST_SUBMIT_RECEIPT_TYPE, receipt_body).thid(message.id),
))
}
fn authenticated_sender_did(
message: &Message,
meta: &UnpackMetadata,
) -> Result<String, DIDCommServiceError> {
if !meta.authenticated || meta.anonymous_sender {
return Err(DIDCommServiceError::Internal(
"DIDComm message is not authcrypt-authenticated — sender cannot be trusted".into(),
));
}
let from = message.from.as_deref().ok_or_else(|| {
DIDCommServiceError::Internal(
"authenticated DIDComm message carries no `from` — sender cannot be trusted".into(),
)
})?;
let skid = meta.encrypted_from_kid.as_deref().ok_or_else(|| {
DIDCommServiceError::Internal(
"authenticated DIDComm message exposes no sender key id — sender cannot be trusted"
.into(),
)
})?;
let authenticated_did = skid.split_once('#').map(|(did, _)| did).unwrap_or(skid);
if authenticated_did != from {
return Err(DIDCommServiceError::Internal(format!(
"DIDComm sender spoofed: `from` ({from}) does not match the authcrypt sender key \
({authenticated_did})"
)));
}
Ok(from.to_string())
}
fn parse_disposition(s: &str) -> Result<Disposition, String> {
match s.to_ascii_lowercase().as_str() {
"purge" => Ok(Disposition::Purge),
"tombstone" => Ok(Disposition::Tombstone),
"historical" => Ok(Disposition::Historical),
"policydefault" => Ok(Disposition::PolicyDefault),
other => Err(format!(
"unknown disposition '{other}' (expected purge|tombstone|historical|policydefault)"
)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn msg(from: Option<&str>) -> Message {
let mut b = Message::build("id-1".to_string(), "test/1.0".to_string(), json!({}));
if let Some(f) = from {
b = b.from(f.to_string());
}
b.finalize()
}
fn meta(authenticated: bool, anonymous_sender: bool, skid: Option<&str>) -> UnpackMetadata {
UnpackMetadata {
encrypted: true,
authenticated,
anonymous_sender,
encrypted_from_kid: skid.map(str::to_string),
..Default::default()
}
}
const ALICE: &str = "did:key:z6MkAlice";
const ALICE_SKID: &str = "did:key:z6MkAlice#z6MkAlice";
#[test]
fn app_error_maps_to_typed_problem_report_codes() {
assert_eq!(
app_error_code(&AppError::Forbidden("nope".into())),
codes::FORBIDDEN
);
assert_eq!(
app_error_code(&AppError::Unauthorized("nope".into())),
codes::UNAUTHORIZED
);
assert_eq!(
app_error_code(&AppError::NotFound("nope".into())),
codes::NOT_FOUND
);
assert_eq!(
app_error_code(&AppError::Conflict("dup".into())),
codes::CONFLICT
);
assert_eq!(
app_error_code(&AppError::Validation("bad".into())),
codes::BAD_REQUEST
);
assert_eq!(
app_error_code(&AppError::Internal("boom".into())),
codes::INTERNAL
);
}
#[test]
fn problem_report_body_is_code_and_comment() {
let body = problem_report_body(codes::BAD_REQUEST, "malformed body");
let (code, comment) = vta_sdk::protocols::extract_problem_report(&body);
assert_eq!(code, codes::BAD_REQUEST);
assert_eq!(comment, "malformed body");
}
#[test]
fn accepts_authenticated_sender_whose_from_matches_the_authcrypt_key() {
let got = authenticated_sender_did(&msg(Some(ALICE)), &meta(true, false, Some(ALICE_SKID)))
.unwrap();
assert_eq!(got, ALICE);
}
#[test]
fn rejects_spoofed_from_authcrypted_with_a_different_key() {
let victim = "did:key:z6MkVictim";
let attacker_skid = "did:key:z6MkAttacker#z6MkAttacker";
let err =
authenticated_sender_did(&msg(Some(victim)), &meta(true, false, Some(attacker_skid)))
.unwrap_err();
assert!(
format!("{err}").contains("spoofed"),
"expected spoof rejection, got: {err}"
);
}
#[test]
fn rejects_unauthenticated_anoncrypt() {
let err =
authenticated_sender_did(&msg(Some(ALICE)), &meta(false, true, None)).unwrap_err();
assert!(
format!("{err}").contains("not authcrypt-authenticated"),
"expected auth rejection, got: {err}"
);
}
#[test]
fn rejects_authenticated_but_anonymous_sender() {
let err = authenticated_sender_did(&msg(Some(ALICE)), &meta(true, true, Some(ALICE_SKID)))
.unwrap_err();
assert!(
format!("{err}").contains("not authcrypt-authenticated"),
"expected auth rejection, got: {err}"
);
}
#[test]
fn rejects_authenticated_message_with_no_from() {
let err =
authenticated_sender_did(&msg(None), &meta(true, false, Some(ALICE_SKID))).unwrap_err();
assert!(
format!("{err}").contains("no `from`"),
"expected missing-from rejection, got: {err}"
);
}
#[test]
fn rejects_authenticated_message_with_no_sender_key_id() {
let err =
authenticated_sender_did(&msg(Some(ALICE)), &meta(true, false, None)).unwrap_err();
assert!(
format!("{err}").contains("no sender key id"),
"expected missing-skid rejection, got: {err}"
);
}
}