darpan 0.2.4

Linux developer service monitoring utility with auto-detection, real-time health checks, and interactive TUI for databases, APIs, Docker containers, and more
Documentation
pub mod docker;
pub mod systemd;
pub mod process;
pub mod file;
pub mod export;

use crate::models::{LogEntry, LogSource, Service};
use anyhow::Result;
use std::collections::VecDeque;
use tokio::sync::mpsc;

/// Trait for log stream implementations
#[async_trait::async_trait]
pub trait LogStreamer: Send + Sync {
    /// Start streaming logs for a service
    async fn stream_logs(
        &self,
        service: &Service,
        tx: mpsc::UnboundedSender<LogEntry>,
    ) -> Result<()>;

    /// Check if this streamer can handle the given service
    fn can_handle(&self, service: &Service) -> bool;

    /// Get the log source type
    fn source_type(&self) -> LogSource;
}

/// Manager for coordinating different log streamers
pub struct LogStreamManager {
    streamers: Vec<Box<dyn LogStreamer>>,
}

impl LogStreamManager {
    pub fn new() -> Self {
        let mut streamers: Vec<Box<dyn LogStreamer>> = Vec::new();
        
        // Add streamers in priority order
        streamers.push(Box::new(docker::DockerLogStreamer::new()));
        streamers.push(Box::new(systemd::SystemdLogStreamer::new()));
        streamers.push(Box::new(file::FileLogStreamer::new()));
        streamers.push(Box::new(process::ProcessLogStreamer::new()));
        
        Self { streamers }
    }

    /// Start streaming logs for a service
    /// Returns a receiver for log entries
    pub async fn start_streaming(
        &self,
        service: &Service,
    ) -> Result<mpsc::UnboundedReceiver<LogEntry>> {
        let (tx, rx) = mpsc::unbounded_channel();

        // Find the first streamer that can handle this service
        for streamer in &self.streamers {
            if streamer.can_handle(service) {
                let service_clone = service.clone();
                let tx_clone = tx.clone();
                
                // Spawn the streaming task based on source type
                match streamer.source_type() {
                    LogSource::Docker => {
                        let docker_streamer = docker::DockerLogStreamer::new();
                        tokio::spawn(async move {
                            if let Err(e) = docker_streamer.stream_logs(&service_clone, tx_clone).await {
                                tracing::error!("Docker log streaming error for {}: {}", service_clone.name, e);
                            }
                        });
                    }
                    LogSource::Systemd => {
                        let systemd_streamer = systemd::SystemdLogStreamer::new();
                        tokio::spawn(async move {
                            if let Err(e) = systemd_streamer.stream_logs(&service_clone, tx_clone).await {
                                tracing::error!("Systemd log streaming error for {}: {}", service_clone.name, e);
                            }
                        });
                    }
                    LogSource::File => {
                        let file_streamer = file::FileLogStreamer::new();
                        tokio::spawn(async move {
                            if let Err(e) = file_streamer.stream_logs(&service_clone, tx_clone).await {
                                tracing::error!("File log streaming error for {}: {}", service_clone.name, e);
                            }
                        });
                    }
                    LogSource::Process => {
                        let process_streamer = process::ProcessLogStreamer::new();
                        tokio::spawn(async move {
                            if let Err(e) = process_streamer.stream_logs(&service_clone, tx_clone).await {
                                tracing::error!("Process log streaming error for {}: {}", service_clone.name, e);
                            }
                        });
                    }
                }
                
                return Ok(rx);
            }
        }

        // No streamer found, return error
        anyhow::bail!("No log streamer available for service: {}", service.name)
    }
}

/// A buffer for storing log entries
pub struct LogBuffer {
    entries: VecDeque<LogEntry>,
    max_size: usize,
}

impl LogBuffer {
    pub fn new(max_size: usize) -> Self {
        Self {
            entries: VecDeque::with_capacity(max_size),
            max_size,
        }
    }

    pub fn push(&mut self, entry: LogEntry) {
        if self.entries.len() >= self.max_size {
            self.entries.pop_front();
        }
        self.entries.push_back(entry);
    }

    pub fn entries(&self) -> &VecDeque<LogEntry> {
        &self.entries
    }

    pub fn clear(&mut self) {
        self.entries.clear();
    }

    pub fn len(&self) -> usize {
        self.entries.len()
    }

    pub fn is_empty(&self) -> bool {
        self.entries.is_empty()
    }
}