Skip to main content

tess/
source.rs

1use std::borrow::Cow;
2use std::fs::File;
3use std::io::{Read, Seek, SeekFrom};
4use std::ops::Range;
5use std::path::{Path, PathBuf};
6use std::sync::{Arc, Mutex, atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}};
7use std::time::SystemTime;
8
9use crate::prettify::{self, PrettifyMode};
10
11pub trait Source: Send + Sync {
12    fn len(&self) -> usize;
13    fn bytes(&self, range: Range<usize>) -> Cow<'_, [u8]>;
14    fn is_complete(&self) -> bool;
15    /// Read any new bytes that have become available since the last call.
16    /// Default no-op for static sources. Streaming sources override.
17    fn pump(&self) {}
18    /// Monotonic counter that bumps whenever the source's *content* has been
19    /// replaced wholesale (as opposed to merely appended to). Append-style
20    /// sources keep this at 0; `LiveFileSource` increments it on each detected
21    /// rewrite so the event loop knows to rebuild the line index instead of
22    /// just folding in new bytes.
23    fn revision(&self) -> u64 { 0 }
24
25    /// Current prettify mode if this source has a `TransformingSource` wrapper.
26    /// `None` means the source is not capable of prettification (e.g. a plain
27    /// `FileSource`). `Some(Off)` means wrapped but currently raw.
28    fn prettify_mode(&self) -> Option<PrettifyMode> { None }
29
30    /// Status-line label for the active prettify state, e.g. `"json"` or
31    /// `"json:err"`. `None` when there's no wrapper or the mode is `Off`
32    /// without an error.
33    fn prettify_label(&self) -> Option<String> { None }
34
35    /// Switch to a specific prettify mode. No-op for sources without a wrapper.
36    /// Bumps `revision()` so callers know to rebuild the line index.
37    fn set_prettify_mode(&self, _mode: PrettifyMode) {}
38
39    /// Flip between the current mode and the last active (non-Off) mode.
40    /// No-op if the source is not wrapped, or if it has never had an active
41    /// mode (i.e. only ever been raw).
42    fn toggle_prettify(&self) {}
43
44    /// Re-run byte-based content detection and apply the result. Used by the
45    /// interactive `-Pa` ("auto") sub-command. No-op without a wrapper.
46    fn redetect_prettify(&self) {}
47}
48
49/// Find the byte offset such that `bytes[offset..]` is exactly the last `n`
50/// logical lines of `src` (lines delimited by `\n`; a trailing `\n` at EOF is
51/// not its own line). Returns 0 when the source has fewer than `n` lines, or
52/// `src.len()` when `n == 0`.
53///
54/// Reverse-scans in 64 KiB chunks so a 10 GB file stays cheap — only the
55/// trailing pages need to be touched.
56pub fn find_tail_offset(src: &dyn Source, n: usize) -> usize {
57    let total = src.len();
58    if n == 0 || total == 0 {
59        return total;
60    }
61    // Don't count a trailing newline as a "line break to step over": it
62    // terminates the last line, it's not a separator before another line.
63    let mut end = total;
64    if end > 0 && src.bytes((end - 1)..end)[0] == b'\n' {
65        end -= 1;
66    }
67
68    let chunk_size: usize = 64 * 1024;
69    let mut count = 0usize;
70    let mut pos = end;
71    while pos > 0 {
72        let chunk_start = pos.saturating_sub(chunk_size);
73        let bytes = src.bytes(chunk_start..pos);
74        for i in (0..bytes.len()).rev() {
75            if bytes[i] == b'\n' {
76                count += 1;
77                if count == n {
78                    return chunk_start + i + 1;
79                }
80            }
81        }
82        pos = chunk_start;
83    }
84    0
85}
86
87pub struct FileSource {
88    mmap: Option<memmap2::Mmap>,
89    fallback_buf: Option<Vec<u8>>,
90    initial_size: usize,
91    appended_len: AtomicUsize,
92    streaming: Mutex<StreamingState>,
93}
94
95struct StreamingState {
96    file: File,
97    appended: Vec<u8>,
98}
99
100impl std::fmt::Debug for FileSource {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("FileSource").finish()
103    }
104}
105
106impl FileSource {
107    pub fn open(path: &Path) -> std::io::Result<Self> {
108        let file = File::open(path)?;
109        let metadata = file.metadata()?;
110        if !metadata.is_file() {
111            return Err(std::io::Error::new(
112                std::io::ErrorKind::InvalidInput,
113                "not a regular file",
114            ));
115        }
116        let initial_size = metadata.len() as usize;
117        let (mmap, fallback_buf) = if initial_size == 0 {
118            (None, Some(Vec::new()))
119        } else {
120            // SAFETY: file remains open as long as Mmap exists; we never modify the file.
121            match unsafe { memmap2::Mmap::map(&file) } {
122                Ok(m) => (Some(m), None),
123                Err(_) => {
124                    let mut buf = Vec::new();
125                    let mut f = File::open(path)?;
126                    f.read_to_end(&mut buf)?;
127                    (None, Some(buf))
128                }
129            }
130        };
131        // Separate handle for streaming reads. Seeked past the initial content
132        // so subsequent reads only return bytes appended after open.
133        let mut stream_file = File::open(path)?;
134        stream_file.seek(SeekFrom::Start(initial_size as u64))?;
135        Ok(Self {
136            mmap,
137            fallback_buf,
138            initial_size,
139            appended_len: AtomicUsize::new(0),
140            streaming: Mutex::new(StreamingState {
141                file: stream_file,
142                appended: Vec::new(),
143            }),
144        })
145    }
146
147    fn static_bytes(&self) -> &[u8] {
148        if let Some(m) = &self.mmap {
149            &m[..]
150        } else if let Some(b) = &self.fallback_buf {
151            &b[..]
152        } else {
153            &[]
154        }
155    }
156}
157
158impl Source for FileSource {
159    fn len(&self) -> usize {
160        self.initial_size + self.appended_len.load(Ordering::Acquire)
161    }
162
163    fn bytes(&self, range: Range<usize>) -> Cow<'_, [u8]> {
164        let static_bytes = self.static_bytes();
165        if range.end <= self.initial_size {
166            return Cow::Borrowed(&static_bytes[range]);
167        }
168        let stream = self.streaming.lock().unwrap();
169        let total = self.initial_size + stream.appended.len();
170        let start = range.start.min(total);
171        let end = range.end.min(total);
172        if start >= self.initial_size {
173            let off = start - self.initial_size;
174            let off_end = end - self.initial_size;
175            Cow::Owned(stream.appended[off..off_end].to_vec())
176        } else {
177            let mut v = Vec::with_capacity(end - start);
178            v.extend_from_slice(&static_bytes[start..self.initial_size]);
179            v.extend_from_slice(&stream.appended[..end - self.initial_size]);
180            Cow::Owned(v)
181        }
182    }
183
184    fn is_complete(&self) -> bool { true }
185
186    fn pump(&self) {
187        let mut stream = self.streaming.lock().unwrap();
188        let mut tmp = [0u8; 8192];
189        loop {
190            match stream.file.read(&mut tmp) {
191                Ok(0) => break,
192                Ok(n) => stream.appended.extend_from_slice(&tmp[..n]),
193                Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
194                Err(_) => break,
195            }
196        }
197        let new_len = stream.appended.len();
198        self.appended_len.store(new_len, Ordering::Release);
199    }
200}
201
202/// A test/utility source whose contents can be appended at runtime.
203pub struct MockSource {
204    buf: Arc<Mutex<Vec<u8>>>,
205    complete: Arc<AtomicBool>,
206}
207
208impl MockSource {
209    pub fn new() -> Self {
210        Self {
211            buf: Arc::new(Mutex::new(Vec::new())),
212            complete: Arc::new(AtomicBool::new(false)),
213        }
214    }
215
216    pub fn append(&self, more: &[u8]) {
217        self.buf.lock().unwrap().extend_from_slice(more);
218    }
219
220    pub fn finish(&self) {
221        self.complete.store(true, Ordering::SeqCst);
222    }
223}
224
225impl Source for MockSource {
226    fn len(&self) -> usize { self.buf.lock().unwrap().len() }
227    fn bytes(&self, range: Range<usize>) -> Cow<'_, [u8]> {
228        Cow::Owned(self.buf.lock().unwrap()[range].to_vec())
229    }
230    fn is_complete(&self) -> bool { self.complete.load(Ordering::SeqCst) }
231}
232
233/// A file source that watches for *whole-file* rewrites. Unlike `FileSource`
234/// (which only picks up appended bytes via a streaming handle), `LiveFileSource`
235/// re-reads the entire file when its `(mtime, size, ino)` signature changes,
236/// swaps the buffer in place, and bumps a revision counter so callers know to
237/// rebuild any per-line state. Intended for source-file-sized inputs being
238/// rewritten by an editor or AI agent.
239pub struct LiveFileSource {
240    path: PathBuf,
241    state: Mutex<LiveState>,
242    revision: AtomicU64,
243}
244
245struct LiveState {
246    bytes: Vec<u8>,
247    signature: FileSignature,
248}
249
250#[derive(Clone, Copy, PartialEq, Eq, Debug)]
251struct FileSignature {
252    mtime: Option<SystemTime>,
253    size: u64,
254    ino: u64,
255}
256
257impl FileSignature {
258    fn read(path: &Path) -> std::io::Result<Self> {
259        let md = std::fs::metadata(path)?;
260        Ok(Self {
261            mtime: md.modified().ok(),
262            size: md.len(),
263            #[cfg(unix)]
264            ino: {
265                use std::os::unix::fs::MetadataExt;
266                md.ino()
267            },
268            #[cfg(not(unix))]
269            ino: 0,
270        })
271    }
272}
273
274impl std::fmt::Debug for LiveFileSource {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        f.debug_struct("LiveFileSource").field("path", &self.path).finish()
277    }
278}
279
280impl LiveFileSource {
281    pub fn open(path: &Path) -> std::io::Result<Self> {
282        let md = std::fs::metadata(path)?;
283        if !md.is_file() {
284            return Err(std::io::Error::new(
285                std::io::ErrorKind::InvalidInput,
286                "not a regular file",
287            ));
288        }
289        let bytes = std::fs::read(path)?;
290        // Re-stat after read so the signature reflects the bytes we actually
291        // captured (concurrent writers won't tear our buffer, but the mtime/size
292        // we record should match what we read).
293        let signature = FileSignature::read(path)?;
294        Ok(Self {
295            path: path.to_path_buf(),
296            state: Mutex::new(LiveState { bytes, signature }),
297            revision: AtomicU64::new(0),
298        })
299    }
300}
301
302impl Source for LiveFileSource {
303    fn len(&self) -> usize { self.state.lock().unwrap().bytes.len() }
304
305    fn bytes(&self, range: Range<usize>) -> Cow<'_, [u8]> {
306        let s = self.state.lock().unwrap();
307        let end = range.end.min(s.bytes.len());
308        let start = range.start.min(end);
309        Cow::Owned(s.bytes[start..end].to_vec())
310    }
311
312    /// Live sources are never "complete" — the file may be rewritten at any
313    /// time. The status line picks this up as the `+` suffix on totals.
314    fn is_complete(&self) -> bool { false }
315
316    fn pump(&self) {
317        // Cheap stat to detect a change. If the file vanished, leave the
318        // current buffer in place and try again next tick.
319        let new_sig = match FileSignature::read(&self.path) {
320            Ok(sig) => sig,
321            Err(_) => return,
322        };
323        let mut s = self.state.lock().unwrap();
324        if new_sig == s.signature {
325            return;
326        }
327        let new_bytes = match std::fs::read(&self.path) {
328            Ok(b) => b,
329            Err(_) => return,
330        };
331        // Re-read signature after the read so we reflect what was actually loaded.
332        let post_sig = FileSignature::read(&self.path).unwrap_or(new_sig);
333        s.bytes = new_bytes;
334        s.signature = post_sig;
335        drop(s);
336        self.revision.fetch_add(1, Ordering::AcqRel);
337    }
338
339    fn revision(&self) -> u64 { self.revision.load(Ordering::Acquire) }
340}
341
342/// A source that wraps another source and applies a `PrettifyMode` transform
343/// to its bytes. Toggling the mode bumps `revision()` so the event loop
344/// rebuilds the line index. Falls back to passing through the inner bytes if
345/// the transform fails to parse, surfacing the error via `last_error()`.
346pub struct TransformingSource {
347    inner: Box<dyn Source>,
348    state: Mutex<TransformState>,
349    revision: AtomicU64,
350}
351
352struct TransformState {
353    mode: PrettifyMode,
354    /// Most recent non-Off mode. Used by `toggle_prettify` to flip back to
355    /// what the user was looking at most recently. Stays at the original
356    /// detected/explicit mode at startup, even if the user toggles to Off.
357    last_active: PrettifyMode,
358    /// Whatever the consumer should see — either the raw inner bytes or the
359    /// successfully transformed bytes. Always populated.
360    cached: Vec<u8>,
361    /// Set when the last attempted transform failed to parse. Cleared on a
362    /// successful transform or on switching to `Off`.
363    last_error: Option<String>,
364}
365
366impl std::fmt::Debug for TransformingSource {
367    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368        f.debug_struct("TransformingSource").finish()
369    }
370}
371
372impl TransformingSource {
373    /// Build a wrapping source. The inner is read fully via its current bytes
374    /// view; the transform runs once. If the transform fails, the cache holds
375    /// the raw inner bytes and `last_error()` returns the parse error.
376    pub fn wrap(inner: Box<dyn Source>, mode: PrettifyMode) -> Self {
377        let raw = inner.bytes(0..inner.len()).to_vec();
378        let (cached, last_error) = run_transform(mode, &raw);
379        let last_active = if mode.is_active() { mode } else { PrettifyMode::Off };
380        Self {
381            inner,
382            state: Mutex::new(TransformState { mode, last_active, cached, last_error }),
383            revision: AtomicU64::new(0),
384        }
385    }
386
387    pub fn mode(&self) -> PrettifyMode {
388        self.state.lock().unwrap().mode
389    }
390
391    /// Currently surfaced parse error, if any. Cleared by a successful
392    /// `set_mode` or by switching to `Off`.
393    pub fn last_error(&self) -> Option<String> {
394        self.state.lock().unwrap().last_error.clone()
395    }
396
397    /// Internal apply-mode helper. Re-reads inner bytes, runs the transform,
398    /// updates the cache, bumps revision. Tracks `last_active` so toggle
399    /// can flip back to a non-Off mode.
400    fn apply_mode(&self, mode: PrettifyMode) {
401        let raw = self.inner.bytes(0..self.inner.len()).to_vec();
402        let (cached, last_error) = run_transform(mode, &raw);
403        let mut s = self.state.lock().unwrap();
404        s.mode = mode;
405        if mode.is_active() && last_error.is_none() {
406            s.last_active = mode;
407        }
408        s.cached = cached;
409        s.last_error = last_error;
410        drop(s);
411        self.revision.fetch_add(1, Ordering::AcqRel);
412    }
413}
414
415fn run_transform(mode: PrettifyMode, raw: &[u8]) -> (Vec<u8>, Option<String>) {
416    match prettify::prettify(mode, raw) {
417        Ok(out) => (out, None),
418        Err(e) => (raw.to_vec(), Some(e)),
419    }
420}
421
422impl Source for TransformingSource {
423    fn len(&self) -> usize { self.state.lock().unwrap().cached.len() }
424
425    fn bytes(&self, range: Range<usize>) -> Cow<'_, [u8]> {
426        let s = self.state.lock().unwrap();
427        let end = range.end.min(s.cached.len());
428        let start = range.start.min(end);
429        Cow::Owned(s.cached[start..end].to_vec())
430    }
431
432    fn is_complete(&self) -> bool { self.inner.is_complete() }
433
434    fn pump(&self) { self.inner.pump(); }
435
436    fn revision(&self) -> u64 { self.revision.load(Ordering::Acquire) }
437
438    fn prettify_mode(&self) -> Option<PrettifyMode> {
439        Some(self.state.lock().unwrap().mode)
440    }
441
442    fn prettify_label(&self) -> Option<String> {
443        let s = self.state.lock().unwrap();
444        if s.last_error.is_some() {
445            // Whatever mode we tried last; show "<mode>:err".
446            let lbl = s.mode.label();
447            let lbl = if lbl.is_empty() { s.last_active.label() } else { lbl };
448            Some(format!("{lbl}:err"))
449        } else if s.mode.is_active() {
450            Some(s.mode.label().to_string())
451        } else {
452            None
453        }
454    }
455
456    fn set_prettify_mode(&self, mode: PrettifyMode) {
457        self.apply_mode(mode);
458    }
459
460    fn toggle_prettify(&self) {
461        let target = {
462            let s = self.state.lock().unwrap();
463            if s.mode.is_active() {
464                PrettifyMode::Off
465            } else if s.last_active.is_active() {
466                s.last_active
467            } else {
468                // Never had an active mode (started raw and never flipped on).
469                // Try a one-shot detection from the inner bytes; if nothing
470                // matches, leave state alone.
471                drop(s);
472                self.redetect_prettify();
473                return;
474            }
475        };
476        self.apply_mode(target);
477    }
478
479    fn redetect_prettify(&self) {
480        let raw = self.inner.bytes(0..self.inner.len()).to_vec();
481        let detected = crate::prettify::detect_from_bytes(&raw);
482        if let Some(mode) = detected {
483            self.apply_mode(mode);
484        }
485        // Undetected: leave state alone (caller will see same label).
486    }
487}
488
489pub struct StdinSource {
490    inner: StdinInner,
491}
492
493enum StdinInner {
494    Static(Vec<u8>),
495    Streaming {
496        buf: Arc<Mutex<Vec<u8>>>,
497        len_cache: Arc<AtomicUsize>,
498        complete: Arc<AtomicBool>,
499    },
500}
501
502impl StdinSource {
503    /// Read all of stdin into a buffer synchronously. After this returns,
504    /// stdin (fd 0) is at EOF; the caller is responsible for redirecting fd 0
505    /// to /dev/tty before entering raw mode if interactive input is needed.
506    pub fn read_all() -> std::io::Result<Self> {
507        let mut bytes = Vec::new();
508        std::io::stdin().lock().read_to_end(&mut bytes)?;
509        Ok(Self { inner: StdinInner::Static(bytes) })
510    }
511
512    /// Duplicate fd 0 onto a private fd, then spawn a thread that reads from
513    /// it into a shared buffer. Caller can safely `dup2(/dev/tty, 0)` afterwards
514    /// without disturbing the thread — it reads from the duplicated fd, not
515    /// from `STDIN_FILENO`.
516    #[cfg(unix)]
517    pub fn spawn_streaming() -> std::io::Result<Self> {
518        use std::os::unix::io::FromRawFd;
519        let cloned_fd = unsafe { libc::dup(libc::STDIN_FILENO) };
520        if cloned_fd < 0 {
521            return Err(std::io::Error::last_os_error());
522        }
523        // SAFETY: cloned_fd is now owned by the File; closed on Drop.
524        let mut file = unsafe { File::from_raw_fd(cloned_fd) };
525
526        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
527        let len_cache = Arc::new(AtomicUsize::new(0));
528        let complete = Arc::new(AtomicBool::new(false));
529        let buf_w = Arc::clone(&buf);
530        let len_w = Arc::clone(&len_cache);
531        let complete_w = Arc::clone(&complete);
532        std::thread::spawn(move || {
533            let mut tmp = [0u8; 8192];
534            loop {
535                match file.read(&mut tmp) {
536                    Ok(0) => break,
537                    Ok(n) => {
538                        let mut b = buf_w.lock().unwrap();
539                        b.extend_from_slice(&tmp[..n]);
540                        len_w.store(b.len(), Ordering::Release);
541                    }
542                    Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
543                    Err(_) => break,
544                }
545            }
546            complete_w.store(true, Ordering::SeqCst);
547        });
548        Ok(Self { inner: StdinInner::Streaming { buf, len_cache, complete } })
549    }
550}
551
552impl Source for StdinSource {
553    fn len(&self) -> usize {
554        match &self.inner {
555            StdinInner::Static(v) => v.len(),
556            StdinInner::Streaming { len_cache, .. } => len_cache.load(Ordering::Acquire),
557        }
558    }
559    fn bytes(&self, range: Range<usize>) -> Cow<'_, [u8]> {
560        match &self.inner {
561            StdinInner::Static(v) => Cow::Borrowed(&v[range]),
562            StdinInner::Streaming { buf, .. } => Cow::Owned(buf.lock().unwrap()[range].to_vec()),
563        }
564    }
565    fn is_complete(&self) -> bool {
566        match &self.inner {
567            StdinInner::Static(_) => true,
568            StdinInner::Streaming { complete, .. } => complete.load(Ordering::Acquire),
569        }
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576    use std::io::Write;
577
578    #[test]
579    fn file_source_reads_temp_file() {
580        let mut tmp = tempfile::NamedTempFile::new().unwrap();
581        tmp.write_all(b"hello world").unwrap();
582        let src = FileSource::open(tmp.path()).unwrap();
583        assert_eq!(src.len(), 11);
584        assert_eq!(&*src.bytes(0..5), b"hello");
585        assert_eq!(&*src.bytes(6..11), b"world");
586        assert!(src.is_complete());
587    }
588
589    #[test]
590    fn file_source_empty_file() {
591        let tmp = tempfile::NamedTempFile::new().unwrap();
592        let src = FileSource::open(tmp.path()).unwrap();
593        assert_eq!(src.len(), 0);
594    }
595
596    #[test]
597    fn file_source_directory_errors() {
598        let dir = tempfile::tempdir().unwrap();
599        let err = FileSource::open(dir.path()).unwrap_err();
600        assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput);
601    }
602
603    #[test]
604    fn file_source_pump_picks_up_appended_bytes() {
605        let mut tmp = tempfile::NamedTempFile::new().unwrap();
606        tmp.write_all(b"first").unwrap();
607        tmp.flush().unwrap();
608        let src = FileSource::open(tmp.path()).unwrap();
609        assert_eq!(src.len(), 5);
610        // Append more bytes to the underlying file.
611        tmp.write_all(b" second").unwrap();
612        tmp.flush().unwrap();
613        // Before pump, len() reflects only what we knew at open.
614        assert_eq!(src.len(), 5);
615        src.pump();
616        assert_eq!(src.len(), 12);
617        // Borrowed range entirely in the original mmap.
618        assert_eq!(&*src.bytes(0..5), b"first");
619        // Range entirely in the appended region.
620        assert_eq!(&*src.bytes(5..12), b" second");
621        // Range straddling the boundary (3..10 = 7 bytes of "first second").
622        assert_eq!(&*src.bytes(3..10), b"st seco");
623    }
624
625    #[test]
626    fn find_tail_offset_zero_lines_returns_total() {
627        let m = MockSource::new();
628        m.append(b"a\nb\nc\n");
629        assert_eq!(find_tail_offset(&m, 0), 6);
630    }
631
632    #[test]
633    fn find_tail_offset_empty_source() {
634        let m = MockSource::new();
635        assert_eq!(find_tail_offset(&m, 5), 0);
636    }
637
638    #[test]
639    fn find_tail_offset_fewer_lines_than_n_returns_zero() {
640        let m = MockSource::new();
641        m.append(b"a\nb\nc\n");  // 3 lines
642        assert_eq!(find_tail_offset(&m, 10), 0);
643    }
644
645    #[test]
646    fn find_tail_offset_last_one_with_trailing_newline() {
647        let m = MockSource::new();
648        m.append(b"alpha\nbeta\ngamma\n");  // 3 lines
649        // gamma starts at byte 11.
650        assert_eq!(find_tail_offset(&m, 1), 11);
651    }
652
653    #[test]
654    fn find_tail_offset_last_two_with_trailing_newline() {
655        let m = MockSource::new();
656        m.append(b"alpha\nbeta\ngamma\n");
657        // beta starts at byte 6.
658        assert_eq!(find_tail_offset(&m, 2), 6);
659    }
660
661    #[test]
662    fn find_tail_offset_last_one_no_trailing_newline() {
663        let m = MockSource::new();
664        m.append(b"alpha\nbeta\ngamma");  // last line not terminated
665        // gamma starts at byte 11.
666        assert_eq!(find_tail_offset(&m, 1), 11);
667    }
668
669    #[test]
670    fn find_tail_offset_exactly_n_lines_returns_zero() {
671        let m = MockSource::new();
672        m.append(b"a\nb\nc\n");  // 3 lines exactly
673        assert_eq!(find_tail_offset(&m, 3), 0);
674    }
675
676    #[test]
677    fn live_source_reads_initial_content() {
678        let mut tmp = tempfile::NamedTempFile::new().unwrap();
679        tmp.write_all(b"alpha\nbeta\n").unwrap();
680        tmp.flush().unwrap();
681        let src = LiveFileSource::open(tmp.path()).unwrap();
682        assert_eq!(src.len(), 11);
683        assert_eq!(&*src.bytes(0..11), b"alpha\nbeta\n");
684        assert_eq!(src.revision(), 0);
685        assert!(!src.is_complete()); // live sources are always "growing"
686    }
687
688    #[test]
689    fn live_source_pump_picks_up_rewritten_content() {
690        let tmp = tempfile::NamedTempFile::new().unwrap();
691        std::fs::write(tmp.path(), b"first\n").unwrap();
692        let src = LiveFileSource::open(tmp.path()).unwrap();
693        assert_eq!(src.len(), 6);
694        assert_eq!(src.revision(), 0);
695
696        // Rewrite the file with different (longer) content. mtime should
697        // bump; signatures differ; pump() picks it up.
698        std::thread::sleep(std::time::Duration::from_millis(20));
699        std::fs::write(tmp.path(), b"second longer line\n").unwrap();
700        src.pump();
701        assert_eq!(src.len(), 19);
702        assert_eq!(&*src.bytes(0..19), b"second longer line\n");
703        assert_eq!(src.revision(), 1);
704    }
705
706    #[test]
707    fn live_source_pump_no_change_does_not_bump_revision() {
708        let tmp = tempfile::NamedTempFile::new().unwrap();
709        std::fs::write(tmp.path(), b"stable\n").unwrap();
710        let src = LiveFileSource::open(tmp.path()).unwrap();
711        let r0 = src.revision();
712        src.pump();
713        src.pump();
714        src.pump();
715        assert_eq!(src.revision(), r0);
716    }
717
718    #[test]
719    fn live_source_handles_file_shrink() {
720        let tmp = tempfile::NamedTempFile::new().unwrap();
721        std::fs::write(tmp.path(), b"longer initial content\n").unwrap();
722        let src = LiveFileSource::open(tmp.path()).unwrap();
723        assert!(src.len() > 5);
724        std::thread::sleep(std::time::Duration::from_millis(20));
725        std::fs::write(tmp.path(), b"x\n").unwrap();
726        src.pump();
727        assert_eq!(src.len(), 2);
728        assert_eq!(&*src.bytes(0..2), b"x\n");
729        assert_eq!(src.revision(), 1);
730    }
731
732    #[test]
733    fn live_source_handles_atomic_rename() {
734        // Most editors write atomically: write tmp file, rename over original.
735        // The inode changes; mtime/size likely change. pump() must follow.
736        let dir = tempfile::tempdir().unwrap();
737        let target = dir.path().join("file.txt");
738        std::fs::write(&target, b"original\n").unwrap();
739        let src = LiveFileSource::open(&target).unwrap();
740        assert_eq!(&*src.bytes(0..9), b"original\n");
741
742        std::thread::sleep(std::time::Duration::from_millis(20));
743        let staging = dir.path().join("file.txt.tmp");
744        std::fs::write(&staging, b"renamed in\n").unwrap();
745        std::fs::rename(&staging, &target).unwrap();
746        src.pump();
747        assert_eq!(src.len(), 11);
748        assert_eq!(&*src.bytes(0..11), b"renamed in\n");
749        assert_eq!(src.revision(), 1);
750    }
751
752    #[test]
753    fn live_source_rebuild_flow_against_line_index() {
754        use crate::line_index::LineIndex;
755
756        let tmp = tempfile::NamedTempFile::new().unwrap();
757        std::fs::write(tmp.path(), b"a\nb\nc\n").unwrap();
758        let src = LiveFileSource::open(tmp.path()).unwrap();
759
760        let mut idx = LineIndex::new();
761        idx.notice_new_bytes(&src);
762        assert_eq!(idx.line_count(), 3);
763        let r0 = src.revision();
764
765        // Rewrite to a 5-line file.
766        std::thread::sleep(std::time::Duration::from_millis(20));
767        std::fs::write(tmp.path(), b"one\ntwo\nthree\nfour\nfive\n").unwrap();
768        src.pump();
769        assert_ne!(src.revision(), r0, "revision must bump on rewrite");
770
771        // Caller's job to rebuild the index — mirror what app::rebuild_after_replace does.
772        idx = LineIndex::new();
773        idx.notice_new_bytes(&src);
774        assert_eq!(idx.line_count(), 5);
775        assert_eq!(&*src.bytes(idx.line_range(2, &src)), b"three");
776    }
777
778    #[test]
779    fn transforming_source_passes_through_when_off() {
780        let inner = MockSource::new();
781        inner.append(b"hello\nworld\n");
782        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Off);
783        assert_eq!(&*t.bytes(0..t.len()), b"hello\nworld\n");
784        assert!(t.last_error().is_none());
785        assert_eq!(t.revision(), 0);
786    }
787
788    #[test]
789    fn transforming_source_emits_pretty_bytes_when_on() {
790        let inner = MockSource::new();
791        inner.append(b"{\"a\":1,\"b\":2}");
792        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Json);
793        let out = t.bytes(0..t.len()).to_vec();
794        let s = String::from_utf8(out).unwrap();
795        assert!(s.contains("\"a\": 1"));
796        assert!(s.contains("\"b\": 2"));
797        assert!(t.last_error().is_none());
798    }
799
800    #[test]
801    fn transforming_source_revision_bumps_on_mode_change() {
802        let inner = MockSource::new();
803        inner.append(b"{\"x\":1}");
804        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Off);
805        let r0 = t.revision();
806        t.set_prettify_mode(PrettifyMode::Json);
807        assert!(t.revision() > r0);
808        let r1 = t.revision();
809        t.set_prettify_mode(PrettifyMode::Off);
810        assert!(t.revision() > r1);
811    }
812
813    #[test]
814    fn transforming_source_falls_back_to_raw_on_parse_error() {
815        let inner = MockSource::new();
816        inner.append(b"not actually json");
817        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Json);
818        // Cache holds raw bytes since the transform failed.
819        assert_eq!(&*t.bytes(0..t.len()), b"not actually json");
820        let err = t.last_error().expect("expected parse error to be surfaced");
821        assert!(err.contains("json"), "expected json in error, got: {err}");
822        // Status label reflects the error.
823        let label = t.prettify_label().expect("error → label should be set");
824        assert!(label.ends_with(":err"), "expected :err label, got: {label}");
825    }
826
827    #[test]
828    fn transforming_source_set_mode_recovers_from_error() {
829        let inner = MockSource::new();
830        inner.append(b"plain content");
831        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Json);
832        assert!(t.last_error().is_some());
833        t.set_prettify_mode(PrettifyMode::Off);
834        assert!(t.last_error().is_none());
835        assert_eq!(&*t.bytes(0..t.len()), b"plain content");
836    }
837
838    #[test]
839    fn transforming_source_toggle_flips_between_active_and_off() {
840        let inner = MockSource::new();
841        inner.append(b"{\"x\":1}");
842        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Json);
843        assert_eq!(t.mode(), PrettifyMode::Json);
844        t.toggle_prettify();
845        assert_eq!(t.mode(), PrettifyMode::Off);
846        t.toggle_prettify();
847        // Flipped back to last active.
848        assert_eq!(t.mode(), PrettifyMode::Json);
849    }
850
851    #[test]
852    fn transforming_source_redetect_picks_up_format() {
853        let inner = MockSource::new();
854        inner.append(b"<?xml version=\"1.0\"?><root/>");
855        let t = TransformingSource::wrap(Box::new(inner), PrettifyMode::Off);
856        // Initially raw; after redetect, should be XML.
857        t.redetect_prettify();
858        assert_eq!(t.mode(), PrettifyMode::Xml);
859    }
860
861    #[test]
862    fn live_source_directory_errors() {
863        let dir = tempfile::tempdir().unwrap();
864        let err = LiveFileSource::open(dir.path()).unwrap_err();
865        assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput);
866    }
867
868    #[test]
869    fn mock_source_grows_and_finishes() {
870        let m = MockSource::new();
871        assert_eq!(m.len(), 0);
872        assert!(!m.is_complete());
873        m.append(b"abc");
874        assert_eq!(m.len(), 3);
875        assert_eq!(&*m.bytes(0..3), b"abc");
876        m.append(b"def");
877        assert_eq!(&*m.bytes(0..6), b"abcdef");
878        m.finish();
879        assert!(m.is_complete());
880    }
881}