pub mod docker;
pub mod systemd;
pub mod process;
pub mod file;
pub mod export;
use crate::models::{LogEntry, LogSource, Service};
use anyhow::Result;
use std::collections::VecDeque;
use tokio::sync::mpsc;
#[async_trait::async_trait]
pub trait LogStreamer: Send + Sync {
async fn stream_logs(
&self,
service: &Service,
tx: mpsc::UnboundedSender<LogEntry>,
) -> Result<()>;
fn can_handle(&self, service: &Service) -> bool;
fn source_type(&self) -> LogSource;
}
pub struct LogStreamManager {
streamers: Vec<Box<dyn LogStreamer>>,
}
impl LogStreamManager {
pub fn new() -> Self {
let mut streamers: Vec<Box<dyn LogStreamer>> = Vec::new();
streamers.push(Box::new(docker::DockerLogStreamer::new()));
streamers.push(Box::new(systemd::SystemdLogStreamer::new()));
streamers.push(Box::new(file::FileLogStreamer::new()));
streamers.push(Box::new(process::ProcessLogStreamer::new()));
Self { streamers }
}
pub async fn start_streaming(
&self,
service: &Service,
) -> Result<mpsc::UnboundedReceiver<LogEntry>> {
let (tx, rx) = mpsc::unbounded_channel();
for streamer in &self.streamers {
if streamer.can_handle(service) {
let service_clone = service.clone();
let tx_clone = tx.clone();
match streamer.source_type() {
LogSource::Docker => {
let docker_streamer = docker::DockerLogStreamer::new();
tokio::spawn(async move {
if let Err(e) = docker_streamer.stream_logs(&service_clone, tx_clone).await {
tracing::error!("Docker log streaming error for {}: {}", service_clone.name, e);
}
});
}
LogSource::Systemd => {
let systemd_streamer = systemd::SystemdLogStreamer::new();
tokio::spawn(async move {
if let Err(e) = systemd_streamer.stream_logs(&service_clone, tx_clone).await {
tracing::error!("Systemd log streaming error for {}: {}", service_clone.name, e);
}
});
}
LogSource::File => {
let file_streamer = file::FileLogStreamer::new();
tokio::spawn(async move {
if let Err(e) = file_streamer.stream_logs(&service_clone, tx_clone).await {
tracing::error!("File log streaming error for {}: {}", service_clone.name, e);
}
});
}
LogSource::Process => {
let process_streamer = process::ProcessLogStreamer::new();
tokio::spawn(async move {
if let Err(e) = process_streamer.stream_logs(&service_clone, tx_clone).await {
tracing::error!("Process log streaming error for {}: {}", service_clone.name, e);
}
});
}
}
return Ok(rx);
}
}
anyhow::bail!("No log streamer available for service: {}", service.name)
}
}
pub struct LogBuffer {
entries: VecDeque<LogEntry>,
max_size: usize,
}
impl LogBuffer {
pub fn new(max_size: usize) -> Self {
Self {
entries: VecDeque::with_capacity(max_size),
max_size,
}
}
pub fn push(&mut self, entry: LogEntry) {
if self.entries.len() >= self.max_size {
self.entries.pop_front();
}
self.entries.push_back(entry);
}
pub fn entries(&self) -> &VecDeque<LogEntry> {
&self.entries
}
pub fn clear(&mut self) {
self.entries.clear();
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}