darpan 0.2.5

Linux developer service monitoring utility with auto-detection, real-time health checks, and interactive TUI for databases, APIs, Docker containers, and more
Documentation
use crate::logs::LogStreamer;
use crate::models::{LogEntry, LogSource, Service};
use anyhow::Result;
use tokio::sync::mpsc;
use tracing::debug;

/// Registry for log streamers
pub struct LogStreamerRegistry {
    streamers: Vec<(String, Box<dyn LogStreamer>)>,
}

impl LogStreamerRegistry {
    pub fn new() -> Self {
        Self {
            streamers: Vec::new(),
        }
    }

    /// Register a log streamer with a unique identifier
    /// Streamers are tried in registration order (first match wins)
    pub fn register(&mut self, id: String, streamer: Box<dyn LogStreamer>) {
        debug!("Registering log streamer: {}", id);
        self.streamers.push((id, streamer));
    }

    /// Start streaming logs for a service using registered streamers
    pub async fn start_streaming(
        &self,
        service: &Service,
    ) -> Result<mpsc::UnboundedReceiver<LogEntry>> {
        let (tx, rx) = mpsc::unbounded_channel();

        // Try streamers in registration order
        for (id, streamer) in &self.streamers {
            if streamer.can_handle(service) {
                debug!("Using log streamer '{}' for service '{}'", id, service.name);
                let service_clone = service.clone();
                let tx_clone = tx.clone();
                
                // Clone the streamer's source type to determine how to spawn
                let source_type = streamer.source_type();
                
                // Spawn the streaming task based on source type
                // Note: We need to create new instances for each spawn since we can't clone trait objects
                match source_type {
                    LogSource::Docker => {
                        let docker_streamer = crate::logs::docker::DockerLogStreamer::new();
                        tokio::spawn(async move {
                            if let Err(e) = docker_streamer.stream_logs(&service_clone, tx_clone).await {
                                tracing::error!("Docker log streaming error for {}: {}", service_clone.name, e);
                            }
                        });
                    }
                    LogSource::Systemd => {
                        let systemd_streamer = crate::logs::systemd::SystemdLogStreamer::new();
                        tokio::spawn(async move {
                            if let Err(e) = systemd_streamer.stream_logs(&service_clone, tx_clone).await {
                                tracing::error!("Systemd log streaming error for {}: {}", service_clone.name, e);
                            }
                        });
                    }
                    LogSource::File => {
                        let file_streamer = crate::logs::file::FileLogStreamer::new();
                        tokio::spawn(async move {
                            if let Err(e) = file_streamer.stream_logs(&service_clone, tx_clone).await {
                                tracing::error!("File log streaming error for {}: {}", service_clone.name, e);
                            }
                        });
                    }
                    LogSource::Process => {
                        let process_streamer = crate::logs::process::ProcessLogStreamer::new();
                        tokio::spawn(async move {
                            if let Err(e) = process_streamer.stream_logs(&service_clone, tx_clone).await {
                                tracing::error!("Process log streaming error for {}: {}", service_clone.name, e);
                            }
                        });
                    }
                    LogSource::Custom => {
                        // For custom streamers, we need to call the trait method directly
                        // This requires cloning the service and channel
                        let streamer_id = id.clone();
                        tokio::spawn(async move {
                            // We can't easily call stream_logs here without cloning the streamer
                            // This is a limitation - custom streamers should handle their own spawning
                            tracing::warn!("Custom log streamer '{}' needs to handle spawning internally", streamer_id);
                        });
                    }
                }

                return Ok(rx);
            }
        }

        // No streamer found
        anyhow::bail!("No log streamer available for service: {}", service.name)
    }

    /// Get list of all registered streamer IDs
    pub fn registered_ids(&self) -> Vec<String> {
        self.streamers.iter().map(|(id, _)| id.clone()).collect()
    }
}

impl Default for LogStreamerRegistry {
    fn default() -> Self {
        Self::new()
    }
}