#[cfg(feature = "futures03")]
mod futures_imp;
#[cfg(test)]
mod tests;
#[cfg(feature = "tokio1")]
mod tokio_imp;
use crate::{BufList, errors::ReadExactError};
use bytes::{Buf, Bytes};
use std::{
cmp::Ordering,
io::{self, IoSlice, IoSliceMut, SeekFrom},
};
pub struct Cursor<T> {
inner: T,
data: CursorData,
}
impl<T: AsRef<BufList>> Cursor<T> {
pub fn new(inner: T) -> Cursor<T> {
let data = CursorData::new();
Cursor { inner, data }
}
pub fn into_inner(self) -> T {
self.inner
}
pub const fn get_ref(&self) -> &T {
&self.inner
}
pub const fn position(&self) -> u64 {
self.data.pos
}
pub fn set_position(&mut self, pos: u64) {
self.data.set_pos(self.inner.as_ref(), pos);
}
#[cfg(test)]
fn assert_invariants(&self) -> anyhow::Result<()> {
self.data.assert_invariants(self.inner.as_ref())
}
}
impl<T> Clone for Cursor<T>
where
T: Clone,
{
#[inline]
fn clone(&self) -> Self {
Cursor {
inner: self.inner.clone(),
data: self.data.clone(),
}
}
#[inline]
fn clone_from(&mut self, other: &Self) {
self.inner.clone_from(&other.inner);
self.data = other.data.clone();
}
}
impl<T: AsRef<BufList>> io::Seek for Cursor<T> {
fn seek(&mut self, style: SeekFrom) -> io::Result<u64> {
self.data.seek_impl(self.inner.as_ref(), style)
}
fn stream_position(&mut self) -> io::Result<u64> {
Ok(self.data.pos)
}
}
impl<T: AsRef<BufList>> io::Read for Cursor<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
Ok(self.data.read_impl(self.inner.as_ref(), buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
Ok(self.data.read_vectored_impl(self.inner.as_ref(), bufs))
}
fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
self.data.read_exact_impl(self.inner.as_ref(), buf)
}
}
impl<T: AsRef<BufList>> io::BufRead for Cursor<T> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
Ok(self.data.fill_buf_impl(self.inner.as_ref()))
}
fn consume(&mut self, amt: usize) {
self.data.consume_impl(self.inner.as_ref(), amt);
}
}
impl<T: AsRef<BufList>> Buf for Cursor<T> {
fn remaining(&self) -> usize {
let total = self.data.num_bytes(self.inner.as_ref());
total.saturating_sub(self.data.pos) as usize
}
fn has_remaining(&self) -> bool {
self.data.num_bytes(self.inner.as_ref()) > self.data.pos
}
fn chunk(&self) -> &[u8] {
self.data.fill_buf_impl(self.inner.as_ref())
}
fn advance(&mut self, amt: usize) {
self.data.consume_impl(self.inner.as_ref(), amt);
}
fn chunks_vectored<'iovs>(&'iovs self, iovs: &mut [IoSlice<'iovs>]) -> usize {
let list = self.inner.as_ref();
if iovs.is_empty() || !self.has_remaining() {
return 0;
}
let current_chunk = self.data.chunk;
let chunk_start_pos = list.get_start_pos()[current_chunk];
let offset_in_chunk = (self.data.pos - chunk_start_pos) as usize;
iovs[0] = IoSlice::new(
&list.get_chunk(current_chunk).expect("chunk is in range")[offset_in_chunk..],
);
let to_fill = (iovs.len()).min(list.num_chunks() - current_chunk);
for (i, iov) in iovs.iter_mut().enumerate().take(to_fill).skip(1) {
*iov = IoSlice::new(
list.get_chunk(current_chunk + i)
.expect("chunk is in range"),
);
}
to_fill
}
}
#[derive(Clone, Debug)]
struct CursorData {
chunk: usize,
pos: u64,
}
impl CursorData {
fn new() -> Self {
Self { chunk: 0, pos: 0 }
}
#[cfg(test)]
fn assert_invariants(&self, list: &BufList) -> anyhow::Result<()> {
use anyhow::ensure;
ensure!(
self.pos >= list.get_start_pos()[self.chunk],
"invariant failed: current position {} >= start position {} (chunk = {})",
self.pos,
list.get_start_pos()[self.chunk],
self.chunk
);
let next_pos = list.get_start_pos().get(self.chunk + 1).copied().into();
ensure!(
Offset::Value(self.pos) < next_pos,
"invariant failed: next start position {:?} > current position {} (chunk = {})",
next_pos,
self.pos,
self.chunk
);
Ok(())
}
fn seek_impl(&mut self, list: &BufList, style: SeekFrom) -> io::Result<u64> {
let (base_pos, offset) = match style {
SeekFrom::Start(n) => {
self.set_pos(list, n);
return Ok(n);
}
SeekFrom::End(n) => (self.num_bytes(list), n),
SeekFrom::Current(n) => (self.pos, n),
};
let new_pos = if offset >= 0 {
base_pos.checked_add(offset as u64)
} else {
base_pos.checked_sub(offset.wrapping_neg() as u64)
};
match new_pos {
Some(n) => {
self.set_pos(list, n);
Ok(self.pos)
}
None => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid seek to a negative or overflowing position",
)),
}
}
fn read_impl(&mut self, list: &BufList, buf: &mut [u8]) -> usize {
let mut buf_pos = 0;
while buf_pos < buf.len() {
let (chunk, chunk_pos) = match self.get_chunk_and_pos(list) {
Some(value) => value,
None => break,
};
let n_to_copy = (chunk.len() - chunk_pos).min(buf.len() - buf_pos);
let chunk_bytes = chunk.as_ref();
let bytes_to_copy = &chunk_bytes[chunk_pos..(chunk_pos + n_to_copy)];
let dest = &mut buf[buf_pos..(buf_pos + n_to_copy)];
dest.copy_from_slice(bytes_to_copy);
buf_pos += n_to_copy;
self.pos += n_to_copy as u64;
if n_to_copy == chunk.len() - chunk_pos {
self.chunk += 1;
}
}
buf_pos
}
fn read_vectored_impl(&mut self, list: &BufList, bufs: &mut [IoSliceMut<'_>]) -> usize {
let mut nread = 0;
for buf in bufs {
let n = self.read_impl(list, buf);
nread += n;
if n < buf.len() {
break;
}
}
nread
}
fn read_exact_impl(&mut self, list: &BufList, buf: &mut [u8]) -> io::Result<()> {
let total = self.num_bytes(list);
let remaining = total.saturating_sub(self.pos);
let buf_len = buf.len();
if remaining < buf_len as u64 {
self.set_pos(list, total);
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
ReadExactError { remaining, buf_len },
));
}
self.read_impl(list, buf);
Ok(())
}
fn fill_buf_impl<'a>(&'a self, list: &'a BufList) -> &'a [u8] {
const EMPTY_SLICE: &[u8] = &[];
match self.get_chunk_and_pos(list) {
Some((chunk, chunk_pos)) => &chunk.as_ref()[chunk_pos..],
None => EMPTY_SLICE,
}
}
fn consume_impl(&mut self, list: &BufList, amt: usize) {
self.set_pos(list, self.pos + amt as u64);
}
fn set_pos(&mut self, list: &BufList, new_pos: u64) {
match new_pos.cmp(&self.pos) {
Ordering::Greater => {
let start_pos = list.get_start_pos();
let next_start = start_pos.get(self.chunk + 1).copied().into();
if Offset::Value(new_pos) < next_start {
} else {
match start_pos[self.chunk + 1..].binary_search(&new_pos) {
Ok(delta_minus_one) => {
self.chunk += 1 + delta_minus_one;
}
Err(delta) => {
debug_assert!(
delta > 0,
"delta must be at least 1 since we already \
checked the same chunk (self.chunk = {})",
self.chunk,
);
self.chunk += delta;
}
}
}
}
Ordering::Equal => {}
Ordering::Less => {
let start_pos = list.get_start_pos();
if start_pos.get(self.chunk).copied() <= Some(new_pos) {
} else {
match start_pos[..self.chunk].binary_search(&new_pos) {
Ok(chunk) => {
self.chunk = chunk;
}
Err(chunk_plus_1) => {
debug_assert!(
chunk_plus_1 > 0,
"chunk_plus_1 must be at least 1 since self.start_pos[0] is 0 \
(self.chunk = {})",
self.chunk,
);
self.chunk = chunk_plus_1 - 1;
}
}
}
}
}
self.pos = new_pos;
}
#[inline]
fn get_chunk_and_pos<'b>(&self, list: &'b BufList) -> Option<(&'b Bytes, usize)> {
match list.get_chunk(self.chunk) {
Some(chunk) => {
debug_assert!(
self.pos < self.num_bytes(list),
"self.pos ({}) is less than num_bytes ({})",
self.pos,
self.num_bytes(list)
);
Some((
chunk,
(self.pos - list.get_start_pos()[self.chunk]) as usize,
))
}
None => {
None
}
}
}
fn num_bytes(&self, list: &BufList) -> u64 {
*list
.get_start_pos()
.last()
.expect("start_pos always has at least one element")
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
enum Offset<T> {
Value(T),
Eof,
}
impl<T> From<Option<T>> for Offset<T> {
fn from(value: Option<T>) -> Self {
match value {
Some(v) => Self::Value(v),
None => Self::Eof,
}
}
}