use std::io;
use std::io::{Error, ErrorKind, Read, Seek, SeekFrom};
#[cfg(feature = "mmap")] use std::borrow::Borrow;
#[cfg(feature = "mmap")] use std::fs::File;
use crate::fs::PosRead;
#[cfg(feature = "mmap")] use memmap::{Mmap, MmapOptions};
#[derive(Debug)]
pub struct ReadPos<P>
where P: PosRead
{
pos: u64,
length: u64,
pos_read: P,
}
#[derive(Debug)]
pub struct ReadSlice<P>
where P: PosRead
{
start: u64,
pos: u64,
end: u64,
pos_read: P,
}
impl<P> ReadPos<P>
where P: PosRead
{
pub fn new(pos_read: P, length: u64) -> Self {
ReadPos { pos: 0, length, pos_read }
}
pub fn len(&self) -> u64 {
self.length
}
pub fn is_empty(&self) -> bool {
self.length == 0
}
pub fn tell(&self) -> u64 {
self.pos
}
fn seek_from(&mut self, origin: u64, offset: i64) -> io::Result<u64> {
let checked_pos = if offset < 0 {
origin.checked_sub((-offset) as u64)
} else {
origin.checked_add(offset as u64)
};
if let Some(p) = checked_pos {
self.pos = p;
Ok(p)
} else if offset < 0 {
Err(Error::new(
ErrorKind::InvalidInput,
"Attempted seek to a negative absolute position"
))
} else {
Err(Error::new(
ErrorKind::Other,
"Attempted seek would overflow u64 position"
))
}
}
}
impl<P> Clone for ReadPos<P>
where P: PosRead + Clone
{
fn clone(&self) -> Self {
ReadPos { pos: 0,
length: self.length,
pos_read: self.pos_read.clone() }
}
}
impl<P> PosRead for ReadPos<P>
where P: PosRead
{
#[inline]
fn pread(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
self.pos_read.pread(buf, offset)
}
}
impl<P> Read for ReadPos<P>
where P: PosRead
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = self.pread(buf, self.pos)?;
self.pos += len as u64;
Ok(len)
}
}
impl<P> Seek for ReadPos<P>
where P: PosRead
{
fn seek(&mut self, from: SeekFrom) -> io::Result<u64> {
match from {
SeekFrom::Start(p) => {
self.pos = p;
Ok(p)
}
SeekFrom::End(offset) => {
let origin = self.length;
self.seek_from(origin, offset)
}
SeekFrom::Current(offset) => {
let origin = self.pos;
self.seek_from(origin, offset)
}
}
}
}
impl<P> ReadPos<P>
where P: PosRead + Clone
{
pub fn subslice(&self, start: u64, end: u64) -> ReadSlice<P> {
ReadSlice::new(self.pos_read.clone(), start, end)
}
}
impl<P> ReadSlice<P>
where P: PosRead
{
pub fn new(pos_read: P, start: u64, end: u64) -> Self {
assert!(start <= end);
ReadSlice { start, pos: start, end, pos_read }
}
pub fn len(&self) -> u64 {
self.end - self.start
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn tell(&self) -> u64 {
self.pos - self.start
}
fn pread_abs(&self, buf: &mut [u8], abspos: u64) -> io::Result<usize> {
if abspos < self.end {
let mlen = self.end - abspos;
if (buf.len() as u64) <= mlen {
self.pos_read.pread(buf, abspos)
} else {
self.pos_read.pread(&mut buf[..(mlen as usize)], abspos)
}
} else {
Ok(0)
}
}
fn seek_from(&mut self, origin: u64, offset: i64) -> io::Result<u64> {
let checked_pos = if offset < 0 {
origin.checked_sub((-offset) as u64)
} else {
origin.checked_add(offset as u64)
};
if let Some(p) = checked_pos {
self.seek_to(p)
} else if offset < 0 {
Err(Error::new(
ErrorKind::InvalidInput,
"Attempted seek to a negative position"
))
} else {
Err(Error::new(
ErrorKind::Other,
"Attempted seek would overflow u64 position"
))
}
}
fn seek_to(&mut self, abspos: u64) -> io::Result<u64> {
if abspos < self.start {
Err(Error::new(
ErrorKind::InvalidInput,
"Attempted seek to a negative position"
))
} else {
self.pos = abspos;
Ok(abspos - self.start)
}
}
}
impl<P> Clone for ReadSlice<P>
where P: PosRead + Clone
{
fn clone(&self) -> Self {
ReadSlice { start: self.start,
pos: self.start,
end: self.end,
pos_read: self.pos_read.clone() }
}
}
impl<P> PosRead for ReadSlice<P>
where P: PosRead
{
#[inline]
fn pread(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
let pos = self.start.saturating_add(offset);
if pos < self.end {
self.pread_abs(buf, pos)
} else {
Ok(0)
}
}
}
impl<P> Read for ReadSlice<P>
where P: PosRead
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = self.pread_abs(buf, self.pos)?;
self.pos += len as u64;
Ok(len)
}
}
impl<P> Seek for ReadSlice<P>
where P: PosRead
{
fn seek(&mut self, from: SeekFrom) -> io::Result<u64> {
match from {
SeekFrom::Start(p) => {
if let Some(p) = self.start.checked_add(p) {
self.seek_to(p)
} else {
Err(Error::new(
ErrorKind::Other,
"Attempted seek would overflow u64 position"
))
}
},
SeekFrom::End(offset) => {
let origin = self.end;
self.seek_from(origin, offset)
}
SeekFrom::Current(offset) => {
let origin = self.pos;
self.seek_from(origin, offset)
}
}
}
}
impl<P> ReadSlice<P>
where P: PosRead + Clone
{
pub fn subslice(&self, start: u64, end: u64) -> ReadSlice<P> {
let abs_start = self.start.checked_add(start)
.expect("ReadSlice::subslice start overflow");
let abs_end = self.start.checked_add(end)
.expect("ReadSlice::subslice end overflow");
assert!(abs_start <= abs_end);
assert!(self.start <= abs_start);
assert!(self.end >= abs_end);
ReadSlice::new(self.pos_read.clone(), abs_start, abs_end)
}
}
#[cfg(feature = "mmap")]
impl<P> ReadSlice<P>
where P: PosRead + Borrow<File>
{
pub fn mem_map(&self) -> Result<Mmap, io::Error> {
let offset = self.start;
let len = self.len();
assert!(len > 0);
assert!(len <= usize::max_value() as u64);
unsafe {
MmapOptions::new()
.offset(offset)
.len(len as usize)
.map(self.pos_read.borrow())
}
}
}
#[cfg(test)]
mod tests {
use std::fs::File;
use std::io::{BufReader, Read, Write};
use std::sync::Arc;
use std::thread;
use tempfile::tempfile;
use super::*;
#[test]
fn test_seek() {
let mut f = tempfile().unwrap();
f.write_all(b"1234567890").unwrap();
let mut r1 = ReadPos::new(f, 10);
let mut buf = [0u8; 5];
let p = r1.seek(SeekFrom::Current(0)).unwrap();
assert_eq!(0, p);
let p = r1.seek(SeekFrom::Current(1)).unwrap();
assert_eq!(1, p);
r1.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"23456");
let p = r1.seek(SeekFrom::End(-5)).unwrap();
assert_eq!(5, p);
let p = r1.seek(SeekFrom::Current(0)).unwrap();
assert_eq!(5, p);
r1.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"67890");
}
#[test]
fn test_with_buf_reader() {
let mut f = tempfile().unwrap();
f.write_all(b"1234567890").unwrap();
let r0 = ReadPos::new(Arc::new(f), 10);
let mut r1 = BufReader::with_capacity(0x2000, r0);
let mut buf = [0u8; 5];
let p = r1.seek(SeekFrom::Start(1)).unwrap();
assert_eq!(1, p);
r1.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"23456");
let mut r0 = r1.into_inner();
let p = r0.seek(SeekFrom::Current(0)).unwrap();
assert_eq!(10, p);
let l = r0.read(&mut buf).unwrap();
assert_eq!(0, l);
}
#[test]
fn test_interleaved() {
let mut f = tempfile().unwrap();
f.write_all(b"1234567890").unwrap();
let mut r1 = ReadPos::new(Arc::new(f), 10);
let mut buf = [0u8; 5];
r1.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"12345");
let mut r2 = r1.clone();
r2.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"12345");
r1.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"67890");
r2.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"67890");
}
#[test]
fn test_concurrent_seek_read() {
let mut f = tempfile().unwrap();
let rule = b"1234567890";
f.write_all(rule).unwrap();
let f = Arc::new(f);
let mut threads = Vec::with_capacity(30);
for i in 0..50 {
let mut rpc = ReadPos::new(f.clone(), rule.len() as u64);
threads.push(thread::spawn(move || {
let p = i % rule.len();
rpc.seek(SeekFrom::Start(p as u64)).expect("seek");
thread::yield_now();
let l = 5.min(rule.len() - p);
let mut buf = vec![0u8; l];
rpc.read_exact(&mut buf).expect("read_exact");
assert_eq!(&buf[..], &rule[p..(p+l)]);
}))
}
for t in threads {
t.join().unwrap();
}
}
#[test]
fn test_slice_seek() {
let mut f = tempfile().unwrap();
f.write_all(b"1234567890").unwrap();
let mut r1 = ReadSlice::new(f, 0, 10);
let mut buf = [0u8; 5];
let p = r1.seek(SeekFrom::Current(0)).unwrap();
assert_eq!(0, p);
let p = r1.seek(SeekFrom::Current(1)).unwrap();
assert_eq!(1, p);
r1.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"23456");
let p = r1.seek(SeekFrom::End(-5)).unwrap();
assert_eq!(5, p);
let p = r1.seek(SeekFrom::Current(0)).unwrap();
assert_eq!(5, p);
r1.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"67890");
}
#[test]
fn test_slice_seek_offset() {
let mut f = tempfile().unwrap();
f.write_all(b"012345678901").unwrap();
let r1 = ReadSlice::new(&f, 1, 12);
let mut r1 = r1.subslice(0, 10);
let mut buf = [0u8; 5];
let p = r1.seek(SeekFrom::Current(0)).unwrap();
assert_eq!(0, p);
let p = r1.seek(SeekFrom::Current(1)).unwrap();
assert_eq!(1, p);
r1.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"23456");
let p = r1.seek(SeekFrom::End(-5)).unwrap();
assert_eq!(5, p);
let p = r1.seek(SeekFrom::Current(0)).unwrap();
assert_eq!(5, p);
r1.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"67890");
}
#[test]
fn test_slice_with_buf_reader() {
let mut f = tempfile().unwrap();
f.write_all(b"01234567890").unwrap();
let r0 = ReadSlice::new(Arc::new(f), 1, 11);
let mut r1 = BufReader::with_capacity(0x2000, r0);
let mut buf = [0u8; 5];
let p = r1.seek(SeekFrom::Start(1)).unwrap();
assert_eq!(1, p);
r1.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"23456");
let mut r0 = r1.into_inner();
let p = r0.seek(SeekFrom::Current(0)).unwrap();
assert_eq!(10, p);
let l = r0.read(&mut buf).unwrap();
assert_eq!(0, l);
}
fn is_send<T: Send>() -> bool { true }
fn is_sync<T: Sync>() -> bool { true }
#[test]
fn test_send_sync() {
assert!(is_send::<ReadPos<File>>());
assert!(is_sync::<ReadPos<File>>());
assert!(is_send::<ReadPos<Arc<File>>>());
assert!(is_sync::<ReadPos<Arc<File>>>());
assert!(is_send::<ReadSlice<Arc<File>>>());
assert!(is_sync::<ReadSlice<Arc<File>>>());
}
fn is_pos_read<T: PosRead>() -> bool { true }
#[test]
fn test_generic_bounds() {
assert!(is_pos_read::<ReadPos<File>>());
assert!(is_pos_read::<ReadPos<Box<File>>>());
assert!(is_pos_read::<ReadPos<&File>>());
}
}