Skip to main content

koi_runtime/
lib.rs

1//! Koi Runtime Adapter — container/service lifecycle integration.
2//!
3//! Watches runtime APIs (Docker, Podman, systemd, Incus, Kubernetes) for
4//! lifecycle events and drives Koi capabilities: mDNS announce, DNS entry,
5//! health check, proxy configuration.
6//!
7//! The adapter uses a trait-based backend system. Each runtime implements
8//! [`RuntimeBackend`] to provide normalized lifecycle events and instance
9//! metadata. The [`RuntimeCore`] facade orchestrates the mapping from
10//! runtime events to Koi API calls.
11
12pub mod backend;
13pub mod docker;
14pub mod error;
15pub mod heuristics;
16pub mod http;
17pub mod instance;
18
19use std::collections::HashMap;
20use std::sync::Arc;
21
22use axum::Router;
23use koi_common::capability::CapabilityStatus;
24use tokio::sync::{broadcast, mpsc, Mutex};
25use tokio_util::sync::CancellationToken;
26
27pub use backend::{RuntimeBackend, RuntimeBackendKind, RuntimeEvent};
28pub use error::RuntimeError;
29pub use instance::{Instance, InstanceState, KoiMetadata, PortMapping};
30
31/// Capacity for the runtime event broadcast channel.
32const BROADCAST_CHANNEL_CAPACITY: usize = 256;
33
34/// Configuration for the runtime adapter.
35#[derive(Debug, Clone)]
36pub struct RuntimeConfig {
37    /// Which backend to use.
38    pub backend_kind: RuntimeBackendKind,
39    /// Custom socket path (overrides default for the selected backend).
40    pub socket_path: Option<String>,
41}
42
43impl Default for RuntimeConfig {
44    fn default() -> Self {
45        Self {
46            backend_kind: RuntimeBackendKind::Auto,
47            socket_path: None,
48        }
49    }
50}
51
52// ── Internal state ──────────────────────────────────────────────────
53
54struct RuntimeState {
55    /// Tracked instances by runtime ID.
56    instances: Mutex<HashMap<String, Instance>>,
57    /// Backend name (set after connect).
58    backend_name: Mutex<Option<String>>,
59    /// Whether the watcher is active.
60    active: Mutex<bool>,
61    /// Event broadcast channel.
62    event_tx: broadcast::Sender<RuntimeEvent>,
63}
64
65// ── RuntimeCore facade ──────────────────────────────────────────────
66
67/// Runtime adapter domain facade.
68///
69/// Wraps the backend and tracked instance state, exposes commands,
70/// status, events, and HTTP routes.
71pub struct RuntimeCore {
72    state: Arc<RuntimeState>,
73    config: RuntimeConfig,
74}
75
76impl RuntimeCore {
77    /// Create a new RuntimeCore with the given configuration.
78    pub fn new(config: RuntimeConfig) -> Self {
79        Self {
80            state: Arc::new(RuntimeState {
81                instances: Mutex::new(HashMap::new()),
82                backend_name: Mutex::new(None),
83                active: Mutex::new(false),
84                event_tx: broadcast::channel(BROADCAST_CHANNEL_CAPACITY).0,
85            }),
86            config,
87        }
88    }
89
90    /// Build the HTTP router for this domain.
91    pub fn routes(&self) -> Router {
92        http::routes(Arc::new(RuntimeCore {
93            state: Arc::clone(&self.state),
94            config: self.config.clone(),
95        }))
96    }
97
98    /// Subscribe to runtime events.
99    pub fn subscribe(&self) -> broadcast::Receiver<RuntimeEvent> {
100        self.state.event_tx.subscribe()
101    }
102
103    /// Get current status.
104    pub async fn status(&self) -> http::RuntimeStatus {
105        let instances = self.state.instances.lock().await;
106        let backend = self.state.backend_name.lock().await;
107        let active = *self.state.active.lock().await;
108
109        http::RuntimeStatus {
110            active,
111            backend: backend.clone(),
112            instance_count: instances.len(),
113        }
114    }
115
116    /// List all tracked instances.
117    pub async fn list_instances(&self) -> Result<Vec<Instance>, RuntimeError> {
118        let instances = self.state.instances.lock().await;
119        Ok(instances.values().cloned().collect())
120    }
121
122    /// Start watching the runtime backend for lifecycle events.
123    ///
124    /// This spawns a background task that:
125    /// 1. Connects to the runtime backend
126    /// 2. Lists existing instances (reconciliation)
127    /// 3. Streams lifecycle events
128    /// 4. Updates tracked state and broadcasts events
129    ///
130    /// Returns immediately. The background task runs until the cancel token fires.
131    pub async fn start_watching(&self, cancel: CancellationToken) -> Result<(), RuntimeError> {
132        let mut backend = self.create_backend()?;
133
134        backend.connect().await?;
135
136        // Store backend name
137        *self.state.backend_name.lock().await = Some(backend.name().to_string());
138        *self.state.active.lock().await = true;
139
140        // Initial reconciliation: list all running instances
141        let existing = backend.list_instances().await?;
142        {
143            let mut instances = self.state.instances.lock().await;
144            for instance in &existing {
145                instances.insert(instance.id.clone(), instance.clone());
146            }
147        }
148
149        tracing::info!(
150            backend = backend.name(),
151            instances = existing.len(),
152            "Runtime adapter started, initial reconciliation complete"
153        );
154
155        // Broadcast initial instances as Started events
156        for instance in existing {
157            let _ = self.state.event_tx.send(RuntimeEvent::Started(instance));
158        }
159
160        // Spawn event watch loop
161        let state = Arc::clone(&self.state);
162        let (event_tx, mut event_rx) = mpsc::channel(256);
163
164        let watch_cancel = cancel.clone();
165        tokio::spawn(async move {
166            if let Err(e) = backend.watch(event_tx, watch_cancel).await {
167                tracing::error!(error = %e, "Runtime watch loop exited with error");
168            }
169            *state.active.lock().await = false;
170            tracing::info!("Runtime watch loop stopped");
171        });
172
173        // Spawn event processing loop
174        let state = Arc::clone(&self.state);
175        tokio::spawn(async move {
176            while let Some(event) = event_rx.recv().await {
177                match &event {
178                    RuntimeEvent::Started(instance) => {
179                        let mut instances = state.instances.lock().await;
180                        instances.insert(instance.id.clone(), instance.clone());
181                        tracing::debug!(
182                            name = %instance.name,
183                            id = %instance.id,
184                            "Instance tracked"
185                        );
186                    }
187                    RuntimeEvent::Stopped { id, name } => {
188                        let mut instances = state.instances.lock().await;
189                        instances.remove(id.as_str());
190                        tracing::debug!(name, id, "Instance untracked");
191                    }
192                    RuntimeEvent::Updated(instance) => {
193                        let mut instances = state.instances.lock().await;
194                        instances.insert(instance.id.clone(), instance.clone());
195                    }
196                    RuntimeEvent::BackendDisconnected { backend, reason } => {
197                        tracing::warn!(backend, reason, "Backend disconnected");
198                    }
199                    RuntimeEvent::BackendReconnected { backend } => {
200                        tracing::info!(backend, "Backend reconnected");
201                    }
202                }
203                // Broadcast to subscribers
204                let _ = state.event_tx.send(event);
205            }
206        });
207
208        Ok(())
209    }
210
211    /// Capability status for the unified status endpoint.
212    pub async fn capability_status(&self) -> CapabilityStatus {
213        let instances = self.state.instances.lock().await;
214        let backend = self.state.backend_name.lock().await;
215        let active = *self.state.active.lock().await;
216
217        CapabilityStatus {
218            name: "runtime".to_string(),
219            healthy: active,
220            summary: if active {
221                format!(
222                    "{}: {} instances",
223                    backend.as_deref().unwrap_or("none"),
224                    instances.len()
225                )
226            } else {
227                "inactive".to_string()
228            },
229        }
230    }
231
232    /// Create a backend based on the configured kind.
233    fn create_backend(&self) -> Result<Box<dyn RuntimeBackend>, RuntimeError> {
234        match self.config.backend_kind {
235            RuntimeBackendKind::Docker => {
236                let backend = if let Some(ref path) = self.config.socket_path {
237                    docker::DockerBackend::with_socket(path.clone())
238                } else {
239                    docker::DockerBackend::new()
240                };
241                Ok(Box::new(backend))
242            }
243            RuntimeBackendKind::Podman => {
244                let backend = if let Some(ref path) = self.config.socket_path {
245                    docker::DockerBackend::with_socket(path.clone())
246                } else {
247                    docker::DockerBackend::podman()
248                };
249                Ok(Box::new(backend))
250            }
251            RuntimeBackendKind::Auto => self.auto_detect_backend(),
252            RuntimeBackendKind::Systemd => Err(RuntimeError::BackendUnavailable(
253                "systemd backend not yet implemented".into(),
254            )),
255            RuntimeBackendKind::Incus => Err(RuntimeError::BackendUnavailable(
256                "incus backend not yet implemented".into(),
257            )),
258            RuntimeBackendKind::Kubernetes => Err(RuntimeError::BackendUnavailable(
259                "kubernetes backend not yet implemented".into(),
260            )),
261        }
262    }
263
264    /// Auto-detect the best available backend.
265    fn auto_detect_backend(&self) -> Result<Box<dyn RuntimeBackend>, RuntimeError> {
266        if docker::is_docker_available() {
267            tracing::info!("Auto-detected Docker runtime");
268            return Ok(Box::new(docker::DockerBackend::new()));
269        }
270
271        if docker::is_podman_available() {
272            tracing::info!("Auto-detected Podman runtime");
273            return Ok(Box::new(docker::DockerBackend::podman()));
274        }
275
276        Err(RuntimeError::BackendUnavailable(
277            "no supported runtime detected (checked: Docker, Podman)".into(),
278        ))
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[tokio::test]
287    async fn runtime_core_default_status_is_inactive() {
288        let core = RuntimeCore::new(RuntimeConfig::default());
289        let status = core.status().await;
290        assert!(!status.active);
291        assert_eq!(status.instance_count, 0);
292        assert!(status.backend.is_none());
293    }
294
295    #[tokio::test]
296    async fn list_instances_empty_by_default() {
297        let core = RuntimeCore::new(RuntimeConfig::default());
298        let instances = core.list_instances().await.unwrap();
299        assert!(instances.is_empty());
300    }
301
302    #[test]
303    fn auto_backend_kind_display() {
304        assert_eq!(RuntimeBackendKind::Auto.to_string(), "auto");
305        assert_eq!(RuntimeBackendKind::Docker.to_string(), "docker");
306    }
307
308    #[test]
309    fn backend_kind_from_str() {
310        assert_eq!(
311            RuntimeBackendKind::from_str_loose("docker"),
312            Some(RuntimeBackendKind::Docker)
313        );
314        assert_eq!(
315            RuntimeBackendKind::from_str_loose("k8s"),
316            Some(RuntimeBackendKind::Kubernetes)
317        );
318        assert_eq!(RuntimeBackendKind::from_str_loose("unknown"), None);
319    }
320}