Skip to main content

envoy/
server.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use crate::engine::Engine;
5use crate::error::Result;
6use crate::http::{build_router, run_nudge_loop, AppState};
7use crate::monitor::{ci, doc};
8
9/// Run the envoy server. Opens (or creates) the database at `db_path`
10/// and starts the HTTP server on `addr`.
11pub async fn run(db_path: &str, addr: SocketAddr) -> Result<()> {
12    run_with_atheneum(db_path, addr, None).await
13}
14
15/// Run the envoy server with atheneum integration enabled.
16pub async fn run_with_atheneum(
17    db_path: &str,
18    addr: SocketAddr,
19    #[allow(unused_variables)] atheneum_path: Option<String>,
20) -> Result<()> {
21    let engine = Engine::open(db_path)?;
22    #[cfg(feature = "atheneum")]
23    let state = Arc::new(AppState::new(engine)?.with_atheneum(atheneum_path));
24    #[cfg(not(feature = "atheneum"))]
25    let state = Arc::new(AppState::new(engine)?);
26
27    // Spawn background nudge loop
28    let nudge_state = state.clone();
29    tokio::spawn(async move {
30        run_nudge_loop(nudge_state).await;
31    });
32
33    // Background event + delivery purge (every hour)
34    let purge_state = state.clone();
35    tokio::spawn(async move {
36        loop {
37            tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
38            let state_fb = purge_state.clone();
39            let (events_purged, deliveries_purged, agents_purged, circuits_evicted) =
40                tokio::task::spawn_blocking(move || {
41                    let engine = state_fb.engine.lock();
42                    let ep = state_fb
43                        .event_bus
44                        .purge_old_events(engine.graph())
45                        .unwrap_or(0);
46                    let dp = state_fb
47                        .delivery_tracker
48                        .purge_deliveries(engine.graph())
49                        .unwrap_or(0);
50                    let ap = state_fb.agent_registry.purge_retired(24).unwrap_or(0);
51                    let ce = state_fb.circuit_breaker.evict_stale();
52                    (ep, dp, ap, ce)
53                })
54                .await
55                .unwrap_or((0, 0, 0, 0));
56            if events_purged > 0 {
57                eprintln!("purged {} events older than 24h", events_purged);
58            }
59            if deliveries_purged > 0 {
60                eprintln!(
61                    "purged {} delivery records older than 24h",
62                    deliveries_purged
63                );
64            }
65            if agents_purged > 0 {
66                eprintln!("purged {} agents offline >24h", agents_purged);
67            }
68            if circuits_evicted > 0 {
69                eprintln!("evicted {} stale circuit breaker entries", circuits_evicted);
70            }
71        }
72    });
73
74    if let Ok(ci_cfg) = std::env::var("ENVOY_CI_MONITOR") {
75        let parts: Vec<&str> = ci_cfg.splitn(3, ',').collect();
76        if parts.len() == 3 {
77            let project = parts[0].to_string();
78            let owner_repo = parts[1].to_string();
79            let interval: u64 = parts[2].parse().unwrap_or(60);
80            let ci_state = state.clone();
81            tokio::spawn(async move {
82                ci::run_ci_monitor(ci_state, project, owner_repo, interval).await;
83            });
84        } else {
85            eprintln!("ENVOY_CI_MONITOR format: project,owner/repo,interval_secs");
86        }
87    }
88
89    if let Ok(doc_cfg) = std::env::var("ENVOY_DOC_MONITOR") {
90        let parts: Vec<&str> = doc_cfg.splitn(3, ',').collect();
91        if parts.len() == 3 {
92            let project = parts[0].to_string();
93            let repo_path = parts[1].to_string();
94            let interval: u64 = parts[2].parse().unwrap_or(300);
95            let doc_state = state.clone();
96            tokio::spawn(async move {
97                doc::run_doc_monitor(doc_state, project, repo_path, interval).await;
98            });
99        } else {
100            eprintln!("ENVOY_DOC_MONITOR format: project,repo_path,interval_secs");
101        }
102    }
103
104    let app = build_router(state).into_make_service_with_connect_info::<std::net::SocketAddr>();
105    let listener = tokio::net::TcpListener::bind(addr)
106        .await
107        .map_err(|e| crate::error::EnvoyError::WsError(format!("failed to bind {addr}: {e}")))?;
108
109    println!("envoy server listening on {addr}, db={db_path}");
110    axum::serve(listener, app)
111        .with_graceful_shutdown(shutdown_signal())
112        .await
113        .map_err(|e| crate::error::EnvoyError::WsError(format!("server error: {e}")))?;
114
115    Ok(())
116}
117
118async fn shutdown_signal() {
119    tokio::signal::ctrl_c()
120        .await
121        .expect("failed to install CTRL+C handler");
122    eprintln!("received shutdown signal, draining...");
123}