use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::net::UnixDatagram;
use tokio::sync::RwLock;
use crate::sdk::{LogLine, LogStream};
use super::graph::ServiceGraph;
use super::log::LogBuffers;
#[derive(Debug, Clone)]
pub struct SyslogMessage {
pub priority: Option<u8>,
pub tag: String,
pub pid: Option<u32>,
pub message: String,
}
pub fn parse_syslog(raw: &str) -> Option<SyslogMessage> {
let mut input = raw.trim();
let priority = if input.starts_with('<') {
if let Some(end) = input.find('>') {
let pri_str = &input[1..end];
let pri = pri_str.parse::<u8>().ok();
input = &input[end + 1..];
pri
} else {
None
}
} else {
None
};
let (tag, pid, message) = parse_tag_and_message(input)?;
Some(SyslogMessage {
priority,
tag,
pid,
message,
})
}
fn parse_tag_and_message(input: &str) -> Option<(String, Option<u32>, String)> {
let colon_pos = input.find(": ")?;
let tag_part = &input[..colon_pos];
let message = input[colon_pos + 2..].to_string();
if let Some(bracket_start) = tag_part.rfind('[')
&& let Some(bracket_end) = tag_part.rfind(']')
&& bracket_end > bracket_start
{
let tag = extract_tag(&tag_part[..bracket_start]);
let pid_str = &tag_part[bracket_start + 1..bracket_end];
let pid = pid_str.parse::<u32>().ok();
return Some((tag, pid, message));
}
let tag = extract_tag(tag_part);
Some((tag, None, message))
}
fn extract_tag(input: &str) -> String {
input.split_whitespace().last().unwrap_or(input).to_string()
}
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
pub struct SyslogReceiver {
socket_path: PathBuf,
graph: Arc<RwLock<ServiceGraph>>,
log_buffers: LogBuffers,
}
impl SyslogReceiver {
pub fn new(
socket_path: PathBuf,
graph: Arc<RwLock<ServiceGraph>>,
log_buffers: LogBuffers,
) -> Self {
Self {
socket_path,
graph,
log_buffers,
}
}
pub async fn run(&self) -> std::io::Result<()> {
let _ = std::fs::remove_file(&self.socket_path);
if let Some(parent) = self.socket_path.parent() {
std::fs::create_dir_all(parent)?;
}
let socket = UnixDatagram::bind(&self.socket_path)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&self.socket_path, std::fs::Permissions::from_mode(0o666))?;
}
tracing::info!(path = %self.socket_path.display(), "syslog receiver listening");
let mut buf = [0u8; 8192];
loop {
match socket.recv(&mut buf).await {
Ok(n) => {
if let Ok(msg) = std::str::from_utf8(&buf[..n]) {
self.handle_message(msg).await;
}
}
Err(e) => {
tracing::error!(error = %e, "syslog recv error");
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
}
}
async fn handle_message(&self, raw: &str) {
let parsed = match parse_syslog(raw) {
Some(p) => p,
None => {
tracing::trace!(raw = %raw, "failed to parse syslog message");
return;
}
};
let service_id = {
let graph = self.graph.read().await;
graph.get_by_name(&parsed.tag)
};
match service_id {
Some(id) => {
let log_line = LogLine {
timestamp_ms: now_ms(),
service: parsed.tag.clone(),
stream: LogStream::Syslog,
content: parsed.message,
};
let mut buffers = self.log_buffers.write().await;
if let Some(buffer) = buffers.get_mut(&id) {
buffer.push(log_line);
}
tracing::trace!(
service = %parsed.tag,
"routed syslog message"
);
}
None => {
tracing::debug!(
tag = %parsed.tag,
message = %parsed.message,
"unmatched syslog message"
);
}
}
}
}
pub fn should_enable_syslog(pid1_mode: bool) -> bool {
if std::env::var("ZINIT_SYSLOG")
.map(|v| v == "0")
.unwrap_or(false)
{
return false;
}
std::process::id() == 1 || pid1_mode
}
pub fn default_socket_path() -> PathBuf {
PathBuf::from("/dev/log")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_simple_message() {
let msg = parse_syslog("sshd[529]: Connection from 10.0.0.1").unwrap();
assert_eq!(msg.tag, "sshd");
assert_eq!(msg.pid, Some(529));
assert_eq!(msg.message, "Connection from 10.0.0.1");
assert_eq!(msg.priority, None);
}
#[test]
fn test_parse_with_priority() {
let msg = parse_syslog("<38>sshd[529]: Connection from 10.0.0.1").unwrap();
assert_eq!(msg.priority, Some(38));
assert_eq!(msg.tag, "sshd");
assert_eq!(msg.pid, Some(529));
assert_eq!(msg.message, "Connection from 10.0.0.1");
}
#[test]
fn test_parse_full_rfc3164() {
let msg =
parse_syslog("<38>Jan 9 17:30:00 mos sshd[529]: Connection from 10.0.0.1").unwrap();
assert_eq!(msg.priority, Some(38));
assert_eq!(msg.tag, "sshd");
assert_eq!(msg.pid, Some(529));
assert_eq!(msg.message, "Connection from 10.0.0.1");
}
#[test]
fn test_parse_no_pid() {
let msg = parse_syslog("kernel: some kernel message").unwrap();
assert_eq!(msg.tag, "kernel");
assert_eq!(msg.pid, None);
assert_eq!(msg.message, "some kernel message");
}
#[test]
fn test_parse_with_timestamp_no_priority() {
let msg = parse_syslog("Jan 9 17:30:00 hostname myapp[123]: hello world").unwrap();
assert_eq!(msg.tag, "myapp");
assert_eq!(msg.pid, Some(123));
assert_eq!(msg.message, "hello world");
}
#[test]
fn test_parse_invalid() {
assert!(parse_syslog("").is_none());
assert!(parse_syslog("no colon here").is_none());
}
#[test]
fn test_should_enable_syslog() {
assert!(!should_enable_syslog(false));
assert!(should_enable_syslog(true));
}
}