1pub mod alerts;
2pub mod api;
3pub mod auth;
4pub mod backup_scheduler;
5pub(crate) mod canary;
6pub mod cleanup_scheduler;
7pub mod cluster_api;
8pub(crate) mod cluster_handlers;
9pub mod cluster_state;
10pub mod deploy_history;
11pub mod health;
12pub(crate) mod instance;
13pub mod metrics;
14pub(crate) mod operations;
15pub mod proto;
16pub mod raft;
17pub mod reconciler;
18pub mod routes;
19pub mod scheduler;
20pub mod state;
21pub mod stats;
22pub mod store;
23pub mod topo_sort;
24pub mod watchdog;
25pub mod webhook;
26pub mod webhook_invocations;
27pub mod ws_handler;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use orca_core::config::ClusterConfig;
33use orca_core::runtime::Runtime;
34use orca_core::types::WorkloadStatus;
35use tracing::info;
36
37use crate::state::{AppState, InstanceState, SharedRouteTable, SharedWasmTriggers};
38
39pub async fn run_server(
45 cluster_config: ClusterConfig,
46 container_runtime: Arc<dyn Runtime>,
47 wasm_runtime: Option<Arc<dyn Runtime>>,
48 route_table: SharedRouteTable,
49 wasm_triggers: SharedWasmTriggers,
50) -> anyhow::Result<()> {
51 run_server_with_acme(
52 cluster_config,
53 container_runtime,
54 wasm_runtime,
55 route_table,
56 wasm_triggers,
57 None,
58 None,
59 )
60 .await
61}
62
63pub async fn run_server_with_acme(
65 cluster_config: ClusterConfig,
66 container_runtime: Arc<dyn Runtime>,
67 wasm_runtime: Option<Arc<dyn Runtime>>,
68 route_table: SharedRouteTable,
69 wasm_triggers: SharedWasmTriggers,
70 acme_manager: Option<orca_proxy::acme::AcmeManager>,
71 cert_resolver: Option<orca_proxy::SharedCertResolver>,
72) -> anyhow::Result<()> {
73 let mut app_state = AppState::new(
74 cluster_config.clone(),
75 container_runtime,
76 wasm_runtime,
77 route_table,
78 wasm_triggers,
79 );
80 if let (Some(acme), Some(resolver)) = (acme_manager, cert_resolver) {
81 app_state = app_state.with_acme(acme, resolver);
82 }
83
84 if let Some(engine) = alerts::try_build_alert_engine(cluster_config.ai.as_ref()) {
85 app_state = app_state.with_alerts(engine);
86 }
87
88 let store_path = dirs_next::home_dir()
90 .unwrap_or_else(|| ".".into())
91 .join(".orca/cluster.db");
92 match store::ClusterStore::open(&store_path) {
93 Ok(s) => {
94 info!("Persistent store opened at {}", store_path.display());
95 app_state = app_state.with_store(Arc::new(s));
96 }
97 Err(e) => {
98 tracing::warn!("Failed to open store at {}: {e}", store_path.display());
99 }
100 }
101
102 let state = Arc::new(app_state);
103
104 if let Some(store) = &state.store {
106 match store.get_all_services() {
107 Ok(services) if !services.is_empty() => {
108 info!("Restoring {} persisted services", services.len());
109 for config in services.values() {
110 if let Err(e) = restore_or_reconcile(&state, config).await {
111 tracing::warn!(service = %config.name, "Failed to restore: {e}");
112 }
113 }
114 }
115 Ok(_) => {}
116 Err(e) => tracing::warn!("Failed to load persisted services: {e}"),
117 }
118 }
119
120 register_master_node(&state, cluster_config.cluster.api_port).await;
122 spawn_master_heartbeat(state.clone());
123
124 watchdog::spawn_watchdog(state.clone());
126 health::spawn_health_checker(state.clone());
127 stats::spawn_stats_collector(state.clone());
128
129 if let Some(backup_cfg) = cluster_config.backup.clone()
131 && backup_scheduler::spawn_backup_scheduler(backup_cfg, state.clone()).is_some()
132 {
133 info!("Backup scheduler started");
134 }
135
136 if let Some(cleanup_cfg) = cluster_config.cleanup.clone()
137 && cleanup_scheduler::spawn_cleanup_scheduler(cleanup_cfg, state.clone()).is_some()
138 {
139 info!("Cleanup scheduler started");
140 }
141
142 if alerts::spawn_alert_monitor(state.clone()).is_some() {
143 info!("AI alert monitor started");
144 }
145
146 let app = api::router(state.clone());
147
148 let addr = format!("0.0.0.0:{}", cluster_config.cluster.api_port);
149 let listener = tokio::net::TcpListener::bind(&addr).await?;
150 info!("API server listening on {addr}");
151
152 axum::serve(listener, app)
153 .with_graceful_shutdown(shutdown_signal())
154 .await?;
155
156 Ok(())
157}
158
159async fn restore_or_reconcile(
163 state: &AppState,
164 config: &orca_core::config::ServiceConfig,
165) -> anyhow::Result<()> {
166 if config
171 .placement
172 .as_ref()
173 .and_then(|p| p.node.as_ref())
174 .is_some()
175 {
176 let desired = match &config.replicas {
177 orca_core::types::Replicas::Fixed(n) => *n,
178 orca_core::types::Replicas::Auto => 1,
179 };
180 let mut services = state.services.write().await;
181 let svc_state = services
182 .entry(config.name.clone())
183 .or_insert_with(|| state::ServiceState::from_config(config.clone()));
184 svc_state.config = config.clone();
185 svc_state.desired_replicas = desired;
186 info!(service = %config.name, "Registered remote service placeholder");
187 return Ok(());
188 }
189
190 let cr = state
192 .container_runtime
193 .as_any()
194 .downcast_ref::<orca_agent::docker::ContainerRuntime>();
195
196 if let Some(container_rt) = cr {
197 let existing = container_rt.find_existing(&config.name).await?;
198 if !existing.is_empty() {
199 info!(
200 service = %config.name,
201 count = existing.len(),
202 "Re-attached to existing containers, skipping reconciliation"
203 );
204 populate_state_from_existing(state, config, existing).await;
205 return Ok(());
206 }
207 }
208
209 reconciler::reconcile_service(state, config).await
210}
211
212async fn populate_state_from_existing(
214 state: &AppState,
215 config: &orca_core::config::ServiceConfig,
216 handles: Vec<orca_core::runtime::WorkloadHandle>,
217) {
218 let initial_health = if config.health.is_some() || config.liveness.is_some() {
221 orca_core::types::HealthState::Healthy
222 } else {
223 orca_core::types::HealthState::NoCheck
224 };
225
226 let runtime = state.container_runtime.as_ref();
228 let mut instances: Vec<InstanceState> = Vec::new();
229 for handle in handles {
230 let mut host_port = if let Some(p) = config.port {
234 runtime.resolve_host_port(&handle, p).await.ok().flatten()
235 } else {
236 None
237 };
238 if host_port.is_none() {
239 host_port = handle
240 .metadata
241 .get("host_port")
242 .and_then(|p| p.parse::<u16>().ok());
243 }
244 info!(
245 service = %config.name,
246 runtime_id = %&handle.runtime_id[..12],
247 ?host_port,
248 "Restored container instance"
249 );
250 instances.push(InstanceState {
251 handle,
252 status: WorkloadStatus::Running,
253 host_port,
254 container_address: None,
255 health: initial_health,
256 is_canary: false,
257 started_at: std::time::Instant::now(),
258 });
259 }
260
261 let desired = match &config.replicas {
262 orca_core::types::Replicas::Fixed(n) => *n,
263 orca_core::types::Replicas::Auto => 1,
264 };
265
266 let mut services = state.services.write().await;
267 let svc_state = services
268 .entry(config.name.clone())
269 .or_insert_with(|| state::ServiceState::from_config(config.clone()));
270 svc_state.instances = instances;
271 svc_state.desired_replicas = desired;
272 drop(services);
273
274 match config.runtime {
276 orca_core::types::RuntimeKind::Container => {
277 routes::update_container_routes(state, config).await;
278 }
279 orca_core::types::RuntimeKind::Wasm => {
280 routes::update_wasm_triggers(state, config).await;
281 }
282 }
283}
284
285fn master_node_id() -> u64 {
287 use std::hash::{Hash, Hasher};
288 let hostname = std::env::var("HOSTNAME")
289 .or_else(|_| std::env::var("COMPUTERNAME"))
290 .unwrap_or_else(|_| "orca-master".to_string());
291 let mut hasher = std::hash::DefaultHasher::new();
292 hostname.hash(&mut hasher);
293 hasher.finish()
294}
295
296async fn register_master_node(state: &state::AppState, api_port: u16) {
298 let node_id = master_node_id();
299 let mut labels = HashMap::new();
300 labels.insert("role".to_string(), "master".to_string());
301 let node = state::RegisteredNode {
302 node_id,
303 address: format!("localhost:{api_port}"),
304 labels,
305 last_heartbeat: chrono::Utc::now(),
306 drain: false,
307 cpu_percent: 0.0,
308 memory_bytes: 0,
309 memory_total: 0,
310 disk_used: 0,
311 disk_total: 0,
312 net_rx: 0,
313 net_tx: 0,
314 };
315 let mut nodes = state.registered_nodes.write().await;
316 nodes.insert(node_id, node);
317 info!(node_id, "Master node self-registered");
318}
319
320fn spawn_master_heartbeat(state: Arc<state::AppState>) {
328 const STALE_AFTER: chrono::Duration = chrono::Duration::seconds(60);
329 let node_id = master_node_id();
330 let collector = Arc::new(orca_agent::host_stats::HostStatsCollector::new());
331 tokio::spawn(async move {
332 loop {
333 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
334 let sample = collector.sample();
335 let now = chrono::Utc::now();
336 let mut nodes = state.registered_nodes.write().await;
337 if let Some(node) = nodes.get_mut(&node_id) {
338 node.last_heartbeat = now;
339 node.cpu_percent = sample.cpu_percent;
340 node.memory_bytes = sample.memory_bytes;
341 node.memory_total = sample.memory_total;
342 node.disk_used = sample.disk_used;
343 node.disk_total = sample.disk_total;
344 node.net_rx = sample.net_rx;
345 node.net_tx = sample.net_tx;
346 }
347 nodes.retain(|id, node| {
348 if *id == node_id {
349 return true;
350 }
351 let age = now - node.last_heartbeat;
352 age < STALE_AFTER
353 });
354 }
355 });
356}
357
358async fn shutdown_signal() {
359 tokio::signal::ctrl_c()
360 .await
361 .expect("failed to install ctrl+c handler");
362 info!("Shutdown signal received");
363}