use bollard::Docker;
use bollard::models::EventMessageTypeEnum;
use bollard::service::EventMessage; use bollard::system::EventsOptions;
use futures_util::stream::StreamExt;
use tokio::sync::mpsc;
use crate::types::{Result, AppError, DockerEvent, EventSender, SystemEvent};
use log::{info, error, warn};
use bollard::container::InspectContainerOptions;
use std::collections::HashMap;
use std::net::IpAddr;
pub struct DockerMonitor {
docker: Docker,
event_tx: mpsc::Sender<SystemEvent>,
}
impl DockerMonitor {
pub fn new(event_tx: mpsc::Sender<SystemEvent>) -> Result<Self> {
let docker = Docker::connect_with_unix_defaults()
.map_err(|e| AppError::DockerError(format!("Failed to connect to Docker socket: {}", e)))?;
info!("Successfully connected to Docker daemon.");
Ok(Self { docker, event_tx })
}
pub async fn start(self) -> Result<()> {
info!("Starting Docker event listener...");
let mut filters = HashMap::new();
filters.insert("type".to_string(), vec!["container".to_string()]);
filters.insert("event".to_string(), vec!["start".to_string(), "stop".to_string(), "die".to_string()]);
let options = EventsOptions::<String> {
since: None,
until: None,
filters,
};
let mut event_stream = self.docker.events(Some(options));
while let Some(event_result) = event_stream.next().await {
match event_result {
Ok(event) => {
if let Err(e) = self.handle_event(event).await {
warn!("Error handling Docker event: {}", e); }
}
Err(e) => {
error!("Error receiving Docker event stream: {}", e);
return Err(AppError::DockerError(format!("Docker event stream error: {}", e)));
}
}
}
warn!("Docker event stream ended unexpectedly.");
Ok(())
}
async fn handle_event(&self, event: EventMessage) -> Result<()> {
match (event.typ, event.action.as_deref()) {
(Some(EventMessageTypeEnum::CONTAINER), Some("start")) => {
if let Some(actor) = event.actor {
let container_id = actor.id.unwrap_or_else(|| "Unknown".to_string());
info!("Docker container started: {}", container_id);
let ip_address_str = match self.get_container_ip(&container_id).await {
Ok(ip) => ip,
Err(e) => {
warn!("Failed to inspect container {} for IP: {}", container_id, e);
None
}
};
let ip_address: Option<IpAddr> = ip_address_str
.clone()
.and_then(|ip_str| ip_str.parse().ok());
if ip_address.is_none() && ip_address_str.is_some() {
warn!("Failed to parse IP address string: {}", ip_address_str.unwrap());
}
self.event_tx.send(SystemEvent::Docker(crate::types::DockerEvent::ContainerStarted(container_id, ip_address))).await
.map_err(|e| AppError::MpscSendError(format!("Failed to send DockerEvent::ContainerStarted: {}", e)))?;
}
}
(Some(EventMessageTypeEnum::CONTAINER), Some("stop")) | (Some(EventMessageTypeEnum::CONTAINER), Some("die")) => {
if let Some(actor) = event.actor {
let container_id = actor.id.unwrap_or_else(|| "Unknown".to_string());
info!("Docker container stopped/died: {}", container_id);
self.event_tx.send(SystemEvent::Docker(crate::types::DockerEvent::ContainerStopped(container_id))).await
.map_err(|e| AppError::MpscSendError(format!("Failed to send DockerEvent::ContainerStopped: {}", e)))?;
}
}
_ => {
}
}
Ok(())
}
async fn get_container_ip(&self, container_id: &str) -> Result<Option<String>> {
info!("Inspecting container {} for IP address...", container_id);
let options = InspectContainerOptions { size: false };
match self.docker.inspect_container(container_id, Some(options)).await {
Ok(inspect_info) => {
if let Some(network_settings) = inspect_info.network_settings {
if let Some(ip) = network_settings.ip_address {
if !ip.is_empty() {
info!("Found default IP {} for container {}", ip, container_id);
return Ok(Some(ip));
}
}
if let Some(networks) = network_settings.networks {
for (network_name, network_data) in networks {
if let Some(ip) = network_data.ip_address {
if !ip.is_empty() {
info!("Found IP {} for container {} in network \"{}\"", ip, container_id, network_name);
return Ok(Some(ip));
}
}
}
}
}
warn!("No IP address found for container {} in inspect details.", container_id);
Ok(None) }
Err(e) => {
error!("Failed to inspect container {}: {}", container_id, e);
Err(AppError::DockerError(format!("Bollard inspect error: {}", e)))
}
}
}
}