zinit 0.3.7

Process supervisor with dependency management
Documentation
//! Log buffering and streaming for service output.

use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{ChildStderr, ChildStdout};
use tokio::sync::{RwLock, mpsc};

use crate::sdk::{LogLine, LogStream};

use super::graph::ServiceId;

/// Ring buffer for log lines.
#[derive(Debug)]
pub struct LogBuffer {
    lines: VecDeque<LogLine>,
    max_lines: usize,
}

impl LogBuffer {
    /// Create a new log buffer with the specified capacity.
    pub fn new(max_lines: usize) -> Self {
        Self {
            lines: VecDeque::with_capacity(max_lines.min(1000)),
            max_lines,
        }
    }

    /// Push a log line to the buffer.
    pub fn push(&mut self, line: LogLine) {
        if self.lines.len() >= self.max_lines {
            self.lines.pop_front();
        }
        self.lines.push_back(line);
    }

    /// Get all log lines.
    pub fn all(&self) -> impl Iterator<Item = &LogLine> {
        self.lines.iter()
    }

    /// Get the last N log lines.
    pub fn last_n(&self, n: usize) -> impl Iterator<Item = &LogLine> {
        let skip = self.lines.len().saturating_sub(n);
        self.lines.iter().skip(skip)
    }

    /// Get the number of log lines.
    pub fn len(&self) -> usize {
        self.lines.len()
    }

    /// Check if the buffer is empty.
    pub fn is_empty(&self) -> bool {
        self.lines.is_empty()
    }

    /// Clear all log lines.
    pub fn clear(&mut self) {
        self.lines.clear();
    }
}

/// Type alias for the shared log buffer storage.
pub type LogBuffers = Arc<RwLock<HashMap<ServiceId, LogBuffer>>>;

/// Create a new shared log buffer storage.
pub fn new_log_buffers() -> LogBuffers {
    Arc::new(RwLock::new(HashMap::new()))
}

/// Get the current timestamp in milliseconds.
fn now_ms() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_millis() as u64)
        .unwrap_or(0)
}

/// Read stdout from a process and buffer the output.
pub async fn read_stdout(
    service_id: ServiceId,
    service_name: String,
    stdout: ChildStdout,
    log_buffers: LogBuffers,
    log_shipper_tx: Option<mpsc::Sender<LogLine>>,
) {
    read_stream(
        service_id,
        service_name,
        BufReader::new(stdout),
        LogStream::Stdout,
        log_buffers,
        log_shipper_tx,
    )
    .await;
}

/// Read stderr from a process and buffer the output.
pub async fn read_stderr(
    service_id: ServiceId,
    service_name: String,
    stderr: ChildStderr,
    log_buffers: LogBuffers,
    log_shipper_tx: Option<mpsc::Sender<LogLine>>,
) {
    read_stream(
        service_id,
        service_name,
        BufReader::new(stderr),
        LogStream::Stderr,
        log_buffers,
        log_shipper_tx,
    )
    .await;
}

/// Read from an async reader and buffer lines.
async fn read_stream<R>(
    service_id: ServiceId,
    service_name: String,
    reader: BufReader<R>,
    stream: LogStream,
    log_buffers: LogBuffers,
    log_shipper_tx: Option<mpsc::Sender<LogLine>>,
) where
    R: tokio::io::AsyncRead + Unpin,
{
    let mut lines = reader.lines();

    while let Ok(Some(content)) = lines.next_line().await {
        let log_line = LogLine {
            timestamp_ms: now_ms(),
            service: service_name.clone(),
            stream,
            content,
        };

        // Write to buffer
        {
            let mut buffers = log_buffers.write().await;
            let buffer = buffers
                .entry(service_id)
                .or_insert_with(|| LogBuffer::new(1000));
            buffer.push(log_line.clone());
        }

        // Send to shipper if configured (fire and forget)
        if let Some(tx) = &log_shipper_tx {
            let _ = tx.try_send(log_line);
        }
    }
}

/// Get logs for a service.
pub async fn get_logs(
    log_buffers: &LogBuffers,
    service_id: ServiceId,
    lines: Option<usize>,
) -> Vec<LogLine> {
    let buffers = log_buffers.read().await;
    if let Some(buffer) = buffers.get(&service_id) {
        match lines {
            Some(n) => buffer.last_n(n).cloned().collect(),
            None => buffer.all().cloned().collect(),
        }
    } else {
        vec![]
    }
}

/// Initialize a log buffer for a service.
pub async fn init_buffer(log_buffers: &LogBuffers, service_id: ServiceId, max_lines: usize) {
    let mut buffers = log_buffers.write().await;
    buffers
        .entry(service_id)
        .or_insert_with(|| LogBuffer::new(max_lines));
}

/// Clear logs for a service.
pub async fn clear_logs(log_buffers: &LogBuffers, service_id: ServiceId) {
    let mut buffers = log_buffers.write().await;
    if let Some(buffer) = buffers.get_mut(&service_id) {
        buffer.clear();
    }
}

/// Remove log buffer for a service.
pub async fn remove_buffer(log_buffers: &LogBuffers, service_id: ServiceId) {
    let mut buffers = log_buffers.write().await;
    buffers.remove(&service_id);
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_log_buffer_push() {
        let mut buffer = LogBuffer::new(3);

        for i in 0..5 {
            buffer.push(LogLine {
                timestamp_ms: i as u64,
                service: "test".to_string(),
                stream: LogStream::Stdout,
                content: format!("line {}", i),
            });
        }

        // Should only keep last 3
        assert_eq!(buffer.len(), 3);
        let lines: Vec<_> = buffer.all().collect();
        assert_eq!(lines[0].content, "line 2");
        assert_eq!(lines[1].content, "line 3");
        assert_eq!(lines[2].content, "line 4");
    }

    #[test]
    fn test_log_buffer_last_n() {
        let mut buffer = LogBuffer::new(10);

        for i in 0..5 {
            buffer.push(LogLine {
                timestamp_ms: i as u64,
                service: "test".to_string(),
                stream: LogStream::Stdout,
                content: format!("line {}", i),
            });
        }

        let last_two: Vec<_> = buffer.last_n(2).collect();
        assert_eq!(last_two.len(), 2);
        assert_eq!(last_two[0].content, "line 3");
        assert_eq!(last_two[1].content, "line 4");
    }

    #[test]
    fn test_log_buffer_empty() {
        let buffer = LogBuffer::new(10);
        assert!(buffer.is_empty());
        assert_eq!(buffer.len(), 0);
    }

    #[tokio::test]
    async fn test_get_logs_nonexistent() {
        let buffers = new_log_buffers();
        let logs = get_logs(&buffers, ServiceId::new(999), None).await;
        assert!(logs.is_empty());
    }

    #[tokio::test]
    async fn test_init_and_clear_buffer() {
        let buffers = new_log_buffers();
        let id = ServiceId::new(1);

        init_buffer(&buffers, id, 100).await;

        {
            let mut b = buffers.write().await;
            let buffer = b.get_mut(&id).unwrap();
            buffer.push(LogLine {
                timestamp_ms: 0,
                service: "test".to_string(),
                stream: LogStream::Stdout,
                content: "hello".to_string(),
            });
        }

        let logs = get_logs(&buffers, id, None).await;
        assert_eq!(logs.len(), 1);

        clear_logs(&buffers, id).await;

        let logs = get_logs(&buffers, id, None).await;
        assert!(logs.is_empty());
    }
}