use crate::logs::LogStreamer;
use crate::models::{LogEntry, LogLevel, LogSource, Service};
use anyhow::Result;
use async_trait::async_trait;
use chrono::Utc;
use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;
use tracing::{debug, warn};
pub struct SystemdLogStreamer;
impl SystemdLogStreamer {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl LogStreamer for SystemdLogStreamer {
async fn stream_logs(
&self,
service: &Service,
tx: mpsc::UnboundedSender<LogEntry>,
) -> Result<()> {
let unit = service.systemd_unit.as_ref()
.ok_or_else(|| anyhow::anyhow!("No systemd unit for service"))?;
debug!("Starting systemd log stream for unit: {}", unit);
let mut child = Command::new("journalctl")
.arg("-u")
.arg(unit)
.arg("-f")
.arg("-n")
.arg("100")
.arg("--output=json")
.stdout(std::process::Stdio::piped())
.spawn()?;
let stdout = child.stdout.take()
.ok_or_else(|| anyhow::anyhow!("Failed to capture stdout"))?;
let mut reader = BufReader::new(stdout).lines();
while let Some(line) = reader.next_line().await? {
match parse_journalctl_json(&line) {
Ok(entry) => {
if tx.send(entry).is_err() {
debug!("Log receiver dropped, stopping systemd log stream");
break;
}
}
Err(e) => {
warn!("Failed to parse journalctl JSON: {}", e);
}
}
}
debug!("Systemd log stream ended for unit: {}", unit);
Ok(())
}
fn can_handle(&self, service: &Service) -> bool {
service.systemd_unit.is_some()
}
fn source_type(&self) -> LogSource {
LogSource::Systemd
}
}
#[derive(Deserialize)]
struct JournalEntry {
#[serde(rename = "MESSAGE")]
message: Option<String>,
#[serde(rename = "PRIORITY")]
priority: Option<String>,
#[serde(rename = "__REALTIME_TIMESTAMP")]
timestamp: Option<String>,
}
fn parse_journalctl_json(json: &str) -> Result<LogEntry> {
let journal: JournalEntry = serde_json::from_str(json)?;
let message = journal.message.unwrap_or_default();
let priority = journal.priority
.and_then(|p| p.parse::<i32>().ok())
.unwrap_or(6);
let level = LogLevel::from_priority(priority);
let timestamp = if let Some(ts_str) = journal.timestamp {
if let Ok(micros) = ts_str.parse::<i64>() {
chrono::DateTime::from_timestamp_micros(micros)
.unwrap_or_else(|| Utc::now())
} else {
Utc::now()
}
} else {
Utc::now()
};
Ok(LogEntry::new(level, message.clone(), LogSource::Systemd)
.with_timestamp(timestamp)
.with_raw(message))
}