streaming_decompression/
lib.rs

1#![forbid(unsafe_code)]
2#![doc = include_str!("lib.md")]
3
4pub use fallible_streaming_iterator;
5pub use fallible_streaming_iterator::FallibleStreamingIterator;
6
7/// Trait denoting a compressed item. Use `is_compressed` to declare if the item is
8/// compressed at runtime (in which case the internal buffer is swapped)
9pub trait Compressed {
10    #[inline]
11    fn is_compressed(&self) -> bool {
12        true
13    }
14}
15
16/// Trait denoting an uncompressed item. Use `buffer_mut` to expose a mutable reference to its
17/// internal buffer, which [`Decompressor`] will use to recover a decompressed buffer for re-use.
18pub trait Decompressed {
19    fn buffer_mut(&mut self) -> &mut Vec<u8>;
20}
21
22/// A [`FallibleStreamingIterator`] that decompresses items of type `I` into type `O` via an
23/// internal [`Vec<u8>`] that is re-used across items.
24/// The purpose of this streaming iterator is to be able to decompress parts of an item `I` into `O`.
25pub struct Decompressor<I, O, F, E, II>
26where
27    I: Compressed,
28    O: Decompressed,
29    E: std::error::Error,
30    II: Iterator<Item = Result<I, E>>,
31    F: Fn(I, &mut Vec<u8>) -> Result<O, E>,
32{
33    iter: II,
34    f: F,
35    buffer: Vec<u8>,
36    current: Option<O>,
37    was_decompressed: bool,
38}
39
40impl<I, O, F, E, II> Decompressor<I, O, F, E, II>
41where
42    I: Compressed,
43    O: Decompressed,
44    E: std::error::Error,
45    II: Iterator<Item = Result<I, E>>,
46    F: Fn(I, &mut Vec<u8>) -> Result<O, E>,
47{
48    /// Returns a new [`Decompressor`].
49    #[inline]
50    pub fn new(iter: II, buffer: Vec<u8>, f: F) -> Self {
51        Self {
52            iter,
53            f,
54            buffer,
55            current: None,
56            was_decompressed: false,
57        }
58    }
59
60    /// Returns its internal buffer, consuming itself.
61    #[inline]
62    pub fn into_inner(mut self) -> Vec<u8> {
63        self.buffer.clear(); // not leak information
64        self.buffer
65    }
66}
67
68impl<I, O, F, E, II> FallibleStreamingIterator for Decompressor<I, O, F, E, II>
69where
70    I: Compressed,
71    O: Decompressed,
72    E: std::error::Error,
73    II: Iterator<Item = Result<I, E>>,
74    F: Fn(I, &mut Vec<u8>) -> Result<O, E>,
75{
76    type Item = O;
77    type Error = E;
78
79    #[inline]
80    fn advance(&mut self) -> Result<(), E> {
81        if let Some(page) = self.current.as_mut() {
82            if self.was_decompressed {
83                self.buffer = std::mem::take(page.buffer_mut());
84            }
85        }
86
87        let next = self
88            .iter
89            .next()
90            .map(|maybe_page| {
91                maybe_page.and_then(|page| {
92                    self.was_decompressed = page.is_compressed();
93                    (self.f)(page, &mut self.buffer)
94                })
95            })
96            .transpose()?;
97        self.current = next;
98        Ok(())
99    }
100
101    #[inline]
102    fn get(&self) -> Option<&Self::Item> {
103        self.current.as_ref()
104    }
105
106    #[inline]
107    fn size_hint(&self) -> (usize, Option<usize>) {
108        self.iter.size_hint()
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    #[derive(Debug, PartialEq)]
117    struct CompressedItem {
118        pub metadata: String,
119        pub data: Vec<u8>,
120    }
121    impl Compressed for CompressedItem {
122        fn is_compressed(&self) -> bool {
123            self.metadata == "is_compressed"
124        }
125    }
126
127    #[derive(Debug, PartialEq)]
128    struct DecompressedItem {
129        pub metadata: String,
130        pub data: Vec<u8>,
131    }
132
133    impl Decompressed for DecompressedItem {
134        fn buffer_mut(&mut self) -> &mut Vec<u8> {
135            &mut self.data
136        }
137    }
138
139    fn decompress(
140        mut i: CompressedItem,
141        buffer: &mut Vec<u8>,
142    ) -> Result<DecompressedItem, std::convert::Infallible> {
143        if i.is_compressed() {
144            // the actual decompression, more complex stuff can happen.
145            buffer.clear();
146            buffer.extend(&mut i.data.iter());
147        } else {
148            std::mem::swap(&mut i.data, buffer);
149        };
150        Ok(DecompressedItem {
151            metadata: i.metadata,
152            data: std::mem::take(buffer),
153        })
154    }
155
156    #[test]
157    fn test_basics_uncompressed() {
158        let item = CompressedItem {
159            metadata: "not_compressed".to_string(),
160            data: vec![1, 2, 3],
161        };
162        let iter = vec![Ok(item)].into_iter();
163
164        let buffer = vec![1];
165        let mut decompressor = Decompressor::new(iter, buffer, decompress);
166
167        let item = decompressor.next().unwrap().unwrap();
168        assert_eq!(item.data, vec![1, 2, 3]);
169        assert_eq!(item.metadata, "not_compressed".to_string());
170        assert_eq!(decompressor.next().unwrap(), None);
171
172        // i.e. the internal buffer was not used.
173        assert_eq!(decompressor.into_inner().capacity(), 0);
174    }
175
176    #[test]
177    fn test_basics_compressed() {
178        let item = CompressedItem {
179            metadata: "is_compressed".to_string(),
180            data: vec![1, 2, 3],
181        };
182        let iter = vec![Ok(item)].into_iter();
183
184        let buffer = vec![1, 2];
185        let mut decompressor = Decompressor::new(iter, buffer, decompress);
186
187        let item = decompressor.next().unwrap().unwrap();
188        assert_eq!(item.data, vec![1, 2, 3]);
189        assert_eq!(item.metadata, "is_compressed".to_string());
190        assert_eq!(decompressor.next().unwrap(), None);
191
192        // i.e. after the last `next`, the last item is consumed and the internal buffer
193        // contains its data
194        assert!(decompressor.into_inner().capacity() > 0);
195    }
196}