1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use crate::engine::Engine;
5use crate::error::Result;
6use crate::http::{build_router, run_nudge_loop, AppState};
7use crate::monitor::{ci, doc};
8
9pub async fn run(db_path: &str, addr: SocketAddr) -> Result<()> {
12 run_with_atheneum(db_path, addr, None).await
13}
14
15pub async fn run_with_atheneum(
17 db_path: &str,
18 addr: SocketAddr,
19 #[allow(unused_variables)] atheneum_path: Option<String>,
20) -> Result<()> {
21 let engine = Engine::open(db_path)?;
22 #[cfg(feature = "atheneum")]
23 let state = Arc::new(AppState::new(engine)?.with_atheneum(atheneum_path));
24 #[cfg(not(feature = "atheneum"))]
25 let state = Arc::new(AppState::new(engine)?);
26
27 let nudge_state = state.clone();
29 tokio::spawn(async move {
30 run_nudge_loop(nudge_state).await;
31 });
32
33 let purge_state = state.clone();
35 tokio::spawn(async move {
36 loop {
37 tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
38 let state_fb = purge_state.clone();
39 let (events_purged, deliveries_purged, agents_purged, circuits_evicted) =
40 tokio::task::spawn_blocking(move || {
41 let engine = state_fb.engine.lock();
42 let ep = state_fb
43 .event_bus
44 .purge_old_events(engine.graph())
45 .unwrap_or(0);
46 let dp = state_fb
47 .delivery_tracker
48 .purge_deliveries(engine.graph())
49 .unwrap_or(0);
50 let ap = state_fb.agent_registry.purge_retired(24).unwrap_or(0);
51 let ce = state_fb.circuit_breaker.evict_stale();
52 (ep, dp, ap, ce)
53 })
54 .await
55 .unwrap_or((0, 0, 0, 0));
56 if events_purged > 0 {
57 eprintln!("purged {} events older than 24h", events_purged);
58 }
59 if deliveries_purged > 0 {
60 eprintln!(
61 "purged {} delivery records older than 24h",
62 deliveries_purged
63 );
64 }
65 if agents_purged > 0 {
66 eprintln!("purged {} agents offline >24h", agents_purged);
67 }
68 if circuits_evicted > 0 {
69 eprintln!("evicted {} stale circuit breaker entries", circuits_evicted);
70 }
71 }
72 });
73
74 if let Ok(ci_cfg) = std::env::var("ENVOY_CI_MONITOR") {
75 let parts: Vec<&str> = ci_cfg.splitn(3, ',').collect();
76 if parts.len() == 3 {
77 let project = parts[0].to_string();
78 let owner_repo = parts[1].to_string();
79 let interval: u64 = parts[2].parse().unwrap_or(60);
80 let ci_state = state.clone();
81 tokio::spawn(async move {
82 ci::run_ci_monitor(ci_state, project, owner_repo, interval).await;
83 });
84 } else {
85 eprintln!("ENVOY_CI_MONITOR format: project,owner/repo,interval_secs");
86 }
87 }
88
89 if let Ok(doc_cfg) = std::env::var("ENVOY_DOC_MONITOR") {
90 let parts: Vec<&str> = doc_cfg.splitn(3, ',').collect();
91 if parts.len() == 3 {
92 let project = parts[0].to_string();
93 let repo_path = parts[1].to_string();
94 let interval: u64 = parts[2].parse().unwrap_or(300);
95 let doc_state = state.clone();
96 tokio::spawn(async move {
97 doc::run_doc_monitor(doc_state, project, repo_path, interval).await;
98 });
99 } else {
100 eprintln!("ENVOY_DOC_MONITOR format: project,repo_path,interval_secs");
101 }
102 }
103
104 let app = build_router(state).into_make_service_with_connect_info::<std::net::SocketAddr>();
105 let listener = tokio::net::TcpListener::bind(addr)
106 .await
107 .map_err(|e| crate::error::EnvoyError::WsError(format!("failed to bind {addr}: {e}")))?;
108
109 println!("envoy server listening on {addr}, db={db_path}");
110 axum::serve(listener, app)
111 .with_graceful_shutdown(shutdown_signal())
112 .await
113 .map_err(|e| crate::error::EnvoyError::WsError(format!("server error: {e}")))?;
114
115 Ok(())
116}
117
118async fn shutdown_signal() {
119 tokio::signal::ctrl_c()
120 .await
121 .expect("failed to install CTRL+C handler");
122 eprintln!("received shutdown signal, draining...");
123}