tokio_rev_lines/
lib.rs

1//! This library provides an async stream for reading files or any `BufReader` line by line with buffering in reverse.
2//!
3//! It's an async tokio version of [rev_lines](https://github.com/mjc-gh/rev_lines).
4//!
5//! ### Example
6//!
7//! ```
8//! use futures_util::{pin_mut, StreamExt};
9//! use tokio::{fs::File, io::BufReader};
10//! use tokio_rev_lines::RevLines;
11//!
12//! #[tokio::main]
13//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
14//!     let file = File::open("tests/multi_line_file").await?;
15//!     let rev_lines = RevLines::new(BufReader::new(file)).await?;
16//!     pin_mut!(rev_lines);
17//!
18//!     while let Some(line) = rev_lines.next().await {
19//!         println!("{}", line?);
20//!     }
21//!
22//!     Ok(())
23//! }
24//! ```
25//!
26//! This method uses logic borrowed from [uutils/coreutils
27//! tail](https://github.com/uutils/coreutils/blob/f2166fed0ad055d363aedff6223701001af090d3/src/tail/tail.rs#L399-L402)
28
29use futures_util::{stream, Stream};
30use std::cmp::min;
31use thiserror::Error;
32use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, BufReader, SeekFrom};
33
34static DEFAULT_SIZE: usize = 4096;
35
36static LF_BYTE: u8 = '\n' as u8;
37static CR_BYTE: u8 = '\r' as u8;
38
39/// Custom error types
40#[derive(Error, Debug)]
41pub enum Error {
42    #[error(transparent)]
43    Io(#[from] tokio::io::Error),
44
45    #[error(transparent)]
46    NotUtf8(#[from] std::string::FromUtf8Error),
47}
48
49/// `RevLines` struct
50pub struct RevLines<R> {
51    reader: BufReader<R>,
52    reader_pos: u64,
53    buf_size: u64,
54}
55
56impl<R: AsyncSeek + AsyncRead + Unpin> RevLines<R> {
57    /// Create an async stream of strings from a `BufReader<R>`. Internal
58    /// buffering for iteration will default to 4096 bytes at a time.
59    pub async fn new(
60        reader: BufReader<R>,
61    ) -> Result<impl Stream<Item = Result<String, Error>>, Error> {
62        RevLines::with_capacity(DEFAULT_SIZE, reader).await
63    }
64
65    /// Create an async stream of strings from a `BufReader<R>`. Internal
66    /// buffering for iteration will use `cap` bytes at a time.
67    pub async fn with_capacity(
68        cap: usize,
69        mut reader: BufReader<R>,
70    ) -> Result<impl Stream<Item = Result<String, Error>>, Error> {
71        // Seek to end of reader now
72        let reader_size = reader.seek(SeekFrom::End(0)).await?;
73
74        let mut rev_lines = RevLines {
75            reader: reader,
76            reader_pos: reader_size,
77            buf_size: cap as u64,
78        };
79
80        // Handle any trailing new line characters for the reader
81        // so the first next call does not return Some("")
82
83        // Read at most 2 bytes
84        let end_size = min(reader_size, 2);
85        let end_buf = rev_lines.read_to_buffer(end_size).await?;
86
87        if end_size == 1 {
88            if end_buf[0] != LF_BYTE {
89                rev_lines.move_reader_position(1).await?;
90            }
91        } else if end_size == 2 {
92            if end_buf[0] != CR_BYTE {
93                rev_lines.move_reader_position(1).await?;
94            }
95
96            if end_buf[1] != LF_BYTE {
97                rev_lines.move_reader_position(1).await?;
98            }
99        }
100
101        let stream = stream::unfold(rev_lines, |mut rev_lines| async {
102            match rev_lines.next_line().await {
103                Some(line) => Some((line, rev_lines)),
104                None => None,
105            }
106        });
107
108        Ok(stream)
109    }
110
111    async fn read_to_buffer(&mut self, size: u64) -> Result<Vec<u8>, tokio::io::Error> {
112        let mut buf = vec![0; size as usize];
113        let offset = -(size as i64);
114
115        self.reader.seek(SeekFrom::Current(offset)).await?;
116        self.reader.read_exact(&mut buf[0..(size as usize)]).await?;
117        self.reader.seek(SeekFrom::Current(offset)).await?;
118
119        self.reader_pos -= size;
120
121        Ok(buf)
122    }
123
124    async fn move_reader_position(&mut self, offset: u64) -> Result<(), tokio::io::Error> {
125        self.reader.seek(SeekFrom::Current(offset as i64)).await?;
126        self.reader_pos += offset;
127
128        Ok(())
129    }
130
131    async fn next_line(&mut self) -> Option<Result<String, Error>> {
132        let mut result: Vec<u8> = Vec::new();
133
134        'outer: loop {
135            if self.reader_pos < 1 {
136                if result.len() > 0 {
137                    break;
138                }
139
140                return None;
141            }
142
143            // Read the of minimum between the desired
144            // buffer size or remaining length of the reader
145            let size = min(self.buf_size, self.reader_pos);
146
147            match self.read_to_buffer(size).await {
148                Ok(buf) => {
149                    for (idx, ch) in (&buf).iter().enumerate().rev() {
150                        // Found a new line character to break on
151                        if *ch == LF_BYTE {
152                            let mut offset = idx as u64;
153
154                            // Add an extra byte cause of CR character
155                            if idx > 1 && buf[idx - 1] == CR_BYTE {
156                                offset -= 1;
157                            }
158
159                            match self.reader.seek(SeekFrom::Current(offset as i64)).await {
160                                Ok(_) => {
161                                    self.reader_pos += offset;
162
163                                    break 'outer;
164                                }
165
166                                Err(e) => return Some(Err(Error::Io(e))),
167                            }
168                        } else {
169                            result.push(ch.clone());
170                        }
171                    }
172                }
173
174                Err(e) => return Some(Err(Error::Io(e))),
175            }
176        }
177
178        // Reverse the results since they were written backwards
179        result.reverse();
180
181        // Convert to a String
182        match String::from_utf8(result) {
183            Ok(s) => Some(Ok(s)),
184            Err(e) => Some(Err(Error::NotUtf8(e))),
185        }
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    use futures_util::{pin_mut, StreamExt};
194    use tokio::fs::File;
195
196    #[tokio::test]
197    async fn it_handles_empty_files() {
198        let file = File::open("tests/empty_file").await.unwrap();
199        let rev_lines = RevLines::new(BufReader::new(file)).await.unwrap();
200        let results = vec![];
201
202        assert_stream_eq(rev_lines, results).await;
203    }
204
205    #[tokio::test]
206    async fn it_handles_file_with_one_line() {
207        let file = File::open("tests/one_line_file").await.unwrap();
208        let rev_lines = RevLines::new(BufReader::new(file)).await.unwrap();
209        let results = vec!["ABCD"];
210
211        assert_stream_eq(rev_lines, results).await;
212    }
213
214    #[tokio::test]
215    async fn it_handles_file_with_multi_lines() {
216        let file = File::open("tests/multi_line_file").await.unwrap();
217        let rev_lines = RevLines::new(BufReader::new(file)).await.unwrap();
218        let results = vec!["UVWXYZ", "LMNOPQRST", "GHIJK", "ABCDEF"];
219
220        assert_stream_eq(rev_lines, results).await;
221    }
222
223    #[tokio::test]
224    async fn it_handles_file_with_blank_lines() {
225        let file = File::open("tests/blank_line_file").await.unwrap();
226        let rev_lines = RevLines::new(BufReader::new(file)).await.unwrap();
227        let results = vec!["", "", "XYZ", "", "ABCD"];
228
229        assert_stream_eq(rev_lines, results).await;
230    }
231
232    #[tokio::test]
233    async fn it_handles_file_with_multi_lines_and_with_capacity() {
234        let file = File::open("tests/multi_line_file").await.unwrap();
235        let rev_lines = RevLines::with_capacity(5, BufReader::new(file))
236            .await
237            .unwrap();
238        let results = vec!["UVWXYZ", "LMNOPQRST", "GHIJK", "ABCDEF"];
239
240        assert_stream_eq(rev_lines, results).await;
241    }
242
243    async fn assert_stream_eq(
244        rev_lines: impl Stream<Item = Result<String, Error>>,
245        results: Vec<&str>,
246    ) {
247        pin_mut!(rev_lines);
248
249        for result in results {
250            let equals = if let Some(Ok(line)) = rev_lines.next().await {
251                line == result.to_string()
252            } else {
253                false
254            };
255            assert!(equals)
256        }
257
258        let equals = if let None = rev_lines.next().await {
259            true
260        } else {
261            false
262        };
263        assert!(equals);
264    }
265}