darpan 0.2.5

Linux developer service monitoring utility with auto-detection, real-time health checks, and interactive TUI for databases, APIs, Docker containers, and more
Documentation
use crate::logs::LogStreamer;
use crate::models::{LogEntry, LogLevel, LogSource, Service};
use anyhow::Result;
use async_trait::async_trait;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command as TokioCommand;
use tokio::sync::mpsc;
use tracing::{debug, error};

/// Log streamer that executes a command and streams its output
pub struct ScriptLogStreamer {
    command: String,
    source_label: String,
}

impl ScriptLogStreamer {
    pub fn new(command: String, source_label: Option<String>) -> Self {
        Self {
            command,
            source_label: source_label.unwrap_or_else(|| "Script".to_string()),
        }
    }
}

#[async_trait]
impl LogStreamer for ScriptLogStreamer {
    async fn stream_logs(
        &self,
        service: &Service,
        tx: mpsc::UnboundedSender<LogEntry>,
    ) -> Result<()> {
        // Replace placeholders in command
        let command = self
            .command
            .replace("{name}", &service.name)
            .replace("{host}", &service.host)
            .replace("{port}", &service.port.to_string())
            .replace("{pid}", &service.pid.map(|p| p.to_string()).unwrap_or_default())
            .replace(
                "{systemd_unit}",
                &service
                    .systemd_unit
                    .as_ref()
                    .map(|s| s.as_str())
                    .unwrap_or(""),
            );

        // Parse command
        let parts: Vec<&str> = command.trim().split_whitespace().collect();
        if parts.is_empty() {
            anyhow::bail!("Empty command");
        }

        let cmd = parts[0];
        let args = &parts[1..];

        debug!(
            "Starting log streaming with command: {} {:?}",
            cmd, args
        );

        // Spawn command
        let mut child = TokioCommand::new(cmd)
            .args(args)
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .spawn()
            .map_err(|e| anyhow::anyhow!("Failed to spawn command: {}", e))?;

        let stdout = child.stdout.take().ok_or_else(|| anyhow::anyhow!("No stdout"))?;
        let stderr = child.stderr.take().ok_or_else(|| anyhow::anyhow!("No stderr"))?;

        // Stream stdout
        let tx_stdout = tx.clone();
        let source_label = self.source_label.clone();
        tokio::spawn(async move {
            let reader = BufReader::new(stdout);
            let mut lines = reader.lines();

            while let Ok(Some(line)) = lines.next_line().await {
                let line_clone = line.clone();
                let entry = LogEntry::new(LogLevel::Info, line, LogSource::Custom)
                    .with_raw(format!("[{}] {}", source_label, line_clone));
                if tx_stdout.send(entry).is_err() {
                    break;
                }
            }
        });

        // Stream stderr
        let tx_stderr = tx.clone();
        let source_label = self.source_label.clone();
        tokio::spawn(async move {
            let reader = BufReader::new(stderr);
            let mut lines = reader.lines();

            while let Ok(Some(line)) = lines.next_line().await {
                let line_clone = line.clone();
                let entry = LogEntry::new(LogLevel::Error, line, LogSource::Custom)
                    .with_raw(format!("[{}] {}", source_label, line_clone));
                if tx_stderr.send(entry).is_err() {
                    break;
                }
            }
        });

        // Wait for process to finish (or continue streaming if it's a long-running command)
        tokio::spawn(async move {
            if let Err(e) = child.wait().await {
                error!("Command process error: {}", e);
            }
        });

        Ok(())
    }

    fn can_handle(&self, _service: &Service) -> bool {
        // Script streamers are configured per-service, so they handle all services
        // The actual matching is done by configuration
        true
    }

    fn source_type(&self) -> LogSource {
        LogSource::Custom
    }
}