Skip to main content

tess/
source.rs

1use std::borrow::Cow;
2use std::fs::File;
3use std::io::{Read, Seek, SeekFrom};
4use std::ops::Range;
5use std::path::{Path, PathBuf};
6use std::sync::{Arc, Mutex, atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}};
7use std::time::SystemTime;
8
9use crate::prettify::{self, PrettifyMode};
10
11pub trait Source: Send + Sync {
12    fn len(&self) -> usize;
13    fn is_empty(&self) -> bool { self.len() == 0 }
14    fn bytes(&self, range: Range<usize>) -> Cow<'_, [u8]>;
15    fn is_complete(&self) -> bool;
16    /// Read any new bytes that have become available since the last call.
17    /// Default no-op for static sources. Streaming sources override.
18    fn pump(&self) {}
19    /// Monotonic counter that bumps whenever the source's *content* has been
20    /// replaced wholesale (as opposed to merely appended to). Append-style
21    /// sources keep this at 0; `LiveFileSource` increments it on each detected
22    /// rewrite so the event loop knows to rebuild the line index instead of
23    /// just folding in new bytes.
24    fn revision(&self) -> u64 { 0 }
25
26    /// Current prettify mode if this source has a `TransformingSource` wrapper.
27    /// `None` means the source is not capable of prettification (e.g. a plain
28    /// `FileSource`). `Some(Off)` means wrapped but currently raw.
29    fn prettify_mode(&self) -> Option<PrettifyMode> { None }
30
31    /// Status-line label for the active prettify state, e.g. `"json"` or
32    /// `"json:err"`. `None` when there's no wrapper or the mode is `Off`
33    /// without an error.
34    fn prettify_label(&self) -> Option<String> { None }
35
36    /// Switch to a specific prettify mode. No-op for sources without a wrapper.
37    /// Bumps `revision()` so callers know to rebuild the line index.
38    fn set_prettify_mode(&self, _mode: PrettifyMode) {}
39
40    /// Flip between the current mode and the last active (non-Off) mode.
41    /// No-op if the source is not wrapped, or if it has never had an active
42    /// mode (i.e. only ever been raw).
43    fn toggle_prettify(&self) {}
44
45    /// Re-run byte-based content detection and apply the result. Used by the
46    /// interactive `-Pa` ("auto") sub-command. No-op without a wrapper.
47    fn redetect_prettify(&self) {}
48}
49
50/// Find the byte offset such that `bytes[offset..]` is exactly the last `n`
51/// logical lines of `src` (lines delimited by `\n`; a trailing `\n` at EOF is
52/// not its own line). Returns 0 when the source has fewer than `n` lines, or
53/// `src.len()` when `n == 0`.
54///
55/// Reverse-scans in 64 KiB chunks so a 10 GB file stays cheap — only the
56/// trailing pages need to be touched.
57pub fn find_tail_offset(src: &dyn Source, n: usize) -> usize {
58    let total = src.len();
59    if n == 0 || total == 0 {
60        return total;
61    }
62    // Don't count a trailing newline as a "line break to step over": it
63    // terminates the last line, it's not a separator before another line.
64    let mut end = total;
65    if end > 0 && src.bytes((end - 1)..end)[0] == b'\n' {
66        end -= 1;
67    }
68
69    let chunk_size: usize = 64 * 1024;
70    let mut count = 0usize;
71    let mut pos = end;
72    while pos > 0 {
73        let chunk_start = pos.saturating_sub(chunk_size);
74        let bytes = src.bytes(chunk_start..pos);
75        for i in (0..bytes.len()).rev() {
76            if bytes[i] == b'\n' {
77                count += 1;
78                if count == n {
79                    return chunk_start + i + 1;
80                }
81            }
82        }
83        pos = chunk_start;
84    }
85    0
86}
87
88pub struct FileSource {
89    mmap: Option<memmap2::Mmap>,
90    fallback_buf: Option<Vec<u8>>,
91    initial_size: usize,
92    appended_len: AtomicUsize,
93    streaming: Mutex<StreamingState>,
94}
95
96struct StreamingState {
97    file: File,
98    appended: Vec<u8>,
99}
100
101impl std::fmt::Debug for FileSource {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct("FileSource").finish()
104    }
105}
106
107impl FileSource {
108    pub fn open(path: &Path) -> std::io::Result<Self> {
109        let file = File::open(path)?;
110        let metadata = file.metadata()?;
111        if !metadata.is_file() {
112            return Err(std::io::Error::new(
113                std::io::ErrorKind::InvalidInput,
114                "not a regular file",
115            ));
116        }
117        let initial_size = metadata.len() as usize;
118        let (mmap, fallback_buf) = if initial_size == 0 {
119            (None, Some(Vec::new()))
120        } else {
121            // SAFETY: file remains open as long as Mmap exists; we never modify the file.
122            match unsafe { memmap2::Mmap::map(&file) } {
123                Ok(m) => (Some(m), None),
124                Err(_) => {
125                    let mut buf = Vec::new();
126                    let mut f = File::open(path)?;
127                    f.read_to_end(&mut buf)?;
128                    (None, Some(buf))
129                }
130            }
131        };
132        // Separate handle for streaming reads. Seeked past the initial content
133        // so subsequent reads only return bytes appended after open.
134        let mut stream_file = File::open(path)?;
135        stream_file.seek(SeekFrom::Start(initial_size as u64))?;
136        Ok(Self {
137            mmap,
138            fallback_buf,
139            initial_size,
140            appended_len: AtomicUsize::new(0),
141            streaming: Mutex::new(StreamingState {
142                file: stream_file,
143                appended: Vec::new(),
144            }),
145        })
146    }
147
148    fn static_bytes(&self) -> &[u8] {
149        if let Some(m) = &self.mmap {
150            &m[..]
151        } else if let Some(b) = &self.fallback_buf {
152            &b[..]
153        } else {
154            &[]
155        }
156    }
157}
158
159impl Source for FileSource {
160    fn len(&self) -> usize {
161        self.initial_size + self.appended_len.load(Ordering::Acquire)
162    }
163
164    fn bytes(&self, range: Range<usize>) -> Cow<'_, [u8]> {
165        let static_bytes = self.static_bytes();
166        if range.end <= self.initial_size {
167            return Cow::Borrowed(&static_bytes[range]);
168        }
169        let stream = self.streaming.lock().unwrap();
170        let total = self.initial_size + stream.appended.len();
171        let start = range.start.min(total);
172        let end = range.end.min(total);
173        if start >= self.initial_size {
174            let off = start - self.initial_size;
175            let off_end = end - self.initial_size;
176            Cow::Owned(stream.appended[off..off_end].to_vec())
177        } else {
178            let mut v = Vec::with_capacity(end - start);
179            v.extend_from_slice(&static_bytes[start..self.initial_size]);
180            v.extend_from_slice(&stream.appended[..end - self.initial_size]);
181            Cow::Owned(v)
182        }
183    }
184
185    fn is_complete(&self) -> bool { true }
186
187    fn pump(&self) {
188        let mut stream = self.streaming.lock().unwrap();
189        let mut tmp = [0u8; 8192];
190        loop {
191            match stream.file.read(&mut tmp) {
192                Ok(0) => break,
193                Ok(n) => stream.appended.extend_from_slice(&tmp[..n]),
194                Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
195                Err(_) => break,
196            }
197        }
198        let new_len = stream.appended.len();
199        self.appended_len.store(new_len, Ordering::Release);
200    }
201}
202
203/// A test/utility source whose contents can be appended at runtime.
204pub struct MockSource {
205    buf: Arc<Mutex<Vec<u8>>>,
206    complete: Arc<AtomicBool>,
207}
208
209impl Default for MockSource {
210    fn default() -> Self {
211        Self::new()
212    }
213}
214
215impl MockSource {
216    pub fn new() -> Self {
217        Self {
218            buf: Arc::new(Mutex::new(Vec::new())),
219            complete: Arc::new(AtomicBool::new(false)),
220        }
221    }
222
223    pub fn append(&self, more: &[u8]) {
224        self.buf.lock().unwrap().extend_from_slice(more);
225    }
226
227    pub fn finish(&self) {
228        self.complete.store(true, Ordering::SeqCst);
229    }
230}
231
232impl Source for MockSource {
233    fn len(&self) -> usize { self.buf.lock().unwrap().len() }
234    fn bytes(&self, range: Range<usize>) -> Cow<'_, [u8]> {
235        Cow::Owned(self.buf.lock().unwrap()[range].to_vec())
236    }
237    fn is_complete(&self) -> bool { self.complete.load(Ordering::SeqCst) }
238}
239
240/// A file source that watches for *whole-file* rewrites. Unlike `FileSource`
241/// (which only picks up appended bytes via a streaming handle), `LiveFileSource`
242/// re-reads the entire file when its `(mtime, size, ino)` signature changes,
243/// swaps the buffer in place, and bumps a revision counter so callers know to
244/// rebuild any per-line state. Intended for source-file-sized inputs being
245/// rewritten by an editor or AI agent.
246pub struct LiveFileSource {
247    path: PathBuf,
248    state: Mutex<LiveState>,
249    revision: AtomicU64,
250}
251
252struct LiveState {
253    bytes: Vec<u8>,
254    signature: FileSignature,
255}
256
257#[derive(Clone, Copy, PartialEq, Eq, Debug)]
258struct FileSignature {
259    mtime: Option<SystemTime>,
260    size: u64,
261    ino: u64,
262}
263
264impl FileSignature {
265    fn read(path: &Path) -> std::io::Result<Self> {
266        let md = std::fs::metadata(path)?;
267        Ok(Self {
268            mtime: md.modified().ok(),
269            size: md.len(),
270            #[cfg(unix)]
271            ino: {
272                use std::os::unix::fs::MetadataExt;
273                md.ino()
274            },
275            #[cfg(not(unix))]
276            ino: 0,
277        })
278    }
279}
280
281impl std::fmt::Debug for LiveFileSource {
282    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283        f.debug_struct("LiveFileSource").field("path", &self.path).finish()
284    }
285}
286
287impl LiveFileSource {
288    pub fn open(path: &Path) -> std::io::Result<Self> {
289        let md = std::fs::metadata(path)?;
290        if !md.is_file() {
291            return Err(std::io::Error::new(
292                std::io::ErrorKind::InvalidInput,
293                "not a regular file",
294            ));
295        }
296        let bytes = std::fs::read(path)?;
297        // Re-stat after read so the signature reflects the bytes we actually
298        // captured (concurrent writers won't tear our buffer, but the mtime/size
299        // we record should match what we read).
300        let signature = FileSignature::read(path)?;
301        Ok(Self {
302            path: path.to_path_buf(),
303            state: Mutex::new(LiveState { bytes, signature }),
304            revision: AtomicU64::new(0),
305        })
306    }
307}
308
309impl Source for LiveFileSource {
310    fn len(&self) -> usize { self.state.lock().unwrap().bytes.len() }
311
312    fn bytes(&self, range: Range<usize>) -> Cow<'_, [u8]> {
313        let s = self.state.lock().unwrap();
314        let end = range.end.min(s.bytes.len());
315        let start = range.start.min(end);
316        Cow::Owned(s.bytes[start..end].to_vec())
317    }
318
319    /// Live sources are never "complete" — the file may be rewritten at any
320    /// time. The status line picks this up as the `+` suffix on totals.
321    fn is_complete(&self) -> bool { false }
322
323    fn pump(&self) {
324        // Cheap stat to detect a change. If the file vanished, leave the
325        // current buffer in place and try again next tick.
326        let new_sig = match FileSignature::read(&self.path) {
327            Ok(sig) => sig,
328            Err(_) => return,
329        };
330        let mut s = self.state.lock().unwrap();
331        if new_sig == s.signature {
332            return;
333        }
334        let new_bytes = match std::fs::read(&self.path) {
335            Ok(b) => b,
336            Err(_) => return,
337        };
338        // Re-read signature after the read so we reflect what was actually loaded.
339        let post_sig = FileSignature::read(&self.path).unwrap_or(new_sig);
340        s.bytes = new_bytes;
341        s.signature = post_sig;
342        drop(s);
343        self.revision.fetch_add(1, Ordering::AcqRel);
344    }
345
346    fn revision(&self) -> u64 { self.revision.load(Ordering::Acquire) }
347}
348
349/// A source that wraps another source and applies a `PrettifyMode` transform
350/// to its bytes. Toggling the mode bumps `revision()` so the event loop
351/// rebuilds the line index. Falls back to passing through the inner bytes if
352/// the transform fails to parse, surfacing the error via `last_error()`.
353pub struct TransformingSource {
354    inner: Box<dyn Source>,
355    state: Mutex<TransformState>,
356    revision: AtomicU64,
357}
358
359struct TransformState {
360    mode: PrettifyMode,
361    /// Most recent non-Off mode. Used by `toggle_prettify` to flip back to
362    /// what the user was looking at most recently. Stays at the original
363    /// detected/explicit mode at startup, even if the user toggles to Off.
364    last_active: PrettifyMode,
365    /// Whatever the consumer should see — either the raw inner bytes or the
366    /// successfully transformed bytes. Always populated.
367    cached: Vec<u8>,
368    /// Set when the last attempted transform failed to parse. Cleared on a
369    /// successful transform or on switching to `Off`.
370    last_error: Option<String>,
371}
372
373impl std::fmt::Debug for TransformingSource {
374    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
375        f.debug_struct("TransformingSource").finish()
376    }
377}
378
379impl TransformingSource {
380    /// Build a wrapping source. The inner is read fully via its current bytes
381    /// view; the transform runs once. If the transform fails, the cache holds
382    /// the raw inner bytes and `last_error()` returns the parse error.
383    pub fn wrap(inner: Box<dyn Source>, mode: PrettifyMode) -> Self {
384        let raw = inner.bytes(0..inner.len()).to_vec();
385        let (cached, last_error) = run_transform(mode, &raw);
386        let last_active = if mode.is_active() { mode } else { PrettifyMode::Off };
387        Self {
388            inner,
389            state: Mutex::new(TransformState { mode, last_active, cached, last_error }),
390            revision: AtomicU64::new(0),
391        }
392    }
393
394    pub fn mode(&self) -> PrettifyMode {
395        self.state.lock().unwrap().mode
396    }
397
398    /// Currently surfaced parse error, if any. Cleared by a successful
399    /// `set_mode` or by switching to `Off`.
400    pub fn last_error(&self) -> Option<String> {
401        self.state.lock().unwrap().last_error.clone()
402    }
403
404    /// Internal apply-mode helper. Re-reads inner bytes, runs the transform,
405    /// updates the cache, bumps revision. Tracks `last_active` so toggle
406    /// can flip back to a non-Off mode.
407    fn apply_mode(&self, mode: PrettifyMode) {
408        let raw = self.inner.bytes(0..self.inner.len()).to_vec();
409        let (cached, last_error) = run_transform(mode, &raw);
410        let mut s = self.state.lock().unwrap();
411        s.mode = mode;
412        if mode.is_active() && last_error.is_none() {
413            s.last_active = mode;
414        }
415        s.cached = cached;
416        s.last_error = last_error;
417        drop(s);
418        self.revision.fetch_add(1, Ordering::AcqRel);
419    }
420}
421
422fn run_transform(mode: PrettifyMode, raw: &[u8]) -> (Vec<u8>, Option<String>) {
423    match prettify::prettify(mode, raw) {
424        Ok(out) => (out, None),
425        Err(e) => (raw.to_vec(), Some(e)),
426    }
427}
428
429impl Source for TransformingSource {
430    fn len(&self) -> usize { self.state.lock().unwrap().cached.len() }
431
432    fn bytes(&self, range: Range<usize>) -> Cow<'_, [u8]> {
433        let s = self.state.lock().unwrap();
434        let end = range.end.min(s.cached.len());
435        let start = range.start.min(end);
436        Cow::Owned(s.cached[start..end].to_vec())
437    }
438
439    fn is_complete(&self) -> bool { self.inner.is_complete() }
440
441    fn pump(&self) { self.inner.pump(); }
442
443    fn revision(&self) -> u64 { self.revision.load(Ordering::Acquire) }
444
445    fn prettify_mode(&self) -> Option<PrettifyMode> {
446        Some(self.state.lock().unwrap().mode)
447    }
448
449    fn prettify_label(&self) -> Option<String> {
450        let s = self.state.lock().unwrap();
451        if s.last_error.is_some() {
452            // Whatever mode we tried last; show "<mode>:err".
453            let lbl = s.mode.label();
454            let lbl = if lbl.is_empty() { s.last_active.label() } else { lbl };
455            Some(format!("{lbl}:err"))
456        } else if s.mode.is_active() {
457            Some(s.mode.label().to_string())
458        } else {
459            None
460        }
461    }
462
463    fn set_prettify_mode(&self, mode: PrettifyMode) {
464        self.apply_mode(mode);
465    }
466
467    fn toggle_prettify(&self) {
468        let target = {
469            let s = self.state.lock().unwrap();
470            if s.mode.is_active() {
471                PrettifyMode::Off
472            } else if s.last_active.is_active() {
473                s.last_active
474            } else {
475                // Never had an active mode (started raw and never flipped on).
476                // Try a one-shot detection from the inner bytes; if nothing
477                // matches, leave state alone.
478                drop(s);
479                self.redetect_prettify();
480                return;
481            }
482        };
483        self.apply_mode(target);
484    }
485
486    fn redetect_prettify(&self) {
487        let raw = self.inner.bytes(0..self.inner.len()).to_vec();
488        let detected = crate::prettify::detect_from_bytes(&raw);
489        if let Some(mode) = detected {
490            self.apply_mode(mode);
491        }
492        // Undetected: leave state alone (caller will see same label).
493    }
494}
495
496pub struct StdinSource {
497    inner: StdinInner,
498}
499
500enum StdinInner {
501    Static(Vec<u8>),
502    Streaming {
503        buf: Arc<Mutex<Vec<u8>>>,
504        len_cache: Arc<AtomicUsize>,
505        complete: Arc<AtomicBool>,
506    },
507}
508
509impl StdinSource {
510    /// Read all of stdin into a buffer synchronously. After this returns,
511    /// stdin (fd 0) is at EOF; the caller is responsible for redirecting fd 0
512    /// to /dev/tty before entering raw mode if interactive input is needed.
513    pub fn read_all() -> std::io::Result<Self> {
514        let mut bytes = Vec::new();
515        std::io::stdin().lock().read_to_end(&mut bytes)?;
516        Ok(Self { inner: StdinInner::Static(bytes) })
517    }
518
519    /// Duplicate fd 0 onto a private fd, then spawn a thread that reads from
520    /// it into a shared buffer. Caller can safely `dup2(/dev/tty, 0)` afterwards
521    /// without disturbing the thread — it reads from the duplicated fd, not
522    /// from `STDIN_FILENO`.
523    #[cfg(unix)]
524    pub fn spawn_streaming() -> std::io::Result<Self> {
525        use std::os::unix::io::FromRawFd;
526        let cloned_fd = unsafe { libc::dup(libc::STDIN_FILENO) };
527        if cloned_fd < 0 {
528            return Err(std::io::Error::last_os_error());
529        }
530        // SAFETY: cloned_fd is now owned by the File; closed on Drop.
531        let mut file = unsafe { File::from_raw_fd(cloned_fd) };
532
533        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
534        let len_cache = Arc::new(AtomicUsize::new(0));
535        let complete = Arc::new(AtomicBool::new(false));
536        let buf_w = Arc::clone(&buf);
537        let len_w = Arc::clone(&len_cache);
538        let complete_w = Arc::clone(&complete);
539        std::thread::spawn(move || {
540            let mut tmp = [0u8; 8192];
541            loop {
542                match file.read(&mut tmp) {
543                    Ok(0) => break,
544                    Ok(n) => {
545                        let mut b = buf_w.lock().unwrap();
546                        b.extend_from_slice(&tmp[..n]);
547                        len_w.store(b.len(), Ordering::Release);
548                    }
549                    Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
550                    Err(_) => break,
551                }
552            }
553            complete_w.store(true, Ordering::SeqCst);
554        });
555        Ok(Self { inner: StdinInner::Streaming { buf, len_cache, complete } })
556    }
557}
558
559impl Source for StdinSource {
560    fn len(&self) -> usize {
561        match &self.inner {
562            StdinInner::Static(v) => v.len(),
563            StdinInner::Streaming { len_cache, .. } => len_cache.load(Ordering::Acquire),
564        }
565    }
566    fn bytes(&self, range: Range<usize>) -> Cow<'_, [u8]> {
567        match &self.inner {
568            StdinInner::Static(v) => Cow::Borrowed(&v[range]),
569            StdinInner::Streaming { buf, .. } => Cow::Owned(buf.lock().unwrap()[range].to_vec()),
570        }
571    }
572    fn is_complete(&self) -> bool {
573        match &self.inner {
574            StdinInner::Static(_) => true,
575            StdinInner::Streaming { complete, .. } => complete.load(Ordering::Acquire),
576        }
577    }
578}
579
580#[cfg(test)]
581mod tests {
582    use super::*;
583    use std::io::Write;
584
585    #[test]
586    fn file_source_reads_temp_file() {
587        let mut tmp = tempfile::NamedTempFile::new().unwrap();
588        tmp.write_all(b"hello world").unwrap();
589        let src = FileSource::open(tmp.path()).unwrap();
590        assert_eq!(src.len(), 11);
591        assert_eq!(&*src.bytes(0..5), b"hello");
592        assert_eq!(&*src.bytes(6..11), b"world");
593        assert!(src.is_complete());
594    }
595
596    #[test]
597    fn file_source_empty_file() {
598        let tmp = tempfile::NamedTempFile::new().unwrap();
599        let src = FileSource::open(tmp.path()).unwrap();
600        assert_eq!(src.len(), 0);
601    }
602
603    #[test]
604    fn file_source_directory_errors() {
605        let dir = tempfile::tempdir().unwrap();
606        let err = FileSource::open(dir.path()).unwrap_err();
607        assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput);
608    }
609
610    #[test]
611    fn file_source_pump_picks_up_appended_bytes() {
612        let mut tmp = tempfile::NamedTempFile::new().unwrap();
613        tmp.write_all(b"first").unwrap();
614        tmp.flush().unwrap();
615        let src = FileSource::open(tmp.path()).unwrap();
616        assert_eq!(src.len(), 5);
617        // Append more bytes to the underlying file.
618        tmp.write_all(b" second").unwrap();
619        tmp.flush().unwrap();
620        // Before pump, len() reflects only what we knew at open.
621        assert_eq!(src.len(), 5);
622        src.pump();
623        assert_eq!(src.len(), 12);
624        // Borrowed range entirely in the original mmap.
625        assert_eq!(&*src.bytes(0..5), b"first");
626        // Range entirely in the appended region.
627        assert_eq!(&*src.bytes(5..12), b" second");
628        // Range straddling the boundary (3..10 = 7 bytes of "first second").
629        assert_eq!(&*src.bytes(3..10), b"st seco");
630    }
631
632    #[test]
633    fn find_tail_offset_zero_lines_returns_total() {
634        let m = MockSource::new();
635        m.append(b"a\nb\nc\n");
636        assert_eq!(find_tail_offset(&m, 0), 6);
637    }
638
639    #[test]
640    fn find_tail_offset_empty_source() {
641        let m = MockSource::new();
642        assert_eq!(find_tail_offset(&m, 5), 0);
643    }
644
645    #[test]
646    fn find_tail_offset_fewer_lines_than_n_returns_zero() {
647        let m = MockSource::new();
648        m.append(b"a\nb\nc\n");  // 3 lines
649        assert_eq!(find_tail_offset(&m, 10), 0);
650    }
651
652    #[test]
653    fn find_tail_offset_last_one_with_trailing_newline() {
654        let m = MockSource::new();
655        m.append(b"alpha\nbeta\ngamma\n");  // 3 lines
656        // gamma starts at byte 11.
657        assert_eq!(find_tail_offset(&m, 1), 11);
658    }
659
660    #[test]
661    fn find_tail_offset_last_two_with_trailing_newline() {
662        let m = MockSource::new();
663        m.append(b"alpha\nbeta\ngamma\n");
664        // beta starts at byte 6.
665        assert_eq!(find_tail_offset(&m, 2), 6);
666    }
667
668    #[test]
669    fn find_tail_offset_last_one_no_trailing_newline() {
670        let m = MockSource::new();
671        m.append(b"alpha\nbeta\ngamma");  // last line not terminated
672        // gamma starts at byte 11.
673        assert_eq!(find_tail_offset(&m, 1), 11);
674    }
675
676    #[test]
677    fn find_tail_offset_exactly_n_lines_returns_zero() {
678        let m = MockSource::new();
679        m.append(b"a\nb\nc\n");  // 3 lines exactly
680        assert_eq!(find_tail_offset(&m, 3), 0);
681    }
682
683    #[test]
684    fn live_source_reads_initial_content() {
685        let mut tmp = tempfile::NamedTempFile::new().unwrap();
686        tmp.write_all(b"alpha\nbeta\n").unwrap();
687        tmp.flush().unwrap();
688        let src = LiveFileSource::open(tmp.path()).unwrap();
689        assert_eq!(src.len(), 11);
690        assert_eq!(&*src.bytes(0..11), b"alpha\nbeta\n");
691        assert_eq!(src.revision(), 0);
692        assert!(!src.is_complete()); // live sources are always "growing"
693    }
694
695    #[test]
696    fn live_source_pump_picks_up_rewritten_content() {
697        let tmp = tempfile::NamedTempFile::new().unwrap();
698        std::fs::write(tmp.path(), b"first\n").unwrap();
699        let src = LiveFileSource::open(tmp.path()).unwrap();
700        assert_eq!(src.len(), 6);
701        assert_eq!(src.revision(), 0);
702
703        // Rewrite the file with different (longer) content. mtime should
704        // bump; signatures differ; pump() picks it up.
705        std::thread::sleep(std::time::Duration::from_millis(20));
706        std::fs::write(tmp.path(), b"second longer line\n").unwrap();
707        src.pump();
708        assert_eq!(src.len(), 19);
709        assert_eq!(&*src.bytes(0..19), b"second longer line\n");
710        assert_eq!(src.revision(), 1);
711    }
712
713    #[test]
714    fn live_source_pump_no_change_does_not_bump_revision() {
715        let tmp = tempfile::NamedTempFile::new().unwrap();
716        std::fs::write(tmp.path(), b"stable\n").unwrap();
717        let src = LiveFileSource::open(tmp.path()).unwrap();
718        let r0 = src.revision();
719        src.pump();
720        src.pump();
721        src.pump();
722        assert_eq!(src.revision(), r0);
723    }
724
725    #[test]
726    fn live_source_handles_file_shrink() {
727        let tmp = tempfile::NamedTempFile::new().unwrap();
728        std::fs::write(tmp.path(), b"longer initial content\n").unwrap();
729        let src = LiveFileSource::open(tmp.path()).unwrap();
730        assert!(src.len() > 5);
731        std::thread::sleep(std::time::Duration::from_millis(20));
732        std::fs::write(tmp.path(), b"x\n").unwrap();
733        src.pump();
734        assert_eq!(src.len(), 2);
735        assert_eq!(&*src.bytes(0..2), b"x\n");
736        assert_eq!(src.revision(), 1);
737    }
738
739    #[test]
740    fn live_source_handles_atomic_rename() {
741        // Most editors write atomically: write tmp file, rename over original.
742        // The inode changes; mtime/size likely change. pump() must follow.
743        let dir = tempfile::tempdir().unwrap();
744        let target = dir.path().join("file.txt");
745        std::fs::write(&target, b"original\n").unwrap();
746        let src = LiveFileSource::open(&target).unwrap();
747        assert_eq!(&*src.bytes(0..9), b"original\n");
748
749        std::thread::sleep(std::time::Duration::from_millis(20));
750        let staging = dir.path().join("file.txt.tmp");
751        std::fs::write(&staging, b"renamed in\n").unwrap();
752        std::fs::rename(&staging, &target).unwrap();
753        src.pump();
754        assert_eq!(src.len(), 11);
755        assert_eq!(&*src.bytes(0..11), b"renamed in\n");
756        assert_eq!(src.revision(), 1);
757    }
758
759    #[test]
760    fn live_source_rebuild_flow_against_line_index() {
761        use crate::line_index::LineIndex;
762
763        let tmp = tempfile::NamedTempFile::new().unwrap();
764        std::fs::write(tmp.path(), b"a\nb\nc\n").unwrap();
765        let src = LiveFileSource::open(tmp.path()).unwrap();
766
767        let mut idx = LineIndex::new();
768        idx.notice_new_bytes(&src);
769        assert_eq!(idx.line_count(), 3);
770        let r0 = src.revision();
771
772        // Rewrite to a 5-line file.
773        std::thread::sleep(std::time::Duration::from_millis(20));
774        std::fs::write(tmp.path(), b"one\ntwo\nthree\nfour\nfive\n").unwrap();
775        src.pump();
776        assert_ne!(src.revision(), r0, "revision must bump on rewrite");
777
778        // Caller's job to rebuild the index — mirror what app::rebuild_after_replace does.
779        idx = LineIndex::new();
780        idx.notice_new_bytes(&src);
781        assert_eq!(idx.line_count(), 5);
782        assert_eq!(&*src.bytes(idx.line_range(2, &src)), b"three");
783    }
784
785    #[test]
786    fn transforming_source_passes_through_when_off() {
787        let inner = MockSource::new();
788        inner.append(b"hello\nworld\n");
789        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Off);
790        assert_eq!(&*t.bytes(0..t.len()), b"hello\nworld\n");
791        assert!(t.last_error().is_none());
792        assert_eq!(t.revision(), 0);
793    }
794
795    #[test]
796    fn transforming_source_emits_pretty_bytes_when_on() {
797        let inner = MockSource::new();
798        inner.append(b"{\"a\":1,\"b\":2}");
799        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Json);
800        let out = t.bytes(0..t.len()).to_vec();
801        let s = String::from_utf8(out).unwrap();
802        assert!(s.contains("\"a\": 1"));
803        assert!(s.contains("\"b\": 2"));
804        assert!(t.last_error().is_none());
805    }
806
807    #[test]
808    fn transforming_source_revision_bumps_on_mode_change() {
809        let inner = MockSource::new();
810        inner.append(b"{\"x\":1}");
811        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Off);
812        let r0 = t.revision();
813        t.set_prettify_mode(PrettifyMode::Json);
814        assert!(t.revision() > r0);
815        let r1 = t.revision();
816        t.set_prettify_mode(PrettifyMode::Off);
817        assert!(t.revision() > r1);
818    }
819
820    #[test]
821    fn transforming_source_falls_back_to_raw_on_parse_error() {
822        let inner = MockSource::new();
823        inner.append(b"not actually json");
824        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Json);
825        // Cache holds raw bytes since the transform failed.
826        assert_eq!(&*t.bytes(0..t.len()), b"not actually json");
827        let err = t.last_error().expect("expected parse error to be surfaced");
828        assert!(err.contains("json"), "expected json in error, got: {err}");
829        // Status label reflects the error.
830        let label = t.prettify_label().expect("error → label should be set");
831        assert!(label.ends_with(":err"), "expected :err label, got: {label}");
832    }
833
834    #[test]
835    fn transforming_source_set_mode_recovers_from_error() {
836        let inner = MockSource::new();
837        inner.append(b"plain content");
838        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Json);
839        assert!(t.last_error().is_some());
840        t.set_prettify_mode(PrettifyMode::Off);
841        assert!(t.last_error().is_none());
842        assert_eq!(&*t.bytes(0..t.len()), b"plain content");
843    }
844
845    #[test]
846    fn transforming_source_toggle_flips_between_active_and_off() {
847        let inner = MockSource::new();
848        inner.append(b"{\"x\":1}");
849        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Json);
850        assert_eq!(t.mode(), PrettifyMode::Json);
851        t.toggle_prettify();
852        assert_eq!(t.mode(), PrettifyMode::Off);
853        t.toggle_prettify();
854        // Flipped back to last active.
855        assert_eq!(t.mode(), PrettifyMode::Json);
856    }
857
858    #[test]
859    fn transforming_source_redetect_picks_up_format() {
860        let inner = MockSource::new();
861        inner.append(b"<?xml version=\"1.0\"?><root/>");
862        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Off);
863        // Initially raw; after redetect, should be XML.
864        t.redetect_prettify();
865        assert_eq!(t.mode(), PrettifyMode::Xml);
866    }
867
868    #[test]
869    fn live_source_directory_errors() {
870        let dir = tempfile::tempdir().unwrap();
871        let err = LiveFileSource::open(dir.path()).unwrap_err();
872        assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput);
873    }
874
875    #[test]
876    fn mock_source_grows_and_finishes() {
877        let m = MockSource::new();
878        assert_eq!(m.len(), 0);
879        assert!(!m.is_complete());
880        m.append(b"abc");
881        assert_eq!(m.len(), 3);
882        assert_eq!(&*m.bytes(0..3), b"abc");
883        m.append(b"def");
884        assert_eq!(&*m.bytes(0..6), b"abcdef");
885        m.finish();
886        assert!(m.is_complete());
887    }
888}