1use std::collections::{HashMap, VecDeque};
4use std::sync::Arc;
5
6use chrono::{DateTime, Duration, Utc};
7use tokio::sync::RwLock;
8
9use orca_core::config::{ClusterConfig, ServiceConfig};
10use orca_core::runtime::{Runtime, WorkloadHandle};
11use orca_core::types::{HealthState, Replicas, WorkloadStatus};
12
13use crate::stats::ContainerStats;
14use crate::webhook::WebhookStore;
15
16pub use orca_proxy::{RouteTarget, SharedWasmTriggers, WasmTrigger};
17
18pub type SharedRouteTable = Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>;
20
21pub struct AppState {
23 pub cluster_config: ClusterConfig,
25 pub container_runtime: Arc<dyn Runtime>,
27 pub wasm_runtime: Option<Arc<dyn Runtime>>,
29 pub services: RwLock<HashMap<String, ServiceState>>,
31 pub route_table: SharedRouteTable,
33 pub wasm_triggers: SharedWasmTriggers,
35 pub registered_nodes: RwLock<HashMap<u64, RegisteredNode>>,
37 pub webhooks: WebhookStore,
39 pub api_tokens: Vec<String>,
41 pub pending_commands: RwLock<HashMap<u64, Vec<serde_json::Value>>>,
44 pub deploy_history: RwLock<crate::deploy_history::DeployHistory>,
46 pub acme_manager: Option<orca_proxy::acme::AcmeManager>,
48 pub cert_resolver: Option<orca_proxy::SharedCertResolver>,
50 pub container_stats: RwLock<HashMap<String, ContainerStats>>,
52 pub store: Option<Arc<crate::store::ClusterStore>>,
54 pub ws_agents: RwLock<HashMap<u64, crate::ws_handler::AgentSender>>,
56 pub log_listeners: RwLock<HashMap<String, tokio::sync::mpsc::Sender<(String, bool)>>>,
58 pub backup_listeners: RwLock<
62 HashMap<String, tokio::sync::mpsc::Sender<orca_core::ws_types::BackupStatusReportData>>,
63 >,
64 pub network_listeners: RwLock<
67 HashMap<String, tokio::sync::mpsc::Sender<orca_core::ws_types::NetworkStatusReportData>>,
68 >,
69 pub last_backup_results: RwLock<HashMap<u64, LastBackupResult>>,
75 pub master_last_backup_result: RwLock<Option<LastBackupResult>>,
79 pub webhook_invocations: RwLock<
84 HashMap<String, std::collections::VecDeque<orca_core::api_types::WebhookInvocation>>,
85 >,
86 pub exec_sessions: RwLock<HashMap<String, tokio::sync::mpsc::Sender<Vec<u8>>>>,
88 pub pending_deploys: RwLock<HashMap<String, tokio::sync::oneshot::Sender<Result<(), String>>>>,
91 pub alerts: Option<crate::alerts::SharedAlertEngine>,
96 pub instance_events: RwLock<HashMap<String, InstanceEventLog>>,
102}
103
104#[derive(Debug, Default)]
109pub struct InstanceEventLog {
110 pub restarts: VecDeque<DateTime<Utc>>,
112 pub failures: VecDeque<DateTime<Utc>>,
115}
116
117impl InstanceEventLog {
118 pub const WINDOW: Duration = Duration::hours(24);
121
122 pub fn record_restart(&mut self, now: DateTime<Utc>) {
123 self.restarts.push_back(now);
124 prune_older_than(&mut self.restarts, now - Self::WINDOW);
125 }
126
127 pub fn record_failure(&mut self, now: DateTime<Utc>) {
128 self.failures.push_back(now);
129 prune_older_than(&mut self.failures, now - Self::WINDOW);
130 }
131
132 pub fn failures_in(&self, now: DateTime<Utc>, window: Duration) -> u64 {
134 let cutoff = now - window;
135 self.failures.iter().filter(|t| **t >= cutoff).count() as u64
136 }
137
138 pub fn restarts_in(&self, now: DateTime<Utc>, window: Duration) -> u32 {
140 let cutoff = now - window;
141 self.restarts.iter().filter(|t| **t >= cutoff).count() as u32
142 }
143}
144
145fn prune_older_than(deque: &mut VecDeque<DateTime<Utc>>, cutoff: DateTime<Utc>) {
146 while let Some(front) = deque.front() {
147 if *front < cutoff {
148 deque.pop_front();
149 } else {
150 break;
151 }
152 }
153}
154
155pub use orca_core::api_types::LastBackupResult;
156
157#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
159pub struct RegisteredNode {
160 pub node_id: u64,
162 pub address: String,
164 pub labels: HashMap<String, String>,
166 pub last_heartbeat: chrono::DateTime<chrono::Utc>,
168 #[serde(default)]
170 pub drain: bool,
171 #[serde(default)]
175 pub cpu_percent: f64,
176 #[serde(default)]
177 pub memory_bytes: u64,
178 #[serde(default)]
179 pub memory_total: u64,
180 #[serde(default)]
181 pub disk_used: u64,
182 #[serde(default)]
183 pub disk_total: u64,
184 #[serde(default)]
185 pub net_rx: u64,
186 #[serde(default)]
187 pub net_tx: u64,
188}
189
190#[derive(Debug)]
192pub struct ServiceState {
193 pub config: ServiceConfig,
195 pub desired_replicas: u32,
197 pub instances: Vec<InstanceState>,
199}
200
201#[derive(Debug)]
203pub struct InstanceState {
204 pub handle: WorkloadHandle,
206 pub status: WorkloadStatus,
208 pub host_port: Option<u16>,
210 pub container_address: Option<String>,
212 pub health: HealthState,
214 pub is_canary: bool,
216 pub started_at: std::time::Instant,
218}
219
220impl AppState {
221 pub fn new(
223 cluster_config: ClusterConfig,
224 container_runtime: Arc<dyn Runtime>,
225 wasm_runtime: Option<Arc<dyn Runtime>>,
226 route_table: SharedRouteTable,
227 wasm_triggers: SharedWasmTriggers,
228 ) -> Self {
229 let api_tokens = cluster_config.api_tokens.clone();
230 Self {
231 cluster_config,
232 container_runtime,
233 wasm_runtime,
234 services: RwLock::new(HashMap::new()),
235 route_table,
236 wasm_triggers,
237 registered_nodes: RwLock::new(HashMap::new()),
238 pending_commands: RwLock::new(HashMap::new()),
239 webhooks: crate::webhook::new_store(),
240 api_tokens,
241 deploy_history: RwLock::new(crate::deploy_history::DeployHistory::new()),
242 acme_manager: None,
243 cert_resolver: None,
244 container_stats: RwLock::new(HashMap::new()),
245 store: None,
246 ws_agents: RwLock::new(HashMap::new()),
247 log_listeners: RwLock::new(HashMap::new()),
248 backup_listeners: RwLock::new(HashMap::new()),
249 network_listeners: RwLock::new(HashMap::new()),
250 last_backup_results: RwLock::new(HashMap::new()),
251 master_last_backup_result: RwLock::new(None),
252 webhook_invocations: RwLock::new(HashMap::new()),
253 exec_sessions: RwLock::new(HashMap::new()),
254 pending_deploys: RwLock::new(HashMap::new()),
255 alerts: None,
256 instance_events: RwLock::new(HashMap::new()),
257 }
258 }
259
260 pub async fn record_instance_restart(&self, service: &str) {
263 let mut log = self.instance_events.write().await;
264 log.entry(service.to_string())
265 .or_default()
266 .record_restart(Utc::now());
267 }
268
269 pub async fn record_instance_failure(&self, service: &str) {
272 let mut log = self.instance_events.write().await;
273 log.entry(service.to_string())
274 .or_default()
275 .record_failure(Utc::now());
276 }
277
278 pub fn with_store(mut self, store: Arc<crate::store::ClusterStore>) -> Self {
280 self.store = Some(store);
281 self
282 }
283
284 pub fn with_alerts(mut self, engine: crate::alerts::SharedAlertEngine) -> Self {
287 self.alerts = Some(engine);
288 self
289 }
290
291 pub fn with_acme(
293 mut self,
294 manager: orca_proxy::acme::AcmeManager,
295 resolver: orca_proxy::SharedCertResolver,
296 ) -> Self {
297 self.acme_manager = Some(manager);
298 self.cert_resolver = Some(resolver);
299 self
300 }
301}
302
303impl ServiceState {
304 pub fn from_config(config: ServiceConfig) -> Self {
306 let desired_replicas = match &config.replicas {
307 Replicas::Fixed(n) => *n,
308 Replicas::Auto => 1,
309 };
310 Self {
311 config,
312 desired_replicas,
313 instances: Vec::new(),
314 }
315 }
316
317 pub fn running_count(&self) -> u32 {
319 self.instances
320 .iter()
321 .filter(|i| i.status == WorkloadStatus::Running)
322 .count() as u32
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329 use std::collections::HashMap;
330
331 use orca_core::config::ServiceConfig;
332 use orca_core::runtime::WorkloadHandle;
333 use orca_core::types::{Replicas, WorkloadStatus};
334
335 fn minimal_config(replicas: Replicas) -> ServiceConfig {
336 ServiceConfig {
337 name: "test-svc".to_string(),
338 project: None,
339 runtime: Default::default(),
340 image: Some("nginx:latest".to_string()),
341 module: None,
342 replicas,
343 port: Some(8080),
344 domain: None,
345 health: None,
346 readiness: None,
347 liveness: None,
348 env: HashMap::new(),
349 resources: None,
350 volume: None,
351 deploy: None,
352 placement: None,
353 network: None,
354 aliases: vec![],
355 mounts: vec![],
356 routes: vec![],
357 host_port: None,
358 triggers: Vec::new(),
359 assets: None,
360 build: None,
361 tls_cert: None,
362 tls_key: None,
363 internal: false,
364 depends_on: vec![],
365 cmd: vec![],
366 extra_ports: vec![],
367 strip_prefix: None,
368 pull_policy: Default::default(),
369 backup: None,
370 }
371 }
372
373 fn make_instance(status: WorkloadStatus) -> InstanceState {
374 InstanceState {
375 handle: WorkloadHandle {
376 runtime_id: "test-id".to_string(),
377 name: "test-instance".to_string(),
378 metadata: HashMap::new(),
379 },
380 status,
381 host_port: None,
382 container_address: None,
383 health: HealthState::Unknown,
384 is_canary: false,
385 started_at: std::time::Instant::now(),
386 }
387 }
388
389 #[test]
390 fn from_config_fixed_sets_desired_replicas() {
391 let state = ServiceState::from_config(minimal_config(Replicas::Fixed(3)));
392 assert_eq!(state.desired_replicas, 3);
393 }
394
395 #[test]
396 fn from_config_auto_defaults_to_one() {
397 let state = ServiceState::from_config(minimal_config(Replicas::Auto));
398 assert_eq!(state.desired_replicas, 1);
399 }
400
401 #[test]
402 fn running_count_with_mixed_statuses() {
403 let mut state = ServiceState::from_config(minimal_config(Replicas::Fixed(4)));
404 state.instances = vec![
405 make_instance(WorkloadStatus::Running),
406 make_instance(WorkloadStatus::Stopped),
407 make_instance(WorkloadStatus::Running),
408 make_instance(WorkloadStatus::Failed),
409 ];
410 assert_eq!(state.running_count(), 2);
411 }
412
413 #[test]
414 fn instance_event_log_counts_inside_window() {
415 let now = Utc::now();
419 let mut log = InstanceEventLog::default();
420 log.failures.push_back(now - Duration::minutes(30));
421 log.failures.push_back(now - Duration::minutes(45));
422 log.failures.push_back(now - Duration::hours(2));
423 log.restarts.push_back(now - Duration::hours(48));
424 log.restarts.push_back(now - Duration::hours(20));
425 log.restarts.push_back(now - Duration::minutes(10));
426
427 assert_eq!(log.failures_in(now, Duration::hours(1)), 2);
428 assert_eq!(log.failures_in(now, Duration::hours(24)), 3);
429 assert_eq!(log.restarts_in(now, Duration::hours(1)), 1);
430 assert_eq!(log.restarts_in(now, Duration::hours(24)), 2);
431 }
432
433 #[test]
434 fn instance_event_log_prunes_on_append() {
435 let now = Utc::now();
437 let mut log = InstanceEventLog::default();
438 log.restarts.push_back(now - Duration::hours(48)); log.restarts.push_back(now - Duration::hours(20));
440 log.record_restart(now);
441 assert_eq!(
442 log.restarts.len(),
443 2,
444 "the 48h-old entry must be pruned when a fresh `now` arrives"
445 );
446 }
447}