use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{ChildStderr, ChildStdout};
use tokio::sync::{RwLock, mpsc};
use crate::sdk::{LogLine, LogStream};
use super::graph::ServiceId;
#[derive(Debug)]
pub struct LogBuffer {
lines: VecDeque<LogLine>,
max_lines: usize,
}
impl LogBuffer {
pub fn new(max_lines: usize) -> Self {
Self {
lines: VecDeque::with_capacity(max_lines.min(1000)),
max_lines,
}
}
pub fn push(&mut self, line: LogLine) {
if self.lines.len() >= self.max_lines {
self.lines.pop_front();
}
self.lines.push_back(line);
}
pub fn all(&self) -> impl Iterator<Item = &LogLine> {
self.lines.iter()
}
pub fn last_n(&self, n: usize) -> impl Iterator<Item = &LogLine> {
let skip = self.lines.len().saturating_sub(n);
self.lines.iter().skip(skip)
}
pub fn len(&self) -> usize {
self.lines.len()
}
pub fn is_empty(&self) -> bool {
self.lines.is_empty()
}
pub fn clear(&mut self) {
self.lines.clear();
}
}
pub type LogBuffers = Arc<RwLock<HashMap<ServiceId, LogBuffer>>>;
pub fn new_log_buffers() -> LogBuffers {
Arc::new(RwLock::new(HashMap::new()))
}
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
pub async fn read_stdout(
service_id: ServiceId,
service_name: String,
stdout: ChildStdout,
log_buffers: LogBuffers,
log_shipper_tx: Option<mpsc::Sender<LogLine>>,
) {
read_stream(
service_id,
service_name,
BufReader::new(stdout),
LogStream::Stdout,
log_buffers,
log_shipper_tx,
)
.await;
}
pub async fn read_stderr(
service_id: ServiceId,
service_name: String,
stderr: ChildStderr,
log_buffers: LogBuffers,
log_shipper_tx: Option<mpsc::Sender<LogLine>>,
) {
read_stream(
service_id,
service_name,
BufReader::new(stderr),
LogStream::Stderr,
log_buffers,
log_shipper_tx,
)
.await;
}
async fn read_stream<R>(
service_id: ServiceId,
service_name: String,
reader: BufReader<R>,
stream: LogStream,
log_buffers: LogBuffers,
log_shipper_tx: Option<mpsc::Sender<LogLine>>,
) where
R: tokio::io::AsyncRead + Unpin,
{
let mut lines = reader.lines();
while let Ok(Some(content)) = lines.next_line().await {
let log_line = LogLine {
timestamp_ms: now_ms(),
service: service_name.clone(),
stream,
content,
};
{
let mut buffers = log_buffers.write().await;
let buffer = buffers
.entry(service_id)
.or_insert_with(|| LogBuffer::new(1000));
buffer.push(log_line.clone());
}
if let Some(tx) = &log_shipper_tx {
let _ = tx.try_send(log_line);
}
}
}
pub async fn get_logs(
log_buffers: &LogBuffers,
service_id: ServiceId,
lines: Option<usize>,
) -> Vec<LogLine> {
let buffers = log_buffers.read().await;
if let Some(buffer) = buffers.get(&service_id) {
match lines {
Some(n) => buffer.last_n(n).cloned().collect(),
None => buffer.all().cloned().collect(),
}
} else {
vec![]
}
}
pub async fn init_buffer(log_buffers: &LogBuffers, service_id: ServiceId, max_lines: usize) {
let mut buffers = log_buffers.write().await;
buffers
.entry(service_id)
.or_insert_with(|| LogBuffer::new(max_lines));
}
pub async fn clear_logs(log_buffers: &LogBuffers, service_id: ServiceId) {
let mut buffers = log_buffers.write().await;
if let Some(buffer) = buffers.get_mut(&service_id) {
buffer.clear();
}
}
pub async fn remove_buffer(log_buffers: &LogBuffers, service_id: ServiceId) {
let mut buffers = log_buffers.write().await;
buffers.remove(&service_id);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_log_buffer_push() {
let mut buffer = LogBuffer::new(3);
for i in 0..5 {
buffer.push(LogLine {
timestamp_ms: i as u64,
service: "test".to_string(),
stream: LogStream::Stdout,
content: format!("line {}", i),
});
}
assert_eq!(buffer.len(), 3);
let lines: Vec<_> = buffer.all().collect();
assert_eq!(lines[0].content, "line 2");
assert_eq!(lines[1].content, "line 3");
assert_eq!(lines[2].content, "line 4");
}
#[test]
fn test_log_buffer_last_n() {
let mut buffer = LogBuffer::new(10);
for i in 0..5 {
buffer.push(LogLine {
timestamp_ms: i as u64,
service: "test".to_string(),
stream: LogStream::Stdout,
content: format!("line {}", i),
});
}
let last_two: Vec<_> = buffer.last_n(2).collect();
assert_eq!(last_two.len(), 2);
assert_eq!(last_two[0].content, "line 3");
assert_eq!(last_two[1].content, "line 4");
}
#[test]
fn test_log_buffer_empty() {
let buffer = LogBuffer::new(10);
assert!(buffer.is_empty());
assert_eq!(buffer.len(), 0);
}
#[tokio::test]
async fn test_get_logs_nonexistent() {
let buffers = new_log_buffers();
let logs = get_logs(&buffers, ServiceId::new(999), None).await;
assert!(logs.is_empty());
}
#[tokio::test]
async fn test_init_and_clear_buffer() {
let buffers = new_log_buffers();
let id = ServiceId::new(1);
init_buffer(&buffers, id, 100).await;
{
let mut b = buffers.write().await;
let buffer = b.get_mut(&id).unwrap();
buffer.push(LogLine {
timestamp_ms: 0,
service: "test".to_string(),
stream: LogStream::Stdout,
content: "hello".to_string(),
});
}
let logs = get_logs(&buffers, id, None).await;
assert_eq!(logs.len(), 1);
clear_logs(&buffers, id).await;
let logs = get_logs(&buffers, id, None).await;
assert!(logs.is_empty());
}
}