use crate::logs::LogStreamer;
use crate::models::{LogEntry, LogLevel, LogSource, Service};
use anyhow::Result;
use async_trait::async_trait;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command as TokioCommand;
use tokio::sync::mpsc;
use tracing::{debug, error};
pub struct ScriptLogStreamer {
command: String,
source_label: String,
}
impl ScriptLogStreamer {
pub fn new(command: String, source_label: Option<String>) -> Self {
Self {
command,
source_label: source_label.unwrap_or_else(|| "Script".to_string()),
}
}
}
#[async_trait]
impl LogStreamer for ScriptLogStreamer {
async fn stream_logs(
&self,
service: &Service,
tx: mpsc::UnboundedSender<LogEntry>,
) -> Result<()> {
let command = self
.command
.replace("{name}", &service.name)
.replace("{host}", &service.host)
.replace("{port}", &service.port.to_string())
.replace("{pid}", &service.pid.map(|p| p.to_string()).unwrap_or_default())
.replace(
"{systemd_unit}",
&service
.systemd_unit
.as_ref()
.map(|s| s.as_str())
.unwrap_or(""),
);
let parts: Vec<&str> = command.trim().split_whitespace().collect();
if parts.is_empty() {
anyhow::bail!("Empty command");
}
let cmd = parts[0];
let args = &parts[1..];
debug!(
"Starting log streaming with command: {} {:?}",
cmd, args
);
let mut child = TokioCommand::new(cmd)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| anyhow::anyhow!("Failed to spawn command: {}", e))?;
let stdout = child.stdout.take().ok_or_else(|| anyhow::anyhow!("No stdout"))?;
let stderr = child.stderr.take().ok_or_else(|| anyhow::anyhow!("No stderr"))?;
let tx_stdout = tx.clone();
let source_label = self.source_label.clone();
tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let line_clone = line.clone();
let entry = LogEntry::new(LogLevel::Info, line, LogSource::Custom)
.with_raw(format!("[{}] {}", source_label, line_clone));
if tx_stdout.send(entry).is_err() {
break;
}
}
});
let tx_stderr = tx.clone();
let source_label = self.source_label.clone();
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let line_clone = line.clone();
let entry = LogEntry::new(LogLevel::Error, line, LogSource::Custom)
.with_raw(format!("[{}] {}", source_label, line_clone));
if tx_stderr.send(entry).is_err() {
break;
}
}
});
tokio::spawn(async move {
if let Err(e) = child.wait().await {
error!("Command process error: {}", e);
}
});
Ok(())
}
fn can_handle(&self, _service: &Service) -> bool {
true
}
fn source_type(&self) -> LogSource {
LogSource::Custom
}
}