speedy/
readable.rs

1use std::io::{
2    Read
3};
4
5use std::fs::File;
6use std::path::Path;
7use std::marker::PhantomData;
8
9use crate::reader::Reader;
10use crate::context::{Context, DefaultContext};
11use crate::endianness::Endianness;
12use crate::Error;
13use crate::circular_buffer::CircularBuffer;
14
15use crate::error::{
16    error_end_of_input,
17    error_input_buffer_is_too_small
18};
19
20struct BufferReader< 'a, C > where C: Context {
21    context: C,
22    ptr: *const u8,
23    end: *const u8,
24    phantom: PhantomData< &'a [u8] >
25}
26
27impl< 'a, C > BufferReader< 'a, C > where C: Context {
28    #[inline]
29    fn new( context: C, buffer: &'a [u8] ) -> Self {
30        BufferReader {
31            context,
32            ptr: buffer.as_ptr(),
33            end: unsafe { buffer.as_ptr().add( buffer.len() ) },
34            phantom: PhantomData
35        }
36    }
37}
38
39impl< 'a, C: Context > Reader< 'a, C > for BufferReader< 'a, C > {
40    #[inline(always)]
41    fn read_bytes( &mut self, output: &mut [u8] ) -> Result< (), C::Error > {
42        let length = output.len();
43        if self.can_read_at_least( length ) == Some( false ) {
44            return Err( error_end_of_input() );
45        }
46
47        unsafe {
48            std::ptr::copy_nonoverlapping( self.ptr, output.as_mut_ptr(), length );
49            self.ptr = self.ptr.add( length );
50        }
51
52        Ok(())
53    }
54
55    #[inline(always)]
56    unsafe fn read_bytes_into_ptr( &mut self, output: *mut u8, length: usize ) -> Result< (), C::Error > {
57        if self.can_read_at_least( length ) == Some( false ) {
58            return Err( error_end_of_input() );
59        }
60
61        unsafe { 
62            std::ptr::copy_nonoverlapping( self.ptr, output, length );
63            self.ptr = self.ptr.add( length );
64        }
65
66        Ok(())
67    }
68
69    #[inline(always)]
70    fn peek_bytes( &mut self, output: &mut [u8] ) -> Result< (), C::Error > {
71        let length = output.len();
72        if self.can_read_at_least( length ) == Some( false ) {
73            return Err( error_end_of_input() );
74        }
75
76        unsafe {
77            std::ptr::copy_nonoverlapping( self.ptr, output.as_mut_ptr(), length );
78        }
79
80        Ok(())
81    }
82
83    #[inline(always)]
84    unsafe fn peek_bytes_into_ptr( &mut self, output: *mut u8, length: usize ) -> Result< (), C::Error > {
85        if self.can_read_at_least( length ) == Some( false ) {
86            return Err( error_end_of_input() );
87        }
88
89        unsafe {
90            std::ptr::copy_nonoverlapping( self.ptr, output, length );
91        }
92        Ok(())
93    }
94
95    #[inline(always)]
96    fn skip_bytes( &mut self, length: usize ) -> Result< (), C::Error > {
97        if self.can_read_at_least( length ) == Some( false ) {
98            return Err( error_end_of_input() );
99        }
100
101        unsafe {
102            self.ptr = self.ptr.add( length );
103        }
104        Ok(())
105    }
106
107    #[inline(always)]
108    fn read_bytes_borrowed( &mut self, length: usize ) -> Option< Result< &'a [u8], C::Error > > {
109        if self.can_read_at_least( length ) == Some( false ) {
110            return Some( Err( error_end_of_input() ) );
111        }
112
113        let slice;
114        unsafe {
115            slice = std::slice::from_raw_parts( self.ptr, length );
116            self.ptr = self.ptr.add( length );
117        }
118
119        Some( Ok( slice ) )
120    }
121
122    #[inline(always)]
123    fn read_bytes_borrowed_from_reader< 'r >( &'r mut self, length: usize ) -> Option< Result< &'r [u8], C::Error > > {
124        if self.can_read_at_least( length ) == Some( false ) {
125            return Some( Err( error_end_of_input() ) );
126        }
127
128        let slice;
129        unsafe {
130            slice = std::slice::from_raw_parts( self.ptr, length );
131            self.ptr = self.ptr.add( length );
132        }
133
134        Some( Ok( slice ) )
135    }
136
137    #[inline(always)]
138    fn read_bytes_borrowed_until_eof( &mut self ) -> Option< &'a [u8] > {
139        let length = self.end as usize - self.ptr as usize;
140        let slice;
141        unsafe {
142            slice = std::slice::from_raw_parts( self.ptr, length );
143            self.ptr = self.ptr.add( length );
144        }
145
146        Some( slice )
147    }
148
149    #[inline(always)]
150    fn can_read_at_least( &self, size: usize ) -> Option< bool > {
151        Some( (self.end as usize - self.ptr as usize) >= size )
152    }
153
154    #[inline(always)]
155    fn context( &self ) -> &C {
156        &self.context
157    }
158
159    #[inline(always)]
160    fn context_mut( &mut self ) -> &mut C {
161        &mut self.context
162    }
163}
164
165struct CopyingBufferReader< 'ctx, 'a, C > where C: Context {
166    context: &'ctx mut C,
167    ptr: *const u8,
168    end: *const u8,
169    phantom: PhantomData< &'a [u8] >
170}
171
172impl< 'ctx, 'a, C > CopyingBufferReader< 'ctx, 'a, C > where C: Context {
173    #[inline]
174    fn new( context: &'ctx mut C, buffer: &'a [u8] ) -> Self {
175        CopyingBufferReader {
176            context,
177            ptr: buffer.as_ptr(),
178            end: unsafe { buffer.as_ptr().add( buffer.len() ) },
179            phantom: PhantomData
180        }
181    }
182}
183
184impl< 'ctx, 'r, 'a, C: Context > Reader< 'r, C > for CopyingBufferReader< 'ctx, 'a, C > {
185    #[inline(always)]
186    fn read_bytes( &mut self, output: &mut [u8] ) -> Result< (), C::Error > {
187        let length = output.len();
188        if self.can_read_at_least( length ) == Some( false ) {
189            return Err( error_end_of_input() );
190        }
191
192        unsafe {
193            std::ptr::copy_nonoverlapping( self.ptr, output.as_mut_ptr(), length );
194            self.ptr = self.ptr.add( length );
195        }
196
197        Ok(())
198    }
199
200    #[inline(always)]
201    unsafe fn read_bytes_into_ptr( &mut self, output: *mut u8, length: usize ) -> Result< (), C::Error > {
202        if self.can_read_at_least( length ) == Some( false ) {
203            return Err( error_end_of_input() );
204        }
205
206        unsafe {
207            std::ptr::copy_nonoverlapping( self.ptr, output, length );
208            self.ptr = self.ptr.add( length );
209        }
210
211        Ok(())
212    }
213
214    #[inline(always)]
215    fn peek_bytes( &mut self, output: &mut [u8] ) -> Result< (), C::Error > {
216        let length = output.len();
217        if self.can_read_at_least( length ) == Some( false ) {
218            return Err( error_end_of_input() );
219        }
220
221        unsafe {
222            std::ptr::copy_nonoverlapping( self.ptr, output.as_mut_ptr(), length );
223        }
224
225        Ok(())
226    }
227
228    #[inline(always)]
229    unsafe fn peek_bytes_into_ptr( &mut self, output: *mut u8, length: usize ) -> Result< (), C::Error > {
230        if self.can_read_at_least( length ) == Some( false ) {
231            return Err( error_end_of_input() );
232        }
233
234        unsafe {
235            std::ptr::copy_nonoverlapping( self.ptr, output, length );
236        }
237        Ok(())
238    }
239
240    #[inline(always)]
241    fn skip_bytes( &mut self, length: usize ) -> Result< (), C::Error > {
242        if self.can_read_at_least( length ) == Some( false ) {
243            return Err( error_end_of_input() );
244        }
245
246        unsafe {
247            self.ptr = self.ptr.add( length );
248        }
249        Ok(())
250    }
251
252    #[inline(always)]
253    fn read_bytes_borrowed_from_reader< 'reader >( &'reader mut self, length: usize ) -> Option< Result< &'reader [u8], C::Error > > {
254        if self.can_read_at_least( length ) == Some( false ) {
255            return Some( Err( error_end_of_input() ) );
256        }
257
258        let slice;
259        unsafe {
260            slice = std::slice::from_raw_parts( self.ptr, length );
261            self.ptr = self.ptr.add( length );
262        }
263
264        Some( Ok( slice ) )
265    }
266
267    #[inline(always)]
268    fn can_read_at_least( &self, size: usize ) -> Option< bool > {
269        Some( (self.end as usize - self.ptr as usize) >= size )
270    }
271
272    #[inline(always)]
273    fn context( &self ) -> &C {
274        &self.context
275    }
276
277    #[inline(always)]
278    fn context_mut( &mut self ) -> &mut C {
279        &mut self.context
280    }
281}
282
283struct StreamReader< C: Context, S: Read > {
284    context: C,
285    reader: S,
286    buffer: CircularBuffer,
287    is_buffering: bool
288}
289
290impl< 'a, C, S > StreamReader< C, S > where C: Context, S: Read {
291    #[inline(never)]
292    fn read_bytes_slow( &mut self, mut output: &mut [u8] ) -> Result< (), C::Error > {
293        if self.is_buffering && output.len() < self.buffer.capacity() {
294            let reader = &mut self.reader;
295            while self.buffer.len() < self.buffer.capacity() {
296                let bytes_written = self.buffer.try_append_with( self.buffer.capacity() - self.buffer.len(), |chunk| {
297                    reader.read( chunk )
298                }).map_err( |error| {
299                    let error = Error::from_io_error( error );
300                    <C::Error as From< Error >>::from( error )
301                })?;
302
303                if bytes_written == 0 {
304                    if self.buffer.len() < output.len() {
305                        return Err( error_end_of_input() );
306                    } else {
307                        break;
308                    }
309                }
310
311                if self.buffer.len() >= output.len() {
312                    break;
313                }
314            }
315        }
316
317        if self.buffer.len() > 0 {
318            let length = std::cmp::min( self.buffer.len(), output.len() );
319            self.buffer.consume_into( &mut output[ ..length ] );
320            output = &mut output[ length.. ];
321        }
322
323        if output.is_empty() {
324            return Ok(());
325        }
326
327        self.reader.read_exact( output ).map_err( |error| {
328            let error = Error::from_io_error( error );
329            <C::Error as From< Error >>::from( error )
330        })
331    }
332}
333
334impl< 'a, C: Context, S: Read > Reader< 'a, C > for StreamReader< C, S > {
335    #[inline(always)]
336    fn read_bytes( &mut self, output: &mut [u8] ) -> Result< (), C::Error > {
337        if self.buffer.len() >= output.len() {
338            self.buffer.consume_into( output );
339            return Ok(());
340        }
341
342        self.read_bytes_slow( output )
343    }
344
345    fn peek_bytes( &mut self, output: &mut [u8] ) -> Result< (), C::Error > {
346        if output.len() > self.buffer.len() {
347            let reader = &mut self.reader;
348            while self.buffer.len() < output.len() {
349                let mut chunk_size = output.len() - self.buffer.len();
350                if self.is_buffering {
351                    chunk_size = std::cmp::max( chunk_size, self.buffer.capacity() - self.buffer.len() );
352                }
353
354                let bytes_written = self.buffer.try_append_with( chunk_size, |chunk| {
355                    reader.read( chunk )
356                }).map_err( |error| {
357                    let error = Error::from_io_error( error );
358                    <C::Error as From< Error >>::from( error )
359                })?;
360
361                if bytes_written == 0 {
362                    return Err( error_end_of_input() );
363                }
364            }
365        }
366
367        let (a, b) = self.buffer.as_slices_of_length( output.len() );
368        output[ ..a.len() ].copy_from_slice( a );
369
370        if let Some( b ) = b {
371            output[ a.len().. ].copy_from_slice( b );
372        }
373
374        Ok(())
375    }
376
377    #[inline(always)]
378    fn context( &self ) -> &C {
379        &self.context
380    }
381
382    #[inline(always)]
383    fn context_mut( &mut self ) -> &mut C {
384        &mut self.context
385    }
386}
387
388impl< C: Context, S: Read > StreamReader< C, S > {
389    #[inline]
390    fn deserialize< 'a, T: Readable< 'a, C > >( context: C, reader: S, is_buffering: bool ) -> Result< T, C::Error > {
391        let capacity = if is_buffering {
392            8 * 1024
393        } else {
394            0
395        };
396
397        let mut reader = StreamReader {
398            context,
399            reader,
400            buffer: CircularBuffer::with_capacity( capacity ),
401            is_buffering
402        };
403
404        T::read_from( &mut reader )
405    }
406}
407
408pub trait Readable< 'a, C: Context >: Sized {
409    fn read_from< R: Reader< 'a, C > >( reader: &mut R ) -> Result< Self, C::Error >;
410
411    #[inline]
412    fn minimum_bytes_needed() -> usize {
413        0
414    }
415
416    /// Deserializes from a given buffer.
417    ///
418    /// This performs zero-copy deserialization when possible.
419    #[inline]
420    fn read_from_buffer( buffer: &'a [u8] ) -> Result< Self, C::Error > where Self: DefaultContext< Context = C >, C: Default {
421        Self::read_from_buffer_with_ctx( Default::default(), buffer )
422    }
423
424    /// Deserializes from a given buffer while also returning the amount of bytes consumed.
425    ///
426    /// This performs zero-copy deserialization when possible.
427    #[inline]
428    fn read_with_length_from_buffer( buffer: &'a [u8] ) -> (Result< Self, C::Error >, usize) where Self: DefaultContext< Context = C >, C: Default {
429        Self::read_with_length_from_buffer_with_ctx( Default::default(), buffer )
430    }
431
432    /// Deserializes from a given buffer.
433    ///
434    /// This never performs zero-copy deserialization.
435    #[inline]
436    fn read_from_buffer_copying_data( buffer: &[u8] ) -> Result< Self, C::Error > where Self: DefaultContext< Context = C >, C: Default {
437        Self::read_from_buffer_copying_data_with_ctx( Default::default(), buffer )
438    }
439
440    /// Deserializes from a given buffer while also returning the amount of bytes consumed.
441    ///
442    /// This never performs zero-copy deserialization.
443    #[inline]
444    fn read_with_length_from_buffer_copying_data( buffer: &[u8] ) -> (Result< Self, C::Error >, usize) where Self: DefaultContext< Context = C >, C: Default {
445        Self::read_with_length_from_buffer_copying_data_with_ctx( Default::default(), buffer )
446    }
447
448    /// Reads from a given stream without any buffering.
449    ///
450    /// This will only read what is necessary from the stream to deserialize
451    /// a given type, but is going to be slow.
452    ///
453    /// Use [`read_from_stream_buffered`](Readable::read_from_stream_buffered) if you need
454    /// to read from a stream and you don't care about not overreading.
455    #[inline]
456    fn read_from_stream_unbuffered( stream: impl Read ) -> Result< Self, C::Error > where Self: DefaultContext< Context = C >, C: Default {
457        Self::read_from_stream_unbuffered_with_ctx( Default::default(), stream )
458    }
459
460    /// Reads from a given stream with internal buffering.
461    ///
462    /// This will read more data from the stream than is necessary to deserialize
463    /// a given type, however it should be orders of magnitude faster than unbuffered streaming,
464    /// especially when reading relatively complex objects.
465    ///
466    /// Use the slower [`read_from_stream_unbuffered`](Readable::read_from_stream_unbuffered) if you want
467    /// to avoid overreading.
468    #[inline]
469    fn read_from_stream_buffered( stream: impl Read ) -> Result< Self, C::Error > where Self: DefaultContext< Context = C >, C: Default {
470        Self::read_from_stream_buffered_with_ctx( Default::default(), stream )
471    }
472
473    #[inline]
474    fn read_from_file( path: impl AsRef< Path > ) -> Result< Self, C::Error > where Self: DefaultContext< Context = C >, C: Default {
475        Self::read_from_file_with_ctx( Default::default(), path )
476    }
477
478    #[inline]
479    fn read_from_buffer_with_ctx( context: C, buffer: &'a [u8] ) -> Result< Self, C::Error > {
480        Self::read_with_length_from_buffer_with_ctx( context, buffer ).0
481    }
482
483    #[inline]
484    fn read_with_length_from_buffer_with_ctx( context: C, buffer: &'a [u8] ) -> (Result< Self, C::Error >, usize) {
485        let bytes_needed = Self::minimum_bytes_needed();
486        let buffer_length = buffer.len();
487        if buffer_length < bytes_needed {
488            return (Err( error_input_buffer_is_too_small( buffer_length, bytes_needed ) ), 0);
489        }
490
491        let mut reader = BufferReader::new( context, buffer );
492        let value = Self::read_from( &mut reader );
493        let bytes_read = reader.ptr as usize - buffer.as_ptr() as usize;
494        (value, bytes_read)
495    }
496
497    #[inline]
498    fn read_from_buffer_copying_data_with_ctx( context: C, buffer: &[u8] ) -> Result< Self, C::Error > {
499        Self::read_with_length_from_buffer_copying_data_with_ctx( context, buffer ).0
500    }
501
502    #[inline]
503    fn read_with_length_from_buffer_copying_data_with_ctx( mut context: C, buffer: &[u8] ) -> (Result< Self, C::Error >, usize) {
504        Self::read_with_length_from_buffer_copying_data_with_ctx_mut( &mut context, buffer )
505    }
506
507    #[inline]
508    fn read_with_length_from_buffer_copying_data_with_ctx_mut( context: &mut C, buffer: &[u8] ) -> (Result< Self, C::Error >, usize) {
509        let bytes_needed = Self::minimum_bytes_needed();
510        let buffer_length = buffer.len();
511        if buffer_length < bytes_needed {
512            return (Err( error_input_buffer_is_too_small( buffer_length, bytes_needed ) ), 0);
513        }
514
515        let mut reader = CopyingBufferReader::new( context, buffer );
516        let value = Self::read_from( &mut reader );
517        let bytes_read = reader.ptr as usize - buffer.as_ptr() as usize;
518        (value, bytes_read)
519    }
520
521    #[inline]
522    fn read_from_stream_unbuffered_with_ctx< S: Read >( context: C, stream: S ) -> Result< Self, C::Error > {
523        StreamReader::deserialize( context, stream, false )
524    }
525
526    #[inline]
527    fn read_from_stream_buffered_with_ctx< S: Read >( context: C, stream: S ) -> Result< Self, C::Error > {
528        StreamReader::deserialize( context, stream, true )
529    }
530
531    #[inline]
532    fn read_from_file_with_ctx( context: C, path: impl AsRef< Path > ) -> Result< Self, C::Error > {
533        let stream = File::open( path ).map_err( |error| {
534            let error = Error::from_io_error( error );
535            <C::Error as From< Error >>::from( error )
536        })?;
537
538        #[cfg(not(all(target_os = "linux", target_arch = "x86_64")))]
539        {
540            Self::read_from_stream_buffered_with_ctx( context, stream )
541        }
542
543        #[cfg(all(target_os = "linux", target_arch = "x86_64"))]
544        {
545            use std::os::unix::io::AsRawFd;
546
547            // Define our own bindings to avoid extra dependencies.
548            extern "C" {
549                fn mmap(
550                    addr: *mut std::ffi::c_void,
551                    len: usize,
552                    prot: i32,
553                    flags: i32,
554                    fd: i32,
555                    offset: i64
556                ) -> *mut std::ffi::c_void;
557
558                fn madvise(
559                    addr: *mut std::ffi::c_void,
560                    len: usize,
561                    advice: i32
562                ) -> i32;
563
564                fn munmap(
565                    addr: *mut std::ffi::c_void,
566                    len: usize
567                ) -> i32;
568            }
569
570            const MAP_PRIVATE: i32 = 0x0002;
571            const PROT_READ: i32 = 1;
572            const MAP_FAILED: *mut std::ffi::c_void = !0 as *mut std::ffi::c_void;
573            const MADV_SEQUENTIAL: i32 = 2;
574            const MADV_WILLNEED: i32 = 3;
575            static EMPTY: &[u8] = &[];
576
577            struct Mmap( *mut std::ffi::c_void, usize );
578            impl Mmap {
579                fn open( fp: &std::fs::File ) -> Result< Self, Error > {
580                    let size = fp.metadata().map_err( Error::from_io_error )?.len();
581                    if size > std::usize::MAX as u64 {
582                        return Err( crate::error::error_too_big_usize_for_this_architecture() );
583                    }
584
585                    if size == 0 {
586                        return Ok( Mmap( EMPTY.as_ptr() as _, 0 ) );
587                    }
588
589                    let size = size as usize;
590                    let pointer = unsafe { mmap( std::ptr::null_mut(), size, PROT_READ, MAP_PRIVATE, fp.as_raw_fd(), 0 ) };
591                    if pointer == MAP_FAILED {
592                        Err( Error::from_io_error( std::io::Error::last_os_error() ) )
593                    } else {
594                        Ok( Mmap( pointer, size ) )
595                    }
596                }
597
598                unsafe fn madvise( &mut self, advice: i32 ) -> Result< (), Error > {
599                    if self.1 == 0 {
600                        return Ok(());
601                    }
602
603                    if unsafe { madvise( self.0, self.1, advice ) } < 0 {
604                        Err( Error::from_io_error( std::io::Error::last_os_error() ) )
605                    } else {
606                        Ok(())
607                    }
608                }
609            }
610
611            impl std::ops::Deref for Mmap {
612                type Target = [u8];
613                #[inline]
614                fn deref( &self ) -> &Self::Target {
615                    unsafe {
616                        std::slice::from_raw_parts( self.0.cast::< u8 >(), self.1 )
617                    }
618                }
619            }
620
621            impl Drop for Mmap {
622                fn drop( &mut self ) {
623                    if self.1 != 0 {
624                        unsafe {
625                            munmap( self.0, self.1 );
626                        }
627                    }
628                }
629            }
630
631            let mut mmap = Mmap::open( &stream )?;
632            unsafe {
633                mmap.madvise( MADV_SEQUENTIAL )?;
634                mmap.madvise( MADV_WILLNEED )?;
635            }
636
637            Self::read_from_buffer_copying_data_with_ctx( context, &mmap )
638        }
639    }
640
641    // Since specialization is not stable yet we do it this way.
642    #[doc(hidden)]
643    #[inline(always)]
644    fn speedy_is_primitive() -> bool {
645        false
646    }
647
648    #[doc(hidden)]
649    #[inline]
650    unsafe fn speedy_slice_from_bytes( _: &[u8] ) -> &[Self] {
651        panic!();
652    }
653
654    #[doc(hidden)]
655    #[inline]
656    unsafe fn speedy_flip_endianness( _: *mut Self ) {
657        panic!();
658    }
659
660    #[doc(hidden)]
661    #[inline]
662    fn speedy_convert_slice_endianness( _: Endianness, _: &mut [Self] ) {
663        panic!()
664    }
665}
666
667#[test]
668fn test_peek() {
669    let value: &[f64] = &[2.0, 123.0];
670    let data = unsafe {
671        std::slice::from_raw_parts( value.as_ptr() as *const u8, 16 )
672    };
673
674    let mut ctx = crate::LittleEndian {};
675
676    macro_rules! test {
677        ($peek:ident, $read:ident) => {
678            let mut reader = CopyingBufferReader::new( &mut ctx, data );
679            let value = reader.$peek().unwrap();
680            for _ in 0..8 {
681                assert_eq!( value, reader.$peek().unwrap() );
682            }
683            assert_eq!( value, reader.$read().unwrap() );
684        }
685    }
686
687    test!( peek_f64, read_f64 );
688    test!( peek_f32, read_f32 );
689    test!( peek_u128, read_u128 );
690    test!( peek_u64, read_u64 );
691    test!( peek_u32, read_u32 );
692    test!( peek_u16, read_u16 );
693    test!( peek_u8, read_u8 );
694    test!( peek_i128, read_i128 );
695    test!( peek_i64, read_i64 );
696    test!( peek_i32, read_i32 );
697    test!( peek_i16, read_i16 );
698    test!( peek_i8, read_i8 );
699    test!( peek_u64_varint, read_u64_varint );
700
701    let mut reader = CopyingBufferReader::new( &mut ctx, data );
702    reader.peek_u8().unwrap();
703    assert_eq!( reader.read_f64().unwrap(), 2.0 );
704}