use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{ChildStdout, Command};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use crate::detect::date::DateParser;
use crate::detect::ignore::IgnoreList;
use crate::detect::matcher::JailMatcher;
use crate::detect::watcher::{Failure, MAX_LINE_LEN};
pub async fn run(
jail_id: String,
journalmatch: Vec<String>,
matcher: JailMatcher,
date_parser: DateParser,
ignore_list: IgnoreList,
failure_tx: mpsc::Sender<Failure>,
cancel: CancellationToken,
) {
info!(jail = %jail_id, "journal watcher started");
let mut cmd = Command::new("journalctl");
cmd.arg("--follow")
.arg("--no-pager")
.arg("--output=short")
.arg("--lines=0");
for m in &journalmatch {
cmd.arg(m);
}
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::null());
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
error!(jail = %jail_id, error = %e, "failed to spawn journalctl");
return;
}
};
let stdout = match child.stdout.take() {
Some(s) => s,
None => {
error!(jail = %jail_id, "journalctl stdout not available");
return;
}
};
let mut reader = BufReader::new(stdout);
let mut line_buf = String::new();
loop {
line_buf.clear();
tokio::select! {
_ = cancel.cancelled() => {
info!(jail = %jail_id, "journal watcher shutting down");
let _ = child.kill().await;
break;
}
result = read_line_bounded(&mut reader, &mut line_buf, &jail_id) => {
match result {
Ok(0) => {
warn!(jail = %jail_id, "journalctl stream ended");
break;
}
Ok(_) => {
let text = line_buf.trim_end();
if text.is_empty() {
continue;
}
process_line(
text,
&jail_id,
&matcher,
&date_parser,
&ignore_list,
&failure_tx,
).await;
}
Err(e) => {
error!(jail = %jail_id, error = %e, "error reading journal");
break;
}
}
}
}
}
let _ = child.wait().await;
info!(jail = %jail_id, "journal watcher stopped");
}
async fn process_line(
line: &str,
jail_id: &str,
matcher: &JailMatcher,
date_parser: &DateParser,
ignore_list: &IgnoreList,
failure_tx: &mpsc::Sender<Failure>,
) {
let match_result = match matcher.try_match(line) {
Some(r) => r,
None => return,
};
if ignore_list.is_ignored(&match_result.ip) {
return;
}
let timestamp = date_parser
.parse_line(line)
.unwrap_or_else(|| chrono::Utc::now().timestamp());
let failure = Failure {
ip: match_result.ip,
jail_id: jail_id.to_string(),
timestamp,
};
if failure_tx.send(failure).await.is_err() {
warn!(jail = %jail_id, "failure channel closed");
}
}
async fn read_line_bounded(
reader: &mut BufReader<ChildStdout>,
buf: &mut String,
jail_id: &str,
) -> std::io::Result<usize> {
let mut total = 0usize;
loop {
let available = reader.fill_buf().await?;
if available.is_empty() {
return Ok(total); }
if let Some(pos) = memchr_newline(available) {
return consume_up_to_newline(reader, buf, available, pos, total, jail_id);
}
let chunk_len = available.len();
if total + chunk_len > MAX_LINE_LEN {
return skip_oversized(reader, buf, chunk_len, jail_id).await;
}
append_valid_utf8(buf, available);
reader.consume(chunk_len);
total += chunk_len;
}
}
fn consume_up_to_newline(
reader: &mut BufReader<ChildStdout>,
buf: &mut String,
available: &[u8],
pos: usize,
total: usize,
jail_id: &str,
) -> std::io::Result<usize> {
let to_take = pos + 1; if total + to_take > MAX_LINE_LEN {
warn!(jail = %jail_id, "skipping oversized journal line (>{MAX_LINE_LEN} bytes)");
reader.consume(to_take);
buf.clear();
return Ok(total + to_take);
}
append_valid_utf8(buf, &available[..to_take]);
reader.consume(to_take);
Ok(total + to_take)
}
async fn skip_oversized(
reader: &mut BufReader<ChildStdout>,
buf: &mut String,
chunk_len: usize,
jail_id: &str,
) -> std::io::Result<usize> {
warn!(jail = %jail_id, "skipping oversized journal line (>{MAX_LINE_LEN} bytes)");
reader.consume(chunk_len);
buf.clear();
drain_until_newline(reader).await?;
Ok(MAX_LINE_LEN + 1)
}
async fn drain_until_newline(reader: &mut BufReader<ChildStdout>) -> std::io::Result<()> {
loop {
let available = reader.fill_buf().await?;
if available.is_empty() {
break; }
if let Some(pos) = memchr_newline(available) {
reader.consume(pos + 1);
break;
}
let len = available.len();
reader.consume(len);
}
Ok(())
}
fn memchr_newline(buf: &[u8]) -> Option<usize> {
buf.iter().position(|&b| b == b'\n')
}
fn append_valid_utf8(buf: &mut String, bytes: &[u8]) {
let text = String::from_utf8_lossy(bytes);
buf.push_str(&text);
}