1#![allow(clippy::needless_range_loop)]
2use crate::{
3 error::{Error, Result},
4 Endidness,
5};
6use core::{
7 borrow::Borrow,
8 convert::TryFrom,
9 ops::{self, Bound, Index, RangeBounds as _},
10 sync::atomic::{AtomicUsize, Ordering},
11};
12#[cfg(feature = "std")]
13use std::io;
14
15mod data;
16pub use data::*;
17
18pub struct Segment<'s, I> {
36 initial_offset: usize,
37 position: AtomicUsize,
38 data: &'s [I],
39 size: usize,
41 endidness: Endidness,
43}
44
45impl<'s, I> Segment<'s, I> {
46 fn new_full(
47 data: &'s [I],
48 initial_offset: usize,
49 position: usize,
50 endidness: Endidness,
51 ) -> Self {
52 Self {
53 initial_offset,
54 position: AtomicUsize::new(position),
55 data,
56 endidness,
57 size: data.len(),
58 }
59 }
60
61 #[inline]
62 fn get_pos(&self) -> usize {
63 self.position.load(Ordering::Relaxed)
64 }
65
66 fn set_pos(&self, pos: usize) -> Result<()> {
67 self.validate_pos(pos, 0)?;
68 self.position.store(pos, Ordering::Relaxed);
69 Ok(())
70 }
71
72 #[inline]
73 fn to_pos(&self, offset: usize) -> usize {
74 offset - self.initial_offset
75 }
76
77 #[inline]
78 fn pos_to_offset(&self, pos: usize) -> usize {
79 pos + self.initial_offset
80 }
81
82 fn adj_pos(&self, amt: i128) -> Result<usize> {
83 let mut result = Ok(());
84 let prev_pos = {
85 let rval = self
86 .position
87 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |p| {
88 let new_pos = (p as i128 + amt) as usize;
89 result = self.validate_pos(new_pos, 0);
90 if result.is_ok() {
91 Some(new_pos)
92 } else {
93 None
94 }
95 });
96 match rval {
97 Ok(v) => v,
98 Err(v) => v,
99 }
100 };
101 match result {
102 Err(e) => Err(e),
103 Ok(_) => Ok(prev_pos),
104 }
105 }
106
107 #[inline]
108 fn inner_with_offset(data: &'s [I], initial_offset: usize, endidness: Endidness) -> Self {
109 Self::new_full(data, initial_offset, 0, endidness)
110 }
111
112 #[inline]
113 pub fn new(data: &'s [I]) -> Self {
114 Self::new_full(data, 0, 0, Endidness::default())
115 }
116
117 #[inline]
119 pub fn change_initial_offset(&mut self, offset: usize) {
120 self.initial_offset = offset;
121 }
122
123 pub fn next_n_as_slice(&self, num_items: usize) -> Result<&[I]> {
126 let pos = self.adj_pos(num_items as i128)?;
127 Ok(&self.data[pos..pos + num_items])
128 }
129
130 pub fn next_item_ref(&self) -> Result<&I> {
132 let pos = self.adj_pos(1)?;
133 Ok(&self.data[pos - 1])
134 }
135
136 pub fn next_n(&self, num_items: usize) -> Result<Segment<I>> {
137 let pos = self.adj_pos(num_items as i128)?;
138 Ok(Self::new_full(
139 &self.data[pos..pos + num_items],
140 self.initial_offset + pos,
141 0,
142 self.endidness,
143 ))
144 }
145
146 pub fn next_item_refs(&self, buf: &mut [&'s I]) -> Result<()> {
149 let offset = self.current_offset();
150 self.validate_offset(offset, buf.len())?;
151 let idx = self.to_pos(offset);
152 let slice = &self.data[idx..idx + buf.len()];
153 for i in 0..buf.len() {
154 buf[i] = &slice[i];
155 }
156 Ok(())
157 }
158
159 #[inline]
160 pub fn with_offset(data: &'s [I], initial_offset: usize) -> Self {
162 Self::inner_with_offset(data, initial_offset, Endidness::default())
163 }
164
165 #[inline]
166 pub fn initial_offset(&self) -> usize {
169 self.initial_offset
170 }
171
172 #[inline]
173 pub fn size(&self) -> usize {
176 self.size
177 }
178
179 #[inline]
180 pub fn current_offset(&self) -> usize {
182 self.pos_to_offset(self.get_pos())
183 }
184
185 pub fn move_to(&self, offset: usize) -> Result<()> {
187 self.set_pos((offset - self.initial_offset) as usize)?;
188 Ok(())
189 }
190
191 pub fn move_by(&self, num_items: i128) -> Result<()> {
193 self.adj_pos(num_items)?;
194 Ok(())
195 }
196
197 pub fn item_ref_at(&self, offset: usize) -> Result<&I> {
199 self.validate_offset(offset, 0)?;
200 Ok(&self[offset])
201 }
202
203 pub fn current_item_ref(&self) -> Result<&I> {
204 self.item_ref_at(self.current_offset())
205 }
206
207 #[inline]
208 pub fn get_remaining_as_slice(&self) -> Result<&[I]> {
211 let pos = self.adj_pos(self.remaining() as i128)?;
212 Ok(&self.data[pos..])
213 }
214
215 #[inline]
216 pub fn get_remaining(&self) -> Result<Self> {
217 let remaining = self.remaining();
218 let pos = self.adj_pos(remaining as i128)?;
220 Ok(Self::new_full(
221 &self.data[pos..pos + remaining],
222 self.initial_offset + pos,
223 0,
224 self.endidness,
225 ))
226 }
227
228 #[inline]
229 pub fn lower_offset_limit(&self) -> usize {
231 self.initial_offset
232 }
233
234 #[inline]
235 pub fn upper_offset_limit(&self) -> usize {
237 self.initial_offset + self.size
238 }
239
240 #[inline]
241 pub fn is_empty(&self) -> bool {
243 self.remaining() == 0
244 }
245
246 #[inline]
247 fn calc_remaining(&self, pos: usize) -> usize {
248 if pos > self.size {
249 0
250 } else {
251 self.size - pos
252 }
253 }
254
255 #[inline]
256 pub fn remaining(&self) -> usize {
258 self.calc_remaining(self.get_pos())
259 }
260
261 #[inline]
262 pub fn has_more(&self) -> bool {
264 self.remaining() > 0
265 }
266
267 pub fn item_refs_at<'a>(&'s self, offset: usize, buf: &mut [&'a I]) -> Result<()>
270 where
271 's: 'a,
272 {
273 self.validate_offset(offset, buf.len())?;
274 for i in 0..buf.len() {
275 buf[i] = self.item_ref_at(offset + i as usize)?;
276 }
277 Ok(())
278 }
279
280 fn validate_pos(&self, pos: usize, size: usize) -> Result<()> {
281 if size > 0 && self.calc_remaining(pos) == 0 {
282 Err(Error::NoMoreData)
283 } else if pos > self.size {
284 Err(Error::OffsetTooLarge {
285 offset: self.pos_to_offset(pos),
286 })
287 } else if pos > self.size - size as usize {
288 Err(Error::NotEnoughData {
289 requested: size,
290 left: self.size - pos,
291 })
292 } else {
293 Ok(())
294 }
295 }
296
297 pub fn validate_offset(&self, offset: usize, size: usize) -> Result<()> {
302 if offset < self.lower_offset_limit() {
304 Err(Error::OffsetTooSmall { offset })
305 } else {
306 self.validate_pos(self.to_pos(offset), size)
307 }
308 }
309
310 pub fn relative_offset(&self, abs_offset: usize) -> Result<usize> {
313 self.validate_offset(abs_offset, 0)?;
314 Ok(abs_offset - self.current_offset())
315 }
316
317 pub fn get_n(&self, offset: usize, num_items: usize) -> Result<Segment<I>> {
320 self.validate_offset(offset, num_items)?;
321 Ok(Segment::inner_with_offset(
322 self.get_as_slice(offset, offset + num_items as usize)?,
323 offset,
324 self.endidness,
325 ))
326 }
327
328 pub fn get_n_as_slice(&self, offset: usize, num_items: usize) -> Result<&[I]> {
329 self.validate_offset(offset, num_items)?;
330 self.get_as_slice(offset, offset + num_items as usize)
331 }
332
333 pub fn get_as_slice(&self, start: usize, end: usize) -> Result<&[I]> {
335 self.validate_offset(start, (end - start) as usize)?;
336 Ok(&self.data[start as usize..end as usize])
337 }
338
339 pub fn segment(&self, start: usize, end: usize) -> Result<Segment<I>> {
340 self.validate_offset(start, (end - start) as usize)?;
341 Ok(Segment::inner_with_offset(
342 &self[start..end],
343 start,
344 self.endidness,
345 ))
346 }
347
348 pub fn all_after(&self, offset: usize) -> Result<Segment<I>> {
350 self.validate_offset(offset, 0)?;
351 Ok(Segment::inner_with_offset(
352 &self[offset..],
353 offset,
354 self.endidness,
355 ))
356 }
357
358 pub fn all_before(&self, offset: usize) -> Result<Segment<I>> {
360 self.validate_offset(offset, 0)?;
361 Ok(Segment::inner_with_offset(
362 &self[..offset],
363 self.initial_offset,
364 self.endidness,
365 ))
366 }
367}
368
369impl<'s, I> Segment<'s, I>
370where
371 I: Default + Copy,
372{
373 pub fn next_n_as_array<const N: usize>(&self) -> Result<[I; N]> {
376 let pos = self.adj_pos(N as i128)?;
377 let mut array = [I::default(); N];
378 array[..N].clone_from_slice(&self.data[pos..(N + pos)]);
379 Ok(array)
380 }
381}
382impl<'s, I, const N: usize> TryFrom<&Segment<'s, I>> for [I; N]
383where
384 I: Default + Copy,
385{
386 type Error = Error;
387
388 fn try_from(segment: &Segment<'s, I>) -> Result<Self> {
389 segment.next_n_as_array()
390 }
391}
392
393impl<'s, I> Segment<'s, I>
394where
395 I: PartialEq,
396{
397 pub fn next_items_are(&self, prefix: &[I]) -> Result<bool> {
399 self.validate_offset(self.current_offset(), prefix.len())?;
400 for i in 0..prefix.len() {
401 if prefix[i] != self[self.current_offset() + i] {
402 return Ok(false);
403 }
404 }
405 Ok(true)
406 }
407}
408
409impl<'s, I: Clone> Segment<'s, I> {
410 pub fn items_at(&self, offset: usize, buf: &mut [I]) -> Result<()> {
413 self.validate_offset(offset, buf.len())?;
414 for i in 0..buf.len() {
415 buf[i] = self.item_at(offset + i as usize)?.clone();
416 }
417 Ok(())
418 }
419
420 pub fn next_item(&self) -> Result<I> {
422 let pos = self.adj_pos(1)?;
423 Ok(self.data[pos].clone())
424 }
425
426 pub fn next_items(&self, buf: &mut [I]) -> Result<()> {
427 let pos = self.adj_pos(buf.len() as i128)?;
428 buf.clone_from_slice(&self.data[pos..pos + buf.len()]);
429 Ok(())
430 }
431
432 pub fn item_at(&self, offset: usize) -> Result<I> {
434 self.validate_offset(offset, 0)?;
435 Ok(self[offset].clone())
436 }
437
438 pub fn current_item(&self) -> Result<I> {
439 self.item_at(self.current_offset())
440 }
441}
442
443impl<'s, I> AsRef<[I]> for Segment<'s, I> {
444 #[inline]
445 fn as_ref(&self) -> &[I] {
446 self.data
447 }
448}
449
450impl<'s, I> Index<usize> for Segment<'s, I> {
451 type Output = I;
452 fn index(&self, idx: usize) -> &Self::Output {
453 &self.data[self.to_pos(idx)]
454 }
455}
456
457macro_rules! add_idx_range {
458 ($type:ty) => {
459 impl<'s, I> Index<$type> for Segment<'s, I> {
460 type Output = [I];
461
462 fn index(&self, idx: $type) -> &Self::Output {
463 let start = match idx.start_bound() {
464 Bound::Unbounded => 0,
465 Bound::Included(i) => i - self.initial_offset,
466 Bound::Excluded(i) => (i + 1) - self.initial_offset,
467 };
468 let end = match idx.end_bound() {
469 Bound::Unbounded => self.size,
470 Bound::Included(i) => (i + 1) - self.initial_offset,
471 Bound::Excluded(i) => i - self.initial_offset,
472 };
473 &self.data[start..end]
474 }
475 }
476 };
477}
478
479add_idx_range! { ops::Range<usize> }
480add_idx_range! { ops::RangeFrom<usize> }
481add_idx_range! { ops::RangeInclusive<usize> }
482add_idx_range! { ops::RangeTo<usize> }
483add_idx_range! { ops::RangeToInclusive<usize> }
484add_idx_range! { ops::RangeFull }
485
486impl<'s, I> Borrow<[I]> for Segment<'s, I> {
487 #[inline]
488 fn borrow(&self) -> &[I] {
489 self.as_ref()
490 }
491}
492
493#[cfg(feature = "std")]
494impl<'s> io::Read for Segment<'s, u8> {
495 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
496 if self.remaining() > buf.len() {
497 self.next_bytes(buf)?;
498 Ok(buf.len())
499 } else {
500 let read = self.remaining() as usize;
501 for i in 0..read {
502 buf[i] = self.next_u8()?;
503 }
504 Ok(read)
505 }
506 }
507}
508
509#[cfg(feature = "std")]
510impl<'s> io::Seek for Segment<'s, u8> {
511 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
512 match pos {
513 io::SeekFrom::Start(to) => self.move_to(to as usize)?,
514 io::SeekFrom::Current(by) => {
515 self.move_to((self.current_offset() as i128 + by as i128) as usize)?
516 }
517 io::SeekFrom::End(point) => {
518 self.move_to((self.upper_offset_limit() as i128 - point as i128) as usize)?
519 }
520 };
521 Ok(self.current_offset() as u64)
522 }
523}
524
525#[cfg(feature = "std")]
526impl<'s> io::BufRead for Segment<'s, u8> {
527 fn fill_buf(&mut self) -> io::Result<&[u8]> {
528 let pos = self.get_pos();
529 if self.size - pos >= 4096 {
530 Ok(&self.data[pos..pos + 4096])
531 } else {
532 Ok(&self.data[pos..])
533 }
534 }
535 fn consume(&mut self, amt: usize) {
536 if !self.is_empty() {
537 if self.remaining() < amt {
538 self.move_to(self.upper_offset_limit()).unwrap();
539 } else {
540 self.adj_pos(amt as i128).unwrap();
541 }
542 }
543 }
544}
545
546impl<'s, I> Clone for Segment<'s, I> {
547 fn clone(&self) -> Self {
548 Self {
549 initial_offset: self.initial_offset,
550 position: AtomicUsize::new(self.get_pos()),
551 data: self.data,
552 endidness: self.endidness,
553 size: self.size,
554 }
555 }
556}
557
558#[cfg(feature = "async")]
559mod sync {
560 use super::Segment;
561 use crate::error::Error;
562 use core::{
563 cmp::min,
564 pin::Pin,
565 sync::atomic::Ordering,
566 task::{Context, Poll},
567 };
568 use std::io;
569 use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, ReadBuf};
570
571 impl<'r> AsyncRead for Segment<'r, u8> {
572 fn poll_read(
573 self: Pin<&mut Self>,
574 _: &mut Context,
575 buf: &mut ReadBuf,
576 ) -> Poll<io::Result<()>> {
577 let to_fill = buf.capacity() - buf.filled().len();
578 let mut end: usize = 0;
579 let maybe_pos = self
580 .position
581 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
582 let remaining = self.calc_remaining(n);
583 if remaining == 0 {
584 None
585 } else {
586 let new = min(n + to_fill, n + remaining);
587 end = new;
588 Some(new)
589 }
590 });
591 if let Ok(pos) = maybe_pos {
592 buf.put_slice(&self.data[pos..end]);
593 }
594 Poll::Ready(Ok(()))
595 }
596 }
597
598 impl<'r> AsyncSeek for Segment<'r, u8> {
599 fn start_seek(self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
600 let result = match pos {
601 io::SeekFrom::Start(to) => self.move_to(to as usize),
602 io::SeekFrom::Current(by) => self.move_by(by as i128),
603 io::SeekFrom::End(adj) => {
604 self.move_to((self.upper_offset_limit() as i64 + adj) as usize)
605 }
606 };
607 match result {
608 Ok(()) => Ok(()),
609 Err(Error::IoError { error }) => Err(error),
610 Err(e) => panic!("{}", e),
611 }
612 }
613 fn poll_complete(self: Pin<&mut Self>, _: &mut Context) -> Poll<io::Result<u64>> {
614 Poll::Ready(Ok(self.current_offset() as u64))
615 }
616 }
617
618 impl<'r> AsyncBufRead for Segment<'r, u8> {
619 fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context) -> Poll<io::Result<&[u8]>> {
620 if self.remaining() == 0 {
621 Poll::Ready(Ok(&[]))
622 } else {
623 let pos = self.get_pos();
624 let to_get = min(8192, self.calc_remaining(pos));
625 Poll::Ready(Ok(&self.data[pos..pos + to_get]))
626 }
627 }
628
629 fn consume(self: Pin<&mut Self>, amount: usize) {
630 self.adj_pos(amount as i128).unwrap();
631 }
632 }
633}
634#[cfg(feature = "async")]
635pub use sync::*;