1use std::collections::HashMap;
4use std::sync::Arc;
5
6use tokio::sync::RwLock;
7
8use orca_core::config::{ClusterConfig, ServiceConfig};
9use orca_core::runtime::{Runtime, WorkloadHandle};
10use orca_core::types::{HealthState, Replicas, WorkloadStatus};
11
12use crate::stats::ContainerStats;
13use crate::webhook::WebhookStore;
14
15pub use orca_proxy::{RouteTarget, SharedWasmTriggers, WasmTrigger};
16
17pub type SharedRouteTable = Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>;
19
20pub struct AppState {
22 pub cluster_config: ClusterConfig,
24 pub container_runtime: Arc<dyn Runtime>,
26 pub wasm_runtime: Option<Arc<dyn Runtime>>,
28 pub services: RwLock<HashMap<String, ServiceState>>,
30 pub route_table: SharedRouteTable,
32 pub wasm_triggers: SharedWasmTriggers,
34 pub registered_nodes: RwLock<HashMap<u64, RegisteredNode>>,
36 pub webhooks: WebhookStore,
38 pub api_tokens: Vec<String>,
40 pub pending_commands: RwLock<HashMap<u64, Vec<serde_json::Value>>>,
43 pub deploy_history: RwLock<crate::deploy_history::DeployHistory>,
45 pub acme_manager: Option<orca_proxy::acme::AcmeManager>,
47 pub cert_resolver: Option<orca_proxy::SharedCertResolver>,
49 pub container_stats: RwLock<HashMap<String, ContainerStats>>,
51 pub store: Option<Arc<crate::store::ClusterStore>>,
53 pub ws_agents: RwLock<HashMap<u64, crate::ws_handler::AgentSender>>,
55 pub log_listeners: RwLock<HashMap<String, tokio::sync::mpsc::Sender<(String, bool)>>>,
57 pub backup_listeners: RwLock<
61 HashMap<String, tokio::sync::mpsc::Sender<orca_core::ws_types::BackupStatusReportData>>,
62 >,
63 pub network_listeners: RwLock<
66 HashMap<String, tokio::sync::mpsc::Sender<orca_core::ws_types::NetworkStatusReportData>>,
67 >,
68 pub last_backup_results: RwLock<HashMap<u64, LastBackupResult>>,
74 pub master_last_backup_result: RwLock<Option<LastBackupResult>>,
78 pub webhook_invocations: RwLock<
83 HashMap<String, std::collections::VecDeque<orca_core::api_types::WebhookInvocation>>,
84 >,
85 pub exec_sessions: RwLock<HashMap<String, tokio::sync::mpsc::Sender<Vec<u8>>>>,
87 pub pending_deploys: RwLock<HashMap<String, tokio::sync::oneshot::Sender<Result<(), String>>>>,
90}
91
92pub use orca_core::api_types::LastBackupResult;
93
94#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
96pub struct RegisteredNode {
97 pub node_id: u64,
99 pub address: String,
101 pub labels: HashMap<String, String>,
103 pub last_heartbeat: chrono::DateTime<chrono::Utc>,
105 #[serde(default)]
107 pub drain: bool,
108 #[serde(default)]
112 pub cpu_percent: f64,
113 #[serde(default)]
114 pub memory_bytes: u64,
115 #[serde(default)]
116 pub memory_total: u64,
117 #[serde(default)]
118 pub disk_used: u64,
119 #[serde(default)]
120 pub disk_total: u64,
121 #[serde(default)]
122 pub net_rx: u64,
123 #[serde(default)]
124 pub net_tx: u64,
125}
126
127#[derive(Debug)]
129pub struct ServiceState {
130 pub config: ServiceConfig,
132 pub desired_replicas: u32,
134 pub instances: Vec<InstanceState>,
136}
137
138#[derive(Debug)]
140pub struct InstanceState {
141 pub handle: WorkloadHandle,
143 pub status: WorkloadStatus,
145 pub host_port: Option<u16>,
147 pub container_address: Option<String>,
149 pub health: HealthState,
151 pub is_canary: bool,
153 pub started_at: std::time::Instant,
155}
156
157impl AppState {
158 pub fn new(
160 cluster_config: ClusterConfig,
161 container_runtime: Arc<dyn Runtime>,
162 wasm_runtime: Option<Arc<dyn Runtime>>,
163 route_table: SharedRouteTable,
164 wasm_triggers: SharedWasmTriggers,
165 ) -> Self {
166 let api_tokens = cluster_config.api_tokens.clone();
167 Self {
168 cluster_config,
169 container_runtime,
170 wasm_runtime,
171 services: RwLock::new(HashMap::new()),
172 route_table,
173 wasm_triggers,
174 registered_nodes: RwLock::new(HashMap::new()),
175 pending_commands: RwLock::new(HashMap::new()),
176 webhooks: crate::webhook::new_store(),
177 api_tokens,
178 deploy_history: RwLock::new(crate::deploy_history::DeployHistory::new()),
179 acme_manager: None,
180 cert_resolver: None,
181 container_stats: RwLock::new(HashMap::new()),
182 store: None,
183 ws_agents: RwLock::new(HashMap::new()),
184 log_listeners: RwLock::new(HashMap::new()),
185 backup_listeners: RwLock::new(HashMap::new()),
186 network_listeners: RwLock::new(HashMap::new()),
187 last_backup_results: RwLock::new(HashMap::new()),
188 master_last_backup_result: RwLock::new(None),
189 webhook_invocations: RwLock::new(HashMap::new()),
190 exec_sessions: RwLock::new(HashMap::new()),
191 pending_deploys: RwLock::new(HashMap::new()),
192 }
193 }
194
195 pub fn with_store(mut self, store: Arc<crate::store::ClusterStore>) -> Self {
197 self.store = Some(store);
198 self
199 }
200
201 pub fn with_acme(
203 mut self,
204 manager: orca_proxy::acme::AcmeManager,
205 resolver: orca_proxy::SharedCertResolver,
206 ) -> Self {
207 self.acme_manager = Some(manager);
208 self.cert_resolver = Some(resolver);
209 self
210 }
211}
212
213impl ServiceState {
214 pub fn from_config(config: ServiceConfig) -> Self {
216 let desired_replicas = match &config.replicas {
217 Replicas::Fixed(n) => *n,
218 Replicas::Auto => 1,
219 };
220 Self {
221 config,
222 desired_replicas,
223 instances: Vec::new(),
224 }
225 }
226
227 pub fn running_count(&self) -> u32 {
229 self.instances
230 .iter()
231 .filter(|i| i.status == WorkloadStatus::Running)
232 .count() as u32
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239 use std::collections::HashMap;
240
241 use orca_core::config::ServiceConfig;
242 use orca_core::runtime::WorkloadHandle;
243 use orca_core::types::{Replicas, WorkloadStatus};
244
245 fn minimal_config(replicas: Replicas) -> ServiceConfig {
246 ServiceConfig {
247 name: "test-svc".to_string(),
248 project: None,
249 runtime: Default::default(),
250 image: Some("nginx:latest".to_string()),
251 module: None,
252 replicas,
253 port: Some(8080),
254 domain: None,
255 health: None,
256 readiness: None,
257 liveness: None,
258 env: HashMap::new(),
259 resources: None,
260 volume: None,
261 deploy: None,
262 placement: None,
263 network: None,
264 aliases: vec![],
265 mounts: vec![],
266 routes: vec![],
267 host_port: None,
268 triggers: Vec::new(),
269 assets: None,
270 build: None,
271 tls_cert: None,
272 tls_key: None,
273 internal: false,
274 depends_on: vec![],
275 cmd: vec![],
276 extra_ports: vec![],
277 strip_prefix: None,
278 pull_policy: Default::default(),
279 backup: None,
280 }
281 }
282
283 fn make_instance(status: WorkloadStatus) -> InstanceState {
284 InstanceState {
285 handle: WorkloadHandle {
286 runtime_id: "test-id".to_string(),
287 name: "test-instance".to_string(),
288 metadata: HashMap::new(),
289 },
290 status,
291 host_port: None,
292 container_address: None,
293 health: HealthState::Unknown,
294 is_canary: false,
295 started_at: std::time::Instant::now(),
296 }
297 }
298
299 #[test]
300 fn from_config_fixed_sets_desired_replicas() {
301 let state = ServiceState::from_config(minimal_config(Replicas::Fixed(3)));
302 assert_eq!(state.desired_replicas, 3);
303 }
304
305 #[test]
306 fn from_config_auto_defaults_to_one() {
307 let state = ServiceState::from_config(minimal_config(Replicas::Auto));
308 assert_eq!(state.desired_replicas, 1);
309 }
310
311 #[test]
312 fn running_count_with_mixed_statuses() {
313 let mut state = ServiceState::from_config(minimal_config(Replicas::Fixed(4)));
314 state.instances = vec![
315 make_instance(WorkloadStatus::Running),
316 make_instance(WorkloadStatus::Stopped),
317 make_instance(WorkloadStatus::Running),
318 make_instance(WorkloadStatus::Failed),
319 ];
320 assert_eq!(state.running_count(), 2);
321 }
322}