segsource/segment/
mod.rs

1#![allow(clippy::needless_range_loop)]
2use crate::{
3    error::{Error, Result},
4    Endidness,
5};
6use core::{
7    borrow::Borrow,
8    convert::TryFrom,
9    ops::{self, Bound, Index, RangeBounds as _},
10    sync::atomic::{AtomicUsize, Ordering},
11};
12#[cfg(feature = "std")]
13use std::io;
14
15mod data;
16pub use data::*;
17
18/// A segment of a [`crate::Source`].
19///
20/// This is where data is actually read from. Each segment keeps track of a few things:
21///
22/// 1. An initial offset (retrievable via [`Segment::initial_offset`]).
23/// 2. A cursor (retrievable via [`Segment::current_offset`]).
24/// 3. A reference to the source's data.
25///
26/// ## Index op
27///
28/// Like slices, [`Segment`]s support indexes via `usize`s or ranges. A few important things to note
29/// about this:
30///
31/// 1. The value(s) provided should be offsets (see the crate's top-level documentation for more
32///    info and what this means).
33/// 2. Unlike with a [`Segment`]'s various methods, no validation of the provided offset occurs,
34///    potentially leading to a panic.
35pub struct Segment<'s, I> {
36    initial_offset: usize,
37    position: AtomicUsize,
38    data: &'s [I],
39    // We use the slice's len a lot, and it never changes, so we might as well cache it.
40    size: usize,
41    // Used for u8 segments
42    endidness: Endidness,
43}
44
45impl<'s, I> Segment<'s, I> {
46    fn new_full(
47        data: &'s [I],
48        initial_offset: usize,
49        position: usize,
50        endidness: Endidness,
51    ) -> Self {
52        Self {
53            initial_offset,
54            position: AtomicUsize::new(position),
55            data,
56            endidness,
57            size: data.len(),
58        }
59    }
60
61    #[inline]
62    fn get_pos(&self) -> usize {
63        self.position.load(Ordering::Relaxed)
64    }
65
66    fn set_pos(&self, pos: usize) -> Result<()> {
67        self.validate_pos(pos, 0)?;
68        self.position.store(pos, Ordering::Relaxed);
69        Ok(())
70    }
71
72    #[inline]
73    fn to_pos(&self, offset: usize) -> usize {
74        offset - self.initial_offset
75    }
76
77    #[inline]
78    fn pos_to_offset(&self, pos: usize) -> usize {
79        pos + self.initial_offset
80    }
81
82    fn adj_pos(&self, amt: i128) -> Result<usize> {
83        let mut result = Ok(());
84        let prev_pos = {
85            let rval = self
86                .position
87                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |p| {
88                    let new_pos = (p as i128 + amt) as usize;
89                    result = self.validate_pos(new_pos, 0);
90                    if result.is_ok() {
91                        Some(new_pos)
92                    } else {
93                        None
94                    }
95                });
96            match rval {
97                Ok(v) => v,
98                Err(v) => v,
99            }
100        };
101        match result {
102            Err(e) => Err(e),
103            Ok(_) => Ok(prev_pos),
104        }
105    }
106
107    #[inline]
108    fn inner_with_offset(data: &'s [I], initial_offset: usize, endidness: Endidness) -> Self {
109        Self::new_full(data, initial_offset, 0, endidness)
110    }
111
112    #[inline]
113    pub fn new(data: &'s [I]) -> Self {
114        Self::new_full(data, 0, 0, Endidness::default())
115    }
116
117    /// Changes the initial offset.
118    #[inline]
119    pub fn change_initial_offset(&mut self, offset: usize) {
120        self.initial_offset = offset;
121    }
122
123    /// Returns a slice of the requested size containing the next n items (where n is
124    /// the `num_items` parameter) and then advances the [`Segment::current_offset`] by that much.
125    pub fn next_n_as_slice(&self, num_items: usize) -> Result<&[I]> {
126        let pos = self.adj_pos(num_items as i128)?;
127        Ok(&self.data[pos..pos + num_items])
128    }
129
130    /// Gets a reference to the next item and then advances the [`Segment::current_offset`] by 1
131    pub fn next_item_ref(&self) -> Result<&I> {
132        let pos = self.adj_pos(1)?;
133        Ok(&self.data[pos - 1])
134    }
135
136    pub fn next_n(&self, num_items: usize) -> Result<Segment<I>> {
137        let pos = self.adj_pos(num_items as i128)?;
138        Ok(Self::new_full(
139            &self.data[pos..pos + num_items],
140            self.initial_offset + pos,
141            0,
142            self.endidness,
143        ))
144    }
145
146    /// Fills the provided buffer with the next n items, where n is the length of the buffer and
147    /// then advances the [`Segment::current_offset`] by n.
148    pub fn next_item_refs(&self, buf: &mut [&'s I]) -> Result<()> {
149        let offset = self.current_offset();
150        self.validate_offset(offset, buf.len())?;
151        let idx = self.to_pos(offset);
152        let slice = &self.data[idx..idx + buf.len()];
153        for i in 0..buf.len() {
154            buf[i] = &slice[i];
155        }
156        Ok(())
157    }
158
159    #[inline]
160    /// Generates a new [`Segment`] using the provided slice and initial offset.
161    pub fn with_offset(data: &'s [I], initial_offset: usize) -> Self {
162        Self::inner_with_offset(data, initial_offset, Endidness::default())
163    }
164
165    #[inline]
166    /// The initial offset of the [`Segment`]. For more information, see the **Offsets** section
167    /// of the [`Segment`] documentation (which still needs to be written...).
168    pub fn initial_offset(&self) -> usize {
169        self.initial_offset
170    }
171
172    #[inline]
173    /// The number of items initially provided to the [`Segment`]. Because a [`Segment`]'s data
174    /// can't be changed, this value will never change either.
175    pub fn size(&self) -> usize {
176        self.size
177    }
178
179    #[inline]
180    /// The current offset of the [`Segment`]'s cursor.
181    pub fn current_offset(&self) -> usize {
182        self.pos_to_offset(self.get_pos())
183    }
184
185    /// Sets the reader's [`Segment::current_offset`].
186    pub fn move_to(&self, offset: usize) -> Result<()> {
187        self.set_pos((offset - self.initial_offset) as usize)?;
188        Ok(())
189    }
190
191    /// Alters the [`Segment::current_offset`] by the given amount.
192    pub fn move_by(&self, num_items: i128) -> Result<()> {
193        self.adj_pos(num_items)?;
194        Ok(())
195    }
196
197    /// Gets the item at the provided offset without altering the [`Segment::current_offset`].
198    pub fn item_ref_at(&self, offset: usize) -> Result<&I> {
199        self.validate_offset(offset, 0)?;
200        Ok(&self[offset])
201    }
202
203    pub fn current_item_ref(&self) -> Result<&I> {
204        self.item_ref_at(self.current_offset())
205    }
206
207    #[inline]
208    /// Gets a slice of all remaining data in the [`Segment`] and then advances the
209    /// [`Segment::current_offset`] to the end of the segment.
210    pub fn get_remaining_as_slice(&self) -> Result<&[I]> {
211        let pos = self.adj_pos(self.remaining() as i128)?;
212        Ok(&self.data[pos..])
213    }
214
215    #[inline]
216    pub fn get_remaining(&self) -> Result<Self> {
217        let remaining = self.remaining();
218        //TODO remaining may have change between here
219        let pos = self.adj_pos(remaining as i128)?;
220        Ok(Self::new_full(
221            &self.data[pos..pos + remaining],
222            self.initial_offset + pos,
223            0,
224            self.endidness,
225        ))
226    }
227
228    #[inline]
229    /// The lowest valid offset that can be requested.
230    pub fn lower_offset_limit(&self) -> usize {
231        self.initial_offset
232    }
233
234    #[inline]
235    /// The highest valid offset that can be requested.
236    pub fn upper_offset_limit(&self) -> usize {
237        self.initial_offset + self.size
238    }
239
240    #[inline]
241    /// Checks whether or not there is any data left, relative to the [`Segment::current_offset`].
242    pub fn is_empty(&self) -> bool {
243        self.remaining() == 0
244    }
245
246    #[inline]
247    fn calc_remaining(&self, pos: usize) -> usize {
248        if pos > self.size {
249            0
250        } else {
251            self.size - pos
252        }
253    }
254
255    #[inline]
256    /// The amount of data left, relative to the [`Segment::current_offset`].
257    pub fn remaining(&self) -> usize {
258        self.calc_remaining(self.get_pos())
259    }
260
261    #[inline]
262    /// Returns `true` if there is more data after the  [`Segment::current_offset`].
263    pub fn has_more(&self) -> bool {
264        self.remaining() > 0
265    }
266
267    /// Fills the provided buffer with references to items, starting at the provided offset. This
268    /// does not alter the [`Segment::current_offset`].
269    pub fn item_refs_at<'a>(&'s self, offset: usize, buf: &mut [&'a I]) -> Result<()>
270    where
271        's: 'a,
272    {
273        self.validate_offset(offset, buf.len())?;
274        for i in 0..buf.len() {
275            buf[i] = self.item_ref_at(offset + i as usize)?;
276        }
277        Ok(())
278    }
279
280    fn validate_pos(&self, pos: usize, size: usize) -> Result<()> {
281        if size > 0 && self.calc_remaining(pos) == 0 {
282            Err(Error::NoMoreData)
283        } else if pos > self.size {
284            Err(Error::OffsetTooLarge {
285                offset: self.pos_to_offset(pos),
286            })
287        } else if pos > self.size - size as usize {
288            Err(Error::NotEnoughData {
289                requested: size,
290                left: self.size - pos,
291            })
292        } else {
293            Ok(())
294        }
295    }
296
297    /// A helper method that validates an offset.
298    ///
299    /// If the offset is valid, then `Ok(())` will be returned. Otherwise, the appropriate
300    /// [`Error`] is returned.
301    pub fn validate_offset(&self, offset: usize, size: usize) -> Result<()> {
302        // We can't just pass the offset along, because it might be too small and cause an overflow.
303        if offset < self.lower_offset_limit() {
304            Err(Error::OffsetTooSmall { offset })
305        } else {
306            self.validate_pos(self.to_pos(offset), size)
307        }
308    }
309
310    /// Takes an absolute offset and converts it to a relative offset, based off of the
311    /// [`Segment::current_offset`].
312    pub fn relative_offset(&self, abs_offset: usize) -> Result<usize> {
313        self.validate_offset(abs_offset, 0)?;
314        Ok(abs_offset - self.current_offset())
315    }
316
317    /// Returns a new [`Segment`] of the requested size, starting at the provied offset. This does
318    /// not alter the [`Segment::current_offset`].
319    pub fn get_n(&self, offset: usize, num_items: usize) -> Result<Segment<I>> {
320        self.validate_offset(offset, num_items)?;
321        Ok(Segment::inner_with_offset(
322            self.get_as_slice(offset, offset + num_items as usize)?,
323            offset,
324            self.endidness,
325        ))
326    }
327
328    pub fn get_n_as_slice(&self, offset: usize, num_items: usize) -> Result<&[I]> {
329        self.validate_offset(offset, num_items)?;
330        self.get_as_slice(offset, offset + num_items as usize)
331    }
332
333    /// Returns a slice of the data between the provided starting and ending offsets.
334    pub fn get_as_slice(&self, start: usize, end: usize) -> Result<&[I]> {
335        self.validate_offset(start, (end - start) as usize)?;
336        Ok(&self.data[start as usize..end as usize])
337    }
338
339    pub fn segment(&self, start: usize, end: usize) -> Result<Segment<I>> {
340        self.validate_offset(start, (end - start) as usize)?;
341        Ok(Segment::inner_with_offset(
342            &self[start..end],
343            start,
344            self.endidness,
345        ))
346    }
347
348    /// Creates a new segment off all items after the provided offset (inclusive).
349    pub fn all_after(&self, offset: usize) -> Result<Segment<I>> {
350        self.validate_offset(offset, 0)?;
351        Ok(Segment::inner_with_offset(
352            &self[offset..],
353            offset,
354            self.endidness,
355        ))
356    }
357
358    /// Creates a new segment off all items before the provided offset (exclusive).
359    pub fn all_before(&self, offset: usize) -> Result<Segment<I>> {
360        self.validate_offset(offset, 0)?;
361        Ok(Segment::inner_with_offset(
362            &self[..offset],
363            self.initial_offset,
364            self.endidness,
365        ))
366    }
367}
368
369impl<'s, I> Segment<'s, I>
370where
371    I: Default + Copy,
372{
373    /// Gets the next n items as an array and then advances the [`Segment::current_offset`] by the
374    /// size of the array
375    pub fn next_n_as_array<const N: usize>(&self) -> Result<[I; N]> {
376        let pos = self.adj_pos(N as i128)?;
377        let mut array = [I::default(); N];
378        array[..N].clone_from_slice(&self.data[pos..(N + pos)]);
379        Ok(array)
380    }
381}
382impl<'s, I, const N: usize> TryFrom<&Segment<'s, I>> for [I; N]
383where
384    I: Default + Copy,
385{
386    type Error = Error;
387
388    fn try_from(segment: &Segment<'s, I>) -> Result<Self> {
389        segment.next_n_as_array()
390    }
391}
392
393impl<'s, I> Segment<'s, I>
394where
395    I: PartialEq,
396{
397    /// Returns `true` if the next items are the same as the ones in the provided slice.
398    pub fn next_items_are(&self, prefix: &[I]) -> Result<bool> {
399        self.validate_offset(self.current_offset(), prefix.len())?;
400        for i in 0..prefix.len() {
401            if prefix[i] != self[self.current_offset() + i] {
402                return Ok(false);
403            }
404        }
405        Ok(true)
406    }
407}
408
409impl<'s, I: Clone> Segment<'s, I> {
410    /// Fills the provided buffer with bytes, starting at the provided offset. This does not alter
411    /// the [`Segment::current_offset`].
412    pub fn items_at(&self, offset: usize, buf: &mut [I]) -> Result<()> {
413        self.validate_offset(offset, buf.len())?;
414        for i in 0..buf.len() {
415            buf[i] = self.item_at(offset + i as usize)?.clone();
416        }
417        Ok(())
418    }
419
420    /// Gets the current byte and then advances the cursor.
421    pub fn next_item(&self) -> Result<I> {
422        let pos = self.adj_pos(1)?;
423        Ok(self.data[pos].clone())
424    }
425
426    pub fn next_items(&self, buf: &mut [I]) -> Result<()> {
427        let pos = self.adj_pos(buf.len() as i128)?;
428        buf.clone_from_slice(&self.data[pos..pos + buf.len()]);
429        Ok(())
430    }
431
432    /// Gets the item at the provided offset without altering the [`Segment::current_offset`].
433    pub fn item_at(&self, offset: usize) -> Result<I> {
434        self.validate_offset(offset, 0)?;
435        Ok(self[offset].clone())
436    }
437
438    pub fn current_item(&self) -> Result<I> {
439        self.item_at(self.current_offset())
440    }
441}
442
443impl<'s, I> AsRef<[I]> for Segment<'s, I> {
444    #[inline]
445    fn as_ref(&self) -> &[I] {
446        self.data
447    }
448}
449
450impl<'s, I> Index<usize> for Segment<'s, I> {
451    type Output = I;
452    fn index(&self, idx: usize) -> &Self::Output {
453        &self.data[self.to_pos(idx)]
454    }
455}
456
457macro_rules! add_idx_range {
458    ($type:ty) => {
459        impl<'s, I> Index<$type> for Segment<'s, I> {
460            type Output = [I];
461
462            fn index(&self, idx: $type) -> &Self::Output {
463                let start = match idx.start_bound() {
464                    Bound::Unbounded => 0,
465                    Bound::Included(i) => i - self.initial_offset,
466                    Bound::Excluded(i) => (i + 1) - self.initial_offset,
467                };
468                let end = match idx.end_bound() {
469                    Bound::Unbounded => self.size,
470                    Bound::Included(i) => (i + 1) - self.initial_offset,
471                    Bound::Excluded(i) => i - self.initial_offset,
472                };
473                &self.data[start..end]
474            }
475        }
476    };
477}
478
479add_idx_range! { ops::Range<usize> }
480add_idx_range! { ops::RangeFrom<usize> }
481add_idx_range! { ops::RangeInclusive<usize> }
482add_idx_range! { ops::RangeTo<usize> }
483add_idx_range! { ops::RangeToInclusive<usize> }
484add_idx_range! { ops::RangeFull }
485
486impl<'s, I> Borrow<[I]> for Segment<'s, I> {
487    #[inline]
488    fn borrow(&self) -> &[I] {
489        self.as_ref()
490    }
491}
492
493#[cfg(feature = "std")]
494impl<'s> io::Read for Segment<'s, u8> {
495    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
496        if self.remaining() > buf.len() {
497            self.next_bytes(buf)?;
498            Ok(buf.len())
499        } else {
500            let read = self.remaining() as usize;
501            for i in 0..read {
502                buf[i] = self.next_u8()?;
503            }
504            Ok(read)
505        }
506    }
507}
508
509#[cfg(feature = "std")]
510impl<'s> io::Seek for Segment<'s, u8> {
511    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
512        match pos {
513            io::SeekFrom::Start(to) => self.move_to(to as usize)?,
514            io::SeekFrom::Current(by) => {
515                self.move_to((self.current_offset() as i128 + by as i128) as usize)?
516            }
517            io::SeekFrom::End(point) => {
518                self.move_to((self.upper_offset_limit() as i128 - point as i128) as usize)?
519            }
520        };
521        Ok(self.current_offset() as u64)
522    }
523}
524
525#[cfg(feature = "std")]
526impl<'s> io::BufRead for Segment<'s, u8> {
527    fn fill_buf(&mut self) -> io::Result<&[u8]> {
528        let pos = self.get_pos();
529        if self.size - pos >= 4096 {
530            Ok(&self.data[pos..pos + 4096])
531        } else {
532            Ok(&self.data[pos..])
533        }
534    }
535    fn consume(&mut self, amt: usize) {
536        if !self.is_empty() {
537            if self.remaining() < amt {
538                self.move_to(self.upper_offset_limit()).unwrap();
539            } else {
540                self.adj_pos(amt as i128).unwrap();
541            }
542        }
543    }
544}
545
546impl<'s, I> Clone for Segment<'s, I> {
547    fn clone(&self) -> Self {
548        Self {
549            initial_offset: self.initial_offset,
550            position: AtomicUsize::new(self.get_pos()),
551            data: self.data,
552            endidness: self.endidness,
553            size: self.size,
554        }
555    }
556}
557
558#[cfg(feature = "async")]
559mod sync {
560    use super::Segment;
561    use crate::error::Error;
562    use core::{
563        cmp::min,
564        pin::Pin,
565        sync::atomic::Ordering,
566        task::{Context, Poll},
567    };
568    use std::io;
569    use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, ReadBuf};
570
571    impl<'r> AsyncRead for Segment<'r, u8> {
572        fn poll_read(
573            self: Pin<&mut Self>,
574            _: &mut Context,
575            buf: &mut ReadBuf,
576        ) -> Poll<io::Result<()>> {
577            let to_fill = buf.capacity() - buf.filled().len();
578            let mut end: usize = 0;
579            let maybe_pos = self
580                .position
581                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
582                    let remaining = self.calc_remaining(n);
583                    if remaining == 0 {
584                        None
585                    } else {
586                        let new = min(n + to_fill, n + remaining);
587                        end = new;
588                        Some(new)
589                    }
590                });
591            if let Ok(pos) = maybe_pos {
592                buf.put_slice(&self.data[pos..end]);
593            }
594            Poll::Ready(Ok(()))
595        }
596    }
597
598    impl<'r> AsyncSeek for Segment<'r, u8> {
599        fn start_seek(self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
600            let result = match pos {
601                io::SeekFrom::Start(to) => self.move_to(to as usize),
602                io::SeekFrom::Current(by) => self.move_by(by as i128),
603                io::SeekFrom::End(adj) => {
604                    self.move_to((self.upper_offset_limit() as i64 + adj) as usize)
605                }
606            };
607            match result {
608                Ok(()) => Ok(()),
609                Err(Error::IoError { error }) => Err(error),
610                Err(e) => panic!("{}", e),
611            }
612        }
613        fn poll_complete(self: Pin<&mut Self>, _: &mut Context) -> Poll<io::Result<u64>> {
614            Poll::Ready(Ok(self.current_offset() as u64))
615        }
616    }
617
618    impl<'r> AsyncBufRead for Segment<'r, u8> {
619        fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context) -> Poll<io::Result<&[u8]>> {
620            if self.remaining() == 0 {
621                Poll::Ready(Ok(&[]))
622            } else {
623                let pos = self.get_pos();
624                let to_get = min(8192, self.calc_remaining(pos));
625                Poll::Ready(Ok(&self.data[pos..pos + to_get]))
626            }
627        }
628
629        fn consume(self: Pin<&mut Self>, amount: usize) {
630            self.adj_pos(amount as i128).unwrap();
631        }
632    }
633}
634#[cfg(feature = "async")]
635pub use sync::*;