use std::io::SeekFrom;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use chrono::{DateTime, Local, Utc};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use super::entry::{LineBuffer, LogEntry};
const DEFAULT_POLL_INTERVAL_MS: u64 = 50;
const MTIME_JUMP_THRESHOLD_SECS: u64 = 60;
const READ_BUFFER_CAPACITY: usize = 8 * 1024;
#[derive(Debug, thiserror::Error)]
pub enum TailerError {
#[error("I/O error on {path}: {source}", path = path.display())]
Io {
path: PathBuf,
source: std::io::Error,
},
}
#[derive(Debug, Clone)]
pub struct RotationInfo {
previous_file_size: u64,
detected_at: DateTime<Utc>,
}
impl RotationInfo {
pub fn previous_file_size(&self) -> u64 {
self.previous_file_size
}
pub fn detected_at(&self) -> DateTime<Utc> {
self.detected_at
}
}
pub struct FileTailer {
path: PathBuf,
file: tokio::fs::File,
offset: u64,
last_event_at: Option<DateTime<Utc>>,
line_buffer: LineBuffer,
partial_line: String,
read_buf: Vec<u8>,
poll_interval_ms: u64,
last_mtime: Option<SystemTime>,
last_rotation: Option<RotationInfo>,
}
impl FileTailer {
pub async fn open(path: &Path) -> Result<Self, TailerError> {
let file = tokio::fs::File::open(path)
.await
.map_err(|source| TailerError::Io {
path: path.to_path_buf(),
source,
})?;
let initial_mtime = tokio::fs::metadata(path)
.await
.ok()
.and_then(|m| m.modified().ok());
let mut tailer = Self {
path: path.to_path_buf(),
file,
offset: 0,
last_event_at: None,
line_buffer: LineBuffer::new(),
partial_line: String::new(),
read_buf: vec![0u8; READ_BUFFER_CAPACITY],
poll_interval_ms: DEFAULT_POLL_INTERVAL_MS,
last_mtime: initial_mtime,
last_rotation: None,
};
let end_pos =
tailer
.file
.seek(SeekFrom::End(0))
.await
.map_err(|source| TailerError::Io {
path: path.to_path_buf(),
source,
})?;
tailer.offset = end_pos;
::log::info!(
"opened log file for tailing: {} (offset: {end_pos})",
path.display()
);
Ok(tailer)
}
pub async fn open_from_start(path: &Path) -> Result<Self, TailerError> {
let file = tokio::fs::File::open(path)
.await
.map_err(|source| TailerError::Io {
path: path.to_path_buf(),
source,
})?;
let initial_mtime = tokio::fs::metadata(path)
.await
.ok()
.and_then(|m| m.modified().ok());
::log::info!("opened log file for reading from start: {}", path.display());
Ok(Self {
path: path.to_path_buf(),
file,
offset: 0,
last_event_at: None,
line_buffer: LineBuffer::new(),
partial_line: String::new(),
read_buf: vec![0u8; READ_BUFFER_CAPACITY],
poll_interval_ms: DEFAULT_POLL_INTERVAL_MS,
last_mtime: initial_mtime,
last_rotation: None,
})
}
pub fn set_poll_interval_ms(&mut self, ms: u64) {
self.poll_interval_ms = ms.max(10);
}
pub fn poll_interval_ms(&self) -> u64 {
self.poll_interval_ms
}
pub fn last_event_at(&self) -> Option<DateTime<Utc>> {
self.last_event_at
}
pub fn offset(&self) -> u64 {
self.offset
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn take_rotation(&mut self) -> Option<RotationInfo> {
self.last_rotation.take()
}
async fn check_rotation(&mut self) -> Result<(), TailerError> {
if self.offset == 0 {
return Ok(());
}
let Ok(path_meta) = tokio::fs::metadata(&self.path).await else {
return Ok(()); };
let path_size = path_meta.len();
let path_mtime = path_meta.modified().ok();
let mut rotated = false;
if path_size < self.offset {
::log::info!(
"rotation detected: file size ({path_size}) < offset ({})",
self.offset,
);
rotated = true;
}
if !rotated {
if let (Some(current_mtime), Some(prev_mtime)) = (path_mtime, self.last_mtime) {
let elapsed = current_mtime.duration_since(prev_mtime).unwrap_or_default();
if elapsed.as_secs() > MTIME_JUMP_THRESHOLD_SECS && path_size <= self.offset {
::log::info!(
"rotation detected: mtime jumped {:.0}s without new data",
elapsed.as_secs_f64(),
);
rotated = true;
}
}
}
if rotated {
let previous_file_size = self.offset;
self.file =
tokio::fs::File::open(&self.path)
.await
.map_err(|source| TailerError::Io {
path: self.path.clone(),
source,
})?;
self.offset = 0;
self.partial_line.clear();
self.line_buffer.reset();
self.last_mtime = path_mtime;
self.last_rotation = Some(RotationInfo {
previous_file_size,
detected_at: Local::now().naive_local().and_utc(),
});
::log::info!(
"re-opened {} after rotation (old offset: {previous_file_size})",
self.path.display(),
);
} else if path_mtime.is_some() {
self.last_mtime = path_mtime;
}
Ok(())
}
pub async fn poll(&mut self) -> Result<Vec<LogEntry>, TailerError> {
self.check_rotation().await?;
let mut entries = Vec::new();
let mut total_bytes_read: u64 = 0;
loop {
let bytes_read =
self.file
.read(&mut self.read_buf)
.await
.map_err(|source| TailerError::Io {
path: self.path.clone(),
source,
})?;
if bytes_read == 0 {
break;
}
total_bytes_read += bytes_read as u64;
let chunk = String::from_utf8_lossy(&self.read_buf[..bytes_read]);
let text = if self.partial_line.is_empty() {
chunk.into_owned()
} else {
let mut combined = std::mem::take(&mut self.partial_line);
combined.push_str(&chunk);
combined
};
let mut lines_iter = text.split('\n').peekable();
while let Some(line) = lines_iter.next() {
if lines_iter.peek().is_none() {
if !line.is_empty() {
line.clone_into(&mut self.partial_line);
}
} else {
let clean = line.strip_suffix('\r').unwrap_or(line);
if let Some(entry) = self.line_buffer.push_line(clean) {
entries.push(entry);
}
}
}
}
if total_bytes_read > 0 {
self.offset += total_bytes_read;
self.last_event_at = Some(Utc::now());
::log::debug!(
"read {total_bytes_read} bytes from {} (new offset: {})",
self.path.display(),
self.offset
);
}
Ok(entries)
}
pub fn flush(&mut self) -> Vec<LogEntry> {
let mut entries = Vec::new();
if !self.partial_line.is_empty() {
let line = std::mem::take(&mut self.partial_line);
if let Some(entry) = self.line_buffer.push_line(&line) {
entries.push(entry);
}
}
if let Some(entry) = self.line_buffer.flush() {
entries.push(entry);
}
entries
}
pub async fn run(
&mut self,
entry_tx: tokio::sync::mpsc::Sender<LogEntry>,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) -> Result<(), TailerError> {
let mut interval =
tokio::time::interval(std::time::Duration::from_millis(self.poll_interval_ms));
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {
let entries = self.poll().await?;
for entry in entries {
if entry_tx.send(entry).await.is_err() {
::log::info!("entry channel closed, stopping tailer");
return Ok(());
}
}
}
_ = shutdown.changed() => {
::log::info!("shutdown signal received, stopping tailer");
for entry in self.flush() {
let _ = entry_tx.send(entry).await;
}
return Ok(());
}
}
}
}
pub async fn run_once(&mut self) -> Result<Vec<LogEntry>, TailerError> {
let mut all_entries = Vec::new();
loop {
let entries = self.poll().await?;
if entries.is_empty() {
break;
}
all_entries.extend(entries);
}
all_entries.extend(self.flush());
::log::info!(
"one-shot read complete: {} entries from {}",
all_entries.len(),
self.path.display(),
);
Ok(all_entries)
}
}
impl std::fmt::Debug for FileTailer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FileTailer")
.field("path", &self.path)
.field("offset", &self.offset)
.field("last_event_at", &self.last_event_at)
.field("poll_interval_ms", &self.poll_interval_ms)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
type TestResult = Result<(), Box<dyn std::error::Error>>;
fn temp_log(content: &str) -> Result<NamedTempFile, std::io::Error> {
let mut f = NamedTempFile::new()?;
f.write_all(content.as_bytes())?;
f.flush()?;
Ok(f)
}
mod open {
use super::*;
#[tokio::test]
async fn test_open_seeks_to_end() -> TestResult {
let f = temp_log("existing content\n")?;
let tailer = FileTailer::open(f.path()).await?;
assert_eq!(tailer.offset(), "existing content\n".len() as u64);
Ok(())
}
#[tokio::test]
async fn test_open_last_event_at_is_none() -> TestResult {
let f = temp_log("")?;
let tailer = FileTailer::open(f.path()).await?;
assert!(tailer.last_event_at().is_none());
Ok(())
}
#[tokio::test]
async fn test_open_nonexistent_file_returns_error() {
let result = FileTailer::open(Path::new("/nonexistent/Player.log")).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_open_default_poll_interval() -> TestResult {
let f = temp_log("")?;
let tailer = FileTailer::open(f.path()).await?;
assert_eq!(tailer.poll_interval_ms(), DEFAULT_POLL_INTERVAL_MS);
Ok(())
}
#[tokio::test]
async fn test_open_path_preserved() -> TestResult {
let f = temp_log("")?;
let tailer = FileTailer::open(f.path()).await?;
assert_eq!(tailer.path(), f.path());
Ok(())
}
}
mod open_from_start {
use super::*;
#[tokio::test]
async fn test_open_from_start_offset_is_zero() -> TestResult {
let f = temp_log("existing content\n")?;
let tailer = FileTailer::open_from_start(f.path()).await?;
assert_eq!(tailer.offset(), 0);
Ok(())
}
#[tokio::test]
async fn test_open_from_start_reads_existing_content() -> TestResult {
let f = temp_log(
"[UnityCrossThreadLogger] Event1\n\
[UnityCrossThreadLogger] Event2\n",
)?;
let mut tailer = FileTailer::open_from_start(f.path()).await?;
let entries = tailer.poll().await?;
assert_eq!(entries.len(), 1);
assert!(entries[0].body.contains("Event1"));
Ok(())
}
}
mod run_once_tests {
use super::*;
#[tokio::test]
async fn test_run_once_reads_entire_file() -> TestResult {
let f = temp_log(
"[UnityCrossThreadLogger] Event1\n\
[UnityCrossThreadLogger] Event2\n\
[UnityCrossThreadLogger] Event3\n",
)?;
let mut tailer = FileTailer::open_from_start(f.path()).await?;
let entries = tailer.run_once().await?;
assert_eq!(entries.len(), 3);
assert!(entries[0].body.contains("Event1"));
assert!(entries[1].body.contains("Event2"));
assert!(entries[2].body.contains("Event3"));
Ok(())
}
#[tokio::test]
async fn test_run_once_empty_file_returns_empty() -> TestResult {
let f = temp_log("")?;
let mut tailer = FileTailer::open_from_start(f.path()).await?;
let entries = tailer.run_once().await?;
assert!(entries.is_empty());
Ok(())
}
#[tokio::test]
async fn test_run_once_single_entry_flushed() -> TestResult {
let f = temp_log("[UnityCrossThreadLogger] Only\n")?;
let mut tailer = FileTailer::open_from_start(f.path()).await?;
let entries = tailer.run_once().await?;
assert_eq!(entries.len(), 1);
assert!(entries[0].body.contains("Only"));
Ok(())
}
#[tokio::test]
async fn test_run_once_multiline_entry() -> TestResult {
let f = temp_log(
"[UnityCrossThreadLogger] Event1\n\
{\"key\": \"value\"}\n\
[UnityCrossThreadLogger] Event2\n",
)?;
let mut tailer = FileTailer::open_from_start(f.path()).await?;
let entries = tailer.run_once().await?;
assert_eq!(entries.len(), 2);
assert!(entries[0].body.contains("key"));
Ok(())
}
#[tokio::test]
async fn test_run_once_works_with_open_from_start() -> TestResult {
let f = temp_log(
"[UnityCrossThreadLogger] Event1\n\
[UnityCrossThreadLogger] Event2\n",
)?;
let mut tailer = FileTailer::open_from_start(f.path()).await?;
let entries = tailer.run_once().await?;
assert_eq!(entries.len(), 2);
Ok(())
}
#[tokio::test]
async fn test_run_once_handles_partial_last_line() -> TestResult {
let f = temp_log(
"[UnityCrossThreadLogger] Event1\n\
[UnityCrossThreadLogger] Event2",
)?;
let mut tailer = FileTailer::open_from_start(f.path()).await?;
let entries = tailer.run_once().await?;
assert_eq!(entries.len(), 2);
assert!(entries[0].body.contains("Event1"));
assert!(entries[1].body.contains("Event2"));
Ok(())
}
}
mod poll_tests {
use super::*;
#[tokio::test]
async fn test_poll_no_new_data_returns_empty() -> TestResult {
let f = temp_log("initial data\n")?;
let mut tailer = FileTailer::open(f.path()).await?;
let entries = tailer.poll().await?;
assert!(entries.is_empty());
Ok(())
}
#[tokio::test]
async fn test_poll_reads_new_data() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
writeln!(f, "[UnityCrossThreadLogger] Event1")?;
writeln!(f, "[UnityCrossThreadLogger] Event2")?;
f.flush()?;
let entries = tailer.poll().await?;
assert_eq!(entries.len(), 1);
assert!(entries[0].body.contains("Event1"));
Ok(())
}
#[tokio::test]
async fn test_poll_updates_offset() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
let initial_offset = tailer.offset();
writeln!(f, "new data")?;
f.flush()?;
tailer.poll().await?;
assert!(tailer.offset() > initial_offset);
Ok(())
}
#[tokio::test]
async fn test_poll_updates_last_event_at() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
assert!(tailer.last_event_at().is_none());
writeln!(f, "new data")?;
f.flush()?;
tailer.poll().await?;
assert!(tailer.last_event_at().is_some());
Ok(())
}
#[tokio::test]
async fn test_poll_does_not_update_last_event_at_on_no_data() -> TestResult {
let f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
tailer.poll().await?;
assert!(tailer.last_event_at().is_none());
Ok(())
}
#[tokio::test]
async fn test_poll_multiline_entry() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
writeln!(f, "[UnityCrossThreadLogger] Event1")?;
writeln!(f, "{{\"key\": \"value\"}}")?;
writeln!(f, "[UnityCrossThreadLogger] Event2")?;
f.flush()?;
let entries = tailer.poll().await?;
assert_eq!(entries.len(), 1);
assert!(entries[0].body.contains("Event1"));
assert!(entries[0].body.contains("{\"key\": \"value\"}"));
Ok(())
}
#[tokio::test]
async fn test_poll_incremental_reads() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
writeln!(f, "[UnityCrossThreadLogger] Event1")?;
f.flush()?;
let entries1 = tailer.poll().await?;
assert!(entries1.is_empty());
writeln!(f, "[Client GRE] Event2")?;
f.flush()?;
let entries2 = tailer.poll().await?;
assert_eq!(entries2.len(), 1);
assert!(entries2[0].body.contains("Event1"));
Ok(())
}
#[tokio::test]
async fn test_poll_handles_partial_lines() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
write!(f, "[UnityCrossThreadLogger] Partial")?;
f.flush()?;
let entries1 = tailer.poll().await?;
assert!(entries1.is_empty());
writeln!(f)?; writeln!(f, "[UnityCrossThreadLogger] Next")?;
f.flush()?;
let entries2 = tailer.poll().await?;
assert_eq!(entries2.len(), 1);
assert!(entries2[0].body.contains("Partial"));
Ok(())
}
#[tokio::test]
async fn test_poll_handles_crlf_line_endings() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
write!(
f,
"[UnityCrossThreadLogger] Event1\r\n\
[UnityCrossThreadLogger] Event2\r\n"
)?;
f.flush()?;
let entries = tailer.poll().await?;
assert_eq!(entries.len(), 1);
assert!(!entries[0].body.contains('\r'));
assert!(entries[0].body.contains("Event1"));
Ok(())
}
#[tokio::test]
async fn test_poll_only_reads_new_bytes() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
writeln!(f, "[UnityCrossThreadLogger] Event1")?;
writeln!(f, "[UnityCrossThreadLogger] Event2")?;
f.flush()?;
let entries1 = tailer.poll().await?;
assert_eq!(entries1.len(), 1);
writeln!(f, "[UnityCrossThreadLogger] Event3")?;
f.flush()?;
let entries2 = tailer.poll().await?;
assert_eq!(entries2.len(), 1);
assert!(entries2[0].body.contains("Event2"));
Ok(())
}
}
mod flush_tests {
use super::*;
#[tokio::test]
async fn test_flush_returns_remaining_entry() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
writeln!(f, "[UnityCrossThreadLogger] Final")?;
f.flush()?;
tailer.poll().await?;
let entries = tailer.flush();
assert_eq!(entries.len(), 1);
assert!(entries[0].body.contains("Final"));
Ok(())
}
#[tokio::test]
async fn test_flush_empty_returns_empty_vec() -> TestResult {
let f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
assert!(tailer.flush().is_empty());
Ok(())
}
#[tokio::test]
async fn test_flush_includes_partial_line() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
writeln!(f, "[UnityCrossThreadLogger] Event")?;
write!(f, "partial continuation")?;
f.flush()?;
tailer.poll().await?;
let entries = tailer.flush();
assert_eq!(entries.len(), 1);
assert!(entries[0].body.contains("Event"));
assert!(entries[0].body.contains("partial continuation"));
Ok(())
}
#[tokio::test]
async fn test_flush_partial_line_is_header_returns_both_entries() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
writeln!(f, "[UnityCrossThreadLogger] First")?;
write!(f, "[Client GRE] Second")?;
f.flush()?;
tailer.poll().await?;
let entries = tailer.flush();
assert_eq!(
entries.len(),
2,
"expected 2 entries, got {}: {entries:?}",
entries.len()
);
assert!(entries[0].body.contains("First"));
assert!(entries[1].body.contains("Second"));
Ok(())
}
}
mod poll_interval {
use super::*;
#[tokio::test]
async fn test_set_poll_interval() -> TestResult {
let f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
tailer.set_poll_interval_ms(100);
assert_eq!(tailer.poll_interval_ms(), 100);
Ok(())
}
#[tokio::test]
async fn test_set_poll_interval_clamps_to_minimum() -> TestResult {
let f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
tailer.set_poll_interval_ms(1);
assert_eq!(tailer.poll_interval_ms(), 10);
Ok(())
}
#[tokio::test]
async fn test_set_poll_interval_zero_clamps_to_minimum() -> TestResult {
let f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
tailer.set_poll_interval_ms(0);
assert_eq!(tailer.poll_interval_ms(), 10);
Ok(())
}
}
mod run_tests {
use super::*;
#[tokio::test]
async fn test_run_sends_entries_to_channel() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
tailer.set_poll_interval_ms(10);
let (entry_tx, mut entry_rx) = tokio::sync::mpsc::channel(16);
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
writeln!(f, "[UnityCrossThreadLogger] Event1")?;
writeln!(f, "[UnityCrossThreadLogger] Event2")?;
f.flush()?;
let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
let entry =
tokio::time::timeout(std::time::Duration::from_secs(2), entry_rx.recv()).await?;
assert!(entry.is_some());
if let Some(e) = entry {
assert!(e.body.contains("Event1"));
}
let _ = shutdown_tx.send(true);
let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
assert!(result.is_ok());
Ok(())
}
#[tokio::test]
async fn test_run_stops_on_shutdown() -> TestResult {
let f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
tailer.set_poll_interval_ms(10);
let (entry_tx, _entry_rx) = tokio::sync::mpsc::channel(16);
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
let _ = shutdown_tx.send(true);
let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
assert!(result.is_ok());
Ok(())
}
#[tokio::test]
async fn test_run_stops_when_receiver_dropped() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
tailer.set_poll_interval_ms(10);
let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(16);
let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
writeln!(f, "[UnityCrossThreadLogger] Event1")?;
writeln!(f, "[UnityCrossThreadLogger] Event2")?;
f.flush()?;
drop(entry_rx);
let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
assert!(result.is_ok());
Ok(())
}
#[tokio::test]
async fn test_run_continuous_data_stream() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
tailer.set_poll_interval_ms(10);
let (entry_tx, mut entry_rx) = tokio::sync::mpsc::channel(64);
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
for i in 0..3 {
writeln!(f, "[UnityCrossThreadLogger] Event{i}")?;
f.flush()?;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
writeln!(f, "[UnityCrossThreadLogger] Final")?;
f.flush()?;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let _ = shutdown_tx.send(true);
let result = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await?;
assert!(result.is_ok());
let mut received = Vec::new();
while let Ok(entry) = entry_rx.try_recv() {
received.push(entry);
}
assert!(
received.len() >= 2,
"expected at least 2 entries, got {}",
received.len()
);
Ok(())
}
}
mod rotation_tests {
use super::*;
fn replace_file_at_path(path: &Path, content: &str) -> Result<(), std::io::Error> {
std::fs::write(path, content.as_bytes())
}
#[tokio::test]
async fn test_rotation_detected_when_file_shrinks() -> TestResult {
let f = temp_log(
"[UnityCrossThreadLogger] Event1\n\
[UnityCrossThreadLogger] Event2\n",
)?;
let path = f.path().to_path_buf();
let mut tailer = FileTailer::open_from_start(&path).await?;
let _entries = tailer.run_once().await?;
assert!(tailer.offset() > 0);
replace_file_at_path(&path, "[UnityCrossThreadLogger] NewSession\n")?;
let mut tailer = FileTailer::open(&path).await?;
tailer.offset = 10_000;
let entries = tailer.poll().await?;
let rotation = tailer.take_rotation();
assert!(
rotation.is_some(),
"rotation should be detected when file shrinks"
);
if let Some(info) = rotation {
assert_eq!(info.previous_file_size(), 10_000);
}
assert!(tailer.offset() > 0, "should have read from new file");
assert!(tailer.take_rotation().is_none());
drop(entries);
Ok(())
}
#[tokio::test]
async fn test_rotation_resets_offset_to_zero_before_reading() -> TestResult {
let f = temp_log(
"[UnityCrossThreadLogger] OldEvent\n\
[UnityCrossThreadLogger] OldEvent2\n",
)?;
let path = f.path().to_path_buf();
let mut tailer = FileTailer::open(&path).await?;
tailer.offset = 50_000;
replace_file_at_path(
&path,
"[UnityCrossThreadLogger] NewA\n\
[UnityCrossThreadLogger] NewB\n",
)?;
let entries = tailer.poll().await?;
assert!(tailer.take_rotation().is_some());
assert_eq!(entries.len(), 1);
assert!(entries[0].body.contains("NewA"));
Ok(())
}
#[tokio::test]
async fn test_rotation_clears_partial_line_and_line_buffer() -> TestResult {
let f = temp_log("")?;
let path = f.path().to_path_buf();
let mut tailer = FileTailer::open(&path).await?;
std::fs::write(&path, "[UnityCrossThreadLogger] Partial")?;
tailer.poll().await?;
assert!(
!tailer.partial_line.is_empty(),
"partial_line should hold the incomplete line"
);
tailer.offset = 50_000;
replace_file_at_path(
&path,
"[UnityCrossThreadLogger] Fresh\n\
[UnityCrossThreadLogger] Fresh2\n",
)?;
let entries = tailer.poll().await?;
assert!(tailer.take_rotation().is_some());
assert_eq!(entries.len(), 1);
assert!(entries[0].body.contains("Fresh"));
assert!(!entries[0].body.contains("Partial"));
Ok(())
}
#[tokio::test]
async fn test_no_false_positive_on_normal_growth() -> TestResult {
let mut f = temp_log("")?;
let mut tailer = FileTailer::open(f.path()).await?;
writeln!(f, "[UnityCrossThreadLogger] Event1")?;
f.flush()?;
tailer.poll().await?;
assert!(
tailer.take_rotation().is_none(),
"no rotation on normal append"
);
writeln!(f, "[UnityCrossThreadLogger] Event2")?;
f.flush()?;
tailer.poll().await?;
assert!(
tailer.take_rotation().is_none(),
"no rotation on continued growth"
);
Ok(())
}
#[tokio::test]
async fn test_no_rotation_when_offset_is_zero() -> TestResult {
let f = temp_log("")?;
let tailer = FileTailer::open_from_start(f.path()).await?;
assert_eq!(tailer.offset(), 0);
Ok(())
}
#[tokio::test]
async fn test_rotation_emits_correct_previous_file_size() -> TestResult {
let f = temp_log("x".repeat(5000).as_str())?;
let path = f.path().to_path_buf();
let mut tailer = FileTailer::open(&path).await?;
assert_eq!(tailer.offset(), 5000);
replace_file_at_path(&path, "small\n")?;
tailer.poll().await?;
let rotation = tailer.take_rotation();
assert!(rotation.is_some());
if let Some(info) = rotation {
assert_eq!(info.previous_file_size(), 5000);
}
Ok(())
}
#[tokio::test]
async fn test_rotation_info_has_timestamp() -> TestResult {
let f = temp_log("x".repeat(1000).as_str())?;
let path = f.path().to_path_buf();
let mut tailer = FileTailer::open(&path).await?;
replace_file_at_path(&path, "y\n")?;
tailer.poll().await?;
let rotation = tailer.take_rotation();
assert!(rotation.is_some());
if let Some(info) = rotation {
let local_as_utc = Local::now().naive_local().and_utc();
let elapsed = local_as_utc - info.detected_at();
assert!(
elapsed.num_seconds() < 5,
"detected_at should be recent, got {elapsed}"
);
}
Ok(())
}
#[tokio::test]
async fn test_take_rotation_returns_none_after_first_call() -> TestResult {
let f = temp_log("x".repeat(1000).as_str())?;
let path = f.path().to_path_buf();
let mut tailer = FileTailer::open(&path).await?;
replace_file_at_path(&path, "y\n")?;
tailer.poll().await?;
assert!(tailer.take_rotation().is_some());
assert!(
tailer.take_rotation().is_none(),
"second take_rotation should return None"
);
Ok(())
}
}
mod debug_impl {
use super::*;
#[tokio::test]
async fn test_debug_does_not_expose_file_handle() -> TestResult {
let f = temp_log("")?;
let tailer = FileTailer::open(f.path()).await?;
let debug = format!("{tailer:?}");
assert!(debug.contains("FileTailer"));
assert!(debug.contains("offset"));
assert!(!debug.contains("read_buf"));
Ok(())
}
}
mod error_tests {
use super::*;
#[test]
fn test_tailer_error_display_includes_path() {
let err = TailerError::Io {
path: PathBuf::from("/test/Player.log"),
source: std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"),
};
let msg = err.to_string();
assert!(msg.contains("/test/Player.log"));
assert!(msg.contains("file not found"));
}
#[test]
fn test_tailer_error_is_debug() {
let err = TailerError::Io {
path: PathBuf::from("/test"),
source: std::io::Error::other("test"),
};
let debug = format!("{err:?}");
assert!(debug.contains("Io"));
}
}
}