combine_elastic_buffered_stream/
lib.rs1use combine::stream::easy::Errors;
2use combine::stream::{Positioned, Resetable, StreamErrorFor, StreamOnce};
3use core::num::NonZeroUsize;
4use std::cell::{Cell, RefCell};
5use std::collections::VecDeque;
6use std::io::Read;
7use std::rc::{Rc, Weak};
8
9const ITEM_INDEX_SIZE: usize = 13;
10const ITEM_INDEX_MASK: usize = (1 << ITEM_INDEX_SIZE) - 1;
11pub const CHUNK_SIZE: usize = 1 << ITEM_INDEX_SIZE;
12
13pub type InternalCheckPoint = Cell<usize>;
14
15#[derive(Clone)]
16pub struct CheckPoint(Rc<InternalCheckPoint>);
17
18impl CheckPoint {
19 fn new(pos: usize) -> CheckPoint {
20 CheckPoint(Rc::new(Cell::new(pos)))
21 }
22
23 fn inner(&self) -> usize {
24 self.0.get()
25 }
26}
27
28pub struct CheckPointHandler(Weak<InternalCheckPoint>);
29
30impl CheckPointHandler {
31 fn from_checkpoint(cp: &CheckPoint) -> CheckPointHandler {
32 let weak_ref = Rc::downgrade(&cp.0);
33
34 CheckPointHandler(weak_ref)
35 }
36}
37
38struct CheckPointSet(RefCell<Vec<CheckPointHandler>>);
39
40impl CheckPointSet {
41 fn new() -> CheckPointSet {
42 CheckPointSet(RefCell::new(Vec::new()))
43 }
44
45 fn insert(&self, pos: usize) -> CheckPoint {
46 let cp = CheckPoint::new(pos);
47 self.0
48 .borrow_mut()
49 .push(CheckPointHandler::from_checkpoint(&cp));
50
51 cp
52 }
53
54 fn min(&self) -> Option<usize> {
55 let mut min: Option<usize> = None;
56
57 self.0.borrow_mut().retain(|cp| {
58 if let Some(intern) = cp.0.upgrade() {
59 let pos = intern.get();
60
61 let min_val = min.map_or(pos, |min_val| pos.min(min_val));
62 min = Some(min_val);
63
64 true
65 } else {
66 false
67 }
68 });
69
70 min
71 }
72
73 fn sub_offset(&self, value: usize) {
74 for cp in self.0.borrow().iter() {
75 let handled = cp.0.upgrade().unwrap(); handled.set(handled.get() - value);
77 }
78 }
79}
80
81pub struct ElasticBufferedReadStream<R: Read> {
82 raw_read: R,
83 buffer: VecDeque<[u8; CHUNK_SIZE]>,
84 eof: Option<NonZeroUsize>,
85 checkpoints: CheckPointSet,
86 cursor_pos: usize,
87 offset: u64, }
89
90impl<R: Read> ElasticBufferedReadStream<R> {
91 pub fn new(read: R) -> Self {
92 Self {
93 raw_read: read,
94 buffer: VecDeque::new(),
95 eof: None,
96 checkpoints: CheckPointSet::new(),
97 cursor_pos: 0,
98 offset: 0,
99 }
100 }
101
102 fn chunk_index(&self) -> usize {
103 self.cursor_pos >> ITEM_INDEX_SIZE
104 }
105
106 fn item_index(&self) -> usize {
107 self.cursor_pos & ITEM_INDEX_MASK
108 }
109
110 fn free_useless_chunks(&mut self) {
111 let checkpoint_pos_min = self.checkpoints.min();
112 let global_pos_min =
113 checkpoint_pos_min.map_or(self.cursor_pos, |cp_min| cp_min.min(self.cursor_pos));
114 let drain_quantity = global_pos_min / CHUNK_SIZE;
115
116 self.buffer.drain(..drain_quantity);
117
118 let offset_delta = drain_quantity * CHUNK_SIZE;
119 self.cursor_pos -= offset_delta;
120 self.offset += offset_delta as u64;
121 self.checkpoints.sub_offset(offset_delta);
122 }
123
124 pub fn buffer_len(&self) -> usize {
125 self.buffer.len()
126 }
127}
128
129fn read_exact_or_eof<R: Read>(
130 reader: &mut R,
131 mut chunk: &mut [u8],
132) -> std::io::Result<Option<NonZeroUsize>> {
133 while !chunk.is_empty() {
134 match reader.read(chunk) {
135 Ok(0) => break,
136 Ok(n) => {
137 let tmp = chunk;
138 chunk = &mut tmp[n..];
139 }
140 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
141 Err(e) => return Err(e),
142 }
143 }
144
145 Ok(NonZeroUsize::new(chunk.len()))
146}
147
148impl<R: Read> StreamOnce for ElasticBufferedReadStream<R> {
149 type Item = u8;
150 type Range = u8;
151 type Position = u64;
152 type Error = Errors<u8, u8, u64>;
153
154 fn uncons(&mut self) -> Result<u8, StreamErrorFor<Self>> {
155 assert!(self.chunk_index() <= self.buffer.len());
156
157 if self.chunk_index() == self.buffer.len() {
158 assert!(self.eof.is_none());
159 self.free_useless_chunks();
160 self.buffer.push_back([0; CHUNK_SIZE]);
161 self.eof = read_exact_or_eof(&mut self.raw_read, self.buffer.back_mut().unwrap())?;
162 }
163
164 if self.chunk_index() == self.buffer.len() - 1 {
165 if let Some(eof_pos_from_right) = self.eof {
166 if self.item_index() >= CHUNK_SIZE - eof_pos_from_right.get() {
167 return Err(StreamErrorFor::<Self>::end_of_input());
168 }
169 }
170 }
171
172 let chunk = self.buffer.get(self.chunk_index()).unwrap(); let item = chunk[self.item_index()]; self.cursor_pos += 1;
175
176 Ok(item)
177 }
178}
179
180impl<R: Read> Positioned for ElasticBufferedReadStream<R> {
181 fn position(&self) -> Self::Position {
182 self.offset + self.cursor_pos as u64
183 }
184}
185
186impl<R: Read> Resetable for ElasticBufferedReadStream<R> {
187 type Checkpoint = CheckPoint;
188
189 fn checkpoint(&self) -> Self::Checkpoint {
190 self.checkpoints.insert(self.cursor_pos)
191 }
192
193 fn reset(&mut self, checkpoint: Self::Checkpoint) {
194 self.cursor_pos = checkpoint.inner();
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use combine::stream::StreamErrorFor;
202
203 #[test]
204 fn it_uncons_on_one_chunk() {
205 let fake_read = &b"This is the text !"[..];
206 let mut stream = ElasticBufferedReadStream::new(fake_read);
207 assert_eq!(stream.uncons(), Ok(b'T'));
208 assert_eq!(stream.uncons(), Ok(b'h'));
209 assert_eq!(stream.uncons(), Ok(b'i'));
210 assert_eq!(stream.uncons(), Ok(b's'));
211 assert_eq!(stream.uncons(), Ok(b' '));
212
213 for _ in 0..12 {
214 assert!(stream.uncons().is_ok());
215 }
216
217 assert_eq!(stream.uncons(), Ok(b'!'));
218 assert_eq!(
219 stream.uncons(),
220 Err(StreamErrorFor::<ElasticBufferedReadStream<&[u8]>>::end_of_input())
221 );
222 }
223
224 #[test]
225 fn it_uncons_on_multiple_chunks() {
226 let mut fake_read = String::with_capacity(CHUNK_SIZE * 3);
227
228 let beautiful_sentence = "This is a sentence, what a beautiful sentence !";
229 let number_of_sentences = CHUNK_SIZE * 3 / beautiful_sentence.len();
230 for _ in 0..number_of_sentences {
231 fake_read += beautiful_sentence;
232 }
233
234 let mut stream = ElasticBufferedReadStream::new(fake_read.as_bytes());
235
236 assert_eq!(stream.uncons(), Ok(b'T'));
237 assert_eq!(stream.uncons(), Ok(b'h'));
238 assert_eq!(stream.uncons(), Ok(b'i'));
239 assert_eq!(stream.uncons(), Ok(b's'));
240 assert_eq!(stream.uncons(), Ok(b' '));
241
242 let first_sentence_of_next_chunk_dist =
243 CHUNK_SIZE + beautiful_sentence.len() - CHUNK_SIZE % beautiful_sentence.len();
244 for _ in 0..first_sentence_of_next_chunk_dist {
245 assert!(stream.uncons().is_ok());
246 }
247
248 assert_eq!(stream.uncons(), Ok(b'i'));
249 assert_eq!(stream.uncons(), Ok(b's'));
250 assert_eq!(stream.uncons(), Ok(b' '));
251
252 for _ in 0..first_sentence_of_next_chunk_dist {
253 assert!(stream.uncons().is_ok());
254 }
255
256 assert_eq!(stream.uncons(), Ok(b'a'));
257 assert_eq!(stream.uncons(), Ok(b' '));
258
259 let dist_to_last_char = number_of_sentences * beautiful_sentence.len()
260 - 10 - 2 * first_sentence_of_next_chunk_dist
262 - 1;
263 for _ in 0..dist_to_last_char {
264 assert!(stream.uncons().is_ok());
265 }
266
267 assert_eq!(stream.uncons(), Ok(b'!'));
268 assert_eq!(
269 stream.uncons(),
270 Err(StreamErrorFor::<ElasticBufferedReadStream<&[u8]>>::end_of_input())
271 );
272 }
273
274 #[test]
275 fn it_resets_on_checkpoint() {
276 let mut fake_read = String::with_capacity(CHUNK_SIZE * 3);
277
278 let beautiful_sentence = "This is a sentence, what a beautiful sentence !";
279 let number_of_sentences = CHUNK_SIZE * 3 / beautiful_sentence.len();
280 for _ in 0..number_of_sentences {
281 fake_read += beautiful_sentence;
282 }
283
284 let mut stream = ElasticBufferedReadStream::new(fake_read.as_bytes());
285
286 let first_sentence_of_next_chunk_dist =
287 CHUNK_SIZE + beautiful_sentence.len() - CHUNK_SIZE % beautiful_sentence.len();
288 for _ in 0..first_sentence_of_next_chunk_dist {
289 assert!(stream.uncons().is_ok());
290 }
291
292 assert_eq!(stream.uncons(), Ok(b'T'));
293 assert_eq!(stream.uncons(), Ok(b'h'));
294 assert_eq!(stream.uncons(), Ok(b'i'));
295 assert_eq!(stream.uncons(), Ok(b's'));
296 assert_eq!(stream.uncons(), Ok(b' '));
297
298 let cp = stream.checkpoint();
299
300 assert_eq!(stream.uncons(), Ok(b'i'));
301 assert_eq!(stream.uncons(), Ok(b's'));
302 assert_eq!(stream.uncons(), Ok(b' '));
303
304 stream.reset(cp);
305 let cp = stream.checkpoint();
306
307 assert_eq!(stream.uncons(), Ok(b'i'));
308 assert_eq!(stream.uncons(), Ok(b's'));
309 assert_eq!(stream.uncons(), Ok(b' '));
310
311 for _ in 0..first_sentence_of_next_chunk_dist {
312 assert!(stream.uncons().is_ok());
313 }
314
315 assert_eq!(stream.uncons(), Ok(b'a'));
316
317 stream.reset(cp);
318
319 assert_eq!(stream.uncons(), Ok(b'i'));
320 assert_eq!(stream.uncons(), Ok(b's'));
321 assert_eq!(stream.uncons(), Ok(b' '));
322 }
323
324 #[test]
325 fn it_free_useless_memory_when_reading_new_chunk() {
326 let mut fake_read = String::with_capacity(CHUNK_SIZE * 3);
327
328 let beautiful_sentence = "This is a sentence, what a beautiful sentence !";
329 let number_of_sentences = CHUNK_SIZE * 3 / beautiful_sentence.len();
330 for _ in 0..number_of_sentences {
331 fake_read += beautiful_sentence;
332 }
333
334 let mut stream = ElasticBufferedReadStream::new(fake_read.as_bytes());
335
336 let cp = stream.checkpoint();
337 assert_eq!(stream.buffer_len(), 0);
338 assert_eq!(stream.uncons(), Ok(b'T'));
339 assert_eq!(stream.buffer_len(), 1);
340
341 for _ in 0..CHUNK_SIZE {
342 assert!(stream.uncons().is_ok());
343 }
344
345 assert_eq!(stream.buffer_len(), 2);
346
347 stream.reset(cp);
348
349 assert_eq!(stream.uncons(), Ok(b'T'));
350
351 for _ in 0..2 * CHUNK_SIZE {
352 assert!(stream.uncons().is_ok());
353 }
354
355 assert_eq!(stream.buffer_len(), 1);
356 }
357}