yozuk_sdk/
stream.rs

1use mediatype::MediaTypeBuf;
2use std::io::{Read, Result};
3
4const HEADER_LENGTH: usize = 1024;
5
6pub struct InputStream {
7    reader: Box<dyn Read + Send + Sync>,
8    header: Option<Box<[u8]>>,
9    offset: usize,
10    media_type: MediaTypeBuf,
11}
12
13impl InputStream {
14    pub fn new<T, M>(reader: T, media_type: M) -> Self
15    where
16        T: 'static + Read + Send + Sync,
17        M: Into<MediaTypeBuf>,
18    {
19        Self {
20            reader: Box::new(reader),
21            header: None,
22            offset: 0,
23            media_type: media_type.into(),
24        }
25    }
26
27    pub fn read_header(&mut self) -> Result<&[u8]> {
28        if self.header.is_none() {
29            let mut header = vec![0; HEADER_LENGTH];
30            let len = self.reader.read(&mut header)?;
31            header.resize(len, 0);
32            self.header = Some(header.into_boxed_slice());
33        }
34        Ok(self.header())
35    }
36
37    pub fn header(&self) -> &[u8] {
38        if let Some(header) = &self.header {
39            header
40        } else {
41            &[]
42        }
43    }
44
45    pub fn media_type(&self) -> &MediaTypeBuf {
46        &self.media_type
47    }
48}
49
50impl Read for InputStream {
51    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
52        let header_remain = &self.header()[self.offset..];
53        if header_remain.is_empty() {
54            self.reader.read(buf)
55        } else {
56            let len = header_remain.len().min(buf.len());
57            buf[..len].copy_from_slice(&header_remain[..len]);
58            self.offset += len;
59            Ok(len)
60        }
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use mediatype::media_type;
68    use std::iter;
69
70    struct DataReader {
71        data: Vec<u8>,
72        offset: usize,
73    }
74
75    impl Read for DataReader {
76        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
77            let data_remain = &self.data[self.offset..];
78            let len = data_remain.len().min(buf.len());
79            buf[..len].copy_from_slice(&data_remain[..len]);
80            self.offset += len;
81            Ok(len)
82        }
83    }
84
85    #[test]
86    fn input_stream() {
87        let data = iter::repeat(0)
88            .enumerate()
89            .map(|(_, i)| (i % 0xff) as u8)
90            .take(HEADER_LENGTH * 2)
91            .collect::<Vec<_>>();
92
93        let mut stream = InputStream::new(
94            DataReader {
95                data: data.clone(),
96                offset: 0,
97            },
98            media_type!(APPLICATION / OCTET_STREAM),
99        );
100
101        assert_eq!(stream.read_header().unwrap(), &data[..HEADER_LENGTH]);
102        assert_eq!(stream.header(), &data[..HEADER_LENGTH]);
103
104        let mut buf = Vec::new();
105        stream.read_to_end(&mut buf).unwrap();
106        assert_eq!(buf, data);
107    }
108
109    #[test]
110    fn input_stream_small_data() {
111        let data = iter::repeat(0)
112            .enumerate()
113            .map(|(_, i)| (i % 0xff) as u8)
114            .take(HEADER_LENGTH / 2)
115            .collect::<Vec<_>>();
116
117        let mut stream = InputStream::new(
118            DataReader {
119                data: data.clone(),
120                offset: 0,
121            },
122            media_type!(APPLICATION / OCTET_STREAM),
123        );
124
125        assert_eq!(stream.read_header().unwrap(), &data);
126        assert_eq!(stream.header(), &data);
127
128        let mut buf = Vec::new();
129        stream.read_to_end(&mut buf).unwrap();
130        assert_eq!(buf, data);
131    }
132}