use crate::logs::LogStreamer;
use crate::models::{LogEntry, LogLevel, LogSource, Service, ServiceType};
use anyhow::Result;
use async_trait::async_trait;
use bollard::container::LogsOptions;
use bollard::Docker;
use chrono::Utc;
use futures::StreamExt;
use tokio::sync::mpsc;
use tracing::{debug, error, warn};
pub struct DockerLogStreamer {
docker: Option<Docker>,
}
impl DockerLogStreamer {
pub fn new() -> Self {
let docker = Docker::connect_with_local_defaults().ok();
if docker.is_none() {
warn!("Docker not available for log streaming");
}
Self { docker }
}
}
#[async_trait]
impl LogStreamer for DockerLogStreamer {
async fn stream_logs(
&self,
service: &Service,
tx: mpsc::UnboundedSender<LogEntry>,
) -> Result<()> {
let docker = self.docker.as_ref()
.ok_or_else(|| anyhow::anyhow!("Docker not available"))?;
let container_id = service.container_id.as_ref()
.ok_or_else(|| anyhow::anyhow!("No container ID for service"))?;
debug!("Starting Docker log stream for container: {}", container_id);
let options = LogsOptions::<String> {
follow: true,
stdout: true,
stderr: true,
tail: "100".to_string(),
timestamps: true,
..Default::default()
};
let mut stream = docker.logs(container_id, Some(options));
while let Some(log_result) = stream.next().await {
match log_result {
Ok(log_output) => {
let message = log_output.to_string();
let (timestamp, clean_message) = parse_docker_timestamp(&message);
let level = match log_output {
bollard::container::LogOutput::StdErr { .. } => LogLevel::Error,
bollard::container::LogOutput::StdOut { .. } => {
detect_log_level(&clean_message)
}
_ => LogLevel::Info,
};
let entry = LogEntry::new(level, clean_message, LogSource::Docker)
.with_timestamp(timestamp)
.with_raw(message);
if tx.send(entry).is_err() {
debug!("Log receiver dropped, stopping Docker log stream");
break;
}
}
Err(e) => {
error!("Error reading Docker logs: {}", e);
break;
}
}
}
debug!("Docker log stream ended for container: {}", container_id);
Ok(())
}
fn can_handle(&self, service: &Service) -> bool {
self.docker.is_some()
&& matches!(service.service_type, ServiceType::DockerContainer)
&& service.container_id.is_some()
}
fn source_type(&self) -> LogSource {
LogSource::Docker
}
}
fn parse_docker_timestamp(message: &str) -> (chrono::DateTime<Utc>, String) {
if let Some(space_idx) = message.find(' ') {
let timestamp_str = &message[..space_idx];
if let Ok(timestamp) = chrono::DateTime::parse_from_rfc3339(timestamp_str) {
return (timestamp.with_timezone(&Utc), message[space_idx + 1..].to_string());
}
}
(Utc::now(), message.to_string())
}
fn detect_log_level(message: &str) -> LogLevel {
let lower = message.to_lowercase();
if lower.contains("error") || lower.contains("fatal") || lower.contains("panic") {
LogLevel::Error
} else if lower.contains("warn") || lower.contains("warning") {
LogLevel::Warn
} else if lower.contains("debug") || lower.contains("trace") {
LogLevel::Debug
} else {
LogLevel::Info
}
}