1use std::fs::File;
2use std::io;
3use std::io::{BufRead, BufReader, Read, StdinLock};
4
5use crate::result::{decoding_error, IonError, IonResult};
6
7pub trait IonDataSource: BufRead {
25 fn skip_bytes(&mut self, number_of_bytes: usize) -> IonResult<()> {
27 let mut bytes_skipped = 0;
28 while bytes_skipped < number_of_bytes {
29 let buffer = self.fill_buf()?;
30 if buffer.is_empty() {
31 return decoding_error("Unexpected end of stream.");
34 }
35 let bytes_in_buffer = buffer.len();
36 let bytes_to_skip = (number_of_bytes - bytes_skipped).min(bytes_in_buffer);
37 self.consume(bytes_to_skip);
38 bytes_skipped += bytes_to_skip;
39 }
40 Ok(())
41 }
42
43 fn next_byte(&mut self) -> IonResult<Option<u8>> {
45 match self.fill_buf()?.first() {
46 Some(&byte) => {
47 self.consume(1);
48 Ok(Some(byte))
49 }
50 None => Ok(None),
51 }
52 }
53
54 fn read_next_byte_while<F>(&mut self, byte_processor: &mut F) -> IonResult<usize>
57 where
58 F: FnMut(u8) -> bool,
59 {
60 let mut number_of_bytes_processed: usize = 0;
62 let mut number_of_buffered_bytes: usize;
64 let mut number_of_bytes_consumed: usize = 0;
66
67 loop {
68 let buffer = self.fill_buf()?;
70 number_of_buffered_bytes = buffer.len();
71
72 if number_of_buffered_bytes == 0 {
73 return decoding_error("Unexpected end of stream.");
76 }
77
78 for byte in buffer {
81 number_of_bytes_processed += 1;
82 if !byte_processor(*byte) {
83 self.consume(number_of_bytes_processed - number_of_bytes_consumed);
86 return Ok(number_of_bytes_processed);
87 }
88 }
89
90 self.consume(number_of_buffered_bytes);
93 number_of_bytes_consumed += number_of_buffered_bytes;
94 }
95 }
96
97 fn read_slice<T, F>(
104 &mut self,
105 length: usize,
106 fallback_buffer: &mut Vec<u8>,
107 slice_processor: F,
108 ) -> IonResult<T>
109 where
110 F: FnOnce(&[u8]) -> IonResult<T>,
111 {
112 let buffer = self.fill_buf()?;
114
115 if buffer.is_empty() && length > 0 {
117 return decoding_error("Unexpected end of stream.");
120 }
121
122 if buffer.len() >= length {
126 let result = slice_processor(&buffer[..length]);
127 self.consume(length);
128 return result;
129 }
130
131 let buffer: &mut [u8] = if fallback_buffer.len() < length {
133 fallback_buffer.resize(length, 0);
134 fallback_buffer.as_mut()
136 } else {
137 let (required_buffer, _) = fallback_buffer.split_at_mut(length);
140 required_buffer
141 };
142
143 match self.read_exact(buffer) {
145 Ok(()) => slice_processor(buffer),
146 Err(ref e) if e.kind() == std::io::ErrorKind::UnexpectedEof =>
147 {
150 decoding_error("Unexpected end of stream.")
151 }
152 Err(io_error) => Err(IonError::IoError { source: io_error }),
153 }
154 }
155}
156
157impl<T> IonDataSource for T where T: BufRead {}
158
159#[cfg(test)]
160mod tests {
161 use super::IonDataSource;
162 use crate::result::IonError;
163 use std::io::BufReader;
164
165 fn test_data(buffer_size: usize, data: &'static [u8]) -> impl IonDataSource {
166 BufReader::with_capacity(buffer_size, data)
167 }
168
169 #[test]
170 fn test_next_byte() {
171 let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
172
173 assert_eq!(Some(1), data_source.next_byte().unwrap());
174 assert_eq!(Some(2), data_source.next_byte().unwrap());
175 assert_eq!(Some(3), data_source.next_byte().unwrap());
176 assert_eq!(Some(4), data_source.next_byte().unwrap());
177 assert_eq!(Some(5), data_source.next_byte().unwrap());
178 }
179
180 #[test]
181 fn test_skip_bytes() {
182 let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
183 data_source.skip_bytes(3).unwrap();
184 assert_eq!(Some(4), data_source.next_byte().unwrap());
185 data_source.skip_bytes(1).unwrap();
186 assert_eq!(None, data_source.next_byte().unwrap());
187 }
188
189 #[test]
190 fn test_read_next_byte_while() {
191 let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
192 let mut sum: u64 = 0;
193 let processor = &mut |byte: u8| {
194 sum += byte as u64;
195 byte < 4
196 };
197 let number_of_bytes_processed = data_source.read_next_byte_while(processor).unwrap();
198 assert_eq!(number_of_bytes_processed, 4);
199 assert_eq!(sum, 10);
200 }
201
202 #[test]
203 fn test_read_slice() {
204 let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
205 let processor = &mut |data: &[u8]| Ok(data.iter().map(|byte| *byte as i32).sum());
206 let sum = data_source
207 .read_slice(4, &mut Vec::new(), processor)
208 .unwrap();
209 assert_eq!(10, sum);
210 }
211
212 #[test]
213 fn test_eof_during_skip_bytes() {
214 let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
215
216 let result = data_source.skip_bytes(42);
219
220 assert!(matches!(result, Err(IonError::DecodingError { .. })));
223 }
224
225 #[test]
226 fn test_eof_during_read_next_byte_while() {
227 let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
228 let mut sum: u64 = 0;
229 let processor = &mut |byte: u8| {
231 sum += byte as u64;
232 byte != 6
233 };
234 let result = data_source.read_next_byte_while(processor);
237
238 assert!(matches!(result, Err(IonError::DecodingError { .. })));
241 }
242
243 #[test]
244 fn test_eof_during_read_slice() {
245 let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
246 let mut fallback_buffer = vec![];
247 let processor = &mut |bytes: &[u8]| Ok(bytes.len());
250 let result = data_source.read_slice(
253 42, &mut fallback_buffer,
255 processor,
256 );
257
258 assert!(matches!(result, Err(IonError::DecodingError { .. })));
261 }
262}
263
264pub trait ToIonDataSource {
268 type DataSource: IonDataSource;
269 fn to_ion_data_source(self) -> Self::DataSource;
270}
271
272impl ToIonDataSource for String {
273 type DataSource = io::Cursor<Self>;
274
275 fn to_ion_data_source(self) -> Self::DataSource {
276 io::Cursor::new(self)
277 }
278}
279
280impl<'a> ToIonDataSource for &'a str {
281 type DataSource = io::Cursor<Self>;
282
283 fn to_ion_data_source(self) -> Self::DataSource {
284 io::Cursor::new(self)
285 }
286}
287
288impl<'a> ToIonDataSource for &'a [u8] {
289 type DataSource = io::Cursor<Self>;
290
291 fn to_ion_data_source(self) -> Self::DataSource {
292 io::Cursor::new(self)
293 }
294}
295
296impl<'a, const N: usize> ToIonDataSource for &'a [u8; N] {
297 type DataSource = io::Cursor<Self>;
298
299 fn to_ion_data_source(self) -> Self::DataSource {
300 io::Cursor::new(self)
301 }
302}
303
304impl ToIonDataSource for Vec<u8> {
305 type DataSource = io::Cursor<Self>;
306
307 fn to_ion_data_source(self) -> Self::DataSource {
308 io::Cursor::new(self)
309 }
310}
311
312impl<T: BufRead, U: BufRead> ToIonDataSource for io::Chain<T, U> {
313 type DataSource = Self;
314
315 fn to_ion_data_source(self) -> Self::DataSource {
316 self
317 }
318}
319
320impl<T> ToIonDataSource for io::Cursor<T>
321where
322 T: AsRef<[u8]>,
323{
324 type DataSource = Self;
325
326 fn to_ion_data_source(self) -> Self::DataSource {
327 self
328 }
329}
330
331impl<T: Read> ToIonDataSource for BufReader<T> {
332 type DataSource = Self;
333
334 fn to_ion_data_source(self) -> Self::DataSource {
335 self
336 }
337}
338
339impl ToIonDataSource for File {
340 type DataSource = BufReader<Self>;
341
342 fn to_ion_data_source(self) -> Self::DataSource {
343 BufReader::new(self)
344 }
345}
346
347impl<'a> ToIonDataSource for StdinLock<'a> {
349 type DataSource = Self;
350
351 fn to_ion_data_source(self) -> Self::DataSource {
352 self
353 }
354}