darpan 0.2.5

Linux developer service monitoring utility with auto-detection, real-time health checks, and interactive TUI for databases, APIs, Docker containers, and more
Documentation
use crate::logs::LogStreamer;
use crate::models::{LogEntry, LogLevel, LogSource, Service};
use anyhow::Result;
use async_trait::async_trait;
use chrono::Utc;
use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;
use tracing::{debug, warn};

pub struct SystemdLogStreamer;

impl SystemdLogStreamer {
    pub fn new() -> Self {
        Self
    }
}

#[async_trait]
impl LogStreamer for SystemdLogStreamer {
    async fn stream_logs(
        &self,
        service: &Service,
        tx: mpsc::UnboundedSender<LogEntry>,
    ) -> Result<()> {
        let unit = service.systemd_unit.as_ref()
            .ok_or_else(|| anyhow::anyhow!("No systemd unit for service"))?;

        debug!("Starting systemd log stream for unit: {}", unit);

        // Run journalctl with JSON output
        let mut child = Command::new("journalctl")
            .arg("-u")
            .arg(unit)
            .arg("-f")
            .arg("-n")
            .arg("100")
            .arg("--output=json")
            .stdout(std::process::Stdio::piped())
            .spawn()?;

        let stdout = child.stdout.take()
            .ok_or_else(|| anyhow::anyhow!("Failed to capture stdout"))?;

        let mut reader = BufReader::new(stdout).lines();

        while let Some(line) = reader.next_line().await? {
            match parse_journalctl_json(&line) {
                Ok(entry) => {
                    if tx.send(entry).is_err() {
                        debug!("Log receiver dropped, stopping systemd log stream");
                        break;
                    }
                }
                Err(e) => {
                    warn!("Failed to parse journalctl JSON: {}", e);
                }
            }
        }

        debug!("Systemd log stream ended for unit: {}", unit);
        Ok(())
    }

    fn can_handle(&self, service: &Service) -> bool {
        service.systemd_unit.is_some()
    }

    fn source_type(&self) -> LogSource {
        LogSource::Systemd
    }
}

#[derive(Deserialize)]
struct JournalEntry {
    #[serde(rename = "MESSAGE")]
    message: Option<String>,
    #[serde(rename = "PRIORITY")]
    priority: Option<String>,
    #[serde(rename = "__REALTIME_TIMESTAMP")]
    timestamp: Option<String>,
}

fn parse_journalctl_json(json: &str) -> Result<LogEntry> {
    let journal: JournalEntry = serde_json::from_str(json)?;

    let message = journal.message.unwrap_or_default();
    
    let priority = journal.priority
        .and_then(|p| p.parse::<i32>().ok())
        .unwrap_or(6);
    let level = LogLevel::from_priority(priority);

    // Parse microsecond timestamp
    let timestamp = if let Some(ts_str) = journal.timestamp {
        if let Ok(micros) = ts_str.parse::<i64>() {
            chrono::DateTime::from_timestamp_micros(micros)
                .unwrap_or_else(|| Utc::now())
        } else {
            Utc::now()
        }
    } else {
        Utc::now()
    };

    Ok(LogEntry::new(level, message.clone(), LogSource::Systemd)
        .with_timestamp(timestamp)
        .with_raw(message))
}