use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::Mutex as TokioMutex;
use tracing::info;
use zeroize::Zeroizing;
use crate::error::AppError;
use crate::keys::seed_store::SeedStore;
use crate::operations::internal_authority::InternalAuthority;
use crate::store::KeyspaceHandle;
use crate::webvh_auth::VtaSigningIdentityOwned;
use crate::webvh_client::{TokenData, WebvhClient};
use crate::webvh_store::{
WebvhServerAuthRecord, delete_server_auth, get_server_auth, store_server_auth,
};
use vta_sdk::did_key::decode_private_key_multibase;
use vta_sdk::webvh::WebvhServerRecord;
const ACCESS_TOKEN_REFRESH_SKEW_SECS: u64 = 30;
#[derive(Clone, Default)]
pub struct WebvhAuthLocks {
inner: Arc<std::sync::Mutex<HashMap<String, Arc<TokioMutex<()>>>>>,
}
impl WebvhAuthLocks {
pub fn new() -> Self {
Self::default()
}
pub fn lock_for(&self, server_id: &str) -> Arc<TokioMutex<()>> {
let mut map = self.inner.lock().expect("WebvhAuthLocks mutex poisoned");
map.entry(server_id.to_string())
.or_insert_with(|| Arc::new(TokioMutex::new(())))
.clone()
}
}
fn unix_now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn access_token_is_fresh(record: &WebvhServerAuthRecord, now_secs: u64) -> bool {
record
.access_expires_at
.saturating_sub(ACCESS_TOKEN_REFRESH_SKEW_SECS)
> now_secs
}
pub struct AuthContext<'a> {
pub webvh_ks: &'a KeyspaceHandle,
pub identity: &'a VtaSigningIdentityOwned,
pub locks: &'a WebvhAuthLocks,
}
pub async fn ensure_fresh_access_token(
auth_ctx: &AuthContext<'_>,
server: &WebvhServerRecord,
client: &mut WebvhClient,
) -> Result<String, AppError> {
let lock = auth_ctx.locks.lock_for(&server.id);
let _guard = lock.lock().await;
if let Some(record) = get_server_auth(auth_ctx.webvh_ks, &server.id).await?
&& access_token_is_fresh(&record, unix_now_secs())
{
let token = record.access_token.clone();
client.set_access_token(token.clone());
return Ok(token);
}
let identity = auth_ctx.identity.as_ref();
let cached = get_server_auth(auth_ctx.webvh_ks, &server.id).await?;
if let Some(stale) = cached {
match client.refresh(&identity, &stale.refresh_token).await {
Ok(tokens) => {
let token = tokens.access_token.clone();
persist_tokens(auth_ctx.webvh_ks, &server.id, &tokens).await?;
client.set_access_token(token.clone());
return Ok(token);
}
Err(AppError::Authentication(reason)) => {
info!(
server_id = %server.id,
reason = %reason,
"webvh refresh rejected; falling back to full authenticate"
);
}
Err(e) => return Err(e),
}
}
let tokens = client.authenticate(&identity).await?;
let token = tokens.access_token.clone();
persist_tokens(auth_ctx.webvh_ks, &server.id, &tokens).await?;
client.set_access_token(token.clone());
Ok(token)
}
pub async fn invalidate_cached_token(
webvh_ks: &KeyspaceHandle,
server_id: &str,
) -> Result<(), AppError> {
delete_server_auth(webvh_ks, server_id).await
}
async fn persist_tokens(
webvh_ks: &KeyspaceHandle,
server_id: &str,
tokens: &TokenData,
) -> Result<WebvhServerAuthRecord, AppError> {
let record = WebvhServerAuthRecord {
server_id: server_id.to_string(),
access_token: tokens.access_token.clone(),
access_expires_at: tokens.access_expires_at,
refresh_token: tokens.refresh_token.clone(),
refresh_expires_at: tokens.refresh_expires_at,
};
store_server_auth(webvh_ks, &record).await?;
Ok(record)
}
#[allow(clippy::too_many_arguments)]
pub async fn publish_log_to_server(
keys_ks: &KeyspaceHandle,
imported_ks: &KeyspaceHandle,
audit_ks: &KeyspaceHandle,
webvh_ks: &KeyspaceHandle,
seed_store: &dyn SeedStore,
did_resolver: &affinidi_did_resolver_cache_sdk::DIDCacheClient,
didcomm_bridge: &Arc<crate::didcomm_bridge::DIDCommBridge>,
auth_locks: &WebvhAuthLocks,
vta_did: &str,
server: &WebvhServerRecord,
mnemonic: &str,
log_content: &str,
domain: Option<&str>,
) -> Result<(), AppError> {
let identity =
load_vta_webvh_signing_identity(keys_ks, imported_ks, seed_store, audit_ks, vta_did)
.await?;
let auth_ctx = AuthContext {
webvh_ks,
identity: &identity,
locks: auth_locks,
};
let mut transport = super::WebvhTransport::from_server_authenticated(
server,
did_resolver,
didcomm_bridge,
&auth_ctx,
)
.await?;
transport
.publish_did_authenticated(mnemonic, log_content, domain, &auth_ctx, server)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn delete_log_on_server(
keys_ks: &KeyspaceHandle,
imported_ks: &KeyspaceHandle,
audit_ks: &KeyspaceHandle,
webvh_ks: &KeyspaceHandle,
seed_store: &dyn SeedStore,
did_resolver: &affinidi_did_resolver_cache_sdk::DIDCacheClient,
didcomm_bridge: &Arc<crate::didcomm_bridge::DIDCommBridge>,
auth_locks: &WebvhAuthLocks,
vta_did: &str,
server: &WebvhServerRecord,
mnemonic: &str,
domain: Option<&str>,
) -> Result<(), AppError> {
let identity =
load_vta_webvh_signing_identity(keys_ks, imported_ks, seed_store, audit_ks, vta_did)
.await?;
let auth_ctx = AuthContext {
webvh_ks,
identity: &identity,
locks: auth_locks,
};
let mut transport = super::WebvhTransport::from_server_authenticated(
server,
did_resolver,
didcomm_bridge,
&auth_ctx,
)
.await?;
transport
.delete_did_authenticated(mnemonic, domain, &auth_ctx, server)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn register_did_atomic_on_server(
keys_ks: &KeyspaceHandle,
imported_ks: &KeyspaceHandle,
audit_ks: &KeyspaceHandle,
webvh_ks: &KeyspaceHandle,
seed_store: &dyn SeedStore,
did_resolver: &affinidi_did_resolver_cache_sdk::DIDCacheClient,
didcomm_bridge: &Arc<crate::didcomm_bridge::DIDCommBridge>,
auth_locks: &WebvhAuthLocks,
vta_did: &str,
server: &WebvhServerRecord,
path: &str,
did_log: &str,
force: bool,
domain: Option<&str>,
) -> Result<crate::webvh_client::RequestUriResponse, AppError> {
let identity =
load_vta_webvh_signing_identity(keys_ks, imported_ks, seed_store, audit_ks, vta_did)
.await?;
let auth_ctx = AuthContext {
webvh_ks,
identity: &identity,
locks: auth_locks,
};
let mut transport = super::WebvhTransport::from_server_authenticated(
server,
did_resolver,
didcomm_bridge,
&auth_ctx,
)
.await?;
transport
.register_did_atomic_authenticated(path, did_log, force, domain, &auth_ctx, server)
.await
}
pub async fn load_vta_webvh_signing_identity(
keys_ks: &KeyspaceHandle,
imported_ks: &KeyspaceHandle,
seed_store: &dyn SeedStore,
audit_ks: &KeyspaceHandle,
vta_did: &str,
) -> Result<VtaSigningIdentityOwned, AppError> {
let signing_kid = format!("{vta_did}#key-0");
let authority = InternalAuthority::new("webvh-rest-auth");
let resp = crate::operations::keys::get_key_secret_internal(
keys_ks,
imported_ks,
seed_store,
audit_ks,
authority,
&signing_kid,
"webvh-rest-auth-internal",
)
.await?;
let bytes: [u8; 32] = decode_private_key_multibase(&resp.private_key_multibase)
.map_err(|e| AppError::Internal(format!("decode VTA signing key for daemon auth: {e}")))?;
Ok(VtaSigningIdentityOwned {
vta_did: vta_did.to_string(),
signing_kid,
private_key: Zeroizing::new(bytes),
})
}