darpan 0.2.5

Linux developer service monitoring utility with auto-detection, real-time health checks, and interactive TUI for databases, APIs, Docker containers, and more
Documentation
use crate::logs::LogStreamer;
use crate::models::{LogEntry, LogLevel, LogSource, Service};
use anyhow::Result;
use async_trait::async_trait;
use std::path::{Path, PathBuf};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader};
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
use tracing::{debug, warn};

pub struct FileLogStreamer;

impl FileLogStreamer {
    pub fn new() -> Self {
        Self
    }

    /// Auto-detect log file paths for a service
    fn find_log_file(&self, service: &Service) -> Option<PathBuf> {
        // Priority 1: Explicit log file path
        if let Some(path) = &service.log_file_path {
            let path = PathBuf::from(path);
            if path.exists() {
                return Some(path);
            }
        }

        // Priority 2: Common log paths
        let service_name = service.name.to_lowercase().replace(" ", "-");
        let project_path = Path::new(&service.project_path);

        let potential_paths = vec![
            // Project-local logs
            project_path.join("logs").join(format!("{}.log", service_name)),
            project_path.join("log").join(format!("{}.log", service_name)),
            project_path.join(format!("{}.log", service_name)),
            
            // PM2 logs
            dirs::home_dir()?.join(".pm2/logs").join(format!("{}-out.log", service_name)),
            dirs::home_dir()?.join(".pm2/logs").join(format!("{}-error.log", service_name)),
            
            // System logs
            PathBuf::from(format!("/var/log/{}/{}.log", service_name, service_name)),
            PathBuf::from(format!("/var/log/{}.log", service_name)),
        ];

        for path in potential_paths {
            if path.exists() && path.is_file() {
                debug!("Found log file: {:?}", path);
                return Some(path);
            }
        }

        None
    }
}

#[async_trait]
impl LogStreamer for FileLogStreamer {
    async fn stream_logs(
        &self,
        service: &Service,
        tx: mpsc::UnboundedSender<LogEntry>,
    ) -> Result<()> {
        let log_path = self.find_log_file(service)
            .ok_or_else(|| anyhow::anyhow!("No log file found for service"))?;

        debug!("Starting file log stream for: {:?}", log_path);

        let mut file = File::open(&log_path).await?;
        
        // Seek to end and read last ~100 lines
        let file_len = file.metadata().await?.len();
        
        // Read last few KB to get recent lines
        let read_size = std::cmp::min(file_len, 50_000); // Read last 50KB
        let start_pos = file_len.saturating_sub(read_size);
        
        file.seek(std::io::SeekFrom::Start(start_pos)).await?;
        
        let mut reader = BufReader::new(file);
        let mut lines = vec![];
        let mut line_buffer = String::new();
        
        // Read initial lines
        while reader.read_line(&mut line_buffer).await? > 0 {
            if !line_buffer.trim().is_empty() {
                lines.push(line_buffer.clone());
            }
            line_buffer.clear();
        }
        
        // Keep only last 100 lines
        let start_idx = lines.len().saturating_sub(100);
        for line in &lines[start_idx..] {
            let entry = LogEntry::new(
                detect_log_level(line),
                line.trim().to_string(),
                LogSource::File,
            ).with_raw(line.clone());
            
            if tx.send(entry).is_err() {
                return Ok(());
            }
        }
        
        // Now tail the file for new lines
        let mut last_size = reader.get_ref().metadata().await?.len();
        
        loop {
            sleep(Duration::from_millis(500)).await;
            
            let current_size = reader.get_ref().metadata().await?.len();
            
            if current_size > last_size {
                // File grew, read new lines
                let mut new_line = String::new();
                while reader.read_line(&mut new_line).await? > 0 {
                    if !new_line.trim().is_empty() {
                        let entry = LogEntry::new(
                            detect_log_level(&new_line),
                            new_line.trim().to_string(),
                            LogSource::File,
                        ).with_raw(new_line.clone());
                        
                        if tx.send(entry).is_err() {
                            return Ok(());
                        }
                    }
                    new_line.clear();
                }
                last_size = current_size;
            } else if current_size < last_size {
                // File was truncated/rotated, reopen
                warn!("Log file was rotated, reopening: {:?}", log_path);
                file = File::open(&log_path).await?;
                reader = BufReader::new(file);
                last_size = reader.get_ref().metadata().await?.len();
            }
            
            if tx.is_closed() {
                break;
            }
        }

        debug!("File log stream ended for: {:?}", log_path);
        Ok(())
    }

    fn can_handle(&self, service: &Service) -> bool {
        self.find_log_file(service).is_some()
    }

    fn source_type(&self) -> LogSource {
        LogSource::File
    }
}

fn detect_log_level(message: &str) -> LogLevel {
    let lower = message.to_lowercase();
    
    if lower.contains("error") || lower.contains("fatal") || lower.contains("panic") {
        LogLevel::Error
    } else if lower.contains("warn") || lower.contains("warning") {
        LogLevel::Warn
    } else if lower.contains("debug") || lower.contains("trace") {
        LogLevel::Debug
    } else {
        LogLevel::Info
    }
}