use std::net::SocketAddr;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::framing::{Framing, FramingError, detect_framing, read_frame};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Severity {
Emergency,
Alert,
Critical,
Error,
Warning,
Notice,
Informational,
Debug,
}
impl Severity {
fn from_loose(s: syslog_loose::SyslogSeverity) -> Self {
use syslog_loose::SyslogSeverity::*;
match s {
SEV_EMERG => Severity::Emergency,
SEV_ALERT => Severity::Alert,
SEV_CRIT => Severity::Critical,
SEV_ERR => Severity::Error,
SEV_WARNING => Severity::Warning,
SEV_NOTICE => Severity::Notice,
SEV_INFO => Severity::Informational,
SEV_DEBUG => Severity::Debug,
}
}
pub fn label(self) -> &'static str {
match self {
Severity::Emergency => "EMERG",
Severity::Alert => "ALERT",
Severity::Critical => "CRIT ",
Severity::Error => "ERROR",
Severity::Warning => "WARN ",
Severity::Notice => "NOTIC",
Severity::Informational => "INFO ",
Severity::Debug => "DEBUG",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEvent {
pub timestamp: Option<String>,
pub hostname: Option<String>,
pub app_name: Option<String>,
pub proc_id: Option<String>,
pub msg_id: Option<String>,
pub severity: Option<Severity>,
pub message: String,
}
impl LogEvent {
pub fn from_frame(bytes: &[u8]) -> Option<Self> {
let s = std::str::from_utf8(bytes).ok()?;
let msg = syslog_loose::parse_message(s, syslog_loose::Variant::Either);
msg.severity?;
Some(Self {
timestamp: msg.timestamp.map(|t| t.to_rfc3339()),
hostname: msg.hostname.map(str::to_owned),
app_name: msg.appname.map(str::to_owned),
proc_id: msg.procid.map(|p| p.to_string()),
msg_id: msg.msgid.map(str::to_owned),
severity: msg.severity.map(Severity::from_loose),
message: msg.msg.to_owned(),
})
}
}
#[derive(Debug)]
pub struct LocalListener {
listener: TcpListener,
addr: SocketAddr,
}
impl LocalListener {
pub async fn bind(port: u16) -> Result<Self> {
let bind_addr: SocketAddr = ([127, 0, 0, 1], port).into();
let listener = TcpListener::bind(bind_addr)
.await
.with_context(|| format!("failed to bind syslog listener on {bind_addr}"))?;
let addr = listener
.local_addr()
.context("failed to read bound socket address")?;
Ok(Self { listener, addr })
}
pub async fn bind_all_interfaces(port: u16) -> Result<Self> {
let bind_addr: SocketAddr = ([0, 0, 0, 0], port).into();
let listener = TcpListener::bind(bind_addr)
.await
.with_context(|| format!("failed to bind syslog listener on {bind_addr}"))?;
let addr = listener
.local_addr()
.context("failed to read bound socket address")?;
Ok(Self { listener, addr })
}
pub fn local_addr(&self) -> SocketAddr {
self.addr
}
}
pub async fn run_receiver(
listener: LocalListener,
tx: mpsc::Sender<LogEvent>,
cancel: CancellationToken,
) -> Result<()> {
let LocalListener { listener, addr } = listener;
tracing::debug!(%addr, "syslog receiver: accept loop started");
loop {
tokio::select! {
_ = cancel.cancelled() => {
tracing::debug!("syslog receiver: cancelled, exiting accept loop");
return Ok(());
}
res = listener.accept() => {
match res {
Ok((stream, peer)) => {
let tx = tx.clone();
let cancel = cancel.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, tx, cancel).await {
tracing::warn!(%peer, error = %e, "syslog connection ended with error");
}
});
}
Err(e) => {
tracing::warn!(error = %e, "accept failed");
}
}
}
}
}
}
pub fn spawn_receiver(
listener: LocalListener,
tx: mpsc::Sender<LogEvent>,
cancel: CancellationToken,
) -> (SocketAddr, tokio::task::JoinHandle<Result<()>>) {
let addr = listener.local_addr();
let handle = tokio::spawn(run_receiver(listener, tx, cancel));
(addr, handle)
}
async fn handle_connection(
stream: TcpStream,
tx: mpsc::Sender<LogEvent>,
cancel: CancellationToken,
) -> Result<()> {
let mut reader = BufReader::new(stream);
let framing = match detect_framing(&mut reader).await {
Ok(Some(f)) => f,
Ok(None) => return Ok(()),
Err(e) => {
tracing::warn!(error = %e, "rejecting connection: malformed first byte");
return Ok(());
}
};
tracing::debug!(?framing, "framing locked in for connection");
loop {
tokio::select! {
_ = cancel.cancelled() => return Ok(()),
frame = read_frame_resilient(&mut reader, framing) => {
match frame {
Some(bytes) => {
match LogEvent::from_frame(&bytes) {
Some(event) => {
if tx.send(event).await.is_err() {
return Ok(());
}
}
None => {
tracing::warn!(
bytes = bytes.len(),
"dropping unparsable syslog frame"
);
}
}
}
None => return Ok(()),
}
}
}
}
}
async fn read_frame_resilient<R>(reader: &mut R, framing: Framing) -> Option<Vec<u8>>
where
R: AsyncBufReadExt + Unpin,
{
match read_frame(reader, framing).await {
Ok(opt) => opt,
Err(FramingError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => None,
Err(e) => {
tracing::warn!(error = %e, "framing error, dropping connection");
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncWriteExt;
fn rfc5424_frame(message: &str) -> Vec<u8> {
let body = format!("<14>1 2026-05-09T10:00:00Z hostA app1 1234 ID47 - {message}");
let mut out = format!("{} ", body.len()).into_bytes();
out.extend_from_slice(body.as_bytes());
out
}
fn rfc5424_lf_frame(message: &str) -> Vec<u8> {
let body = format!("<14>1 2026-05-09T10:00:00Z hostA app1 1234 ID47 - {message}\n");
body.into_bytes()
}
#[tokio::test]
async fn parses_octet_counted_frames() {
let listener = LocalListener::bind(0).await.unwrap();
let addr = listener.local_addr();
let (tx, mut rx) = mpsc::channel(8);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let server = tokio::spawn(run_receiver(listener, tx, cancel_clone));
let mut s = TcpStream::connect(addr).await.unwrap();
let mut bytes = Vec::new();
bytes.extend(rfc5424_frame("hello"));
bytes.extend(rfc5424_frame("world"));
s.write_all(&bytes).await.unwrap();
s.shutdown().await.unwrap();
let e1 = rx.recv().await.unwrap();
let e2 = rx.recv().await.unwrap();
assert_eq!(e1.message, "hello");
assert_eq!(e2.message, "world");
assert_eq!(e1.severity, Some(Severity::Informational));
assert_eq!(e1.app_name.as_deref(), Some("app1"));
assert_eq!(e1.hostname.as_deref(), Some("hostA"));
cancel.cancel();
server.await.expect("server task panicked").unwrap();
}
#[tokio::test]
async fn parses_lf_terminated_frames() {
let listener = LocalListener::bind(0).await.unwrap();
let addr = listener.local_addr();
let (tx, mut rx) = mpsc::channel(8);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let server = tokio::spawn(run_receiver(listener, tx, cancel_clone));
let mut s = TcpStream::connect(addr).await.unwrap();
let mut bytes = Vec::new();
bytes.extend(rfc5424_lf_frame("alpha"));
bytes.extend(rfc5424_lf_frame("beta"));
s.write_all(&bytes).await.unwrap();
s.shutdown().await.unwrap();
let e1 = rx.recv().await.unwrap();
let e2 = rx.recv().await.unwrap();
assert_eq!(e1.message, "alpha");
assert_eq!(e2.message, "beta");
cancel.cancel();
server.await.expect("server task panicked").unwrap();
}
#[tokio::test]
async fn malformed_first_byte_drops_connection_only() {
let listener = LocalListener::bind(0).await.unwrap();
let addr = listener.local_addr();
let (tx, mut rx) = mpsc::channel(8);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let server = tokio::spawn(run_receiver(listener, tx, cancel_clone));
let mut bad = TcpStream::connect(addr).await.unwrap();
bad.write_all(b"garbage\n").await.unwrap();
bad.shutdown().await.unwrap();
let mut good = TcpStream::connect(addr).await.unwrap();
good.write_all(&rfc5424_lf_frame("after-bad"))
.await
.unwrap();
good.shutdown().await.unwrap();
let e = rx.recv().await.unwrap();
assert_eq!(e.message, "after-bad");
cancel.cancel();
server.await.expect("server task panicked").unwrap();
}
#[test]
fn unparsable_frame_returns_none() {
let evt = LogEvent::from_frame(b"this is not syslog at all");
assert!(evt.is_none());
}
}