use crate::{
SharedData,
config::init,
handlers::{application_routes, health_checker_handler},
statistics::{Statistics, statistics},
};
use affinidi_did_resolver_cache_sdk::{
DIDCacheClient, config::DIDCacheConfigBuilder, errors::DIDCacheError,
};
use axum::{Router, routing::get};
use http::Method;
use std::{env, net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tower_http::{
cors::CorsLayer,
trace::{self, TraceLayer},
};
use tracing::{Level, event};
use tracing_subscriber::{filter, layer::SubscriberExt, reload, util::SubscriberInitExt};
pub async fn start() -> Result<(), DIDCacheError> {
start_with_config(crate::config::DEFAULT_CONFIG_PATH).await
}
pub async fn start_with_config(config_path: &str) -> Result<(), DIDCacheError> {
let filter = filter::LevelFilter::INFO; let (filter, reload_handle) = reload::Layer::new(filter);
let ansi = env::var("LOCAL").is_ok();
tracing_subscriber::registry()
.with(filter)
.with(tracing_subscriber::fmt::layer().with_ansi(ansi))
.init();
if ansi {
event!(Level::INFO, "");
event!(
Level::INFO,
r#" db 888888888888 888b 88 88888888ba, 88 88888888ba, ,ad8888ba, 88 "#
);
event!(
Level::INFO,
r#" d88b 88 8888b 88 88 `"8b 88 88 `"8b d8"' `"8b 88 "#
);
event!(
Level::INFO,
r#" d8'`8b 88 88 `8b 88 88 `8b 88 88 `8b d8' 88 "#
);
event!(
Level::INFO,
r#" d8' `8b 88 88 `8b 88 88 88 88 88 88 88 ,adPPYYba, ,adPPYba, 88,dPPYba, ,adPPYba, "#
);
event!(
Level::INFO,
r#" d8YaaaaY8b 88 88 `8b 88 88 88 88 88 88 88 "" `Y8 a8" "" 88P' "8a a8P_____88 "#
);
event!(
Level::INFO,
r#" d8""""""""8b 88 88 `8b 88 88 8P 88 88 8P Y8, ,adPPPPP88 8b 88 88 8PP""""""" "#
);
event!(
Level::INFO,
r#" d8' `8b 88 88 `8888 88 .a8P 88 88 .a8P Y8a. .a8P 88, ,88 "8a, ,aa 88 88 "8b, ,aa "#
);
event!(
Level::INFO,
r#" d8' `8b 88 88 `888 88888888Y"' 88 88888888Y"' `"Y8888Y"' `"8bbdP"Y8 `"Ybbd8"' 88 88 `"Ybbd8"' "#
);
event!(Level::INFO, "");
}
event!(Level::INFO, "[Loading Affinidi DID Cache configuration]");
let config = init(config_path, Some(reload_handle))
.map_err(|e| DIDCacheError::ConfigError(format!("Couldn't initialize DID Cache: {e}")))?;
let cache_config = DIDCacheConfigBuilder::default()
.with_cache_capacity(config.cache_capacity_count)
.with_cache_ttl(config.cache_expire)
.build();
let resolver = DIDCacheClient::new(cache_config).await?;
let webvh_client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.redirect(reqwest::redirect::Policy::none())
.build()
.map_err(|e| {
DIDCacheError::ConfigError(format!("Failed to build WebVH HTTP client: {e}"))
})?;
let shared_state = SharedData {
service_start_timestamp: chrono::Utc::now(),
stats: Arc::new(Mutex::new(Statistics::default())),
resolver,
resolve_timeout: config.resolve_timeout,
max_did_size: config.max_did_size,
webvh_client,
};
let shutdown = CancellationToken::new();
let _stats = shared_state.stats.clone();
let _cache = shared_state.resolver.get_cache();
let stats_shutdown = shutdown.clone();
let stats_handle = tokio::spawn(async move {
if let Err(e) =
statistics(config.statistics_interval, &_stats, _cache, stats_shutdown).await
{
event!(Level::ERROR, "Statistics task exited with error: {e}");
}
});
let app: Router = application_routes(&shared_state, &config);
let app = Router::new()
.merge(app)
.layer(
CorsLayer::new()
.allow_origin(tower_http::cors::Any)
.allow_headers([http::header::CONTENT_TYPE])
.allow_methods([Method::GET]),
)
.layer(
TraceLayer::new_for_http()
.make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
.on_response(trace::DefaultOnResponse::new().level(Level::INFO)),
)
.route(
"/did/healthchecker",
get(health_checker_handler).with_state(shared_state),
);
let listen_address = config
.listen_address
.parse::<std::net::SocketAddr>()
.map_err(|e| {
DIDCacheError::ConfigError(format!(
"Invalid listen_address ({}): {e}",
config.listen_address
))
})?;
let server_handle = axum_server::Handle::new();
{
let server_handle = server_handle.clone();
let shutdown = shutdown.clone();
tokio::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => event!(
Level::INFO,
"Shutdown signal received; draining connections"
),
Err(e) => {
event!(Level::ERROR, "Failed to listen for shutdown signal: {e}");
return;
}
}
shutdown.cancel();
server_handle.graceful_shutdown(Some(Duration::from_secs(10)));
});
}
axum_server::bind(listen_address)
.handle(server_handle)
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.await
.map_err(|e| DIDCacheError::TransportError(format!("server error: {e}")))?;
shutdown.cancel();
if let Err(e) = stats_handle.await {
event!(Level::ERROR, "Statistics task terminated abnormally: {e}");
}
Ok(())
}