use crate::logs::LogStreamer;
use crate::models::{LogEntry, LogLevel, LogSource, Service};
use anyhow::Result;
use async_trait::async_trait;
use std::path::{Path, PathBuf};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader};
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
use tracing::{debug, warn};
pub struct FileLogStreamer;
impl FileLogStreamer {
pub fn new() -> Self {
Self
}
fn find_log_file(&self, service: &Service) -> Option<PathBuf> {
if let Some(path) = &service.log_file_path {
let path = PathBuf::from(path);
if path.exists() {
return Some(path);
}
}
let service_name = service.name.to_lowercase().replace(" ", "-");
let project_path = Path::new(&service.project_path);
let potential_paths = vec![
project_path.join("logs").join(format!("{}.log", service_name)),
project_path.join("log").join(format!("{}.log", service_name)),
project_path.join(format!("{}.log", service_name)),
dirs::home_dir()?.join(".pm2/logs").join(format!("{}-out.log", service_name)),
dirs::home_dir()?.join(".pm2/logs").join(format!("{}-error.log", service_name)),
PathBuf::from(format!("/var/log/{}/{}.log", service_name, service_name)),
PathBuf::from(format!("/var/log/{}.log", service_name)),
];
for path in potential_paths {
if path.exists() && path.is_file() {
debug!("Found log file: {:?}", path);
return Some(path);
}
}
None
}
}
#[async_trait]
impl LogStreamer for FileLogStreamer {
async fn stream_logs(
&self,
service: &Service,
tx: mpsc::UnboundedSender<LogEntry>,
) -> Result<()> {
let log_path = self.find_log_file(service)
.ok_or_else(|| anyhow::anyhow!("No log file found for service"))?;
debug!("Starting file log stream for: {:?}", log_path);
let mut file = File::open(&log_path).await?;
let file_len = file.metadata().await?.len();
let read_size = std::cmp::min(file_len, 50_000); let start_pos = file_len.saturating_sub(read_size);
file.seek(std::io::SeekFrom::Start(start_pos)).await?;
let mut reader = BufReader::new(file);
let mut lines = vec![];
let mut line_buffer = String::new();
while reader.read_line(&mut line_buffer).await? > 0 {
if !line_buffer.trim().is_empty() {
lines.push(line_buffer.clone());
}
line_buffer.clear();
}
let start_idx = lines.len().saturating_sub(100);
for line in &lines[start_idx..] {
let entry = LogEntry::new(
detect_log_level(line),
line.trim().to_string(),
LogSource::File,
).with_raw(line.clone());
if tx.send(entry).is_err() {
return Ok(());
}
}
let mut last_size = reader.get_ref().metadata().await?.len();
loop {
sleep(Duration::from_millis(500)).await;
let current_size = reader.get_ref().metadata().await?.len();
if current_size > last_size {
let mut new_line = String::new();
while reader.read_line(&mut new_line).await? > 0 {
if !new_line.trim().is_empty() {
let entry = LogEntry::new(
detect_log_level(&new_line),
new_line.trim().to_string(),
LogSource::File,
).with_raw(new_line.clone());
if tx.send(entry).is_err() {
return Ok(());
}
}
new_line.clear();
}
last_size = current_size;
} else if current_size < last_size {
warn!("Log file was rotated, reopening: {:?}", log_path);
file = File::open(&log_path).await?;
reader = BufReader::new(file);
last_size = reader.get_ref().metadata().await?.len();
}
if tx.is_closed() {
break;
}
}
debug!("File log stream ended for: {:?}", log_path);
Ok(())
}
fn can_handle(&self, service: &Service) -> bool {
self.find_log_file(service).is_some()
}
fn source_type(&self) -> LogSource {
LogSource::File
}
}
fn detect_log_level(message: &str) -> LogLevel {
let lower = message.to_lowercase();
if lower.contains("error") || lower.contains("fatal") || lower.contains("panic") {
LogLevel::Error
} else if lower.contains("warn") || lower.contains("warning") {
LogLevel::Warn
} else if lower.contains("debug") || lower.contains("trace") {
LogLevel::Debug
} else {
LogLevel::Info
}
}