darpan 0.2.5

Linux developer service monitoring utility with auto-detection, real-time health checks, and interactive TUI for databases, APIs, Docker containers, and more
Documentation
use crate::logs::LogStreamer;
use crate::models::{LogEntry, LogLevel, LogSource, Service};
use anyhow::Result;
use async_trait::async_trait;
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
use tracing::{debug, warn};

pub struct ProcessLogStreamer;

impl ProcessLogStreamer {
    pub fn new() -> Self {
        Self
    }
}

#[async_trait]
impl LogStreamer for ProcessLogStreamer {
    async fn stream_logs(
        &self,
        service: &Service,
        tx: mpsc::UnboundedSender<LogEntry>,
    ) -> Result<()> {
        let pid = service.pid
            .ok_or_else(|| anyhow::anyhow!("No PID for service"))?;

        debug!("Starting process log stream for PID: {}", pid);

        // Try to read from stdout/stderr file descriptors
        let stdout_path = format!("/proc/{}/fd/1", pid);
        let _stderr_path = format!("/proc/{}/fd/2", pid);

        // Attempt to tail stdout
        if let Ok(file) = File::open(&stdout_path).await {
            let mut reader = BufReader::new(file).lines();
            let mut line_count = 0;

            // Read last 100 lines (simplified - just read what's available)
            while let Ok(Some(line)) = reader.next_line().await {
                let entry = LogEntry::new(
                    detect_log_level(&line),
                    line.clone(),
                    LogSource::Process,
                ).with_raw(line);

                if tx.send(entry).is_err() {
                    break;
                }

                line_count += 1;
                if line_count >= 100 {
                    break;
                }
            }
        } else {
            warn!("Cannot read process stdout for PID {}: Permission denied or not available", pid);
            
            // Send a message indicating logs aren't available
            let entry = LogEntry::new(
                LogLevel::Warn,
                format!("Process logs not accessible for PID {}. Try running with sudo or check if the process writes to a log file.", pid),
                LogSource::Process,
            );
            
            tx.send(entry)?;
        }

        // Keep checking if process is still alive
        loop {
            sleep(Duration::from_secs(1)).await;
            
            // Check if process exists
            if !std::path::Path::new(&format!("/proc/{}", pid)).exists() {
                debug!("Process {} no longer exists, stopping log stream", pid);
                break;
            }
            
            // Check if receiver is still alive
            if tx.is_closed() {
                break;
            }
        }

        debug!("Process log stream ended for PID: {}", pid);
        Ok(())
    }

    fn can_handle(&self, service: &Service) -> bool {
        service.pid.is_some() 
            && service.container_id.is_none() 
            && service.systemd_unit.is_none()
            && service.log_file_path.is_none()
    }

    fn source_type(&self) -> LogSource {
        LogSource::Process
    }
}

fn detect_log_level(message: &str) -> LogLevel {
    let lower = message.to_lowercase();
    
    if lower.contains("error") || lower.contains("fatal") || lower.contains("panic") {
        LogLevel::Error
    } else if lower.contains("warn") || lower.contains("warning") {
        LogLevel::Warn
    } else if lower.contains("debug") || lower.contains("trace") {
        LogLevel::Debug
    } else {
        LogLevel::Info
    }
}