polars_arrow/io/
iterator.rs

1pub use streaming_iterator::StreamingIterator;
2
3/// A [`StreamingIterator`] with an internal buffer of [`Vec<u8>`] used to efficiently
4/// present items of type `T` as `&[u8]`.
5/// It is generic over the type `T` and the transformation `F: T -> &[u8]`.
6pub struct BufStreamingIterator<I, F, T>
7where
8    I: Iterator<Item = T>,
9    F: FnMut(T, &mut Vec<u8>),
10{
11    iterator: I,
12    f: F,
13    buffer: Vec<u8>,
14    is_valid: bool,
15}
16
17impl<I, F, T> BufStreamingIterator<I, F, T>
18where
19    I: Iterator<Item = T>,
20    F: FnMut(T, &mut Vec<u8>),
21{
22    #[inline]
23    pub fn new(iterator: I, f: F, buffer: Vec<u8>) -> Self {
24        Self {
25            iterator,
26            f,
27            buffer,
28            is_valid: false,
29        }
30    }
31}
32
33impl<I, F, T> StreamingIterator for BufStreamingIterator<I, F, T>
34where
35    I: Iterator<Item = T>,
36    F: FnMut(T, &mut Vec<u8>),
37{
38    type Item = [u8];
39
40    #[inline]
41    fn advance(&mut self) {
42        let a = self.iterator.next();
43        if let Some(a) = a {
44            self.is_valid = true;
45            self.buffer.clear();
46            (self.f)(a, &mut self.buffer);
47        } else {
48            self.is_valid = false;
49        }
50    }
51
52    #[inline]
53    fn get(&self) -> Option<&Self::Item> {
54        if self.is_valid {
55            Some(&self.buffer)
56        } else {
57            None
58        }
59    }
60
61    #[inline]
62    fn size_hint(&self) -> (usize, Option<usize>) {
63        self.iterator.size_hint()
64    }
65}