use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use crate::detect::date::DateParser;
use crate::detect::ignore::IgnoreList;
use crate::detect::matcher::JailMatcher;
use crate::detect::watcher::{Failure, MAX_LINE_LEN};
pub async fn run(
jail_id: String,
journalmatch: Vec<String>,
matcher: JailMatcher,
date_parser: DateParser,
ignore_list: IgnoreList,
failure_tx: mpsc::Sender<Failure>,
cancel: CancellationToken,
) {
info!(jail = %jail_id, "journal watcher started");
let mut cmd = Command::new("journalctl");
cmd.arg("--follow")
.arg("--no-pager")
.arg("--output=short")
.arg("--lines=0");
for m in &journalmatch {
cmd.arg(m);
}
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::null());
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
error!(
jail = %jail_id,
error = %e,
"jail configured with log_backend = \"systemd\" but journalctl is not available on this system — install systemd-journald, or switch this jail to log_backend = \"file\" with a log_path",
);
return;
}
};
let Some(stdout) = child.stdout.take() else {
error!(jail = %jail_id, "journalctl stdout not available");
return;
};
let mut reader = BufReader::new(stdout);
let mut line_buf = String::new();
loop {
line_buf.clear();
tokio::select! {
() = cancel.cancelled() => {
info!(jail = %jail_id, "journal watcher shutting down");
let _ = child.kill().await;
break;
}
result = read_line_bounded(&mut reader, &mut line_buf, &jail_id) => {
match result {
Ok(0) => {
warn!(jail = %jail_id, "journalctl stream ended");
break;
}
Ok(_) => {
let text = line_buf.trim_end();
if text.is_empty() {
continue;
}
process_line(
text,
&jail_id,
&matcher,
&date_parser,
&ignore_list,
&failure_tx,
).await;
}
Err(e) => {
error!(jail = %jail_id, error = %e, "error reading journal");
break;
}
}
}
}
}
let _ = child.wait().await;
info!(jail = %jail_id, "journal watcher stopped");
}
async fn process_line(
line: &str,
jail_id: &str,
matcher: &JailMatcher,
date_parser: &DateParser,
ignore_list: &IgnoreList,
failure_tx: &mpsc::Sender<Failure>,
) {
let Some(match_result) = matcher.try_match(line) else {
return;
};
if ignore_list.is_ignored(&match_result.ip) {
return;
}
let timestamp = date_parser
.parse_line(line)
.unwrap_or_else(|| chrono::Utc::now().timestamp());
let failure = Failure {
ip: match_result.ip,
jail_id: jail_id.to_string(),
timestamp,
};
if failure_tx.send(failure).await.is_err() {
warn!(jail = %jail_id, "failure channel closed");
}
}
async fn read_line_bounded<R: AsyncBufRead + Unpin>(
reader: &mut R,
buf: &mut String,
jail_id: &str,
) -> std::io::Result<usize> {
let mut total = 0usize;
loop {
let available = reader.fill_buf().await?;
if available.is_empty() {
return Ok(total); }
if let Some(pos) = memchr_newline(available) {
let to_take = pos + 1; let new_total = total + to_take;
if new_total > MAX_LINE_LEN {
warn!(jail = %jail_id, "skipping oversized journal line (>{MAX_LINE_LEN} bytes)");
buf.clear();
} else if let Some(slice) = available.get(..to_take) {
append_valid_utf8(buf, slice);
}
reader.consume(to_take);
return Ok(new_total);
}
let chunk_len = available.len();
if total + chunk_len > MAX_LINE_LEN {
return skip_oversized(reader, buf, chunk_len, jail_id).await;
}
append_valid_utf8(buf, available);
reader.consume(chunk_len);
total += chunk_len;
}
}
async fn skip_oversized<R: AsyncBufRead + Unpin>(
reader: &mut R,
buf: &mut String,
chunk_len: usize,
jail_id: &str,
) -> std::io::Result<usize> {
warn!(jail = %jail_id, "skipping oversized journal line (>{MAX_LINE_LEN} bytes)");
reader.consume(chunk_len);
buf.clear();
drain_until_newline(reader).await?;
Ok(MAX_LINE_LEN + 1)
}
async fn drain_until_newline<R: AsyncBufRead + Unpin>(reader: &mut R) -> std::io::Result<()> {
loop {
let available = reader.fill_buf().await?;
if available.is_empty() {
break; }
if let Some(pos) = memchr_newline(available) {
reader.consume(pos + 1);
break;
}
let len = available.len();
reader.consume(len);
}
Ok(())
}
fn memchr_newline(buf: &[u8]) -> Option<usize> {
buf.iter().position(|&b| b == b'\n')
}
fn append_valid_utf8(buf: &mut String, bytes: &[u8]) {
let text = String::from_utf8_lossy(bytes);
buf.push_str(&text);
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_read_line_bounded_newline_in_single_chunk() {
let input: &[u8] = b"hello\n";
let mut reader = BufReader::new(input);
let mut buf = String::new();
let n = read_line_bounded(&mut reader, &mut buf, "test")
.await
.unwrap();
assert_eq!(n, 6);
assert_eq!(buf, "hello\n");
}
#[tokio::test]
async fn test_read_line_bounded_two_lines() {
let input: &[u8] = b"one\ntwo\n";
let mut reader = BufReader::new(input);
let mut buf = String::new();
let n1 = read_line_bounded(&mut reader, &mut buf, "test")
.await
.unwrap();
assert_eq!(n1, 4);
assert_eq!(buf, "one\n");
buf.clear();
let n2 = read_line_bounded(&mut reader, &mut buf, "test")
.await
.unwrap();
assert_eq!(n2, 4);
assert_eq!(buf, "two\n");
}
#[tokio::test]
async fn test_read_line_bounded_eof_returns_zero() {
let input: &[u8] = b"";
let mut reader = BufReader::new(input);
let mut buf = String::new();
let n = read_line_bounded(&mut reader, &mut buf, "test")
.await
.unwrap();
assert_eq!(n, 0);
assert_eq!(buf, "");
}
#[tokio::test]
async fn test_read_line_bounded_partial_line_then_eof() {
let input: &[u8] = b"no-newline";
let mut reader = BufReader::new(input);
let mut buf = String::new();
let n = read_line_bounded(&mut reader, &mut buf, "test")
.await
.unwrap();
assert_eq!(n, 10);
assert_eq!(buf, "no-newline");
}
#[tokio::test]
async fn test_read_line_bounded_line_spans_multiple_chunks() {
let input: &[u8] = b"helloworld\n";
let mut reader = BufReader::with_capacity(4, input);
let mut buf = String::new();
let n = read_line_bounded(&mut reader, &mut buf, "test")
.await
.unwrap();
assert_eq!(n, 11);
assert_eq!(buf, "helloworld\n");
}
#[tokio::test]
async fn test_read_line_bounded_oversized_line_skipped() {
let mut line = "x".repeat(MAX_LINE_LEN + 10);
line.push('\n');
let bytes = line.into_bytes();
let mut reader = BufReader::with_capacity(MAX_LINE_LEN + 100, bytes.as_slice());
let mut buf = String::new();
let n = read_line_bounded(&mut reader, &mut buf, "test")
.await
.unwrap();
assert_eq!(n, MAX_LINE_LEN + 11);
assert_eq!(buf, "", "oversized line must leave buf empty");
}
#[tokio::test]
async fn test_read_line_bounded_invalid_utf8_replaced() {
let input: &[u8] = &[0xff, 0xfe, b'\n'];
let mut reader = BufReader::new(input);
let mut buf = String::new();
let n = read_line_bounded(&mut reader, &mut buf, "test")
.await
.unwrap();
assert_eq!(n, 3);
assert!(buf.ends_with('\n'));
assert!(buf.contains('\u{FFFD}'));
}
#[tokio::test]
async fn test_read_line_bounded_skip_oversized_no_newline_in_first_chunk() {
let mut line = "x".repeat(MAX_LINE_LEN + 8192);
line.push('\n');
let bytes = line.into_bytes();
let mut reader = BufReader::with_capacity(4096, bytes.as_slice());
let mut buf = String::new();
let n = read_line_bounded(&mut reader, &mut buf, "test")
.await
.unwrap();
assert_eq!(n, MAX_LINE_LEN + 1);
assert_eq!(buf, "", "oversized line must leave buf empty");
}
#[tokio::test]
async fn test_read_line_bounded_recovers_after_oversized_line() {
let mut input = "x".repeat(MAX_LINE_LEN + 8192);
input.push('\n');
input.push_str("next\n");
let bytes = input.into_bytes();
let mut reader = BufReader::with_capacity(4096, bytes.as_slice());
let mut buf = String::new();
let n1 = read_line_bounded(&mut reader, &mut buf, "test")
.await
.unwrap();
assert_eq!(n1, MAX_LINE_LEN + 1);
assert_eq!(buf, "");
buf.clear();
let n2 = read_line_bounded(&mut reader, &mut buf, "test")
.await
.unwrap();
assert_eq!(n2, 5);
assert_eq!(buf, "next\n");
}
#[tokio::test]
async fn test_read_line_bounded_empty_line() {
let input: &[u8] = b"\n";
let mut reader = BufReader::new(input);
let mut buf = String::new();
let n = read_line_bounded(&mut reader, &mut buf, "test")
.await
.unwrap();
assert_eq!(n, 1);
assert_eq!(buf, "\n");
}
}