1use std::collections::HashMap;
8
9use bollard::query_parameters::{EventsOptions, InspectContainerOptions, ListContainersOptions};
10use bollard::Docker;
11use chrono::Utc;
12use futures_util::StreamExt;
13use tokio::sync::mpsc;
14use tokio_util::sync::CancellationToken;
15
16use crate::backend::{RuntimeBackend, RuntimeEvent};
17use crate::error::RuntimeError;
18use crate::instance::{
19 ComposeInfo, Instance, InstanceState, KoiMetadata, PortMapping, PortProtocol,
20};
21
22pub struct DockerBackend {
24 client: Option<Docker>,
25 socket_path: Option<String>,
26 is_podman: bool,
27}
28
29impl Default for DockerBackend {
30 fn default() -> Self {
31 Self::new()
32 }
33}
34
35impl DockerBackend {
36 pub fn new() -> Self {
38 Self {
39 client: None,
40 socket_path: None,
41 is_podman: false,
42 }
43 }
44
45 pub fn with_socket(path: String) -> Self {
47 Self {
48 client: None,
49 socket_path: Some(path),
50 is_podman: false,
51 }
52 }
53
54 pub fn podman() -> Self {
56 Self {
57 client: None,
58 socket_path: None,
59 is_podman: true,
60 }
61 }
62
63 fn client(&self) -> Result<&Docker, RuntimeError> {
64 self.client
65 .as_ref()
66 .ok_or_else(|| RuntimeError::Connection("Docker client not connected".into()))
67 }
68
69 async fn container_to_instance(
71 &self,
72 client: &Docker,
73 container_id: &str,
74 ) -> Result<Instance, RuntimeError> {
75 let info = client
76 .inspect_container(container_id, None::<InspectContainerOptions>)
77 .await
78 .map_err(|e| RuntimeError::Internal(format!("inspect {container_id}: {e}")))?;
79
80 let config = info.config.as_ref();
81 let labels = config
82 .and_then(|c| c.labels.as_ref())
83 .cloned()
84 .unwrap_or_default();
85
86 let name = info
87 .name
88 .as_deref()
89 .unwrap_or(container_id)
90 .trim_start_matches('/')
91 .to_string();
92
93 let compose = ComposeInfo::from_labels(&labels);
94 let effective_name = compose.effective_name(&name).to_string();
95
96 let ports = extract_port_mappings(&info);
97 let ips = extract_ips(&info);
98
99 let state = match info.state.as_ref().and_then(|s| s.status) {
100 Some(bollard::models::ContainerStateStatusEnum::RUNNING) => InstanceState::Running,
101 Some(bollard::models::ContainerStateStatusEnum::PAUSED) => InstanceState::Paused,
102 Some(bollard::models::ContainerStateStatusEnum::RESTARTING) => {
103 InstanceState::Restarting
104 }
105 _ => InstanceState::Stopped,
106 };
107
108 let image = config.and_then(|c| c.image.clone());
109
110 let env_vars: Vec<String> = config
112 .and_then(|c| c.env.as_ref())
113 .cloned()
114 .unwrap_or_default();
115
116 let koi_metadata = KoiMetadata::from_labels_and_env(&labels, &env_vars);
117
118 Ok(Instance {
119 id: info.id.unwrap_or_else(|| container_id.to_string()),
120 name: effective_name,
121 ports,
122 ips,
123 metadata: koi_metadata,
124 backend: if self.is_podman { "podman" } else { "docker" }.to_string(),
125 state,
126 discovered_at: Utc::now(),
127 image,
128 })
129 }
130}
131
132#[async_trait::async_trait]
133impl RuntimeBackend for DockerBackend {
134 fn name(&self) -> &'static str {
135 if self.is_podman {
136 "podman"
137 } else {
138 "docker"
139 }
140 }
141
142 async fn connect(&mut self) -> Result<(), RuntimeError> {
143 let client = if let Some(ref path) = self.socket_path {
144 Docker::connect_with_socket(path, 120, bollard::API_DEFAULT_VERSION)
145 .map_err(|e| RuntimeError::Connection(format!("socket {path}: {e}")))?
146 } else if self.is_podman {
147 #[cfg(unix)]
149 {
150 let uid = unsafe { libc::getuid() };
151 let user_socket = format!("/run/user/{uid}/podman/podman.sock");
152 if std::path::Path::new(&user_socket).exists() {
153 Docker::connect_with_socket(&user_socket, 120, bollard::API_DEFAULT_VERSION)
154 .map_err(|e| RuntimeError::Connection(format!("podman: {e}")))?
155 } else {
156 Docker::connect_with_socket(
157 "/run/podman/podman.sock",
158 120,
159 bollard::API_DEFAULT_VERSION,
160 )
161 .map_err(|e| RuntimeError::Connection(format!("podman: {e}")))?
162 }
163 }
164 #[cfg(not(unix))]
165 {
166 Docker::connect_with_local_defaults()
167 .map_err(|e| RuntimeError::Connection(format!("podman: {e}")))?
168 }
169 } else {
170 Docker::connect_with_local_defaults()
171 .map_err(|e| RuntimeError::Connection(format!("docker: {e}")))?
172 };
173
174 client
176 .ping()
177 .await
178 .map_err(|e| RuntimeError::Connection(format!("ping failed: {e}")))?;
179
180 let version = client
181 .version()
182 .await
183 .map_err(|e| RuntimeError::Connection(format!("version check: {e}")))?;
184
185 tracing::info!(
186 backend = self.name(),
187 api_version = ?version.api_version,
188 "Connected to runtime"
189 );
190
191 self.client = Some(client);
192 Ok(())
193 }
194
195 async fn list_instances(&self) -> Result<Vec<Instance>, RuntimeError> {
196 let client = self.client()?;
197
198 let opts = ListContainersOptions {
199 all: false, ..Default::default()
201 };
202
203 let containers = client
204 .list_containers(Some(opts))
205 .await
206 .map_err(|e| RuntimeError::Internal(format!("list containers: {e}")))?;
207
208 let mut instances = Vec::with_capacity(containers.len());
209 for container in &containers {
210 if let Some(ref id) = container.id {
211 match self.container_to_instance(client, id).await {
212 Ok(instance) => instances.push(instance),
213 Err(e) => {
214 tracing::warn!(id, error = %e, "Failed to inspect container, skipping");
215 }
216 }
217 }
218 }
219
220 Ok(instances)
221 }
222
223 async fn watch(
224 &self,
225 tx: mpsc::Sender<RuntimeEvent>,
226 cancel: CancellationToken,
227 ) -> Result<(), RuntimeError> {
228 let client = self.client()?;
229
230 let event_filters = HashMap::from([("type".to_string(), vec!["container".to_string()])]);
231 let opts = EventsOptions {
232 filters: Some(event_filters),
233 ..Default::default()
234 };
235
236 let mut stream = client.events(Some(opts));
237
238 loop {
239 tokio::select! {
240 _ = cancel.cancelled() => {
241 tracing::info!(backend = self.name(), "Watch cancelled");
242 break;
243 }
244 event = stream.next() => {
245 match event {
246 Some(Ok(ev)) => {
247 if let Err(e) = self.handle_docker_event(client, &tx, &ev).await {
248 tracing::warn!(error = %e, "Error handling Docker event");
249 }
250 }
251 Some(Err(e)) => {
252 let _ = tx.send(RuntimeEvent::BackendDisconnected {
253 backend: self.name().to_string(),
254 reason: e.to_string(),
255 }).await;
256 tracing::error!(error = %e, "Docker event stream error");
257 break;
258 }
259 None => {
260 tracing::info!("Docker event stream ended");
261 break;
262 }
263 }
264 }
265 }
266 }
267
268 Ok(())
269 }
270}
271
272impl DockerBackend {
273 async fn handle_docker_event(
274 &self,
275 client: &Docker,
276 tx: &mpsc::Sender<RuntimeEvent>,
277 event: &bollard::models::EventMessage,
278 ) -> Result<(), RuntimeError> {
279 let action = event.action.as_deref().unwrap_or("");
280 let actor = event.actor.as_ref();
281 let id = actor.and_then(|a| a.id.as_deref()).unwrap_or("");
282
283 if id.is_empty() {
284 return Ok(());
285 }
286
287 match action {
288 "start" => match self.container_to_instance(client, id).await {
289 Ok(instance) => {
290 tracing::info!(
291 name = %instance.name,
292 ports = ?instance.ports.len(),
293 backend = self.name(),
294 "Instance started"
295 );
296 let _ = tx.send(RuntimeEvent::Started(instance)).await;
297 }
298 Err(e) => {
299 tracing::warn!(id, error = %e, "Failed to inspect started container");
300 }
301 },
302 "die" | "stop" | "kill" | "destroy" => {
303 let name = actor
304 .and_then(|a| a.attributes.as_ref())
305 .and_then(|attrs| attrs.get("name"))
306 .cloned()
307 .unwrap_or_else(|| id.to_string());
308
309 tracing::info!(
310 name = %name,
311 action,
312 backend = self.name(),
313 "Instance stopped"
314 );
315 let _ = tx
316 .send(RuntimeEvent::Stopped {
317 id: id.to_string(),
318 name,
319 })
320 .await;
321 }
322 _ => {}
324 }
325
326 Ok(())
327 }
328}
329
330fn extract_port_mappings(info: &bollard::models::ContainerInspectResponse) -> Vec<PortMapping> {
332 let mut mappings = Vec::new();
333
334 let network_ports = info
335 .network_settings
336 .as_ref()
337 .and_then(|ns| ns.ports.as_ref());
338
339 if let Some(ports) = network_ports {
340 for (port_spec, bindings) in ports {
341 let Some(bindings) = bindings else { continue };
342
343 let (container_port, protocol) = parse_port_spec(port_spec);
345
346 for binding in bindings {
347 let host_port = binding
348 .host_port
349 .as_deref()
350 .and_then(|p| p.parse::<u16>().ok())
351 .unwrap_or(0);
352
353 if host_port == 0 {
354 continue;
355 }
356
357 let host_ip = binding.host_ip.as_deref().unwrap_or("0.0.0.0").to_string();
358
359 mappings.push(PortMapping {
360 host_port,
361 container_port,
362 protocol,
363 host_ip,
364 });
365 }
366 }
367 }
368
369 mappings
370}
371
372fn parse_port_spec(spec: &str) -> (u16, PortProtocol) {
374 let parts: Vec<&str> = spec.split('/').collect();
375 let port = parts
376 .first()
377 .and_then(|p| p.parse::<u16>().ok())
378 .unwrap_or(0);
379 let protocol = match parts.get(1) {
380 Some(&"udp") => PortProtocol::Udp,
381 _ => PortProtocol::Tcp,
382 };
383 (port, protocol)
384}
385
386fn extract_ips(info: &bollard::models::ContainerInspectResponse) -> Vec<String> {
388 let mut ips = Vec::new();
389
390 if let Some(ns) = &info.network_settings {
391 if let Some(ref networks) = ns.networks {
393 for network in networks.values() {
394 if let Some(ref ip) = network.ip_address {
395 if !ip.is_empty() && !ips.contains(ip) {
396 ips.push(ip.clone());
397 }
398 }
399 if let Some(ref ip6) = network.global_ipv6_address {
400 if !ip6.is_empty() && !ips.contains(ip6) {
401 ips.push(ip6.clone());
402 }
403 }
404 }
405 }
406 }
407
408 ips
409}
410
411pub fn is_docker_available() -> bool {
413 #[cfg(unix)]
414 {
415 std::path::Path::new("/var/run/docker.sock").exists()
416 }
417 #[cfg(windows)]
418 {
419 std::process::Command::new("docker")
422 .arg("info")
423 .stdout(std::process::Stdio::null())
424 .stderr(std::process::Stdio::null())
425 .status()
426 .map(|s| s.success())
427 .unwrap_or(false)
428 }
429}
430
431pub fn is_podman_available() -> bool {
433 #[cfg(unix)]
434 {
435 let uid = unsafe { libc::getuid() };
436 let user_socket = format!("/run/user/{uid}/podman/podman.sock");
437 std::path::Path::new(&user_socket).exists()
438 || std::path::Path::new("/run/podman/podman.sock").exists()
439 }
440 #[cfg(not(unix))]
441 {
442 false
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449
450 #[test]
451 fn parse_tcp_port_spec() {
452 let (port, proto) = parse_port_spec("80/tcp");
453 assert_eq!(port, 80);
454 assert_eq!(proto, PortProtocol::Tcp);
455 }
456
457 #[test]
458 fn parse_udp_port_spec() {
459 let (port, proto) = parse_port_spec("53/udp");
460 assert_eq!(port, 53);
461 assert_eq!(proto, PortProtocol::Udp);
462 }
463
464 #[test]
465 fn parse_bare_port_defaults_to_tcp() {
466 let (port, proto) = parse_port_spec("443");
467 assert_eq!(port, 443);
468 assert_eq!(proto, PortProtocol::Tcp);
469 }
470
471 #[test]
472 fn docker_backend_name() {
473 let docker = DockerBackend::new();
474 assert_eq!(docker.name(), "docker");
475
476 let podman = DockerBackend::podman();
477 assert_eq!(podman.name(), "podman");
478 }
479}