ubl_ledger/
reader.rs

1//! NDJSON reader and tail helpers for UBL event streams.
2use crate::event::UblEvent;
3use anyhow::Result;
4use std::{
5    fs::File,
6    io::{BufRead, BufReader},
7    path::Path,
8    thread,
9    time::Duration,
10};
11
12/// Leitor simplificado de NDJSON.
13pub struct UblReader;
14impl UblReader {
15    /// Cria iterador para um arquivo NDJSON UBL.
16    ///
17    /// # Errors
18    ///
19    /// - Retorna erros de I/O ao abrir o arquivo
20    pub fn iter_file<P: AsRef<Path>>(path: P) -> Result<UblIter> {
21        let f = File::open(path)?;
22        Ok(UblIter {
23            reader: BufReader::new(f),
24        })
25    }
26}
27/// Iterador linha-a-linha.
28pub struct UblIter {
29    reader: BufReader<File>,
30}
31impl Iterator for UblIter {
32    type Item = Result<UblEvent, anyhow::Error>;
33    fn next(&mut self) -> Option<Self::Item> {
34        let mut line = String::new();
35        match self.reader.read_line(&mut line) {
36            Ok(0) => None,
37            Ok(_) => {
38                if line.trim().is_empty() {
39                    return self.next();
40                }
41                Some(serde_json::from_str::<UblEvent>(&line).map_err(Into::into))
42            }
43            Err(e) => Some(Err(e.into())),
44        }
45    }
46}
47/// "Tail -f" simplificado (bloqueante).
48///
49/// # Errors
50///
51/// - Erros de I/O ao ler/seekar o arquivo
52pub fn tail_file<P: AsRef<Path>, F: Fn(UblEvent)>(path: P, on_event: F) -> Result<()> {
53    use std::io::{Seek, SeekFrom};
54    let mut f = File::open(path)?;
55    let mut pos = f.seek(SeekFrom::End(0))?;
56    loop {
57        let mut r = BufReader::new(&mut f);
58        r.seek(SeekFrom::Start(pos))?;
59        let mut line = String::new();
60        while r.read_line(&mut line)? > 0 {
61            if !line.trim().is_empty() {
62                if let Ok(ev) = serde_json::from_str::<UblEvent>(&line) {
63                    on_event(ev);
64                }
65            }
66            line.clear();
67            pos = r.stream_position()?;
68        }
69        thread::sleep(Duration::from_millis(300));
70    }
71}