1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use crate::engine::Engine;
5use crate::error::Result;
6use crate::http::{build_router, run_nudge_loop, AppState};
7use crate::monitor::{ci, doc};
8
9pub async fn run(db_path: &str, addr: SocketAddr) -> Result<()> {
12 run_with_atheneum(db_path, addr, None).await
13}
14
15pub async fn run_with_atheneum(
17 db_path: &str,
18 addr: SocketAddr,
19 #[allow(unused_variables)] atheneum_path: Option<String>,
20) -> Result<()> {
21 let engine = Engine::open(db_path)?;
22 #[cfg(feature = "atheneum")]
23 let state = Arc::new(AppState::new(engine)?.with_atheneum(atheneum_path));
24 #[cfg(not(feature = "atheneum"))]
25 let state = Arc::new(AppState::new(engine)?);
26
27 let nudge_state = state.clone();
29 tokio::spawn(async move {
30 run_nudge_loop(nudge_state).await;
31 });
32
33 let purge_state = state.clone();
35 tokio::spawn(async move {
36 loop {
37 tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
38 let state_fb = purge_state.clone();
39 let (events_purged, deliveries_purged, agents_purged, circuits_evicted) =
40 tokio::task::spawn_blocking(move || {
41 let engine = match state_fb.engine.lock() {
42 Ok(g) => g,
43 Err(e) => {
44 eprintln!("purge: lock poisoned: {e}");
45 return (0, 0, 0, 0);
46 }
47 };
48 let ep = state_fb
49 .event_bus
50 .purge_old_events(engine.graph())
51 .unwrap_or(0);
52 let dp = state_fb
53 .delivery_tracker
54 .purge_deliveries(engine.graph())
55 .unwrap_or(0);
56 let ap = state_fb.agent_registry.purge_retired(24).unwrap_or(0);
57 let ce = state_fb.circuit_breaker.evict_stale();
58 (ep, dp, ap, ce)
59 })
60 .await
61 .unwrap_or((0, 0, 0, 0));
62 if events_purged > 0 {
63 eprintln!("purged {} events older than 24h", events_purged);
64 }
65 if deliveries_purged > 0 {
66 eprintln!(
67 "purged {} delivery records older than 24h",
68 deliveries_purged
69 );
70 }
71 if agents_purged > 0 {
72 eprintln!("purged {} agents offline >24h", agents_purged);
73 }
74 if circuits_evicted > 0 {
75 eprintln!("evicted {} stale circuit breaker entries", circuits_evicted);
76 }
77 }
78 });
79
80 if let Ok(ci_cfg) = std::env::var("ENVOY_CI_MONITOR") {
81 let parts: Vec<&str> = ci_cfg.splitn(3, ',').collect();
82 if parts.len() == 3 {
83 let project = parts[0].to_string();
84 let owner_repo = parts[1].to_string();
85 let interval: u64 = parts[2].parse().unwrap_or(60);
86 let ci_state = state.clone();
87 tokio::spawn(async move {
88 ci::run_ci_monitor(ci_state, project, owner_repo, interval).await;
89 });
90 } else {
91 eprintln!("ENVOY_CI_MONITOR format: project,owner/repo,interval_secs");
92 }
93 }
94
95 if let Ok(doc_cfg) = std::env::var("ENVOY_DOC_MONITOR") {
96 let parts: Vec<&str> = doc_cfg.splitn(3, ',').collect();
97 if parts.len() == 3 {
98 let project = parts[0].to_string();
99 let repo_path = parts[1].to_string();
100 let interval: u64 = parts[2].parse().unwrap_or(300);
101 let doc_state = state.clone();
102 tokio::spawn(async move {
103 doc::run_doc_monitor(doc_state, project, repo_path, interval).await;
104 });
105 } else {
106 eprintln!("ENVOY_DOC_MONITOR format: project,repo_path,interval_secs");
107 }
108 }
109
110 let app = build_router(state).into_make_service_with_connect_info::<std::net::SocketAddr>();
111 let listener = tokio::net::TcpListener::bind(addr)
112 .await
113 .map_err(|e| crate::error::EnvoyError::WsError(format!("failed to bind {addr}: {e}")))?;
114
115 println!("envoy server listening on {addr}, db={db_path}");
116 axum::serve(listener, app)
117 .with_graceful_shutdown(shutdown_signal())
118 .await
119 .map_err(|e| crate::error::EnvoyError::WsError(format!("server error: {e}")))?;
120
121 Ok(())
122}
123
124async fn shutdown_signal() {
125 tokio::signal::ctrl_c()
126 .await
127 .expect("failed to install CTRL+C handler");
128 eprintln!("received shutdown signal, draining...");
129}