Skip to main content

worldinterface_daemon/
server.rs

1//! Daemon startup, serving, and shutdown.
2
3use std::sync::{Arc, RwLock};
4
5use worldinterface_connector::connectors::default_registry;
6use worldinterface_host::EmbeddedHost;
7use worldinterface_http_trigger::WebhookRegistry;
8
9use crate::config::DaemonConfig;
10use crate::error::DaemonError;
11use crate::metrics::{PrometheusMetricsRecorder, WiMetricsRegistry};
12use crate::router::build_router;
13use crate::state::AppState;
14
15/// Run the daemon: bootstrap host, bind HTTP server, handle signals.
16pub async fn run(config: DaemonConfig) -> Result<(), DaemonError> {
17    tracing::info!("starting WorldInterface daemon");
18
19    // Create Prometheus metrics registry
20    let metrics_registry =
21        Arc::new(WiMetricsRegistry::new().map_err(|e| {
22            DaemonError::Config(format!("failed to create metrics registry: {}", e))
23        })?);
24
25    // Build host config with metrics recorder
26    let mut host_config = config.to_host_config();
27    host_config.metrics = Arc::new(PrometheusMetricsRecorder::new(Arc::clone(&metrics_registry)));
28
29    let registry = default_registry();
30    let host = EmbeddedHost::start(host_config, registry).await?;
31
32    tracing::info!("host started successfully");
33
34    // Load webhook registrations from ContextStore
35    let webhook_registry = WebhookRegistry::load_from_store(host.context_store())
36        .map_err(|e| DaemonError::Config(format!("failed to load webhooks: {}", e)))?;
37    tracing::info!(count = webhook_registry.len(), "loaded webhook registrations");
38
39    // Build application state and router
40    let state = Arc::new(AppState {
41        host,
42        webhook_registry: RwLock::new(webhook_registry),
43        metrics: metrics_registry,
44    });
45    let router = build_router(Arc::clone(&state));
46
47    // Bind TCP listener
48    let listener =
49        tokio::net::TcpListener::bind(&config.bind_address).await.map_err(DaemonError::Bind)?;
50    tracing::info!(address = %config.bind_address, "listening");
51
52    // Serve with graceful shutdown on SIGINT/SIGTERM
53    axum::serve(listener, router)
54        .with_graceful_shutdown(shutdown_signal())
55        .await
56        .map_err(DaemonError::Serve)?;
57
58    tracing::info!("HTTP server stopped, shutting down host");
59
60    // Shut down the embedded host
61    match Arc::try_unwrap(state) {
62        Ok(app_state) => {
63            app_state.host.shutdown().await?;
64        }
65        Err(_) => {
66            tracing::warn!("could not take ownership of host for graceful shutdown");
67        }
68    }
69
70    tracing::info!("daemon shut down");
71    Ok(())
72}
73
74/// Wait for SIGINT or SIGTERM.
75async fn shutdown_signal() {
76    let ctrl_c = tokio::signal::ctrl_c();
77    #[cfg(unix)]
78    {
79        let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
80            .expect("failed to install SIGTERM handler");
81        tokio::select! {
82            _ = ctrl_c => { tracing::info!("received SIGINT"); }
83            _ = sigterm.recv() => { tracing::info!("received SIGTERM"); }
84        }
85    }
86    #[cfg(not(unix))]
87    {
88        ctrl_c.await.expect("failed to listen for ctrl-c");
89        tracing::info!("received SIGINT");
90    }
91}