Skip to main content

envoy/
server.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use crate::engine::Engine;
5use crate::error::Result;
6use crate::http::{build_router, run_nudge_loop, AppState};
7use crate::monitor::{ci, doc};
8
9/// Run the envoy server. Opens (or creates) the database at `db_path`
10/// and starts the HTTP server on `addr`.
11pub async fn run(db_path: &str, addr: SocketAddr) -> Result<()> {
12    run_with_atheneum(db_path, addr, None).await
13}
14
15/// Run the envoy server with atheneum integration enabled.
16pub async fn run_with_atheneum(
17    db_path: &str,
18    addr: SocketAddr,
19    #[allow(unused_variables)] atheneum_path: Option<String>,
20) -> Result<()> {
21    let engine = Engine::open(db_path)?;
22    #[cfg(feature = "atheneum")]
23    let state = Arc::new(AppState::new(engine)?.with_atheneum(atheneum_path));
24    #[cfg(not(feature = "atheneum"))]
25    let state = Arc::new(AppState::new(engine)?);
26
27    // Spawn background nudge loop
28    let nudge_state = state.clone();
29    tokio::spawn(async move {
30        run_nudge_loop(nudge_state).await;
31    });
32
33    // Background event + delivery purge (every hour)
34    let purge_state = state.clone();
35    tokio::spawn(async move {
36        loop {
37            tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
38            let state_fb = purge_state.clone();
39            let (events_purged, deliveries_purged, agents_purged, circuits_evicted) =
40                tokio::task::spawn_blocking(move || {
41                    let engine = match state_fb.engine.lock() {
42                        Ok(g) => g,
43                        Err(e) => {
44                            eprintln!("purge: lock poisoned: {e}");
45                            return (0, 0, 0, 0);
46                        }
47                    };
48                    let ep = state_fb
49                        .event_bus
50                        .purge_old_events(engine.graph())
51                        .unwrap_or(0);
52                    let dp = state_fb
53                        .delivery_tracker
54                        .purge_deliveries(engine.graph())
55                        .unwrap_or(0);
56                    let ap = state_fb.agent_registry.purge_retired(24).unwrap_or(0);
57                    let ce = state_fb.circuit_breaker.evict_stale();
58                    (ep, dp, ap, ce)
59                })
60                .await
61                .unwrap_or((0, 0, 0, 0));
62            if events_purged > 0 {
63                eprintln!("purged {} events older than 24h", events_purged);
64            }
65            if deliveries_purged > 0 {
66                eprintln!(
67                    "purged {} delivery records older than 24h",
68                    deliveries_purged
69                );
70            }
71            if agents_purged > 0 {
72                eprintln!("purged {} agents offline >24h", agents_purged);
73            }
74            if circuits_evicted > 0 {
75                eprintln!("evicted {} stale circuit breaker entries", circuits_evicted);
76            }
77        }
78    });
79
80    if let Ok(ci_cfg) = std::env::var("ENVOY_CI_MONITOR") {
81        let parts: Vec<&str> = ci_cfg.splitn(3, ',').collect();
82        if parts.len() == 3 {
83            let project = parts[0].to_string();
84            let owner_repo = parts[1].to_string();
85            let interval: u64 = parts[2].parse().unwrap_or(60);
86            let ci_state = state.clone();
87            tokio::spawn(async move {
88                ci::run_ci_monitor(ci_state, project, owner_repo, interval).await;
89            });
90        } else {
91            eprintln!("ENVOY_CI_MONITOR format: project,owner/repo,interval_secs");
92        }
93    }
94
95    if let Ok(doc_cfg) = std::env::var("ENVOY_DOC_MONITOR") {
96        let parts: Vec<&str> = doc_cfg.splitn(3, ',').collect();
97        if parts.len() == 3 {
98            let project = parts[0].to_string();
99            let repo_path = parts[1].to_string();
100            let interval: u64 = parts[2].parse().unwrap_or(300);
101            let doc_state = state.clone();
102            tokio::spawn(async move {
103                doc::run_doc_monitor(doc_state, project, repo_path, interval).await;
104            });
105        } else {
106            eprintln!("ENVOY_DOC_MONITOR format: project,repo_path,interval_secs");
107        }
108    }
109
110    let app = build_router(state).into_make_service_with_connect_info::<std::net::SocketAddr>();
111    let listener = tokio::net::TcpListener::bind(addr)
112        .await
113        .map_err(|e| crate::error::EnvoyError::WsError(format!("failed to bind {addr}: {e}")))?;
114
115    println!("envoy server listening on {addr}, db={db_path}");
116    axum::serve(listener, app)
117        .with_graceful_shutdown(shutdown_signal())
118        .await
119        .map_err(|e| crate::error::EnvoyError::WsError(format!("server error: {e}")))?;
120
121    Ok(())
122}
123
124async fn shutdown_signal() {
125    tokio::signal::ctrl_c()
126        .await
127        .expect("failed to install CTRL+C handler");
128    eprintln!("received shutdown signal, draining...");
129}