combine_elastic_buffered_stream/
lib.rs

1use combine::stream::easy::Errors;
2use combine::stream::{Positioned, Resetable, StreamErrorFor, StreamOnce};
3use core::num::NonZeroUsize;
4use std::cell::{Cell, RefCell};
5use std::collections::VecDeque;
6use std::io::Read;
7use std::rc::{Rc, Weak};
8
9const ITEM_INDEX_SIZE: usize = 13;
10const ITEM_INDEX_MASK: usize = (1 << ITEM_INDEX_SIZE) - 1;
11pub const CHUNK_SIZE: usize = 1 << ITEM_INDEX_SIZE;
12
13pub type InternalCheckPoint = Cell<usize>;
14
15#[derive(Clone)]
16pub struct CheckPoint(Rc<InternalCheckPoint>);
17
18impl CheckPoint {
19    fn new(pos: usize) -> CheckPoint {
20        CheckPoint(Rc::new(Cell::new(pos)))
21    }
22
23    fn inner(&self) -> usize {
24        self.0.get()
25    }
26}
27
28pub struct CheckPointHandler(Weak<InternalCheckPoint>);
29
30impl CheckPointHandler {
31    fn from_checkpoint(cp: &CheckPoint) -> CheckPointHandler {
32        let weak_ref = Rc::downgrade(&cp.0);
33
34        CheckPointHandler(weak_ref)
35    }
36}
37
38struct CheckPointSet(RefCell<Vec<CheckPointHandler>>);
39
40impl CheckPointSet {
41    fn new() -> CheckPointSet {
42        CheckPointSet(RefCell::new(Vec::new()))
43    }
44
45    fn insert(&self, pos: usize) -> CheckPoint {
46        let cp = CheckPoint::new(pos);
47        self.0
48            .borrow_mut()
49            .push(CheckPointHandler::from_checkpoint(&cp));
50
51        cp
52    }
53
54    fn min(&self) -> Option<usize> {
55        let mut min: Option<usize> = None;
56
57        self.0.borrow_mut().retain(|cp| {
58            if let Some(intern) = cp.0.upgrade() {
59                let pos = intern.get();
60
61                let min_val = min.map_or(pos, |min_val| pos.min(min_val));
62                min = Some(min_val);
63
64                true
65            } else {
66                false
67            }
68        });
69
70        min
71    }
72
73    fn sub_offset(&self, value: usize) {
74        for cp in self.0.borrow().iter() {
75            let handled = cp.0.upgrade().unwrap(); // sub_offset has to be called right after min, so there is no unhandled value
76            handled.set(handled.get() - value);
77        }
78    }
79}
80
81pub struct ElasticBufferedReadStream<R: Read> {
82    raw_read: R,
83    buffer: VecDeque<[u8; CHUNK_SIZE]>,
84    eof: Option<NonZeroUsize>,
85    checkpoints: CheckPointSet,
86    cursor_pos: usize,
87    offset: u64, // The capacity of this parameter limits the size of the stream
88}
89
90impl<R: Read> ElasticBufferedReadStream<R> {
91    pub fn new(read: R) -> Self {
92        Self {
93            raw_read: read,
94            buffer: VecDeque::new(),
95            eof: None,
96            checkpoints: CheckPointSet::new(),
97            cursor_pos: 0,
98            offset: 0,
99        }
100    }
101
102    fn chunk_index(&self) -> usize {
103        self.cursor_pos >> ITEM_INDEX_SIZE
104    }
105
106    fn item_index(&self) -> usize {
107        self.cursor_pos & ITEM_INDEX_MASK
108    }
109
110    fn free_useless_chunks(&mut self) {
111        let checkpoint_pos_min = self.checkpoints.min();
112        let global_pos_min =
113            checkpoint_pos_min.map_or(self.cursor_pos, |cp_min| cp_min.min(self.cursor_pos));
114        let drain_quantity = global_pos_min / CHUNK_SIZE;
115
116        self.buffer.drain(..drain_quantity);
117
118        let offset_delta = drain_quantity * CHUNK_SIZE;
119        self.cursor_pos -= offset_delta;
120        self.offset += offset_delta as u64;
121        self.checkpoints.sub_offset(offset_delta);
122    }
123
124    pub fn buffer_len(&self) -> usize {
125        self.buffer.len()
126    }
127}
128
129fn read_exact_or_eof<R: Read>(
130    reader: &mut R,
131    mut chunk: &mut [u8],
132) -> std::io::Result<Option<NonZeroUsize>> {
133    while !chunk.is_empty() {
134        match reader.read(chunk) {
135            Ok(0) => break,
136            Ok(n) => {
137                let tmp = chunk;
138                chunk = &mut tmp[n..];
139            }
140            Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
141            Err(e) => return Err(e),
142        }
143    }
144
145    Ok(NonZeroUsize::new(chunk.len()))
146}
147
148impl<R: Read> StreamOnce for ElasticBufferedReadStream<R> {
149    type Item = u8;
150    type Range = u8;
151    type Position = u64;
152    type Error = Errors<u8, u8, u64>;
153
154    fn uncons(&mut self) -> Result<u8, StreamErrorFor<Self>> {
155        assert!(self.chunk_index() <= self.buffer.len());
156
157        if self.chunk_index() == self.buffer.len() {
158            assert!(self.eof.is_none());
159            self.free_useless_chunks();
160            self.buffer.push_back([0; CHUNK_SIZE]);
161            self.eof = read_exact_or_eof(&mut self.raw_read, self.buffer.back_mut().unwrap())?;
162        }
163
164        if self.chunk_index() == self.buffer.len() - 1 {
165            if let Some(eof_pos_from_right) = self.eof {
166                if self.item_index() >= CHUNK_SIZE - eof_pos_from_right.get() {
167                    return Err(StreamErrorFor::<Self>::end_of_input());
168                }
169            }
170        }
171
172        let chunk = self.buffer.get(self.chunk_index()).unwrap(); // We can unwrap because self.chunk_index() < self.buffer.len()
173        let item = chunk[self.item_index()]; //  item_index < CHUNK_SIZE
174        self.cursor_pos += 1;
175
176        Ok(item)
177    }
178}
179
180impl<R: Read> Positioned for ElasticBufferedReadStream<R> {
181    fn position(&self) -> Self::Position {
182        self.offset + self.cursor_pos as u64
183    }
184}
185
186impl<R: Read> Resetable for ElasticBufferedReadStream<R> {
187    type Checkpoint = CheckPoint;
188
189    fn checkpoint(&self) -> Self::Checkpoint {
190        self.checkpoints.insert(self.cursor_pos)
191    }
192
193    fn reset(&mut self, checkpoint: Self::Checkpoint) {
194        self.cursor_pos = checkpoint.inner();
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use combine::stream::StreamErrorFor;
202
203    #[test]
204    fn it_uncons_on_one_chunk() {
205        let fake_read = &b"This is the text !"[..];
206        let mut stream = ElasticBufferedReadStream::new(fake_read);
207        assert_eq!(stream.uncons(), Ok(b'T'));
208        assert_eq!(stream.uncons(), Ok(b'h'));
209        assert_eq!(stream.uncons(), Ok(b'i'));
210        assert_eq!(stream.uncons(), Ok(b's'));
211        assert_eq!(stream.uncons(), Ok(b' '));
212
213        for _ in 0..12 {
214            assert!(stream.uncons().is_ok());
215        }
216
217        assert_eq!(stream.uncons(), Ok(b'!'));
218        assert_eq!(
219            stream.uncons(),
220            Err(StreamErrorFor::<ElasticBufferedReadStream<&[u8]>>::end_of_input())
221        );
222    }
223
224    #[test]
225    fn it_uncons_on_multiple_chunks() {
226        let mut fake_read = String::with_capacity(CHUNK_SIZE * 3);
227
228        let beautiful_sentence = "This is a sentence, what a beautiful sentence !";
229        let number_of_sentences = CHUNK_SIZE * 3 / beautiful_sentence.len();
230        for _ in 0..number_of_sentences {
231            fake_read += beautiful_sentence;
232        }
233
234        let mut stream = ElasticBufferedReadStream::new(fake_read.as_bytes());
235
236        assert_eq!(stream.uncons(), Ok(b'T'));
237        assert_eq!(stream.uncons(), Ok(b'h'));
238        assert_eq!(stream.uncons(), Ok(b'i'));
239        assert_eq!(stream.uncons(), Ok(b's'));
240        assert_eq!(stream.uncons(), Ok(b' '));
241
242        let first_sentence_of_next_chunk_dist =
243            CHUNK_SIZE + beautiful_sentence.len() - CHUNK_SIZE % beautiful_sentence.len();
244        for _ in 0..first_sentence_of_next_chunk_dist {
245            assert!(stream.uncons().is_ok());
246        }
247
248        assert_eq!(stream.uncons(), Ok(b'i'));
249        assert_eq!(stream.uncons(), Ok(b's'));
250        assert_eq!(stream.uncons(), Ok(b' '));
251
252        for _ in 0..first_sentence_of_next_chunk_dist {
253            assert!(stream.uncons().is_ok());
254        }
255
256        assert_eq!(stream.uncons(), Ok(b'a'));
257        assert_eq!(stream.uncons(), Ok(b' '));
258
259        let dist_to_last_char = number_of_sentences * beautiful_sentence.len()
260            - 10 // Letters already read : "This is a "
261            - 2 * first_sentence_of_next_chunk_dist
262            - 1;
263        for _ in 0..dist_to_last_char {
264            assert!(stream.uncons().is_ok());
265        }
266
267        assert_eq!(stream.uncons(), Ok(b'!'));
268        assert_eq!(
269            stream.uncons(),
270            Err(StreamErrorFor::<ElasticBufferedReadStream<&[u8]>>::end_of_input())
271        );
272    }
273
274    #[test]
275    fn it_resets_on_checkpoint() {
276        let mut fake_read = String::with_capacity(CHUNK_SIZE * 3);
277
278        let beautiful_sentence = "This is a sentence, what a beautiful sentence !";
279        let number_of_sentences = CHUNK_SIZE * 3 / beautiful_sentence.len();
280        for _ in 0..number_of_sentences {
281            fake_read += beautiful_sentence;
282        }
283
284        let mut stream = ElasticBufferedReadStream::new(fake_read.as_bytes());
285
286        let first_sentence_of_next_chunk_dist =
287            CHUNK_SIZE + beautiful_sentence.len() - CHUNK_SIZE % beautiful_sentence.len();
288        for _ in 0..first_sentence_of_next_chunk_dist {
289            assert!(stream.uncons().is_ok());
290        }
291
292        assert_eq!(stream.uncons(), Ok(b'T'));
293        assert_eq!(stream.uncons(), Ok(b'h'));
294        assert_eq!(stream.uncons(), Ok(b'i'));
295        assert_eq!(stream.uncons(), Ok(b's'));
296        assert_eq!(stream.uncons(), Ok(b' '));
297
298        let cp = stream.checkpoint();
299
300        assert_eq!(stream.uncons(), Ok(b'i'));
301        assert_eq!(stream.uncons(), Ok(b's'));
302        assert_eq!(stream.uncons(), Ok(b' '));
303
304        stream.reset(cp);
305        let cp = stream.checkpoint();
306
307        assert_eq!(stream.uncons(), Ok(b'i'));
308        assert_eq!(stream.uncons(), Ok(b's'));
309        assert_eq!(stream.uncons(), Ok(b' '));
310
311        for _ in 0..first_sentence_of_next_chunk_dist {
312            assert!(stream.uncons().is_ok());
313        }
314
315        assert_eq!(stream.uncons(), Ok(b'a'));
316
317        stream.reset(cp);
318
319        assert_eq!(stream.uncons(), Ok(b'i'));
320        assert_eq!(stream.uncons(), Ok(b's'));
321        assert_eq!(stream.uncons(), Ok(b' '));
322    }
323
324    #[test]
325    fn it_free_useless_memory_when_reading_new_chunk() {
326        let mut fake_read = String::with_capacity(CHUNK_SIZE * 3);
327
328        let beautiful_sentence = "This is a sentence, what a beautiful sentence !";
329        let number_of_sentences = CHUNK_SIZE * 3 / beautiful_sentence.len();
330        for _ in 0..number_of_sentences {
331            fake_read += beautiful_sentence;
332        }
333
334        let mut stream = ElasticBufferedReadStream::new(fake_read.as_bytes());
335
336        let cp = stream.checkpoint();
337        assert_eq!(stream.buffer_len(), 0);
338        assert_eq!(stream.uncons(), Ok(b'T'));
339        assert_eq!(stream.buffer_len(), 1);
340
341        for _ in 0..CHUNK_SIZE {
342            assert!(stream.uncons().is_ok());
343        }
344
345        assert_eq!(stream.buffer_len(), 2);
346
347        stream.reset(cp);
348
349        assert_eq!(stream.uncons(), Ok(b'T'));
350
351        for _ in 0..2 * CHUNK_SIZE {
352            assert!(stream.uncons().is_ok());
353        }
354
355        assert_eq!(stream.buffer_len(), 1);
356    }
357}