reading_liner/stream.rs
1//! The central Reader of this crate.
2//!
3//! The reader [Stream] wraps any types implementing [std::io::Read] and an extra [Index].
4//! [Stream] can be used and passed as common reader. When consumed, the [Stream] is gone and the extra [Index] can be used.
5//!
6//! If you want to read and query locations simutaneously, [Index] can be reborrowed from [Stream].
7
8use crate::{
9 index::{self, Index},
10 location::{Offset, line_column},
11};
12use std::io;
13
14/// A stream which can be used to convert between offsets and line-column locations.
15///
16/// [Index] is a mutable reference to ensure that we can still refer to the index after consuming the whole [Stream].
17#[derive(Debug)]
18pub struct Stream<'index, Reader> {
19 reader: Reader,
20
21 base: usize, // For future use
22 index: &'index mut Index,
23 next_offset: Offset,
24 current_line: usize,
25}
26
27impl<'index, R> Stream<'index, R> {
28 pub fn new(reader: R, index: &'index mut Index) -> Self {
29 Self {
30 reader,
31 base: 0,
32 index,
33 next_offset: 0.into(),
34 current_line: 0,
35 }
36 }
37
38 pub fn get_ref(&self) -> &R {
39 &self.reader
40 }
41
42 #[inline]
43 pub fn base(&self) -> usize {
44 self.base
45 }
46
47 /// Immutable query to further query offsets and line-column locations
48 #[inline]
49 pub fn query(&self) -> index::Query<'_> {
50 self.index.query()
51 }
52
53 #[inline]
54 pub fn get_index(&self) -> &Index {
55 &self.index
56 }
57}
58
59impl<'index, R: io::Read> Stream<'index, R> {
60 /// Read length
61 #[inline]
62 pub fn read_len(&self) -> usize {
63 self.next_offset.raw()
64 }
65
66 /// Try to get more bytes and update states
67 fn forward(&mut self, buf: &mut [u8]) -> io::Result<usize> {
68 let n = self.reader.read(buf)?;
69
70 for (offset, b) in buf.iter().take(n).enumerate() {
71 if *b == b'\n' {
72 self.current_line += 1;
73 self.index.add_next_line(self.next_offset + offset + 1); // next line begin
74 continue;
75 }
76 }
77
78 // reached EoF, try to add fake ending
79 if !buf.is_empty() && n == 0 {
80 // TODO
81 match self.index.end() {
82 Some(end) if end != self.next_offset => {
83 self.index.add_next_line(self.next_offset);
84 }
85 None => self.index.add_next_line(self.next_offset),
86 _ => {}
87 }
88 }
89
90 self.next_offset += n;
91 Ok(n)
92 }
93
94 /// Locate the (line, column) position for a given byte `offset`.
95 ///
96 /// NOTE: this method may cause extra reading when the offset input cannot find a location.
97 ///
98 /// This method first resolves the line index via [`locate_line`], then
99 /// computes the column by subtracting the starting offset of that line.
100 ///
101 /// # Parameters
102 /// - `offset`: The target byte offset.
103 /// - `buf`: A temporary buffer used for incremental reading.
104 ///
105 /// # Returns
106 /// - `Ok(ZeroBased(line, column))` if the offset is within bounds.
107 /// - `Err` if the offset exceeds EOF (propagated from [`locate_line`]).
108 ///
109 /// # Invariants
110 /// - The internal index always contains a valid starting offset for every line.
111 /// - Therefore, `line_offset(line)` must succeed for any valid `line`.
112 ///
113 /// # Notes
114 /// - Both line and column are zero-based.
115 /// - Column is computed in **bytes**, not characters (UTF-8 aware handling is not performed here).
116 pub fn locate(&mut self, offset: Offset, buf: &mut [u8]) -> io::Result<line_column::ZeroBased> {
117 let line = self.locate_line(offset, buf)?;
118 let line_offset = self.query().line_offset(line).unwrap();
119 let col = offset - line_offset;
120 Ok((line, col.raw()).into())
121 }
122
123 /// Locate the line index for a given byte `offset`.
124 ///
125 /// This method performs an incremental lookup:
126 /// it first queries the existing line index, and if the offset
127 /// is not covered, it reads more data and extends the index.
128 /// This method may cause extra reading when the offset input cannot find a location.
129 ///
130 /// # Invariants
131 /// - The internal index is non-empty and ends with a sentinel EOF offset.
132 ///
133 /// # Errors
134 /// Returns an error if `offset` exceeds EOF.
135 pub fn locate_line(&mut self, offset: Offset, buf: &mut [u8]) -> io::Result<usize> {
136 let mut begin = 0;
137 loop {
138 // Invariant: index is non-empty and ends with EOF.
139 // Therefore, begin <= query.count() always holds, and range_from(begin..)
140 // is guaranteed to be a valid slice (possibly containing only EOF).
141 if let Some(i) = self.query().range_from(begin..).locate_line(offset) {
142 break Ok(i); // look here the returned `i` is already `begin` based, there's no need to add an extra begin
143 }
144 begin = self.index.count();
145
146 if self.forward(buf)? == 0 {
147 break Err(io_error("Invalid offset, exceed EOF"));
148 }
149 }
150 }
151
152 /// Encode a (line, column) location into a byte `Offset`.
153 ///
154 /// This method may incrementally extend the internal line index by reading
155 /// additional data if the requested line is not yet available.
156 ///
157 /// # Behavior
158 /// - If the line is already indexed, the offset is computed directly.
159 /// - Otherwise, more data is read and the index is extended until the line
160 /// becomes available or EOF is reached.
161 ///
162 /// # Returns
163 /// - `Ok(offset)` if the position can be resolved.
164 /// - `Err` if the line index exceeds EOF.
165 ///
166 /// # Notes
167 /// - Column is interpreted as a **byte offset** relative to the start of the line.
168 /// - This method does **not** validate whether the column lies within the bounds
169 /// of the line.
170 pub fn encode(
171 &mut self,
172 line_index: line_column::ZeroBased,
173 buf: &mut [u8],
174 ) -> io::Result<Offset> {
175 let (line, col) = line_index.raw();
176 loop {
177 if let Some(offset) = self.query().line_offset(line) {
178 break Ok(offset + col);
179 }
180
181 if self.forward(buf)? == 0 {
182 break Err(io_error(format!("Invalid line index: ({}, {})", line, col)));
183 }
184 }
185 }
186
187 /// Drain the reader, consume the reader
188 pub fn drain(&mut self, buf: &mut [u8]) -> io::Result<()> {
189 loop {
190 let n = self.forward(buf)?;
191 if n == 0 {
192 return Ok(());
193 }
194 }
195 }
196}
197
198/// You can use [Stream] as a normal [io::Read] and recording index at the same time.
199impl<'index, R: io::Read> io::Read for Stream<'index, R> {
200 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
201 self.forward(buf)
202 }
203}
204
205#[inline]
206fn io_error<S: ToString>(msg: S) -> io::Error {
207 io::Error::new(io::ErrorKind::Other, msg.to_string())
208}
209
210#[cfg(test)]
211mod test {
212 #![allow(unused_must_use)]
213 use super::*;
214 use std::io::{BufReader, Read};
215
216 static SRC: &'static str = "\nThis is s sim\nple test that\n I have to verify stream reader!";
217
218 #[test]
219 fn test_stream_str_buf() {
220 let mut index = Index::new();
221 let stream = Stream::new(SRC.as_bytes(), &mut index);
222 let mut reader = BufReader::new(stream);
223 let mut buf = String::new();
224 reader.read_to_string(&mut buf).unwrap();
225
226 let ans = reader.get_ref().query().locate(Offset(20));
227 assert!(ans.is_some());
228 assert_eq!(ans.unwrap(), (2, 5).into());
229 }
230
231 #[test]
232 fn test_stream_str_drain() {
233 let mut index = Index::new();
234 let mut stream = Stream::new(SRC.as_bytes(), &mut index);
235 let mut buf = vec![b'\0'; 10];
236 stream.drain(&mut buf);
237
238 let ans = stream.query().locate(Offset(20));
239 assert!(ans.is_some());
240 assert_eq!(ans.unwrap(), (2, 5).into());
241 }
242
243 #[test]
244 fn test_stream_str_incremental() {
245 let mut index = Index::new();
246 let mut stream = Stream::new(SRC.as_bytes(), &mut index);
247 let mut buf = vec![b'\0'; 10];
248
249 let ans = stream.locate(Offset(20), &mut buf);
250 assert!(ans.is_ok());
251 assert_eq!(ans.unwrap(), (2, 5).into());
252 }
253}