use memchr::{memchr_iter, memchr3_iter};
use memmap2::Mmap;
#[cfg(unix)]
use memmap2::{Advice, UncheckedAdvice};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{fs::File, io};
use tokio::{
io::{AsyncReadExt, AsyncSeekExt},
spawn,
sync::{oneshot, watch},
task::spawn_blocking,
};
pub type VisibilityPredicate = Box<dyn Fn(&[u8]) -> bool + Send + Sync>;
pub struct FileLoadResult {
pub reader: FileReader,
pub precomputed_visible: Option<Vec<usize>>,
}
pub struct FileLoadHandle {
pub progress_rx: watch::Receiver<f64>,
pub result_rx: oneshot::Receiver<io::Result<FileLoadResult>>,
pub total_bytes: u64,
}
#[derive(Clone)]
enum Storage {
Mmap(std::sync::Arc<Mmap>),
Bytes(std::sync::Arc<Vec<u8>>),
}
impl Storage {
fn as_bytes(&self) -> &[u8] {
match self {
Storage::Mmap(m) => m.as_ref(),
Storage::Bytes(v) => v.as_slice(),
}
}
}
#[derive(Clone)]
pub struct FileReader {
storage: Storage,
line_starts: std::sync::Arc<Vec<usize>>,
}
impl FileReader {
pub fn new(path: &str) -> io::Result<Self> {
let file = File::open(path)?;
let scan_mmap = unsafe { Mmap::map(&file)? };
let len = scan_mmap.len();
#[cfg(unix)]
let _ = scan_mmap.advise(Advice::Sequential);
let mut starts = vec![0usize];
let mut has_ansi = false;
for pos in memchr3_iter(b'\n', b'\x1b', b'\r', &scan_mmap) {
if scan_mmap[pos] == b'\n' {
let next = pos + 1;
if next <= len {
starts.push(next);
}
} else {
has_ansi = true;
break;
}
}
if has_ansi {
let (stripped, line_starts) = strip_ansi_and_index(&scan_mmap);
return Ok(FileReader {
storage: Storage::Bytes(std::sync::Arc::new(stripped)),
line_starts: std::sync::Arc::new(line_starts),
});
}
drop(scan_mmap);
let access_mmap = unsafe { Mmap::map(&file)? };
#[cfg(unix)]
let _ = access_mmap.advise(Advice::Random);
Ok(FileReader {
storage: Storage::Mmap(std::sync::Arc::new(access_mmap)),
line_starts: std::sync::Arc::new(starts),
})
}
pub fn from_bytes(data: Vec<u8>) -> Self {
let mut starts = vec![0usize];
let mut has_ansi = false;
for pos in memchr3_iter(b'\n', b'\x1b', b'\r', &data) {
if data[pos] == b'\n' {
let next = pos + 1;
if next <= data.len() {
starts.push(next);
}
} else {
has_ansi = true;
break;
}
}
if has_ansi {
let (stripped, line_starts) = strip_ansi_and_index(&data);
return FileReader {
storage: Storage::Bytes(std::sync::Arc::new(stripped)),
line_starts: std::sync::Arc::new(line_starts),
};
}
FileReader {
storage: Storage::Bytes(std::sync::Arc::new(data)),
line_starts: std::sync::Arc::new(starts),
}
}
pub async fn from_file_tail(path: &str, preview_bytes: u64) -> io::Result<Self> {
let mut file = tokio::fs::File::open(path).await?;
let total_len = file.metadata().await?.len();
let offset = total_len.saturating_sub(preview_bytes);
file.seek(io::SeekFrom::Start(offset)).await?;
let read_len = (total_len - offset) as usize;
let mut buf = vec![0u8; read_len];
file.read_exact(&mut buf).await?;
let start = if offset > 0 {
buf.iter()
.position(|&b| b == b'\n')
.map(|p| p + 1)
.unwrap_or(buf.len())
} else {
0
};
Ok(Self::from_bytes(buf[start..].to_vec()))
}
pub async fn from_file_head(path: &str, preview_bytes: u64) -> io::Result<Self> {
let mut file = tokio::fs::File::open(path).await?;
let total_len = file.metadata().await?.len();
let read_len = total_len.min(preview_bytes) as usize;
let mut buf = vec![0u8; read_len];
file.read_exact(&mut buf).await?;
if let Some(last_nl) = buf.iter().rposition(|&b| b == b'\n') {
buf.truncate(last_nl + 1);
} else {
buf.clear();
}
Ok(Self::from_bytes(buf))
}
pub async fn stream_stdin() -> watch::Receiver<Vec<u8>> {
let (snapshot_tx, snapshot_rx) = watch::channel(Vec::<u8>::new());
spawn(async move {
use std::time::Duration;
use tokio::io::AsyncReadExt;
let mut stdin = tokio::io::stdin();
let mut accumulated: Vec<u8> = Vec::new();
let mut partial: Vec<u8> = Vec::new();
let mut buf = vec![0u8; 4096];
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
interval.tick().await;
loop {
tokio::select! {
result = stdin.read(&mut buf) => {
match result {
Ok(0) | Err(_) => {
accumulated.extend_from_slice(&partial);
let _ = snapshot_tx.send(accumulated);
return;
}
Ok(n) => partial.extend_from_slice(&buf[..n]),
}
}
_ = interval.tick() => {
if let Some(last_nl) = partial.iter().rposition(|&b| b == b'\n') {
accumulated.extend_from_slice(&partial[..=last_nl]);
partial.drain(..=last_nl);
let _ = snapshot_tx.send(accumulated.clone());
}
}
}
}
});
snapshot_rx
}
pub async fn load(
path: String,
predicate: Option<VisibilityPredicate>,
tail: bool,
cancel: Arc<AtomicBool>,
) -> io::Result<FileLoadHandle> {
let total_bytes = std::fs::metadata(&path)?.len();
let (progress_tx, progress_rx) = watch::channel(0.0_f64);
let (result_tx, result_rx) = oneshot::channel();
spawn_blocking(move || {
let result =
Self::index_chunked(&path, total_bytes, progress_tx, predicate, tail, &cancel);
let _ = result_tx.send(result);
});
Ok(FileLoadHandle {
progress_rx,
result_rx,
total_bytes,
})
}
fn index_chunked(
path: &str,
_total_bytes: u64,
progress_tx: watch::Sender<f64>,
predicate: Option<VisibilityPredicate>,
tail: bool,
cancel: &AtomicBool,
) -> io::Result<FileLoadResult> {
use rayon::prelude::*;
use std::sync::atomic::AtomicUsize;
let file = File::open(path)?;
let scan_mmap = unsafe { Mmap::map(&file)? };
let len = scan_mmap.len();
let num_threads = rayon::current_num_threads().max(1);
let chunk_size = len.div_ceil(num_threads).max(4 * 1024 * 1024);
let bytes_done = AtomicUsize::new(0);
let chunk_results: Vec<(bool, Vec<usize>)> = scan_mmap
.par_chunks(chunk_size)
.enumerate()
.map(|(chunk_idx, chunk)| {
if cancel.load(Ordering::Relaxed) {
return (false, vec![]);
}
let chunk_start = chunk_idx * chunk_size;
let mut has_ansi = false;
let mut local_starts: Vec<usize> = Vec::new();
for pos in memchr3_iter(b'\n', b'\x1b', b'\r', chunk) {
match chunk[pos] {
b'\n' => {
let next = chunk_start + pos + 1;
if next <= len {
local_starts.push(next);
}
}
_ => {
has_ansi = true;
break;
}
}
}
let done = bytes_done.fetch_add(chunk.len(), Ordering::Relaxed) + chunk.len();
if len > 0 {
let _ = progress_tx.send(done as f64 / len as f64);
}
(has_ansi, local_starts)
})
.collect();
if cancel.load(Ordering::Relaxed) {
return Err(io::Error::new(io::ErrorKind::Interrupted, "load cancelled"));
}
let has_ansi = chunk_results.iter().any(|(a, _)| *a);
let reader = if has_ansi {
let (stripped, line_starts) = strip_ansi_and_index(&scan_mmap);
drop(scan_mmap);
let _ = progress_tx.send(1.0);
FileReader {
storage: Storage::Bytes(std::sync::Arc::new(stripped)),
line_starts: std::sync::Arc::new(line_starts),
}
} else {
let total_starts: usize = chunk_results.iter().map(|(_, v)| v.len()).sum();
let mut starts = Vec::with_capacity(1 + total_starts);
starts.push(0usize); for (_, local) in chunk_results {
starts.extend(local);
}
drop(scan_mmap);
let access_mmap = unsafe { Mmap::map(&file)? };
#[cfg(unix)]
let _ = access_mmap.advise(Advice::Random);
FileReader {
storage: Storage::Mmap(std::sync::Arc::new(access_mmap)),
line_starts: std::sync::Arc::new(starts),
}
};
let precomputed_visible = predicate.map(|pred| {
let count = reader.line_count();
if tail {
let mut visible: Vec<usize> = (0..count)
.rev()
.filter(|&i| pred(reader.get_line(i)))
.collect();
visible.reverse();
visible
} else {
use rayon::prelude::*;
(0..count)
.into_par_iter()
.filter(|&i| pred(reader.get_line(i)))
.collect()
}
});
if precomputed_visible.is_some()
&& let Storage::Mmap(ref m) = reader.storage
{
#[cfg(unix)]
let _ = unsafe { (**m).unchecked_advise(UncheckedAdvice::DontNeed) };
}
Ok(FileLoadResult {
reader,
precomputed_visible,
})
}
pub fn line_count(&self) -> usize {
let data = self.storage.as_bytes();
if data.is_empty() {
return 0;
}
let n = self.line_starts.len();
if n > 0 && self.line_starts[n - 1] == data.len() {
n - 1
} else {
n
}
}
pub fn get_line(&self, idx: usize) -> &[u8] {
let data = self.storage.as_bytes();
let start = self.line_starts[idx];
let end = if idx + 1 < self.line_starts.len() {
let next = self.line_starts[idx + 1];
if next > 0 && data.get(next - 1) == Some(&b'\n') {
next - 1
} else {
next
}
} else {
data.len()
};
&data[start..end]
}
#[cfg(unix)]
pub fn advise_viewport(&self, first_line: usize, last_line: usize) {
if let Storage::Mmap(ref m) = self.storage {
let len = m.len();
let start = self.line_starts.get(first_line).copied().unwrap_or(0);
let end = self
.line_starts
.get(last_line + 1)
.copied()
.unwrap_or(len)
.min(len);
if end > start {
let _ = m.advise_range(Advice::WillNeed, start, end - start);
}
}
}
pub fn iter(&self) -> impl Iterator<Item = (usize, &[u8])> {
(0..self.line_count()).map(move |i| (i, self.get_line(i)))
}
pub fn append_bytes(&mut self, new_data: &[u8]) {
if new_data.is_empty() {
return;
}
let old_storage = std::mem::replace(
&mut self.storage,
Storage::Bytes(std::sync::Arc::new(Vec::new())),
);
let mut data: Vec<u8> = match old_storage {
Storage::Bytes(v) => std::sync::Arc::try_unwrap(v).unwrap_or_else(|arc| (*arc).clone()),
Storage::Mmap(m) => m.to_vec(),
};
let offset = data.len();
data.extend_from_slice(new_data);
let starts = std::sync::Arc::make_mut(&mut self.line_starts);
for pos in memchr_iter(b'\n', &data[offset..]) {
let abs = offset + pos + 1;
if abs <= data.len() {
starts.push(abs);
}
}
self.storage = Storage::Bytes(std::sync::Arc::new(data));
}
pub async fn spawn_process_stream(
program: &str,
args: &[&str],
) -> io::Result<watch::Receiver<Vec<u8>>> {
use tokio::process::Command;
let mut child = Command::new(program)
.args(args)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.stdin(std::process::Stdio::null())
.spawn()?;
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let (tx, rx) = watch::channel(Vec::<u8>::new());
let (line_tx, mut line_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(64);
if let Some(mut out) = stdout {
let sender = line_tx.clone();
spawn(async move {
use tokio::io::AsyncReadExt;
let mut buf = vec![0u8; 4096];
loop {
match out.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => {
if sender.send(buf[..n].to_vec()).await.is_err() {
break;
}
}
}
}
});
}
if let Some(mut err) = stderr {
let sender = line_tx.clone();
spawn(async move {
use tokio::io::AsyncReadExt;
let mut buf = vec![0u8; 4096];
loop {
match err.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => {
if sender.send(buf[..n].to_vec()).await.is_err() {
break;
}
}
}
}
});
}
drop(line_tx);
spawn(async move {
use std::time::Duration;
let mut accumulated: Vec<u8> = Vec::new();
let mut partial: Vec<u8> = Vec::new();
let mut interval = tokio::time::interval(Duration::from_millis(500));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
interval.tick().await;
loop {
tokio::select! {
chunk = line_rx.recv() => {
match chunk {
Some(data) => partial.extend_from_slice(&data),
None => {
accumulated.extend_from_slice(&strip_ansi_escapes(&partial));
let _ = tx.send(accumulated);
return;
}
}
}
_ = interval.tick() => {
if let Some(last_nl) = partial.iter().rposition(|&b| b == b'\n') {
let chunk = &partial[..=last_nl];
accumulated.extend_from_slice(&strip_ansi_escapes(chunk));
partial.drain(..=last_nl);
let _ = tx.send(accumulated.clone());
}
}
}
}
});
Ok(rx)
}
pub async fn spawn_file_watcher(path: String, initial_offset: u64) -> watch::Receiver<Vec<u8>> {
let (tx, rx) = watch::channel(Vec::<u8>::new());
tokio::spawn(async move {
use tokio::time::MissedTickBehavior;
let mut last_offset = initial_offset;
let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
interval.tick().await;
loop {
interval.tick().await;
let path_clone = path.clone();
let result = tokio::task::spawn_blocking(move || -> io::Result<(u64, Vec<u8>)> {
use std::io::{Read, Seek};
let current_size = std::fs::metadata(&path_clone)?.len();
if current_size <= last_offset {
return Ok((current_size, vec![]));
}
let mut file = File::open(&path_clone)?;
file.seek(std::io::SeekFrom::Start(last_offset))?;
let new_len = (current_size - last_offset) as usize;
let mut buf = vec![0u8; new_len];
file.read_exact(&mut buf)?;
Ok((current_size, buf))
})
.await;
if let Ok(Ok((new_size, buf))) = result {
if new_size < last_offset {
last_offset = new_size;
} else if !buf.is_empty() {
last_offset = new_size;
let stripped = strip_ansi_escapes(&buf);
if tx.send(stripped).is_err() {
break; }
}
}
}
});
rx
}
}
fn strip_ansi_escapes(input: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(input.len());
let mut i = 0;
while i < input.len() {
match input[i] {
b'\x1b' => {
i += 1;
if i >= input.len() {
break;
}
match input[i] {
b'[' => {
i += 1;
while i < input.len() {
let b = input[i];
i += 1;
if (0x40..=0x7E).contains(&b) {
break;
}
}
}
b']' => {
i += 1;
while i < input.len() {
let b = input[i];
i += 1;
if b == b'\x07' {
break;
}
if b == b'\x1b' && i < input.len() && input[i] == b'\\' {
i += 1;
break;
}
}
}
_ => {
i += 1;
} }
}
b'\r' => {
i += 1;
} b => {
out.push(b);
i += 1;
}
}
}
out
}
fn strip_ansi_and_index(input: &[u8]) -> (Vec<u8>, Vec<usize>) {
let mut out = Vec::with_capacity(input.len());
let mut starts = vec![0usize];
let mut i = 0;
while i < input.len() {
match input[i] {
b'\x1b' => {
i += 1;
if i >= input.len() {
break;
}
match input[i] {
b'[' => {
i += 1;
while i < input.len() {
let b = input[i];
i += 1;
if (0x40..=0x7E).contains(&b) {
break;
}
}
}
b']' => {
i += 1;
while i < input.len() {
let b = input[i];
i += 1;
if b == b'\x07' {
break;
}
if b == b'\x1b' && i < input.len() && input[i] == b'\\' {
i += 1;
break;
}
}
}
_ => {
i += 1; }
}
}
b'\r' => {
i += 1; }
b'\n' => {
out.push(b'\n');
i += 1;
starts.push(out.len()); }
b => {
out.push(b);
i += 1;
}
}
}
(out, starts)
}
#[cfg(test)]
fn compute_line_starts(data: &[u8]) -> Vec<usize> {
let mut starts = vec![0usize];
for pos in memchr_iter(b'\n', data) {
if pos < data.len() {
starts.push(pos + 1);
}
}
starts
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
fn make(content: &[u8]) -> FileReader {
FileReader::from_bytes(content.to_vec())
}
fn make_tmp(lines: &[&str]) -> NamedTempFile {
let mut f = NamedTempFile::new().unwrap();
for line in lines {
writeln!(f, "{}", line).unwrap();
}
f
}
#[tokio::test]
async fn test_load_no_predicate_no_precomputed_visible() {
let f = make_tmp(&["line1", "line2"]);
let path = f.path().to_str().unwrap().to_string();
let handle = FileReader::load(path, None, false, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
assert!(result.precomputed_visible.is_none());
assert_eq!(result.reader.line_count(), 2);
}
#[tokio::test]
async fn test_load_predicate_forward_filters_correctly() {
let f = make_tmp(&["ERROR: bad", "INFO: ok", "ERROR: also bad"]);
let path = f.path().to_str().unwrap().to_string();
let pred: Box<dyn Fn(&[u8]) -> bool + Send + Sync> =
Box::new(|line: &[u8]| line.starts_with(b"ERROR"));
let handle = FileReader::load(path, Some(pred), false, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
assert_eq!(result.precomputed_visible, Some(vec![0, 2]));
}
#[tokio::test]
async fn test_load_predicate_tail_result_is_ascending() {
let f = make_tmp(&["ERROR: first", "INFO: skip", "ERROR: last"]);
let path = f.path().to_str().unwrap().to_string();
let pred: Box<dyn Fn(&[u8]) -> bool + Send + Sync> =
Box::new(|line: &[u8]| line.starts_with(b"ERROR"));
let handle = FileReader::load(path, Some(pred), true, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
let visible = result.precomputed_visible.unwrap();
assert_eq!(visible, vec![0, 2]);
assert!(visible.windows(2).all(|w| w[0] < w[1]));
}
#[tokio::test]
async fn test_load_predicate_ansi_file_indices_correct() {
let mut f = NamedTempFile::new().unwrap();
writeln!(f, "\x1b[32mERROR\x1b[0m: red").unwrap(); writeln!(f, "\x1b[32mINFO\x1b[0m: green").unwrap(); writeln!(f, "\x1b[31mERROR\x1b[0m: also red").unwrap(); let path = f.path().to_str().unwrap().to_string();
let pred: Box<dyn Fn(&[u8]) -> bool + Send + Sync> =
Box::new(|line: &[u8]| line.starts_with(b"ERROR"));
let handle = FileReader::load(path, Some(pred), false, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
assert_eq!(result.precomputed_visible, Some(vec![0, 2]));
assert_eq!(result.reader.get_line(0), b"ERROR: red");
assert_eq!(result.reader.get_line(2), b"ERROR: also red");
}
#[tokio::test]
async fn test_load_predicate_tail_all_match() {
let f = make_tmp(&["a", "b", "c"]);
let path = f.path().to_str().unwrap().to_string();
let pred: Box<dyn Fn(&[u8]) -> bool + Send + Sync> = Box::new(|_| true);
let handle = FileReader::load(path, Some(pred), true, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
assert_eq!(result.precomputed_visible, Some(vec![0, 1, 2]));
}
#[tokio::test]
async fn test_load_predicate_none_match() {
let f = make_tmp(&["INFO: ok", "DEBUG: verbose"]);
let path = f.path().to_str().unwrap().to_string();
let pred: Box<dyn Fn(&[u8]) -> bool + Send + Sync> =
Box::new(|line: &[u8]| line.starts_with(b"ERROR"));
let handle = FileReader::load(path, Some(pred), false, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
assert_eq!(result.precomputed_visible, Some(vec![]));
}
#[test]
fn test_empty_file() {
let r = make(b"");
assert_eq!(r.line_count(), 0);
}
#[test]
fn test_single_line_no_newline() {
let r = make(b"hello");
assert_eq!(r.line_count(), 1);
assert_eq!(r.get_line(0), b"hello");
}
#[test]
fn test_single_line_with_newline() {
let r = make(b"hello\n");
assert_eq!(r.line_count(), 1);
assert_eq!(r.get_line(0), b"hello");
}
#[test]
fn test_multiple_lines() {
let r = make(b"line1\nline2\nline3\n");
assert_eq!(r.line_count(), 3);
assert_eq!(r.get_line(0), b"line1");
assert_eq!(r.get_line(1), b"line2");
assert_eq!(r.get_line(2), b"line3");
}
#[test]
fn test_multiple_lines_no_trailing_newline() {
let r = make(b"line1\nline2\nline3");
assert_eq!(r.line_count(), 3);
assert_eq!(r.get_line(0), b"line1");
assert_eq!(r.get_line(1), b"line2");
assert_eq!(r.get_line(2), b"line3");
}
#[test]
fn test_iter() {
let r = make(b"a\nb\nc\n");
let collected: Vec<(usize, &[u8])> = r.iter().collect();
assert_eq!(collected.len(), 3);
assert_eq!(collected[0], (0, b"a".as_ref()));
assert_eq!(collected[1], (1, b"b".as_ref()));
assert_eq!(collected[2], (2, b"c".as_ref()));
}
#[test]
fn test_file_reader_from_path() {
let mut f = NamedTempFile::new().unwrap();
writeln!(f, "[2024-07-24T10:00:00Z] INFO myhost: line 1").unwrap();
writeln!(f, "[2024-07-24T10:01:00Z] DEBUG myhost: line 2").unwrap();
let path = f.path().to_str().unwrap();
let reader = FileReader::new(path).unwrap();
assert_eq!(reader.line_count(), 2);
let l0 = std::str::from_utf8(reader.get_line(0)).unwrap();
assert!(l0.contains("INFO"));
let l1 = std::str::from_utf8(reader.get_line(1)).unwrap();
assert!(l1.contains("DEBUG"));
}
#[test]
fn test_empty_lines_in_content() {
let r = make(b"first\n\nthird\n");
assert_eq!(r.line_count(), 3);
assert_eq!(r.get_line(0), b"first");
assert_eq!(r.get_line(1), b"");
assert_eq!(r.get_line(2), b"third");
}
#[test]
fn test_strip_ansi_csi_color_codes() {
let r = make(b"\x1b[32m INFO\x1b[0m message\n");
assert_eq!(r.line_count(), 1);
assert_eq!(r.get_line(0), b" INFO message");
}
#[test]
fn test_strip_carriage_return() {
let r = make(b"line1\r\nline2\r\n");
assert_eq!(r.line_count(), 2);
assert_eq!(r.get_line(0), b"line1");
assert_eq!(r.get_line(1), b"line2");
}
#[test]
fn test_strip_ansi_real_log_line() {
let input = b"\x1b[2m2026-02-20T15:06:28Z\x1b[0m \x1b[32m INFO\x1b[0m \x1b[2mtodo_app\x1b[0m: message\n";
let r = make(input);
assert_eq!(r.line_count(), 1);
assert_eq!(
r.get_line(0),
b"2026-02-20T15:06:28Z INFO todo_app: message"
);
}
#[test]
fn test_no_ansi_unchanged() {
let r = make(b"plain log line\n");
assert_eq!(r.line_count(), 1);
assert_eq!(r.get_line(0), b"plain log line");
}
#[test]
fn test_append_bytes_basic() {
let mut r = make(b"line1\nline2\n");
assert_eq!(r.line_count(), 2);
r.append_bytes(b"line3\nline4\n");
assert_eq!(r.line_count(), 4);
assert_eq!(r.get_line(2), b"line3");
assert_eq!(r.get_line(3), b"line4");
}
#[test]
fn test_append_bytes_extends_partial_last_line() {
let mut r = make(b"partial");
assert_eq!(r.line_count(), 1);
assert_eq!(r.get_line(0), b"partial");
r.append_bytes(b"ly done\nnext\n");
assert_eq!(r.line_count(), 2);
assert_eq!(r.get_line(0), b"partially done");
assert_eq!(r.get_line(1), b"next");
}
#[test]
fn test_append_bytes_empty_is_noop() {
let mut r = make(b"line1\n");
r.append_bytes(b"");
assert_eq!(r.line_count(), 1);
}
#[test]
fn test_strip_osc_terminated_by_bel() {
let input = b"\x1b]0;my title\x07rest of line\n";
let r = make(input);
assert_eq!(r.line_count(), 1);
assert_eq!(r.get_line(0), b"rest of line");
}
#[test]
fn test_strip_osc_terminated_by_st() {
let input = b"\x1b]0;my title\x1b\\rest\n";
let r = make(input);
assert_eq!(r.line_count(), 1);
assert_eq!(r.get_line(0), b"rest");
}
#[test]
fn test_strip_osc_mixed_with_csi() {
let input = b"\x1b]0;title\x07\x1b[32mGREEN\x1b[0m\n";
let r = make(input);
assert_eq!(r.get_line(0), b"GREEN");
}
#[test]
fn test_strip_two_byte_esc_sequence() {
let input = b"before\x1bMafter\n";
let r = make(input);
assert_eq!(r.get_line(0), b"beforeafter");
}
#[test]
fn test_strip_multiple_two_byte_esc() {
let input = b"\x1b=\x1b>hello\n";
let r = make(input);
assert_eq!(r.get_line(0), b"hello");
}
#[test]
fn test_strip_esc_at_end_of_input() {
let out = strip_ansi_escapes(b"hello\x1b");
assert_eq!(out, b"hello");
}
#[test]
fn test_strip_truncated_csi() {
let out = strip_ansi_escapes(b"hi\x1b[31");
assert_eq!(out, b"hi");
}
#[test]
fn test_strip_empty_input() {
let out = strip_ansi_escapes(b"");
assert!(out.is_empty());
}
#[test]
fn test_strip_only_escapes() {
let out = strip_ansi_escapes(b"\x1b[32m\x1b[0m\r");
assert!(out.is_empty());
}
#[test]
fn test_strip_complex_csi_with_params() {
let input = b"\x1b[38;5;196mred text\x1b[0m\n";
let r = make(input);
assert_eq!(r.get_line(0), b"red text");
}
#[test]
fn test_strip_cr_only_lines() {
let out = strip_ansi_escapes(b"hello\rworld");
assert_eq!(out, b"helloworld");
}
#[test]
fn test_only_newlines() {
let r = make(b"\n\n\n");
assert_eq!(r.line_count(), 3);
assert_eq!(r.get_line(0), b"");
assert_eq!(r.get_line(1), b"");
assert_eq!(r.get_line(2), b"");
}
#[test]
fn test_single_newline() {
let r = make(b"\n");
assert_eq!(r.line_count(), 1);
assert_eq!(r.get_line(0), b"");
}
#[test]
fn test_large_number_of_lines() {
let mut data = Vec::new();
for i in 0..10_000 {
data.extend_from_slice(format!("line {i}\n").as_bytes());
}
let r = make(&data);
assert_eq!(r.line_count(), 10_000);
assert_eq!(r.get_line(0), b"line 0");
assert_eq!(r.get_line(9_999), b"line 9999");
}
#[test]
fn test_long_single_line() {
let line = vec![b'x'; 100_000];
let r = make(&line);
assert_eq!(r.line_count(), 1);
assert_eq!(r.get_line(0).len(), 100_000);
}
#[test]
fn test_binary_content_no_newlines() {
let data: Vec<u8> = (0..=255).collect();
let r = make(&data);
assert!(r.line_count() >= 1);
}
#[test]
fn test_append_bytes_multiple_times() {
let mut r = make(b"a\n");
r.append_bytes(b"b\n");
r.append_bytes(b"c\n");
r.append_bytes(b"d\n");
assert_eq!(r.line_count(), 4);
assert_eq!(r.get_line(0), b"a");
assert_eq!(r.get_line(1), b"b");
assert_eq!(r.get_line(2), b"c");
assert_eq!(r.get_line(3), b"d");
}
#[test]
fn test_append_bytes_no_newline_then_newline() {
let mut r = make(b"start");
assert_eq!(r.line_count(), 1);
assert_eq!(r.get_line(0), b"start");
r.append_bytes(b" middle");
assert_eq!(r.line_count(), 1);
assert_eq!(r.get_line(0), b"start middle");
r.append_bytes(b" end\nnew\n");
assert_eq!(r.line_count(), 2);
assert_eq!(r.get_line(0), b"start middle end");
assert_eq!(r.get_line(1), b"new");
}
#[test]
fn test_append_to_empty() {
let mut r = make(b"");
assert_eq!(r.line_count(), 0);
r.append_bytes(b"hello\n");
assert_eq!(r.line_count(), 1);
assert_eq!(r.get_line(0), b"hello");
}
#[test]
fn test_file_reader_from_path_with_ansi() {
let mut f = NamedTempFile::new().unwrap();
f.write_all(b"\x1b[32mgreen\x1b[0m\nplain\n").unwrap();
let path = f.path().to_str().unwrap();
let reader = FileReader::new(path).unwrap();
assert_eq!(reader.line_count(), 2);
assert_eq!(reader.get_line(0), b"green");
assert_eq!(reader.get_line(1), b"plain");
}
#[test]
fn test_file_reader_from_path_with_crlf() {
let mut f = NamedTempFile::new().unwrap();
f.write_all(b"line1\r\nline2\r\n").unwrap();
let path = f.path().to_str().unwrap();
let reader = FileReader::new(path).unwrap();
assert_eq!(reader.line_count(), 2);
assert_eq!(reader.get_line(0), b"line1");
assert_eq!(reader.get_line(1), b"line2");
}
#[test]
fn test_file_reader_nonexistent_path() {
let result = FileReader::new("/tmp/nonexistent_logana_test_file.log");
assert!(result.is_err());
}
#[test]
fn test_compute_line_starts_empty() {
let starts = compute_line_starts(b"");
assert_eq!(starts, vec![0]);
}
#[test]
fn test_compute_line_starts_no_newline() {
let starts = compute_line_starts(b"hello");
assert_eq!(starts, vec![0]);
}
#[test]
fn test_compute_line_starts_one_newline() {
let starts = compute_line_starts(b"hello\n");
assert_eq!(starts, vec![0, 6]);
}
#[test]
fn test_compute_line_starts_multiple() {
let starts = compute_line_starts(b"ab\ncd\nef\n");
assert_eq!(starts, vec![0, 3, 6, 9]);
}
#[test]
fn test_compute_line_starts_consecutive_newlines() {
let starts = compute_line_starts(b"\n\n\n");
assert_eq!(starts, vec![0, 1, 2, 3]);
}
#[tokio::test]
async fn test_load_basic() {
let mut f = NamedTempFile::new().unwrap();
writeln!(f, "line 1").unwrap();
writeln!(f, "line 2").unwrap();
writeln!(f, "line 3").unwrap();
let path = f.path().to_str().unwrap().to_string();
let handle = FileReader::load(path, None, false, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
assert!(handle.total_bytes > 0);
let result = handle.result_rx.await.unwrap().unwrap();
assert_eq!(result.reader.line_count(), 3);
assert_eq!(result.reader.get_line(0), b"line 1");
}
#[tokio::test]
async fn test_load_progress_reaches_one() {
let mut f = NamedTempFile::new().unwrap();
for i in 0..100 {
writeln!(f, "line {i}").unwrap();
}
let path = f.path().to_str().unwrap().to_string();
let handle = FileReader::load(path, None, false, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
assert_eq!(result.reader.line_count(), 100);
let progress = *handle.progress_rx.borrow();
assert!((progress - 1.0).abs() < f64::EPSILON);
}
#[tokio::test]
async fn test_load_with_ansi() {
let mut f = NamedTempFile::new().unwrap();
f.write_all(b"\x1b[31mred\x1b[0m\nplain\n").unwrap();
let path = f.path().to_str().unwrap().to_string();
let handle = FileReader::load(path, None, false, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
assert_eq!(result.reader.line_count(), 2);
assert_eq!(result.reader.get_line(0), b"red");
assert_eq!(result.reader.get_line(1), b"plain");
}
#[tokio::test]
async fn test_load_nonexistent() {
let result = FileReader::load(
"/tmp/nonexistent_logana_load_test.log".to_string(),
None,
false,
Arc::new(AtomicBool::new(false)),
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_load_empty_file() {
let f = NamedTempFile::new().unwrap();
let path = f.path().to_str().unwrap().to_string();
let handle = FileReader::load(path, None, false, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
assert_eq!(result.reader.line_count(), 0);
}
#[tokio::test]
async fn test_load_cancel_returns_error() {
let f = make_tmp(&["line1", "line2", "line3"]);
let path = f.path().to_str().unwrap().to_string();
let cancel = Arc::new(AtomicBool::new(true)); let handle = FileReader::load(path, None, false, cancel).await.unwrap();
let result = handle.result_rx.await.unwrap();
assert!(result.is_err());
assert_eq!(
result.err().unwrap().kind(),
std::io::ErrorKind::Interrupted
);
}
#[tokio::test]
async fn test_spawn_file_watcher_detects_new_data() {
use std::io::{Seek, SeekFrom};
use tokio::time::{Duration, sleep};
let mut f = NamedTempFile::new().unwrap();
write!(f, "initial\n").unwrap();
f.flush().unwrap();
let initial_size = f.as_file().metadata().unwrap().len();
let path = f.path().to_str().unwrap().to_string();
let mut rx = FileReader::spawn_file_watcher(path, initial_size).await;
f.seek(SeekFrom::End(0)).unwrap();
write!(f, "appended\n").unwrap();
f.flush().unwrap();
sleep(Duration::from_millis(1500)).await;
let data = rx.borrow_and_update().clone();
let text = String::from_utf8_lossy(&data);
assert!(
text.contains("appended"),
"watcher should detect appended data, got: {text}"
);
}
#[test]
fn test_iter_empty() {
let r = make(b"");
let collected: Vec<_> = r.iter().collect();
assert!(collected.is_empty());
}
#[test]
fn test_iter_single_no_newline() {
let r = make(b"only");
let collected: Vec<_> = r.iter().collect();
assert_eq!(collected, vec![(0, b"only".as_ref())]);
}
#[tokio::test]
async fn test_spawn_process_stream_basic() {
let mut rx = FileReader::spawn_process_stream("echo", &["hello world"])
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
let data = rx.borrow_and_update().clone();
let text = String::from_utf8_lossy(&data);
assert!(
text.contains("hello world"),
"stdout should be captured, got: {text}"
);
}
#[tokio::test]
async fn test_spawn_process_stream_stderr() {
let mut rx = FileReader::spawn_process_stream("sh", &["-c", "echo error_output >&2"])
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
let data = rx.borrow_and_update().clone();
let text = String::from_utf8_lossy(&data);
assert!(
text.contains("error_output"),
"stderr should be captured, got: {text}"
);
}
#[tokio::test]
async fn test_spawn_process_stream_strips_ansi() {
let mut rx = FileReader::spawn_process_stream("printf", &["\x1b[31mred text\x1b[0m\n"])
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
let data = rx.borrow_and_update().clone();
let text = String::from_utf8_lossy(&data);
assert!(
text.contains("red text"),
"should contain stripped text, got: {text}"
);
assert!(
!text.contains("\x1b["),
"ANSI codes should be stripped, got: {text}"
);
}
fn strip_bytes(input: &[u8]) -> Vec<u8> {
strip_ansi_and_index(input).0
}
fn index_bytes(input: &[u8]) -> Vec<usize> {
strip_ansi_and_index(input).1
}
#[test]
fn test_strip_ansi_and_index_plain() {
let input = b"hello\nworld\n";
assert_eq!(strip_bytes(input), input);
assert_eq!(index_bytes(input), compute_line_starts(input));
}
#[test]
fn test_strip_ansi_and_index_csi() {
let input = b"\x1b[32mgreen\x1b[0m\nplain\n";
let expected_bytes = b"green\nplain\n";
let (out, starts) = strip_ansi_and_index(input);
assert_eq!(out, expected_bytes);
assert_eq!(starts, compute_line_starts(expected_bytes));
}
#[test]
fn test_strip_ansi_and_index_osc_bel() {
let input = b"\x1b]0;title\x07line\n";
let expected_bytes = b"line\n";
let (out, starts) = strip_ansi_and_index(input);
assert_eq!(out, expected_bytes);
assert_eq!(starts, compute_line_starts(expected_bytes));
}
#[test]
fn test_strip_ansi_and_index_osc_string_terminator() {
let input = b"\x1b]0;title\x1b\\line\n";
let expected_bytes = b"line\n";
let (out, starts) = strip_ansi_and_index(input);
assert_eq!(out, expected_bytes);
assert_eq!(starts, compute_line_starts(expected_bytes));
}
#[test]
fn test_strip_ansi_and_index_two_byte_esc() {
let input = b"\x1b=text\n";
let expected_bytes = b"text\n";
let (out, starts) = strip_ansi_and_index(input);
assert_eq!(out, expected_bytes);
assert_eq!(starts, compute_line_starts(expected_bytes));
}
#[test]
fn test_strip_ansi_and_index_cr_stripped() {
let input = b"line1\r\nline2\r\n";
let expected_bytes = b"line1\nline2\n";
let (out, starts) = strip_ansi_and_index(input);
assert_eq!(out, expected_bytes);
assert_eq!(starts, compute_line_starts(expected_bytes));
}
#[test]
fn test_strip_ansi_and_index_multiline_ansi() {
let input = b"\x1b[32mfoo\x1b[0m\n\x1b[34mbar\x1b[0m\nbaz\n";
let stripped = strip_ansi_escapes(input);
let expected_starts = compute_line_starts(&stripped);
let (out, starts) = strip_ansi_and_index(input);
assert_eq!(out, stripped);
assert_eq!(starts, expected_starts);
}
#[test]
fn test_strip_ansi_and_index_no_trailing_newline() {
let input = b"\x1b[1mhello\x1b[0m";
let stripped = strip_ansi_escapes(input);
let expected_starts = compute_line_starts(&stripped);
let (out, starts) = strip_ansi_and_index(input);
assert_eq!(out, stripped);
assert_eq!(starts, expected_starts);
}
#[test]
fn test_strip_ansi_and_index_esc_at_end() {
let input = b"text\n\x1b";
let stripped = strip_ansi_escapes(input);
let (out, starts) = strip_ansi_and_index(input);
assert_eq!(out, stripped);
assert_eq!(starts, compute_line_starts(&stripped));
}
#[test]
fn test_strip_ansi_and_index_empty() {
let (out, starts) = strip_ansi_and_index(b"");
assert!(out.is_empty());
assert_eq!(starts, vec![0usize]);
}
#[tokio::test]
async fn test_from_file_tail_returns_last_lines() {
let mut f = NamedTempFile::new().unwrap();
for i in 0..1000usize {
writeln!(f, "line {i}").unwrap();
}
f.flush().unwrap();
let path = f.path().to_str().unwrap();
let reader = FileReader::from_file_tail(path, 512).await.unwrap();
let n = reader.line_count();
assert!(n > 0, "should have at least one line");
let last_preview = reader.get_line(n - 1);
assert_eq!(last_preview, b"line 999");
}
#[tokio::test]
async fn test_from_file_tail_all_lines_complete() {
let mut f = NamedTempFile::new().unwrap();
for i in 0..500usize {
writeln!(f, "entry {i} data").unwrap();
}
f.flush().unwrap();
let path = f.path().to_str().unwrap();
let reader = FileReader::from_file_tail(path, 1024).await.unwrap();
for i in 0..reader.line_count() {
let line = reader.get_line(i);
assert!(
line.starts_with(b"entry "),
"partial line leaked: {:?}",
std::str::from_utf8(line)
);
}
}
#[tokio::test]
async fn test_from_file_tail_small_file_fits_in_preview() {
let mut f = NamedTempFile::new().unwrap();
writeln!(f, "only line").unwrap();
f.flush().unwrap();
let path = f.path().to_str().unwrap();
let reader = FileReader::from_file_tail(path, 64 * 1024).await.unwrap();
assert_eq!(reader.line_count(), 1);
assert_eq!(reader.get_line(0), b"only line");
}
#[tokio::test]
async fn test_from_file_tail_nonexistent_returns_error() {
let result = FileReader::from_file_tail("/tmp/logana_no_such_file_tail.log", 1024).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_from_file_head_returns_first_lines() {
let mut f = NamedTempFile::new().unwrap();
for i in 0..1000usize {
writeln!(f, "line {i}").unwrap();
}
f.flush().unwrap();
let path = f.path().to_str().unwrap();
let reader = FileReader::from_file_head(path, 512).await.unwrap();
let n = reader.line_count();
assert!(n > 0, "should have at least one line");
assert_eq!(reader.get_line(0), b"line 0");
}
#[tokio::test]
async fn test_from_file_head_all_lines_complete() {
let mut f = NamedTempFile::new().unwrap();
for i in 0..500usize {
writeln!(f, "entry {i} data").unwrap();
}
f.flush().unwrap();
let path = f.path().to_str().unwrap();
let reader = FileReader::from_file_head(path, 1024).await.unwrap();
for i in 0..reader.line_count() {
let line = reader.get_line(i);
assert!(
line.starts_with(b"entry "),
"partial line leaked: {:?}",
std::str::from_utf8(line)
);
}
}
#[tokio::test]
async fn test_from_file_head_small_file_fits_in_preview() {
let mut f = NamedTempFile::new().unwrap();
writeln!(f, "only line").unwrap();
f.flush().unwrap();
let path = f.path().to_str().unwrap();
let reader = FileReader::from_file_head(path, 64 * 1024).await.unwrap();
assert_eq!(reader.line_count(), 1);
assert_eq!(reader.get_line(0), b"only line");
}
#[tokio::test]
async fn test_from_file_head_nonexistent_returns_error() {
let result = FileReader::from_file_head("/tmp/logana_no_such_file_head.log", 1024).await;
assert!(result.is_err());
}
#[test]
fn test_append_bytes_on_file_backed_reader() {
let mut f = NamedTempFile::new().unwrap();
f.write_all(b"mmap line\n").unwrap();
f.flush().unwrap();
let path = f.path().to_str().unwrap();
let mut reader = FileReader::new(path).unwrap();
assert_eq!(reader.line_count(), 1);
assert_eq!(reader.get_line(0), b"mmap line");
reader.append_bytes(b"appended\n");
assert_eq!(reader.line_count(), 2);
assert_eq!(reader.get_line(0), b"mmap line");
assert_eq!(reader.get_line(1), b"appended");
}
#[tokio::test]
async fn test_spawn_file_watcher_truncation() {
use std::io::{Seek, SeekFrom};
use tokio::time::{Duration, sleep};
let mut f = NamedTempFile::new().unwrap();
write!(f, "original data that is fairly long\n").unwrap();
f.flush().unwrap();
let initial_size = f.as_file().metadata().unwrap().len();
let path = f.path().to_str().unwrap().to_string();
let mut rx = FileReader::spawn_file_watcher(path.clone(), initial_size).await;
f.as_file().set_len(0).unwrap();
sleep(Duration::from_millis(1500)).await;
f.seek(SeekFrom::Start(0)).unwrap();
write!(f, "after truncation\n").unwrap();
f.flush().unwrap();
sleep(Duration::from_millis(1500)).await;
let data = rx.borrow_and_update().clone();
let text = String::from_utf8_lossy(&data);
assert!(
text.contains("after truncation"),
"watcher should detect data after truncation, got: {text}"
);
}
fn make_large_tmp(line: &str, target_bytes: usize) -> (NamedTempFile, usize) {
let line_with_newline = format!("{line}\n");
let n = (target_bytes / line_with_newline.len()).max(1);
let mut f = NamedTempFile::new().unwrap();
for _ in 0..n {
f.write_all(line_with_newline.as_bytes()).unwrap();
}
f.flush().unwrap();
(f, n)
}
#[tokio::test]
async fn test_load_large_file_line_count_correct() {
let line = "hello world this is a reasonably long log line for testing";
let (f, expected_lines) = make_large_tmp(line, 6 * 1024 * 1024);
let path = f.path().to_str().unwrap().to_string();
let handle = FileReader::load(path, None, false, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
assert_eq!(result.reader.line_count(), expected_lines);
assert_eq!(result.reader.get_line(0), line.as_bytes());
assert_eq!(result.reader.get_line(expected_lines - 1), line.as_bytes());
}
#[tokio::test]
async fn test_load_large_file_matches_reference_implementation() {
let line = "2024-01-15T10:00:00Z INFO service: request processed id=42 dur=3ms";
let (f, n) = make_large_tmp(line, 6 * 1024 * 1024);
let path = f.path().to_str().unwrap().to_string();
let handle = FileReader::load(path, None, false, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
assert_eq!(result.reader.line_count(), n);
for &idx in &[0, n / 4, n / 2, 3 * n / 4, n - 1] {
assert_eq!(
result.reader.get_line(idx),
line.as_bytes(),
"line {idx} mismatch"
);
}
}
#[tokio::test]
async fn test_load_large_file_predicate_correct() {
let mut f = NamedTempFile::new().unwrap();
let n = 200_000usize;
for i in 0..n {
if i % 2 == 0 {
writeln!(f, "EVEN line {i}").unwrap();
} else {
writeln!(f, "ODD line {i}").unwrap();
}
}
f.flush().unwrap();
let path = f.path().to_str().unwrap().to_string();
let pred: VisibilityPredicate = Box::new(|line| line.starts_with(b"EVEN"));
let handle = FileReader::load(path, Some(pred), false, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
let visible = result.precomputed_visible.unwrap();
assert_eq!(visible.len(), n / 2);
for &idx in visible
.iter()
.take(100)
.chain(visible.iter().rev().take(100))
{
assert!(
result.reader.get_line(idx).starts_with(b"EVEN"),
"index {idx} should be an EVEN line"
);
}
assert!(visible.windows(2).all(|w| w[0] < w[1]));
}
#[tokio::test]
async fn test_load_newline_at_chunk_boundary() {
const BOUNDARY: usize = 4 * 1024 * 1024;
let line = "A".repeat(63);
let lines_to_boundary = BOUNDARY / 64;
let mut f = NamedTempFile::new().unwrap();
for _ in 0..lines_to_boundary {
writeln!(f, "{line}").unwrap();
}
for i in 0..10 {
writeln!(f, "extra{i}").unwrap();
}
f.flush().unwrap();
let path = f.path().to_str().unwrap().to_string();
let handle = FileReader::load(path, None, false, Arc::new(AtomicBool::new(false)))
.await
.unwrap();
let result = handle.result_rx.await.unwrap().unwrap();
let expected = lines_to_boundary + 10;
assert_eq!(result.reader.line_count(), expected);
assert_eq!(result.reader.get_line(0), line.as_bytes());
assert_eq!(result.reader.get_line(lines_to_boundary), b"extra0");
assert_eq!(result.reader.get_line(expected - 1), b"extra9");
}
}