sea_streamer_file/
buffer.rs

1use crate::{ByteSink, ByteSource, FileErr};
2use std::{
3    cmp::Ordering,
4    collections::VecDeque,
5    future::{ready, Ready},
6};
7
8pub trait Appendable: Default {
9    fn append(&mut self, bytes: Bytes);
10}
11
12/// A FIFO queue of Bytes.
13#[derive(Debug, Default, Clone)]
14pub struct ByteBuffer {
15    buf: VecDeque<Bytes>,
16}
17
18impl Appendable for ByteBuffer {
19    fn append(&mut self, bytes: Bytes) {
20        self.append(bytes)
21    }
22}
23
24/// A blob of bytes; optimized over byte and word.
25#[derive(Clone)]
26pub enum Bytes {
27    Empty,
28    Byte(u8),
29    Word([u8; 4]),
30    Bytes(Vec<u8>),
31}
32
33impl Appendable for Bytes {
34    fn append(&mut self, bytes: Bytes) {
35        self.append(bytes)
36    }
37}
38
39impl std::fmt::Debug for Bytes {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        match self {
42            Self::Empty => write!(f, "Empty"),
43            Self::Byte(b) => write!(f, "Byte({b})"),
44            Self::Word(w) => write!(f, "Word({w:?})"),
45            Self::Bytes(b) => write!(f, "Bytes(len = {})", b.len()),
46        }
47    }
48}
49
50impl Default for Bytes {
51    fn default() -> Self {
52        Self::Empty
53    }
54}
55
56impl ByteBuffer {
57    /// Create a new buffer.
58    pub fn new() -> Self {
59        Default::default()
60    }
61
62    /// Create a new buffer with one blob of bytes.
63    pub fn one(bytes: Bytes) -> Self {
64        let mut buf = Self::new();
65        buf.append(bytes);
66        buf
67    }
68
69    /// Push bytes into the buffer.
70    pub fn append(&mut self, bytes: Bytes) {
71        self.buf.push_back(bytes);
72    }
73
74    /// Calculate the total number of bytes in the buffer.
75    pub fn size(&self) -> usize {
76        let mut size = 0;
77        for bytes in self.buf.iter() {
78            size += bytes.len();
79        }
80        size
81    }
82
83    /// Return whether this buffer is empty.
84    pub fn is_empty(&self) -> bool {
85        self.size() == 0
86    }
87
88    /// Clear all bytes from this buffer.
89    pub fn clear(&mut self) {
90        self.buf.clear();
91    }
92
93    /// Take ownership of all bytes; leaving self Empty.
94    pub fn take(&mut self) -> Self {
95        let buf = std::mem::take(&mut self.buf);
96        Self { buf }
97    }
98
99    /// Consume a specific number of bytes from the buffer,
100    /// panic if there are not enough bytes.
101    pub fn consume<T: Appendable>(&mut self, size: usize) -> T {
102        let mut buffer = T::default();
103        let mut remaining = size;
104        loop {
105            if let Some(bytes) = self.buf.front() {
106                match bytes.len().cmp(&remaining) {
107                    Ordering::Less | Ordering::Equal => {
108                        let bytes = self.buf.pop_front().unwrap();
109                        remaining -= bytes.len();
110                        buffer.append(bytes);
111                    }
112                    Ordering::Greater => {
113                        buffer.append(self.buf.front_mut().unwrap().pop(remaining));
114                        break;
115                    }
116                }
117            } else {
118                panic!(
119                    "Not enough bytes: consuming {}, only got {}",
120                    size,
121                    size - remaining
122                );
123            }
124            if remaining == 0 {
125                break;
126            }
127        }
128        buffer
129    }
130}
131
132/// IO methods
133impl ByteBuffer {
134    pub fn write_to(self, sink: &mut impl ByteSink) -> Result<usize, FileErr> {
135        let mut sum = 0;
136        for bytes in self.buf {
137            sum += bytes.len();
138            sink.write(bytes)?;
139        }
140        Ok(sum)
141    }
142}
143
144impl ByteSource for ByteBuffer {
145    type Future<'a> = Ready<Result<Bytes, FileErr>>;
146
147    fn request_bytes(&mut self, size: usize) -> Self::Future<'_> {
148        if size <= self.size() {
149            ready(Ok(self.consume(size)))
150        } else {
151            ready(Err(FileErr::NotEnoughBytes))
152        }
153    }
154}
155
156impl ByteSink for ByteBuffer {
157    fn write(&mut self, bytes: Bytes) -> Result<(), FileErr> {
158        self.append(bytes);
159        Ok(())
160    }
161}
162
163impl Bytes {
164    /// Construct a blob from raw bytes.
165    pub fn from_bytes(bytes: Vec<u8>) -> Self {
166        Bytes::Bytes(bytes)
167    }
168
169    /// Construct a blob from a slice of bytes. Clones them.
170    pub fn from_slice(b: &[u8]) -> Self {
171        match b.len() {
172            0 => Bytes::Empty,
173            1 => Bytes::Byte(b[0]),
174            4 => Bytes::Word([b[0], b[1], b[2], b[3]]),
175            _ => Bytes::Bytes(b.to_vec()),
176        }
177    }
178
179    /// Get the length of this blob of bytes.
180    pub fn len(&self) -> usize {
181        match self {
182            Bytes::Empty => 0,
183            Bytes::Byte(_) => 1,
184            Bytes::Word(_) => 4,
185            Bytes::Bytes(bytes) => bytes.len(),
186        }
187    }
188
189    /// Return true if self is empty.
190    pub fn is_empty(&self) -> bool {
191        match self {
192            Bytes::Empty => true,
193            Bytes::Byte(_) => false,
194            Bytes::Word(_) => false,
195            Bytes::Bytes(bytes) => bytes.is_empty(),
196        }
197    }
198
199    /// Return true if there is exactly 1 byte.
200    pub fn is_byte(&self) -> bool {
201        match self {
202            Bytes::Empty => false,
203            Bytes::Byte(_) => true,
204            Bytes::Word(_) => false,
205            Bytes::Bytes(bytes) => bytes.len() == 1,
206        }
207    }
208
209    /// Pop some bytes from head. This requires allocation.
210    pub fn pop(&mut self, size: usize) -> Self {
211        let len = self.len();
212        match len.cmp(&size) {
213            Ordering::Less => panic!("Not enough bytes: popping {}, only got {}", size, len),
214            Ordering::Equal => self.take(),
215            Ordering::Greater => {
216                let bytes = self.take();
217                match bytes {
218                    Bytes::Empty => unreachable!(),
219                    Bytes::Byte(_) => Self::Empty,
220                    Bytes::Word(bytes) => match size {
221                        0 => Self::Empty,
222                        1 => {
223                            *self = Self::Bytes(bytes[1..].to_vec());
224                            Self::Byte(bytes[0])
225                        }
226                        2 | 3 => {
227                            *self = Self::Bytes(bytes[size..].to_vec());
228                            Self::Bytes(bytes[0..size].to_vec())
229                        }
230                        _ => unreachable!(),
231                    },
232                    Bytes::Bytes(mut ret) => {
233                        let bytes = ret.split_off(size);
234                        *self = Self::Bytes(bytes);
235                        Self::Bytes(ret)
236                    }
237                }
238            }
239        }
240    }
241
242    /// Take ownership of all bytes.
243    pub fn bytes(self) -> Vec<u8> {
244        match self {
245            Bytes::Empty => vec![],
246            Bytes::Byte(b) => vec![b],
247            Bytes::Word([a, b, c, d]) => vec![a, b, c, d],
248            Bytes::Bytes(bytes) => bytes,
249        }
250    }
251
252    /// Get exactly a byte; otherwise None
253    pub fn byte(&self) -> Option<u8> {
254        match self {
255            Bytes::Empty => None,
256            Bytes::Byte(b) => Some(*b),
257            Bytes::Word(_) => None,
258            Bytes::Bytes(bytes) => {
259                if bytes.len() == 1 {
260                    Some(bytes[0])
261                } else {
262                    None
263                }
264            }
265        }
266    }
267
268    /// Get exactly a word; otherwise None
269    pub fn word(&self) -> Option<[u8; 4]> {
270        match self {
271            Bytes::Empty => None,
272            Bytes::Byte(_) => None,
273            Bytes::Word(w) => Some(*w),
274            Bytes::Bytes(b) => {
275                if b.len() == 4 {
276                    Some([b[0], b[1], b[2], b[3]])
277                } else {
278                    None
279                }
280            }
281        }
282    }
283
284    fn bytes_copy(&self) -> Vec<u8> {
285        match self {
286            Bytes::Empty => vec![],
287            Bytes::Byte(b) => vec![*b],
288            Bytes::Word([a, b, c, d]) => vec![*a, *b, *c, *d],
289            Bytes::Bytes(bytes) => bytes.clone(),
290        }
291    }
292
293    /// Append some bytes. This may require re-allocation.
294    pub fn append(&mut self, other: Self) {
295        if other.is_empty() {
296            return;
297        }
298        if self.is_empty() {
299            *self = other;
300            return;
301        }
302        *self = Self::Bytes(self.take().bytes());
303        match self {
304            Bytes::Bytes(bytes) => bytes.extend_from_slice(&other.bytes()),
305            _ => unreachable!(),
306        }
307    }
308
309    /// Take ownership of all bytes; leaving self Empty.
310    pub fn take(&mut self) -> Self {
311        let mut ret = Bytes::Empty;
312        std::mem::swap(self, &mut ret);
313        ret
314    }
315}
316
317/// IO methods
318impl Bytes {
319    #[inline]
320    pub async fn read_from(file: &mut impl ByteSource, size: usize) -> Result<Self, FileErr> {
321        file.request_bytes(size).await
322    }
323
324    #[inline]
325    pub fn write_to(self, sink: &mut impl ByteSink) -> Result<usize, FileErr> {
326        let size = self.len();
327        sink.write(self)?;
328        Ok(size)
329    }
330}
331
332impl PartialEq for Bytes {
333    fn eq(&self, other: &Self) -> bool {
334        match (self, other) {
335            (Self::Empty, Self::Empty) => true,
336            (Self::Byte(l), Self::Byte(r)) => l == r,
337            (Self::Word(l), Self::Word(r)) => l == r,
338            (Self::Bytes(l), Self::Bytes(r)) => l == r,
339            (left, right) => {
340                if left.len() != right.len() {
341                    false
342                } else {
343                    left.bytes_copy() == right.bytes_copy()
344                }
345            }
346        }
347    }
348}
349
350#[cfg(test)]
351mod test {
352    use super::*;
353
354    #[test]
355    fn test_bytes() {
356        assert!(Bytes::Empty.is_empty());
357        assert!(Bytes::Bytes(vec![]).is_empty());
358
359        let mut bytes = Bytes::Empty;
360        bytes.append(Bytes::Byte(1));
361        assert_eq!(bytes.bytes(), vec![1]);
362
363        let mut bytes = Bytes::Empty;
364        bytes.append(Bytes::Bytes(vec![1, 2]));
365        assert_eq!(bytes.bytes_copy(), vec![1, 2]);
366        assert_eq!(bytes.pop(1).bytes(), vec![1]);
367        assert_eq!(bytes.bytes_copy(), vec![2]);
368        assert_eq!(bytes.pop(1).bytes(), vec![2]);
369        assert!(bytes.is_empty());
370
371        let mut bytes = Bytes::Byte(1);
372        bytes.append(Bytes::Byte(2));
373        assert_eq!(bytes.bytes_copy(), vec![1, 2]);
374        bytes.append(Bytes::Byte(3));
375        assert_eq!(bytes.bytes_copy(), vec![1, 2, 3]);
376        bytes.append(Bytes::Byte(4));
377        assert_eq!(bytes.bytes_copy(), vec![1, 2, 3, 4]);
378        assert_eq!(bytes.pop(2).bytes(), vec![1, 2]);
379        assert_eq!(bytes.bytes_copy(), vec![3, 4]);
380        assert_eq!(bytes.pop(2).bytes(), vec![3, 4]);
381        assert!(bytes.is_empty());
382
383        let mut bytes = Bytes::Bytes(vec![1, 2, 3, 4]);
384        bytes.append(Bytes::Bytes(vec![5, 6]));
385        assert_eq!(bytes.bytes_copy(), vec![1, 2, 3, 4, 5, 6]);
386        assert_eq!(bytes.pop(3).bytes(), vec![1, 2, 3]);
387        assert_eq!(bytes.bytes(), vec![4, 5, 6]);
388    }
389
390    #[test]
391    fn test_byte_buffer() {
392        let mut buffer = ByteBuffer::new();
393        let mut bytes = Bytes::Empty;
394        bytes.append(Bytes::Byte(1));
395        buffer.append(bytes);
396        assert_eq!(buffer.size(), 1);
397        assert_eq(buffer.consume(1), &[1]);
398        assert_eq!(buffer.size(), 0);
399        buffer.append(Bytes::Byte(1));
400        buffer.append(Bytes::Bytes(vec![2, 3]));
401        assert_eq!(buffer.size(), 3);
402        assert_eq(buffer.consume(2), &[1, 2]);
403        assert_eq!(buffer.size(), 1);
404        assert_eq(buffer.consume(1), &[3]);
405        assert!(buffer.is_empty());
406
407        let mut buffer = ByteBuffer::new();
408        buffer.append(Bytes::Bytes(vec![1]));
409        buffer.append(Bytes::Bytes(vec![2, 3]));
410        buffer.append(Bytes::Bytes(vec![4, 5, 6]));
411        buffer.append(Bytes::Bytes(vec![7, 8, 9, 10]));
412        assert_eq!(buffer.size(), 10);
413        assert_eq(buffer.consume(4), &[1, 2, 3, 4]);
414        assert_eq(buffer.consume(3), &[5, 6, 7]);
415        assert_eq(buffer.consume(2), &[8, 9]);
416        assert_eq(buffer.consume(1), &[10]);
417
418        fn assert_eq(bytes: Bytes, same: &[u8]) {
419            assert_eq!(&bytes.bytes(), same);
420        }
421    }
422}