zinit 0.3.8

Process supervisor with dependency management
Documentation
//! Syslog receiver for capturing logs from traditional daemons.
//!
//! Binds to `/dev/log` Unix socket and routes syslog messages to the
//! appropriate service's log buffer based on the program tag.

use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use tokio::net::UnixDatagram;
use tokio::sync::RwLock;

use crate::sdk::{LogLine, LogStream};

use super::graph::ServiceGraph;
use super::log::LogBuffers;

/// Parsed syslog message (RFC 3164).
#[derive(Debug, Clone)]
pub struct SyslogMessage {
    /// Syslog priority (facility * 8 + severity).
    pub priority: Option<u8>,
    /// Program tag (e.g., "sshd").
    pub tag: String,
    /// Process ID if present.
    pub pid: Option<u32>,
    /// Message content.
    pub message: String,
}

/// Parse a syslog message in RFC 3164 format.
///
/// Format: `<PRI>TIMESTAMP HOSTNAME TAG[PID]: MESSAGE`
/// or simplified: `TAG[PID]: MESSAGE`
///
/// Examples:
/// - `<38>Jan  9 17:30:00 mos sshd[529]: Connection from 10.0.0.1`
/// - `sshd[529]: Connection from 10.0.0.1`
pub fn parse_syslog(raw: &str) -> Option<SyslogMessage> {
    let mut input = raw.trim();

    // Parse optional priority: <PRI>
    let priority = if input.starts_with('<') {
        if let Some(end) = input.find('>') {
            let pri_str = &input[1..end];
            let pri = pri_str.parse::<u8>().ok();
            input = &input[end + 1..];
            pri
        } else {
            None
        }
    } else {
        None
    };

    // Skip timestamp and hostname if present (RFC 3164 format)
    // Timestamp format: "Jan  9 17:30:00" or "2026-01-09T17:30:00"
    // We look for the TAG[PID]: or TAG: pattern
    let (tag, pid, message) = parse_tag_and_message(input)?;

    Some(SyslogMessage {
        priority,
        tag,
        pid,
        message,
    })
}

/// Extract tag, optional PID, and message from the input.
/// Looks for patterns like: `sshd[529]: message` or `sshd: message`
fn parse_tag_and_message(input: &str) -> Option<(String, Option<u32>, String)> {
    // Find the colon that separates tag from message
    let colon_pos = input.find(": ")?;

    let tag_part = &input[..colon_pos];
    let message = input[colon_pos + 2..].to_string();

    // Check for PID in brackets: tag[pid]
    if let Some(bracket_start) = tag_part.rfind('[')
        && let Some(bracket_end) = tag_part.rfind(']')
        && bracket_end > bracket_start
    {
        let tag = extract_tag(&tag_part[..bracket_start]);
        let pid_str = &tag_part[bracket_start + 1..bracket_end];
        let pid = pid_str.parse::<u32>().ok();
        return Some((tag, pid, message));
    }

    // No PID, just tag
    let tag = extract_tag(tag_part);
    Some((tag, None, message))
}

/// Extract the actual tag from potentially prefixed input.
/// Handles cases like "Jan  9 17:30:00 hostname tag" -> "tag"
fn extract_tag(input: &str) -> String {
    // Take the last whitespace-separated token as the tag
    input.split_whitespace().last().unwrap_or(input).to_string()
}

/// Get the current timestamp in milliseconds.
fn now_ms() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_millis() as u64)
        .unwrap_or(0)
}

/// Syslog receiver that captures messages from `/dev/log`.
pub struct SyslogReceiver {
    socket_path: PathBuf,
    graph: Arc<RwLock<ServiceGraph>>,
    log_buffers: LogBuffers,
}

impl SyslogReceiver {
    /// Create a new syslog receiver.
    pub fn new(
        socket_path: PathBuf,
        graph: Arc<RwLock<ServiceGraph>>,
        log_buffers: LogBuffers,
    ) -> Self {
        Self {
            socket_path,
            graph,
            log_buffers,
        }
    }

    /// Run the syslog receiver loop.
    ///
    /// This binds to the socket path and continuously receives syslog messages,
    /// routing them to the appropriate service's log buffer.
    pub async fn run(&self) -> std::io::Result<()> {
        // Remove stale socket if it exists
        let _ = std::fs::remove_file(&self.socket_path);

        // Ensure parent directory exists
        if let Some(parent) = self.socket_path.parent() {
            std::fs::create_dir_all(parent)?;
        }

        // Bind the Unix datagram socket
        let socket = UnixDatagram::bind(&self.socket_path)?;

        // Set permissions so all processes can write (0666)
        #[cfg(unix)]
        {
            use std::os::unix::fs::PermissionsExt;
            std::fs::set_permissions(&self.socket_path, std::fs::Permissions::from_mode(0o666))?;
        }

        tracing::info!(path = %self.socket_path.display(), "syslog receiver listening");

        // Receive loop
        let mut buf = [0u8; 8192];
        loop {
            match socket.recv(&mut buf).await {
                Ok(n) => {
                    if let Ok(msg) = std::str::from_utf8(&buf[..n]) {
                        self.handle_message(msg).await;
                    }
                }
                Err(e) => {
                    tracing::error!(error = %e, "syslog recv error");
                    // Brief pause before retrying
                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                }
            }
        }
    }

    /// Handle a single syslog message.
    async fn handle_message(&self, raw: &str) {
        let parsed = match parse_syslog(raw) {
            Some(p) => p,
            None => {
                tracing::trace!(raw = %raw, "failed to parse syslog message");
                return;
            }
        };

        // Try to match the tag to a known service
        let service_id = {
            let graph = self.graph.read().await;
            graph.get_by_name(&parsed.tag)
        };

        match service_id {
            Some(id) => {
                let log_line = LogLine {
                    timestamp_ms: now_ms(),
                    service: parsed.tag.clone(),
                    stream: LogStream::Syslog,
                    content: parsed.message,
                };

                let mut buffers = self.log_buffers.write().await;
                if let Some(buffer) = buffers.get_mut(&id) {
                    buffer.push(log_line);
                }

                tracing::trace!(
                    service = %parsed.tag,
                    "routed syslog message"
                );
            }
            None => {
                // Unmatched message - log at debug level, don't store
                tracing::debug!(
                    tag = %parsed.tag,
                    message = %parsed.message,
                    "unmatched syslog message"
                );
            }
        }
    }
}

/// Check if syslog receiver should be enabled.
///
/// Returns true if running as PID 1 or in PID 1 mode, and not explicitly disabled.
pub fn should_enable_syslog(pid1_mode: bool) -> bool {
    // Check for explicit disable via environment
    if std::env::var("ZINIT_SYSLOG")
        .map(|v| v == "0")
        .unwrap_or(false)
    {
        return false;
    }

    // Enable if running as PID 1 or in PID 1 mode
    std::process::id() == 1 || pid1_mode
}

/// Default syslog socket path.
pub fn default_socket_path() -> PathBuf {
    PathBuf::from("/dev/log")
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_simple_message() {
        let msg = parse_syslog("sshd[529]: Connection from 10.0.0.1").unwrap();
        assert_eq!(msg.tag, "sshd");
        assert_eq!(msg.pid, Some(529));
        assert_eq!(msg.message, "Connection from 10.0.0.1");
        assert_eq!(msg.priority, None);
    }

    #[test]
    fn test_parse_with_priority() {
        let msg = parse_syslog("<38>sshd[529]: Connection from 10.0.0.1").unwrap();
        assert_eq!(msg.priority, Some(38));
        assert_eq!(msg.tag, "sshd");
        assert_eq!(msg.pid, Some(529));
        assert_eq!(msg.message, "Connection from 10.0.0.1");
    }

    #[test]
    fn test_parse_full_rfc3164() {
        let msg =
            parse_syslog("<38>Jan  9 17:30:00 mos sshd[529]: Connection from 10.0.0.1").unwrap();
        assert_eq!(msg.priority, Some(38));
        assert_eq!(msg.tag, "sshd");
        assert_eq!(msg.pid, Some(529));
        assert_eq!(msg.message, "Connection from 10.0.0.1");
    }

    #[test]
    fn test_parse_no_pid() {
        let msg = parse_syslog("kernel: some kernel message").unwrap();
        assert_eq!(msg.tag, "kernel");
        assert_eq!(msg.pid, None);
        assert_eq!(msg.message, "some kernel message");
    }

    #[test]
    fn test_parse_with_timestamp_no_priority() {
        let msg = parse_syslog("Jan  9 17:30:00 hostname myapp[123]: hello world").unwrap();
        assert_eq!(msg.tag, "myapp");
        assert_eq!(msg.pid, Some(123));
        assert_eq!(msg.message, "hello world");
    }

    #[test]
    fn test_parse_invalid() {
        assert!(parse_syslog("").is_none());
        assert!(parse_syslog("no colon here").is_none());
    }

    #[test]
    fn test_should_enable_syslog() {
        // Not PID 1 and not in PID 1 mode
        assert!(!should_enable_syslog(false));

        // In PID 1 mode
        assert!(should_enable_syslog(true));
    }
}