darpan 0.2.0

Linux developer service monitoring utility with auto-detection, real-time health checks, and interactive TUI for databases, APIs, Docker containers, and more
Documentation
use crate::logs::LogStreamer;
use crate::models::{LogEntry, LogLevel, LogSource, Service, ServiceType};
use anyhow::Result;
use async_trait::async_trait;
use bollard::container::LogsOptions;
use bollard::Docker;
use chrono::Utc;
use futures::StreamExt;
use tokio::sync::mpsc;
use tracing::{debug, error, warn};

pub struct DockerLogStreamer {
    docker: Option<Docker>,
}

impl DockerLogStreamer {
    pub fn new() -> Self {
        let docker = Docker::connect_with_local_defaults().ok();
        if docker.is_none() {
            warn!("Docker not available for log streaming");
        }
        Self { docker }
    }
}

#[async_trait]
impl LogStreamer for DockerLogStreamer {
    async fn stream_logs(
        &self,
        service: &Service,
        tx: mpsc::UnboundedSender<LogEntry>,
    ) -> Result<()> {
        let docker = self.docker.as_ref()
            .ok_or_else(|| anyhow::anyhow!("Docker not available"))?;

        let container_id = service.container_id.as_ref()
            .ok_or_else(|| anyhow::anyhow!("No container ID for service"))?;

        debug!("Starting Docker log stream for container: {}", container_id);

        let options = LogsOptions::<String> {
            follow: true,
            stdout: true,
            stderr: true,
            tail: "100".to_string(),
            timestamps: true,
            ..Default::default()
        };

        let mut stream = docker.logs(container_id, Some(options));

        while let Some(log_result) = stream.next().await {
            match log_result {
                Ok(log_output) => {
                    let message = log_output.to_string();
                    
                    // Parse timestamp if available
                    let (timestamp, clean_message) = parse_docker_timestamp(&message);
                    
                    // Determine log level from stderr vs stdout
                    let level = match log_output {
                        bollard::container::LogOutput::StdErr { .. } => LogLevel::Error,
                        bollard::container::LogOutput::StdOut { .. } => {
                            // Try to detect log level from message content
                            detect_log_level(&clean_message)
                        }
                        _ => LogLevel::Info,
                    };

                    let entry = LogEntry::new(level, clean_message, LogSource::Docker)
                        .with_timestamp(timestamp)
                        .with_raw(message);

                    if tx.send(entry).is_err() {
                        debug!("Log receiver dropped, stopping Docker log stream");
                        break;
                    }
                }
                Err(e) => {
                    error!("Error reading Docker logs: {}", e);
                    break;
                }
            }
        }

        debug!("Docker log stream ended for container: {}", container_id);
        Ok(())
    }

    fn can_handle(&self, service: &Service) -> bool {
        self.docker.is_some()
            && matches!(service.service_type, ServiceType::DockerContainer)
            && service.container_id.is_some()
    }

    fn source_type(&self) -> LogSource {
        LogSource::Docker
    }
}

/// Parse Docker timestamp from log line
/// Docker format: "2024-01-20T12:30:45.123456789Z message"
fn parse_docker_timestamp(message: &str) -> (chrono::DateTime<Utc>, String) {
    if let Some(space_idx) = message.find(' ') {
        let timestamp_str = &message[..space_idx];
        if let Ok(timestamp) = chrono::DateTime::parse_from_rfc3339(timestamp_str) {
            return (timestamp.with_timezone(&Utc), message[space_idx + 1..].to_string());
        }
    }
    (Utc::now(), message.to_string())
}

/// Detect log level from message content
fn detect_log_level(message: &str) -> LogLevel {
    let lower = message.to_lowercase();
    
    if lower.contains("error") || lower.contains("fatal") || lower.contains("panic") {
        LogLevel::Error
    } else if lower.contains("warn") || lower.contains("warning") {
        LogLevel::Warn
    } else if lower.contains("debug") || lower.contains("trace") {
        LogLevel::Debug
    } else {
        LogLevel::Info
    }
}