use affinidi_did_resolver_cache_sdk::{
DIDCacheClient, DIDMethod, ResolveResponse,
networking::{WSRequest, WSResponse, WSResponseError, WSResponseType},
};
use axum::{
extract::{
State, WebSocketUpgrade,
ws::{Message, WebSocket},
},
response::IntoResponse,
};
use tokio::select;
use tracing::{Instrument, debug, info, span, warn};
use crate::{
SharedData,
handlers::{fetch_webvh_log, resolve_with_timeout},
};
async fn build_response(response: ResolveResponse) -> WSResponseType {
let (did_log, did_witness_log) = if response.method == DIDMethod::WEBVH {
fetch_webvh_log(&response.did).await
} else {
(None, None)
};
WSResponseType::Response(Box::new(WSResponse {
did: response.did.clone(),
hash: response.did_hash,
document: response.doc,
did_log,
did_witness_log,
}))
}
async fn send_response(socket: &mut WebSocket, message: &WSResponseType) -> bool {
let text = match serde_json::to_string(message) {
Ok(text) => text,
Err(e) => {
warn!("ws: failed to serialize response, closing connection: {e:?}");
return false;
}
};
match socket.send(Message::Text(text.into())).await {
Ok(()) => {
debug!("Sent response: {message:?}");
true
}
Err(e) => {
warn!("ws: Error sending response: {e:?}");
false
}
}
}
async fn resolve_and_respond(socket: &mut WebSocket, state: &SharedData, did: String) -> bool {
match resolve_with_timeout(&state.resolver, state.resolve_timeout, &did).await {
Ok(response) => {
{
let mut stats = state.stats().await;
stats.increment_resolver_success();
if response.cache_hit {
stats.increment_cache_hit();
}
stats.increment_did_method_success(response.method.clone());
}
debug!(
"resolved DID: ({}) cache_hit?({})",
response.did, response.cache_hit
);
let message = build_response(response).await;
send_response(socket, &message).await
}
Err(e) => {
let hash = DIDCacheClient::hash_did(&did);
warn!("Couldn't resolve DID: ({did}) Reason: {e}");
state.stats().await.increment_resolver_error();
let message = WSResponseType::Error(WSResponseError {
did,
hash,
error: e.to_string(),
});
send_response(socket, &message).await
}
}
}
pub async fn websocket_handler(
ws: WebSocketUpgrade,
State(state): State<SharedData>,
) -> impl IntoResponse {
let _span = span!(
tracing::Level::DEBUG,
"websocket_handler",
);
async move { ws.on_upgrade(move |socket| handle_socket(socket, state)) }
.instrument(_span)
.await
}
async fn handle_socket(mut socket: WebSocket, state: SharedData) {
let _span = span!(
tracing::Level::DEBUG,
"handle_socket",
);
async move {
state.stats().await.increment_ws_opened();
info!("Websocket connection established");
loop {
select! {
value = socket.recv() => {
if let Some(msg) = value {
match msg {
Ok(msg) => {
match msg {
Message::Text(msg) => {
debug!("ws: Received text message: {:?}", msg);
let request: WSRequest = match serde_json::from_str(&msg) {
Ok(request) => request,
Err(e) => {
warn!("ws: Error parsing message: {:?}", e);
break;
}
};
if !resolve_and_respond(&mut socket, &state, request.did).await {
break;
}
}
Message::Binary(msg) => {
debug!("ws: Received binary message: {:?}", msg);
let request: WSRequest = match serde_json::from_slice(msg.as_ref()) {
Ok(request) => request,
Err(e) => {
warn!("ws: Error parsing message: {:?}", e);
break;
}
};
if !resolve_and_respond(&mut socket, &state, request.did).await {
break;
}
}
Message::Ping(_) => {
}
Message::Pong(_) => {
}
Message::Close(_) => {
debug!("Received close message, closing connection");
break;
}
}
}
Err(err) => {
warn!("Error receiving message: {:?}", err);
continue;
}
}
} else {
debug!("Received None, closing connection");
break;
}
}
}
}
state.stats().await.increment_ws_closed();
info!("Websocket connection closed");
}
.instrument(_span)
.await
}