use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::net::IpAddr;
use std::path::PathBuf;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use xxhash_rust::xxh3::xxh3_64;
pub(crate) const MAX_LINE_LEN: usize = 64 * 1024;
use crate::detect::date::DateParser;
use crate::detect::ignore::IgnoreList;
use crate::detect::matcher::JailMatcher;
#[derive(Debug, Clone)]
pub struct Failure {
pub ip: IpAddr,
pub jail_id: String,
pub timestamp: i64,
}
#[derive(Debug)]
struct FileIdentity {
#[cfg(unix)]
inode: u64,
size: u64,
first_line_hash: u64,
}
impl FileIdentity {
fn from_file(path: &PathBuf) -> Option<Self> {
let meta = std::fs::metadata(path).ok()?;
let size = meta.len();
#[cfg(unix)]
let inode = {
use std::os::unix::fs::MetadataExt;
meta.ino()
};
let first_line_hash = {
let file = std::fs::File::open(path).ok()?;
let mut reader = BufReader::new(file);
let mut bytes = Vec::new();
reader.read_until(b'\n', &mut bytes).ok()?;
xxh3_64(&bytes)
};
Some(Self {
#[cfg(unix)]
inode,
size,
first_line_hash,
})
}
fn is_rotated(&self, other: &FileIdentity) -> bool {
#[cfg(unix)]
if self.inode != other.inode {
return true;
}
if other.size < self.size {
return true;
}
self.first_line_hash != other.first_line_hash
}
}
pub async fn run(
jail_id: String,
log_path: PathBuf,
matcher: JailMatcher,
date_parser: DateParser,
ignore_list: IgnoreList,
tx: mpsc::Sender<Failure>,
cancel: CancellationToken,
) {
info!(jail = %jail_id, path = %log_path.display(), "watcher started");
let (line_tx, mut line_rx) = mpsc::channel::<Failure>(256);
let reader_jail = jail_id.clone();
let reader_cancel = cancel.clone();
let reader_handle = tokio::task::spawn_blocking(move || {
read_loop(
reader_jail,
log_path,
matcher,
date_parser,
ignore_list,
line_tx,
reader_cancel,
);
});
loop {
tokio::select! {
() = cancel.cancelled() => {
info!(jail = %jail_id, "watcher shutting down");
break;
}
failure = line_rx.recv() => {
match failure {
Some(f) => {
if tx.send(f).await.is_err() {
info!(jail = %jail_id, "channel closed, stopping watcher");
break;
}
}
None => break, }
}
}
}
let _ = reader_handle.await;
}
#[allow(clippy::needless_pass_by_value)]
fn read_loop(
jail_id: String,
log_path: PathBuf,
matcher: JailMatcher,
date_parser: DateParser,
ignore_list: IgnoreList,
tx: mpsc::Sender<Failure>,
cancel: CancellationToken,
) {
let poll_interval = std::time::Duration::from_millis(250);
let rotation_check_interval = std::time::Duration::from_secs(5);
let mut file = match open_at_end(&log_path) {
Ok(f) => f,
Err(e) => {
error!(jail = %jail_id, error = %e, "failed to open log file");
return;
}
};
let mut identity = FileIdentity::from_file(&log_path);
let mut last_rotation_check = std::time::Instant::now();
loop {
if cancel.is_cancelled() {
break;
}
if last_rotation_check.elapsed() >= rotation_check_interval {
if let Some(ref old_id) = identity
&& let Some(new_id) = FileIdentity::from_file(&log_path)
{
if old_id.is_rotated(&new_id) {
info!(jail = %jail_id, "log rotation detected, reopening");
match open_from_start(&log_path) {
Ok(f) => {
file = f;
identity = Some(new_id);
}
Err(e) => {
warn!(jail = %jail_id, error = %e, "failed to reopen after rotation");
}
}
} else {
identity = Some(new_id);
}
}
last_rotation_check = std::time::Instant::now();
}
let mut line = String::new();
loop {
line.clear();
match read_line_bounded(&mut file, &mut line, &jail_id) {
Ok(0) => break, Ok(_) => {
let trimmed = line.trim_end();
if let Some(m) = matcher.try_match(trimmed) {
if ignore_list.is_ignored(&m.ip) {
debug!(jail = %jail_id, ip = %m.ip, "ignored");
continue;
}
let timestamp = date_parser
.parse_line(trimmed)
.unwrap_or_else(|| chrono::Utc::now().timestamp());
let failure = Failure {
ip: m.ip,
jail_id: jail_id.clone(),
timestamp,
};
if tx.blocking_send(failure).is_err() {
return; }
}
}
Err(e) => {
warn!(jail = %jail_id, error = %e, "read error");
break;
}
}
}
std::thread::sleep(poll_interval);
}
}
fn read_line_bounded(
reader: &mut BufReader<std::fs::File>,
buf: &mut String,
jail_id: &str,
) -> std::io::Result<usize> {
let limit = (MAX_LINE_LEN as u64) + 1;
let mut byte_buf = Vec::new();
let n = reader
.by_ref()
.take(limit)
.read_until(b'\n', &mut byte_buf)?;
if n == 0 {
return Ok(0);
}
if byte_buf.len() > MAX_LINE_LEN && byte_buf.last() != Some(&b'\n') {
warn!(jail = %jail_id, "skipping oversized log line (>{MAX_LINE_LEN} bytes)");
drain_until_newline(reader)?;
return Ok(0);
}
buf.push_str(&String::from_utf8_lossy(&byte_buf));
Ok(n)
}
fn drain_until_newline(reader: &mut BufReader<std::fs::File>) -> std::io::Result<()> {
loop {
let available = reader.fill_buf()?;
if available.is_empty() {
break; }
if let Some(pos) = available.iter().position(|&b| b == b'\n') {
reader.consume(pos + 1);
break;
}
let len = available.len();
reader.consume(len);
}
Ok(())
}
fn open_at_end(path: &PathBuf) -> std::io::Result<BufReader<std::fs::File>> {
let mut file = std::fs::File::open(path)?;
file.seek(SeekFrom::End(0))?;
Ok(BufReader::new(file))
}
fn open_from_start(path: &PathBuf) -> std::io::Result<BufReader<std::fs::File>> {
let file = std::fs::File::open(path)?;
Ok(BufReader::new(file))
}
#[cfg(test)]
mod tests {
use std::io::Write;
use tempfile::NamedTempFile;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::detect::date::{DateFormat, DateParser};
use crate::detect::ignore::IgnoreList;
use crate::detect::matcher::JailMatcher;
const SSHD_FAILURE_LINE: &str =
"Jan 15 10:30:00 server sshd[1234]: Failed password for root from 192.168.1.100 port 22";
const SSHD_FAILURE_LINE2: &str =
"Jan 15 10:31:00 server sshd[5678]: Failed password for admin from 10.0.0.42 port 22";
fn spawn_watcher(
path: std::path::PathBuf,
) -> (
CancellationToken,
tokio::task::JoinHandle<()>,
tokio::sync::mpsc::Receiver<crate::detect::watcher::Failure>,
) {
let (tx, rx) = mpsc::channel(32);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = tokio::spawn(async move {
crate::detect::watcher::run(
"test".to_string(),
path,
JailMatcher::new(&[r"Failed password for .* from <HOST>".to_string()]).unwrap(),
DateParser::new(DateFormat::Syslog).unwrap(),
IgnoreList::new(&[], false).unwrap(),
tx,
cancel_clone,
)
.await;
});
(cancel, handle, rx)
}
fn test_matcher() -> JailMatcher {
JailMatcher::new(&[r"Failed password for .* from <HOST>".to_string()]).unwrap()
}
#[tokio::test]
async fn detects_failure_in_appended_lines() {
let mut tmpfile = NamedTempFile::new().unwrap();
let path = tmpfile.path().to_path_buf();
let (tx, mut rx) = mpsc::channel(16);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let path_clone = path.clone();
let handle = tokio::spawn(async move {
crate::detect::watcher::run(
"test".to_string(),
path_clone,
test_matcher(),
DateParser::new(DateFormat::Syslog).unwrap(),
IgnoreList::new(&[], false).unwrap(),
tx,
cancel_clone,
)
.await;
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
writeln!(
tmpfile,
"Jan 15 10:30:00 server sshd[1234]: Failed password for root from 192.168.1.100 port 22"
)
.unwrap();
tmpfile.flush().unwrap();
let failure = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for failure")
.expect("channel closed");
assert_eq!(failure.ip.to_string(), "192.168.1.100");
assert_eq!(failure.jail_id, "test");
cancel.cancel();
handle.await.unwrap();
}
#[tokio::test]
async fn ignores_non_matching_lines() {
let mut tmpfile = NamedTempFile::new().unwrap();
let path = tmpfile.path().to_path_buf();
let (tx, mut rx) = mpsc::channel(16);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let path_clone = path.clone();
let handle = tokio::spawn(async move {
crate::detect::watcher::run(
"test".to_string(),
path_clone,
test_matcher(),
DateParser::new(DateFormat::Syslog).unwrap(),
IgnoreList::new(&[], false).unwrap(),
tx,
cancel_clone,
)
.await;
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
writeln!(
tmpfile,
"Jan 15 10:30:00 server sshd[1234]: Accepted password for user from 10.0.0.1 port 22"
)
.unwrap();
tmpfile.flush().unwrap();
let result = tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await;
assert!(result.is_err(), "should not have received a failure");
cancel.cancel();
handle.await.unwrap();
}
#[tokio::test]
async fn ignores_allowlisted_ips() {
let mut tmpfile = NamedTempFile::new().unwrap();
let path = tmpfile.path().to_path_buf();
let (tx, mut rx) = mpsc::channel(16);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let path_clone = path.clone();
let handle = tokio::spawn(async move {
crate::detect::watcher::run(
"test".to_string(),
path_clone,
test_matcher(),
DateParser::new(DateFormat::Syslog).unwrap(),
IgnoreList::new(&["192.168.1.0/24".to_string()], false).unwrap(),
tx,
cancel_clone,
)
.await;
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
writeln!(
tmpfile,
"Jan 15 10:30:00 server sshd[1234]: Failed password for root from 192.168.1.100 port 22"
)
.unwrap();
tmpfile.flush().unwrap();
let result = tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await;
assert!(result.is_err(), "ignored IP should not produce a failure");
cancel.cancel();
handle.await.unwrap();
}
const INVALID_LINES_BEFORE_VALID: usize = 3;
const TIGHT_TIMEOUT_MS: u64 = 600;
#[tokio::test]
async fn test_read_line_bounded_valid_utf8() {
let mut tmpfile = NamedTempFile::new().unwrap();
let path = tmpfile.path().to_path_buf();
let (cancel, handle, mut rx) = spawn_watcher(path);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
writeln!(tmpfile, "{SSHD_FAILURE_LINE}").unwrap();
tmpfile.flush().unwrap();
let failure = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout — valid UTF-8 line produced no failure event")
.expect("channel closed unexpectedly");
assert_eq!(failure.ip.to_string(), "192.168.1.100");
cancel.cancel();
handle.await.unwrap();
}
#[tokio::test]
async fn test_read_line_bounded_invalid_utf8_continues() {
let tmpfile = NamedTempFile::new().unwrap();
let path = tmpfile.path().to_path_buf();
let (cancel, handle, mut rx) = spawn_watcher(path.clone());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut content: Vec<u8> = Vec::new();
for _ in 0..INVALID_LINES_BEFORE_VALID {
content.extend_from_slice(b"invalid \xff\xfe bytes in this line\n");
}
content.extend_from_slice(SSHD_FAILURE_LINE.as_bytes());
content.push(b'\n');
std::fs::write(&path, &content).unwrap();
let failure = tokio::time::timeout(
std::time::Duration::from_millis(TIGHT_TIMEOUT_MS),
rx.recv(),
)
.await
.expect("timeout — watcher incurred sleep penalty per invalid UTF-8 line instead of continuing within same poll cycle")
.expect("channel closed unexpectedly");
assert_eq!(failure.ip.to_string(), "192.168.1.100");
cancel.cancel();
handle.await.unwrap();
}
#[tokio::test]
async fn test_read_line_bounded_mixed_valid_invalid_lines() {
let tmpfile = NamedTempFile::new().unwrap();
let path = tmpfile.path().to_path_buf();
let (cancel, handle, mut rx) = spawn_watcher(path.clone());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut content: Vec<u8> = Vec::new();
content.extend_from_slice(SSHD_FAILURE_LINE.as_bytes());
content.push(b'\n');
for _ in 0..INVALID_LINES_BEFORE_VALID {
content.extend_from_slice(b"garbage \xc3\x28 invalid sequence here\n");
}
content.extend_from_slice(SSHD_FAILURE_LINE2.as_bytes());
content.push(b'\n');
std::fs::write(&path, &content).unwrap();
let first = tokio::time::timeout(
std::time::Duration::from_millis(TIGHT_TIMEOUT_MS),
rx.recv(),
)
.await
.expect("timeout waiting for first failure — valid line 1 not processed in time")
.expect("channel closed");
assert_eq!(
first.ip.to_string(),
"192.168.1.100",
"first failure IP mismatch"
);
let second = tokio::time::timeout(
std::time::Duration::from_millis(TIGHT_TIMEOUT_MS),
rx.recv(),
)
.await
.expect("timeout waiting for second failure — watcher slept per invalid UTF-8 line")
.expect("channel closed");
assert_eq!(
second.ip.to_string(),
"10.0.0.42",
"second failure IP mismatch"
);
cancel.cancel();
handle.await.unwrap();
}
#[tokio::test]
async fn test_read_line_bounded_all_invalid_bytes() {
let tmpfile = NamedTempFile::new().unwrap();
let path = tmpfile.path().to_path_buf();
let (cancel, handle, mut rx) = spawn_watcher(path.clone());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut content: Vec<u8> = Vec::new();
for _ in 0..INVALID_LINES_BEFORE_VALID {
content.extend(std::iter::repeat_n(0xff_u8, 32));
content.push(b'\n');
}
content.extend_from_slice(SSHD_FAILURE_LINE.as_bytes());
content.push(b'\n');
std::fs::write(&path, &content).unwrap();
let failure = tokio::time::timeout(
std::time::Duration::from_millis(TIGHT_TIMEOUT_MS),
rx.recv(),
)
.await
.expect("timeout — watcher slept per all-invalid-bytes line instead of processing in-cycle")
.expect("channel closed");
assert_eq!(failure.ip.to_string(), "192.168.1.100");
cancel.cancel();
handle.await.unwrap();
}
#[tokio::test]
async fn test_read_line_bounded_invalid_utf8_oversized() {
let tmpfile = NamedTempFile::new().unwrap();
let path = tmpfile.path().to_path_buf();
let (cancel, handle, mut rx) = spawn_watcher(path.clone());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let oversize_len = crate::detect::watcher::MAX_LINE_LEN + 256;
let mut content: Vec<u8> = Vec::with_capacity(oversize_len + 512);
content.extend(std::iter::repeat_n(0xff_u8, oversize_len));
content.push(b'\n');
for _ in 0..(INVALID_LINES_BEFORE_VALID - 1) {
content.extend_from_slice(b"\xff\xfe short invalid\n");
}
content.extend_from_slice(SSHD_FAILURE_LINE.as_bytes());
content.push(b'\n');
std::fs::write(&path, &content).unwrap();
let failure = tokio::time::timeout(
std::time::Duration::from_millis(TIGHT_TIMEOUT_MS),
rx.recv(),
)
.await
.expect("timeout — watcher did not recover within one poll cycle after oversized invalid-UTF-8 line")
.expect("channel closed");
assert_eq!(failure.ip.to_string(), "192.168.1.100");
cancel.cancel();
handle.await.unwrap();
}
#[tokio::test]
async fn test_file_identity_invalid_utf8_first_line() {
let tmpfile = NamedTempFile::new().unwrap();
let path = tmpfile.path().to_path_buf();
std::fs::write(&path, b"\xff\xfe this is the first line\n").unwrap();
let (cancel, handle, mut rx) = spawn_watcher(path.clone());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
use std::io::Write as _;
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.unwrap();
writeln!(f, "{SSHD_FAILURE_LINE}").unwrap();
f.flush().unwrap();
let failure = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout — watcher did not start correctly when first line has invalid UTF-8")
.expect("channel closed");
assert_eq!(failure.ip.to_string(), "192.168.1.100");
cancel.cancel();
handle.await.unwrap();
}
#[tokio::test]
async fn test_watcher_processes_after_invalid_utf8() {
let tmpfile = NamedTempFile::new().unwrap();
let path = tmpfile.path().to_path_buf();
let (cancel, handle, mut rx) = spawn_watcher(path.clone());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut content: Vec<u8> = Vec::new();
content.extend_from_slice(SSHD_FAILURE_LINE.as_bytes());
content.push(b'\n');
for _ in 0..INVALID_LINES_BEFORE_VALID {
content.extend_from_slice(b"corrupted log entry: \xed\xa0\x80\xed\xb0\x80\n");
}
content.extend_from_slice(SSHD_FAILURE_LINE2.as_bytes());
content.push(b'\n');
std::fs::write(&path, &content).unwrap();
let first = tokio::time::timeout(
std::time::Duration::from_millis(TIGHT_TIMEOUT_MS),
rx.recv(),
)
.await
.expect("timeout waiting for first failure event")
.expect("channel closed");
assert_eq!(first.ip.to_string(), "192.168.1.100");
let second = tokio::time::timeout(
std::time::Duration::from_millis(TIGHT_TIMEOUT_MS),
rx.recv(),
)
.await
.expect(
"timeout — invalid UTF-8 lines caused poll-cycle sleep penalty before second failure",
)
.expect("channel closed");
assert_eq!(second.ip.to_string(), "10.0.0.42");
cancel.cancel();
handle.await.unwrap();
}
#[tokio::test]
async fn test_invalid_utf8_does_not_match_pattern() {
let tmpfile = NamedTempFile::new().unwrap();
let path = tmpfile.path().to_path_buf();
let (cancel, handle, mut rx) = spawn_watcher(path.clone());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let garbage: Vec<u8> = vec![0xff, 0xfe, 0xfd, 0xfc, 0xfb, 0xfa, b'\n'];
std::fs::write(&path, &garbage).unwrap();
let result = tokio::time::timeout(std::time::Duration::from_millis(600), rx.recv()).await;
assert!(
result.is_err(),
"replacement characters from invalid UTF-8 must not match any jail pattern"
);
cancel.cancel();
handle.await.unwrap();
}
}