Skip to main content

reading_liner/
stream.rs

1//! The central Reader of this crate.
2//!
3//! The reader [Stream] wraps any types implementing [std::io::Read] and an extra [Index].
4//! [Stream] can be used and passed as common reader. When consumed, the [Stream] is gone and the extra [Index] can be used.
5//!
6//! If you want to read and query locations simutaneously, [Index] can be reborrowed from [Stream].
7
8use crate::{
9    index::{self, Index},
10    location::{Offset, line_column},
11};
12use std::io;
13
14/// A stream which can be used to convert between offsets and line-column locations.
15///
16/// [Index] is a mutable reference to ensure that we can still refer to the index after consuming the whole [Stream].
17#[derive(Debug)]
18pub struct Stream<'index, Reader> {
19    reader: Reader,
20
21    base: usize, // For future use
22    index: &'index mut Index,
23    next_offset: Offset,
24    current_line: usize,
25}
26
27impl<'index, R> Stream<'index, R> {
28    pub fn new(reader: R, index: &'index mut Index) -> Self {
29        Self {
30            reader,
31            base: 0,
32            index,
33            next_offset: 0.into(),
34            current_line: 0,
35        }
36    }
37
38    pub fn get_ref(&self) -> &R {
39        &self.reader
40    }
41
42    #[inline]
43    pub fn base(&self) -> usize {
44        self.base
45    }
46
47    /// Immutable query to further query offsets and line-column locations
48    #[inline]
49    pub fn query(&self) -> index::Query<'_> {
50        self.index.query()
51    }
52
53    #[inline]
54    pub fn get_index(&self) -> &Index {
55        &self.index
56    }
57}
58
59impl<'index, R: io::Read> Stream<'index, R> {
60    /// Read length
61    #[inline]
62    pub fn read_len(&self) -> usize {
63        self.next_offset.raw()
64    }
65
66    /// Try to get more bytes and update states
67    fn forward(&mut self, buf: &mut [u8]) -> io::Result<usize> {
68        let n = self.reader.read(buf)?;
69
70        for (offset, b) in buf.iter().take(n).enumerate() {
71            if *b == b'\n' {
72                self.current_line += 1;
73                self.index.add_next_line(self.next_offset + offset + 1); // next line begin
74                continue;
75            }
76        }
77
78        // reached EoF, try to add fake ending
79        if !buf.is_empty() && n == 0 {
80            // TODO
81            match self.index.end() {
82                Some(end) if end != self.next_offset => {
83                    self.index.add_next_line(self.next_offset);
84                }
85                None => self.index.add_next_line(self.next_offset),
86                _ => {}
87            }
88        }
89
90        self.next_offset += n;
91        Ok(n)
92    }
93
94    /// Locate the (line, column) position for a given byte `offset`.  
95    ///
96    /// NOTE: this method may cause extra reading when the offset input cannot find a location.
97    ///
98    /// This method first resolves the line index via [`locate_line`], then
99    /// computes the column by subtracting the starting offset of that line.
100    ///
101    /// # Parameters
102    /// - `offset`: The target byte offset.
103    /// - `buf`: A temporary buffer used for incremental reading.
104    ///
105    /// # Returns
106    /// - `Ok(ZeroBased(line, column))` if the offset is within bounds.
107    /// - `Err` if the offset exceeds EOF (propagated from [`locate_line`]).
108    ///
109    /// # Invariants
110    /// - The internal index always contains a valid starting offset for every line.
111    /// - Therefore, `line_offset(line)` must succeed for any valid `line`.
112    ///
113    /// # Notes
114    /// - Both line and column are zero-based.
115    /// - Column is computed in **bytes**, not characters (UTF-8 aware handling is not performed here).
116    pub fn locate(&mut self, offset: Offset, buf: &mut [u8]) -> io::Result<line_column::ZeroBased> {
117        let line = self.locate_line(offset, buf)?;
118        let line_offset = self.query().line_offset(line).unwrap();
119        let col = offset - line_offset;
120        Ok((line, col.raw()).into())
121    }
122
123    /// Locate the line index for a given byte `offset`.
124    ///
125    /// This method performs an incremental lookup:
126    /// it first queries the existing line index, and if the offset
127    /// is not covered, it reads more data and extends the index.
128    /// This method may cause extra reading when the offset input cannot find a location.
129    ///
130    /// # Invariants
131    /// - The internal index is non-empty and ends with a sentinel EOF offset.
132    ///
133    /// # Errors
134    /// Returns an error if `offset` exceeds EOF.
135    pub fn locate_line(&mut self, offset: Offset, buf: &mut [u8]) -> io::Result<usize> {
136        let mut begin = 0;
137        loop {
138            // Invariant: index is non-empty and ends with EOF.
139            // Therefore, begin <= query.count() always holds, and range_from(begin..)
140            // is guaranteed to be a valid slice (possibly containing only EOF).
141            if let Some(i) = self.query().range_from(begin..).locate_line(offset) {
142                break Ok(i); // look here the returned `i` is already `begin` based, there's no need to add an extra begin
143            }
144            begin = self.index.count();
145
146            if self.forward(buf)? == 0 {
147                break Err(io_error("Invalid offset, exceed EOF"));
148            }
149        }
150    }
151
152    /// Encode a (line, column) location into a byte `Offset`.
153    ///
154    /// This method may incrementally extend the internal line index by reading
155    /// additional data if the requested line is not yet available.
156    ///
157    /// # Behavior
158    /// - If the line is already indexed, the offset is computed directly.
159    /// - Otherwise, more data is read and the index is extended until the line
160    ///   becomes available or EOF is reached.
161    ///
162    /// # Returns
163    /// - `Ok(offset)` if the position can be resolved.
164    /// - `Err` if the line index exceeds EOF.
165    ///
166    /// # Notes
167    /// - Column is interpreted as a **byte offset** relative to the start of the line.
168    /// - This method does **not** validate whether the column lies within the bounds
169    ///   of the line.
170    pub fn encode(
171        &mut self,
172        line_index: line_column::ZeroBased,
173        buf: &mut [u8],
174    ) -> io::Result<Offset> {
175        let (line, col) = line_index.raw();
176        loop {
177            if let Some(offset) = self.query().line_offset(line) {
178                break Ok(offset + col);
179            }
180
181            if self.forward(buf)? == 0 {
182                break Err(io_error(format!("Invalid line index: ({}, {})", line, col)));
183            }
184        }
185    }
186
187    /// Drain the reader, consume the reader
188    pub fn drain(&mut self, buf: &mut [u8]) -> io::Result<()> {
189        loop {
190            let n = self.forward(buf)?;
191            if n == 0 {
192                return Ok(());
193            }
194        }
195    }
196}
197
198/// You can use [Stream] as a normal [io::Read] and recording index at the same time.
199impl<'index, R: io::Read> io::Read for Stream<'index, R> {
200    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
201        self.forward(buf)
202    }
203}
204
205#[inline]
206fn io_error<S: ToString>(msg: S) -> io::Error {
207    io::Error::new(io::ErrorKind::Other, msg.to_string())
208}
209
210#[cfg(test)]
211mod test {
212    #![allow(unused_must_use)]
213    use super::*;
214    use std::io::{BufReader, Read};
215
216    static SRC: &'static str = "\nThis is s sim\nple test that\n I have to verify stream reader!";
217
218    #[test]
219    fn test_stream_str_buf() {
220        let mut index = Index::new();
221        let stream = Stream::new(SRC.as_bytes(), &mut index);
222        let mut reader = BufReader::new(stream);
223        let mut buf = String::new();
224        reader.read_to_string(&mut buf).unwrap();
225
226        let ans = reader.get_ref().query().locate(Offset(20));
227        assert!(ans.is_some());
228        assert_eq!(ans.unwrap(), (2, 5).into());
229    }
230
231    #[test]
232    fn test_stream_str_drain() {
233        let mut index = Index::new();
234        let mut stream = Stream::new(SRC.as_bytes(), &mut index);
235        let mut buf = vec![b'\0'; 10];
236        stream.drain(&mut buf);
237
238        let ans = stream.query().locate(Offset(20));
239        assert!(ans.is_some());
240        assert_eq!(ans.unwrap(), (2, 5).into());
241    }
242
243    #[test]
244    fn test_stream_str_incremental() {
245        let mut index = Index::new();
246        let mut stream = Stream::new(SRC.as_bytes(), &mut index);
247        let mut buf = vec![b'\0'; 10];
248
249        let ans = stream.locate(Offset(20), &mut buf);
250        assert!(ans.is_ok());
251        assert_eq!(ans.unwrap(), (2, 5).into());
252    }
253}