use std::sync::Arc;
use serde_json::Value as JsonValue;
use tracing::info;
use vti_common::telemetry::{SharedTelemetrySink, TelemetryEvent, TelemetryKind};
use vta_sdk::protocol::services::validate_service_url;
use crate::auth::AuthClaims;
use crate::config::AppConfig;
use crate::operations::did_webvh::{UpdateDidWebvhError, UpdateDidWebvhOptions, update_did_webvh};
use crate::operations::protocol::document::DocumentPatchError;
use crate::operations::protocol::invariant::{
CurrentServices, ProposedOp, would_violate_last_service,
};
use crate::operations::protocol::preconditions::ProtocolPreconditionError;
use crate::operations::protocol::snapshot::{self, ServiceConfigSnapshot, ServiceKind};
use crate::operations::protocol::{OpContext, PROTOCOL_LOCK};
use crate::store::KeyspaceHandle;
use tokio::sync::RwLock;
use vta_sdk::error::VtaError;
pub(crate) trait ServiceLifecycle {
const LABEL: &'static str;
const KIND: ServiceKind;
const ENABLE_TELEMETRY: TelemetryKind;
const UPDATE_TELEMETRY: TelemetryKind;
fn config_enabled(cfg: &AppConfig) -> bool;
fn current_service_url(doc: &JsonValue) -> Option<String>;
fn with_service(doc: JsonValue, url: &str) -> Result<JsonValue, DocumentPatchError>;
fn without_service(doc: JsonValue) -> JsonValue;
fn snapshot_disabled() -> ServiceConfigSnapshot;
fn snapshot_enabled(prior_url: String) -> ServiceConfigSnapshot;
}
pub(crate) trait ServiceMutationError:
Sized + From<ProtocolPreconditionError> + From<DocumentPatchError> + From<UpdateDidWebvhError>
{
fn validation(msg: String) -> Self;
fn auth(msg: String) -> Self;
fn storage(msg: String) -> Self;
}
pub(crate) trait EnableMutationError: ServiceMutationError {
fn already_enabled() -> Self;
fn config_persistence(msg: String) -> Self;
}
pub(crate) trait UpdateMutationError: ServiceMutationError {
fn not_present() -> Self;
}
pub(crate) trait DisableMutationError:
Sized
+ From<ProtocolPreconditionError>
+ From<DocumentPatchError>
+ From<UpdateDidWebvhError>
+ From<VtaError>
{
fn not_present() -> Self;
}
pub(crate) struct ServiceMutationOk {
pub new_version_id: String,
pub canonical_url: String,
pub vta_did: String,
pub serverless: bool,
pub prior_url: Option<String>,
}
pub(crate) use super::ServiceOpDeps;
pub(crate) async fn publish_patch<E: From<UpdateDidWebvhError>>(
deps: &ServiceOpDeps<'_>,
auth: &AuthClaims,
scid: &str,
vta_did: &str,
patched: JsonValue,
channel: &str,
) -> Result<crate::operations::did_webvh::UpdateDidWebvhResult, E> {
update_did_webvh(
&deps.webvh(),
auth,
scid,
UpdateDidWebvhOptions {
document: Some(patched),
..Default::default()
},
Some(vta_did),
channel,
)
.await
.map_err(E::from)
}
pub(crate) async fn check_enable_preconditions<S, E>(
config: &Arc<RwLock<AppConfig>>,
webvh_ks: &KeyspaceHandle,
) -> Result<crate::operations::protocol::preconditions::VtaDocState, E>
where
S: ServiceLifecycle,
E: EnableMutationError,
{
{
let cfg = config.read().await;
if S::config_enabled(&cfg) {
return Err(E::already_enabled());
}
}
let state =
crate::operations::protocol::preconditions::load_vta_doc_state(config, webvh_ks).await?;
if S::current_service_url(&state.current_doc).is_some() {
return Err(E::already_enabled());
}
Ok(state)
}
pub(crate) async fn check_update_preconditions<S, E>(
config: &Arc<RwLock<AppConfig>>,
webvh_ks: &KeyspaceHandle,
) -> Result<
(
crate::operations::protocol::preconditions::VtaDocState,
String,
),
E,
>
where
S: ServiceLifecycle,
E: UpdateMutationError,
{
{
let cfg = config.read().await;
if !S::config_enabled(&cfg) {
return Err(E::not_present());
}
}
let state =
crate::operations::protocol::preconditions::load_vta_doc_state(config, webvh_ks).await?;
let prior_url = S::current_service_url(&state.current_doc).ok_or_else(E::not_present)?;
Ok((state, prior_url))
}
pub(crate) async fn check_disable_preconditions<S, E>(
config: &Arc<RwLock<AppConfig>>,
webvh_ks: &KeyspaceHandle,
) -> Result<
(
crate::operations::protocol::preconditions::VtaDocState,
String,
),
E,
>
where
S: ServiceLifecycle,
E: DisableMutationError,
{
let (rest, didcomm, webauthn) = {
let cfg = config.read().await;
if !S::config_enabled(&cfg) {
return Err(E::not_present());
}
(
cfg.services.rest,
cfg.services.didcomm,
cfg.services.webauthn,
)
};
would_violate_last_service(
&CurrentServices::new(rest, didcomm, webauthn),
ProposedOp::disable(S::KIND),
)?;
let state =
crate::operations::protocol::preconditions::load_vta_doc_state(config, webvh_ks).await?;
let prior_url = S::current_service_url(&state.current_doc).ok_or_else(E::not_present)?;
Ok((state, prior_url))
}
pub(crate) async fn run_enable<S, E>(
deps: &ServiceOpDeps<'_>,
auth: &AuthClaims,
url: &str,
ctx: OpContext,
channel: &str,
persist_enabled: impl AsyncFnOnce() -> Result<(), String>,
) -> Result<ServiceMutationOk, E>
where
S: ServiceLifecycle,
E: EnableMutationError,
{
auth.require_super_admin()
.map_err(|e| E::auth(e.to_string()))?;
let _guard = PROTOCOL_LOCK.lock().await;
let canonical_url = validate_service_url(url)
.map_err(|e| E::validation(e.to_string()))?
.to_string();
let state = check_enable_preconditions::<S, E>(deps.config, deps.webvh_ks).await?;
snapshot::write(deps.snapshot_ks, S::snapshot_disabled())
.await
.map_err(|e| E::storage(format!("snapshot write: {e}")))?;
let patched = S::with_service(state.current_doc, &canonical_url)?;
let update_result =
publish_patch::<E>(deps, auth, &state.scid, &state.vta_did, patched, channel).await?;
persist_enabled().await.map_err(E::config_persistence)?;
emit_telemetry(
deps.telemetry,
S::ENABLE_TELEMETRY,
channel,
&update_result.new_version_id,
&canonical_url,
None,
ctx,
)
.await;
info!(
channel,
url = %canonical_url,
new_version_id = %update_result.new_version_id,
vta_did = %state.vta_did,
"{} enabled",
S::LABEL,
);
Ok(ServiceMutationOk {
new_version_id: update_result.new_version_id,
canonical_url,
vta_did: state.vta_did,
serverless: update_result.serverless,
prior_url: None,
})
}
pub(crate) async fn run_update<S, E>(
deps: &ServiceOpDeps<'_>,
auth: &AuthClaims,
url: &str,
ctx: OpContext,
channel: &str,
) -> Result<ServiceMutationOk, E>
where
S: ServiceLifecycle,
E: UpdateMutationError,
{
auth.require_super_admin()
.map_err(|e| E::auth(e.to_string()))?;
let _guard = PROTOCOL_LOCK.lock().await;
let canonical_url = validate_service_url(url)
.map_err(|e| E::validation(e.to_string()))?
.to_string();
let (state, prior_url) = check_update_preconditions::<S, E>(deps.config, deps.webvh_ks).await?;
snapshot::write(deps.snapshot_ks, S::snapshot_enabled(prior_url.clone()))
.await
.map_err(|e| E::storage(format!("snapshot write: {e}")))?;
let patched = S::with_service(state.current_doc, &canonical_url)?;
let update_result =
publish_patch::<E>(deps, auth, &state.scid, &state.vta_did, patched, channel).await?;
emit_telemetry(
deps.telemetry,
S::UPDATE_TELEMETRY,
channel,
&update_result.new_version_id,
&canonical_url,
Some(&prior_url),
ctx,
)
.await;
info!(
channel,
prior_url = %prior_url,
url = %canonical_url,
new_version_id = %update_result.new_version_id,
vta_did = %state.vta_did,
"{} URL updated",
S::LABEL,
);
Ok(ServiceMutationOk {
new_version_id: update_result.new_version_id,
canonical_url,
vta_did: state.vta_did,
serverless: update_result.serverless,
prior_url: Some(prior_url),
})
}
async fn emit_telemetry(
telemetry: &SharedTelemetrySink,
kind: TelemetryKind,
channel: &str,
new_version_id: &str,
url: &str,
prior_url: Option<&str>,
ctx: OpContext,
) {
let mut event = TelemetryEvent::new(kind)
.with_field("channel", JsonValue::from(channel))
.with_field("new_version_id", JsonValue::from(new_version_id))
.with_field("url", JsonValue::from(url));
if let Some(prior) = prior_url {
event = event.with_field("prior_url", JsonValue::from(prior));
}
if let Some(tag) = ctx.telemetry_triggered_by() {
event = event.with_field("triggered_by", JsonValue::from(tag));
}
let _ = telemetry.record(event).await;
}
pub(crate) struct RestService;
impl ServiceLifecycle for RestService {
const LABEL: &'static str = "REST";
const KIND: ServiceKind = ServiceKind::Rest;
const ENABLE_TELEMETRY: TelemetryKind = TelemetryKind::ServicesRestEnable;
const UPDATE_TELEMETRY: TelemetryKind = TelemetryKind::ServicesRestUpdate;
fn config_enabled(cfg: &AppConfig) -> bool {
cfg.services.rest
}
fn current_service_url(doc: &JsonValue) -> Option<String> {
crate::operations::protocol::document::current_rest_service(doc).map(|s| s.url)
}
fn with_service(doc: JsonValue, url: &str) -> Result<JsonValue, DocumentPatchError> {
crate::operations::protocol::document::with_rest_service(doc, url)
}
fn without_service(doc: JsonValue) -> JsonValue {
crate::operations::protocol::document::without_rest_service(doc)
}
fn snapshot_disabled() -> ServiceConfigSnapshot {
ServiceConfigSnapshot::Rest(crate::operations::protocol::snapshot::RestSnapshot::Disabled)
}
fn snapshot_enabled(prior_url: String) -> ServiceConfigSnapshot {
ServiceConfigSnapshot::Rest(
crate::operations::protocol::snapshot::RestSnapshot::Enabled { url: prior_url },
)
}
}
pub(crate) struct WebauthnService;
impl ServiceLifecycle for WebauthnService {
const LABEL: &'static str = "WebAuthn";
const KIND: ServiceKind = ServiceKind::Webauthn;
const ENABLE_TELEMETRY: TelemetryKind = TelemetryKind::ServicesWebauthnEnable;
const UPDATE_TELEMETRY: TelemetryKind = TelemetryKind::ServicesWebauthnUpdate;
fn config_enabled(cfg: &AppConfig) -> bool {
cfg.services.webauthn
}
fn current_service_url(doc: &JsonValue) -> Option<String> {
crate::operations::protocol::document::current_webauthn_service(doc).map(|s| s.url)
}
fn with_service(doc: JsonValue, url: &str) -> Result<JsonValue, DocumentPatchError> {
crate::operations::protocol::document::with_webauthn_service(doc, url)
}
fn without_service(doc: JsonValue) -> JsonValue {
crate::operations::protocol::document::without_webauthn_service(doc)
}
fn snapshot_disabled() -> ServiceConfigSnapshot {
ServiceConfigSnapshot::Webauthn(
crate::operations::protocol::snapshot::WebauthnSnapshot::Disabled,
)
}
fn snapshot_enabled(prior_url: String) -> ServiceConfigSnapshot {
ServiceConfigSnapshot::Webauthn(
crate::operations::protocol::snapshot::WebauthnSnapshot::Enabled { url: prior_url },
)
}
}