1use futures_util::{stream, Stream};
30use std::cmp::min;
31use thiserror::Error;
32use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, BufReader, SeekFrom};
33
34static DEFAULT_SIZE: usize = 4096;
35
36static LF_BYTE: u8 = '\n' as u8;
37static CR_BYTE: u8 = '\r' as u8;
38
39#[derive(Error, Debug)]
41pub enum Error {
42 #[error(transparent)]
43 Io(#[from] tokio::io::Error),
44
45 #[error(transparent)]
46 NotUtf8(#[from] std::string::FromUtf8Error),
47}
48
49pub struct RevLines<R> {
51 reader: BufReader<R>,
52 reader_pos: u64,
53 buf_size: u64,
54}
55
56impl<R: AsyncSeek + AsyncRead + Unpin> RevLines<R> {
57 pub async fn new(
60 reader: BufReader<R>,
61 ) -> Result<impl Stream<Item = Result<String, Error>>, Error> {
62 RevLines::with_capacity(DEFAULT_SIZE, reader).await
63 }
64
65 pub async fn with_capacity(
68 cap: usize,
69 mut reader: BufReader<R>,
70 ) -> Result<impl Stream<Item = Result<String, Error>>, Error> {
71 let reader_size = reader.seek(SeekFrom::End(0)).await?;
73
74 let mut rev_lines = RevLines {
75 reader: reader,
76 reader_pos: reader_size,
77 buf_size: cap as u64,
78 };
79
80 let end_size = min(reader_size, 2);
85 let end_buf = rev_lines.read_to_buffer(end_size).await?;
86
87 if end_size == 1 {
88 if end_buf[0] != LF_BYTE {
89 rev_lines.move_reader_position(1).await?;
90 }
91 } else if end_size == 2 {
92 if end_buf[0] != CR_BYTE {
93 rev_lines.move_reader_position(1).await?;
94 }
95
96 if end_buf[1] != LF_BYTE {
97 rev_lines.move_reader_position(1).await?;
98 }
99 }
100
101 let stream = stream::unfold(rev_lines, |mut rev_lines| async {
102 match rev_lines.next_line().await {
103 Some(line) => Some((line, rev_lines)),
104 None => None,
105 }
106 });
107
108 Ok(stream)
109 }
110
111 async fn read_to_buffer(&mut self, size: u64) -> Result<Vec<u8>, tokio::io::Error> {
112 let mut buf = vec![0; size as usize];
113 let offset = -(size as i64);
114
115 self.reader.seek(SeekFrom::Current(offset)).await?;
116 self.reader.read_exact(&mut buf[0..(size as usize)]).await?;
117 self.reader.seek(SeekFrom::Current(offset)).await?;
118
119 self.reader_pos -= size;
120
121 Ok(buf)
122 }
123
124 async fn move_reader_position(&mut self, offset: u64) -> Result<(), tokio::io::Error> {
125 self.reader.seek(SeekFrom::Current(offset as i64)).await?;
126 self.reader_pos += offset;
127
128 Ok(())
129 }
130
131 async fn next_line(&mut self) -> Option<Result<String, Error>> {
132 let mut result: Vec<u8> = Vec::new();
133
134 'outer: loop {
135 if self.reader_pos < 1 {
136 if result.len() > 0 {
137 break;
138 }
139
140 return None;
141 }
142
143 let size = min(self.buf_size, self.reader_pos);
146
147 match self.read_to_buffer(size).await {
148 Ok(buf) => {
149 for (idx, ch) in (&buf).iter().enumerate().rev() {
150 if *ch == LF_BYTE {
152 let mut offset = idx as u64;
153
154 if idx > 1 && buf[idx - 1] == CR_BYTE {
156 offset -= 1;
157 }
158
159 match self.reader.seek(SeekFrom::Current(offset as i64)).await {
160 Ok(_) => {
161 self.reader_pos += offset;
162
163 break 'outer;
164 }
165
166 Err(e) => return Some(Err(Error::Io(e))),
167 }
168 } else {
169 result.push(ch.clone());
170 }
171 }
172 }
173
174 Err(e) => return Some(Err(Error::Io(e))),
175 }
176 }
177
178 result.reverse();
180
181 match String::from_utf8(result) {
183 Ok(s) => Some(Ok(s)),
184 Err(e) => Some(Err(Error::NotUtf8(e))),
185 }
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 use futures_util::{pin_mut, StreamExt};
194 use tokio::fs::File;
195
196 #[tokio::test]
197 async fn it_handles_empty_files() {
198 let file = File::open("tests/empty_file").await.unwrap();
199 let rev_lines = RevLines::new(BufReader::new(file)).await.unwrap();
200 let results = vec![];
201
202 assert_stream_eq(rev_lines, results).await;
203 }
204
205 #[tokio::test]
206 async fn it_handles_file_with_one_line() {
207 let file = File::open("tests/one_line_file").await.unwrap();
208 let rev_lines = RevLines::new(BufReader::new(file)).await.unwrap();
209 let results = vec!["ABCD"];
210
211 assert_stream_eq(rev_lines, results).await;
212 }
213
214 #[tokio::test]
215 async fn it_handles_file_with_multi_lines() {
216 let file = File::open("tests/multi_line_file").await.unwrap();
217 let rev_lines = RevLines::new(BufReader::new(file)).await.unwrap();
218 let results = vec!["UVWXYZ", "LMNOPQRST", "GHIJK", "ABCDEF"];
219
220 assert_stream_eq(rev_lines, results).await;
221 }
222
223 #[tokio::test]
224 async fn it_handles_file_with_blank_lines() {
225 let file = File::open("tests/blank_line_file").await.unwrap();
226 let rev_lines = RevLines::new(BufReader::new(file)).await.unwrap();
227 let results = vec!["", "", "XYZ", "", "ABCD"];
228
229 assert_stream_eq(rev_lines, results).await;
230 }
231
232 #[tokio::test]
233 async fn it_handles_file_with_multi_lines_and_with_capacity() {
234 let file = File::open("tests/multi_line_file").await.unwrap();
235 let rev_lines = RevLines::with_capacity(5, BufReader::new(file))
236 .await
237 .unwrap();
238 let results = vec!["UVWXYZ", "LMNOPQRST", "GHIJK", "ABCDEF"];
239
240 assert_stream_eq(rev_lines, results).await;
241 }
242
243 async fn assert_stream_eq(
244 rev_lines: impl Stream<Item = Result<String, Error>>,
245 results: Vec<&str>,
246 ) {
247 pin_mut!(rev_lines);
248
249 for result in results {
250 let equals = if let Some(Ok(line)) = rev_lines.next().await {
251 line == result.to_string()
252 } else {
253 false
254 };
255 assert!(equals)
256 }
257
258 let equals = if let None = rev_lines.next().await {
259 true
260 } else {
261 false
262 };
263 assert!(equals);
264 }
265}