use futures_lite::AsyncSeekExt as _;
use std::{
fs,
io::{self, Seek as _},
mem,
pin::{Pin, pin},
};
use zng_unit::ByteUnits as _;
use crate::channel::{IpcBytes, IpcFileHandle};
#[derive(Debug)]
#[cfg_attr(ipc, derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub enum IpcReadHandle {
File(IpcFileHandle),
Bytes(IpcBytes),
}
impl From<IpcFileHandle> for IpcReadHandle {
fn from(f: IpcFileHandle) -> Self {
IpcReadHandle::File(f)
}
}
impl From<IpcBytes> for IpcReadHandle {
fn from(b: IpcBytes) -> Self {
IpcReadHandle::Bytes(b)
}
}
impl From<fs::File> for IpcReadHandle {
fn from(f: fs::File) -> Self {
IpcReadHandle::File(f.into())
}
}
impl IpcReadHandle {
pub fn best_read_blocking(mut file: std::fs::File) -> io::Result<Self> {
if file.metadata()?.len() > 5.megabytes().0 {
Ok(file.into())
} else {
file.seek(io::SeekFrom::Start(0))?;
IpcBytes::from_file_blocking(file).map(Into::into)
}
}
pub async fn best_read(mut file: crate::fs::File) -> io::Result<Self> {
if file.metadata().await?.len() > 5.megabytes().0 {
let file = file.try_unwrap().await.unwrap();
Ok(file.into())
} else {
file.seek(io::SeekFrom::Start(0)).await?;
IpcBytes::from_file(file).await.map(Into::into)
}
}
pub fn duplicate(&self) -> io::Result<Self> {
match self {
IpcReadHandle::File(h) => h.duplicate().map(Self::File),
IpcReadHandle::Bytes(b) => Ok(IpcReadHandle::Bytes(b.clone())),
}
}
pub fn read_blocking(self) -> io::Result<IpcReadBlocking> {
match self {
IpcReadHandle::File(h) => {
let mut file = std::fs::File::from(h);
file.seek(io::SeekFrom::Start(0))?;
Ok(IpcReadBlocking::File(io::BufReader::new(file)))
}
IpcReadHandle::Bytes(b) => Ok(IpcReadBlocking::Bytes(io::Cursor::new(b))),
}
}
pub async fn read(self) -> io::Result<IpcRead> {
match self {
IpcReadHandle::File(h) => {
let mut file = crate::fs::File::from(h);
file.seek(io::SeekFrom::Start(0)).await?;
Ok(IpcRead::File(crate::io::BufReader::new(file)))
}
IpcReadHandle::Bytes(b) => Ok(IpcRead::Bytes(crate::io::Cursor::new(b))),
}
}
pub fn read_to_bytes_blocking(self) -> io::Result<IpcBytes> {
match self {
IpcReadHandle::File(h) => {
let mut file = std::fs::File::from(h);
file.seek(io::SeekFrom::Start(0))?;
IpcBytes::from_file_blocking(file)
}
IpcReadHandle::Bytes(b) => Ok(b),
}
}
pub async fn read_to_bytes(self) -> io::Result<IpcBytes> {
match self {
IpcReadHandle::File(h) => {
let mut file = crate::fs::File::from(h);
file.seek(io::SeekFrom::Start(0)).await?;
IpcBytes::from_file(file).await
}
IpcReadHandle::Bytes(b) => Ok(b),
}
}
pub async fn duplicate_or_read(&mut self) -> io::Result<Self> {
match self.duplicate() {
Ok(d) => Ok(d),
Err(e) => {
tracing::debug!("duplicate_or_read duplicate error, {e}");
let f = mem::replace(self, IpcReadHandle::Bytes(IpcBytes::empty()));
let b = f.read_to_bytes().await?;
*self = IpcReadHandle::Bytes(b);
self.duplicate()
}
}
}
pub fn duplicate_or_read_blocking(&mut self) -> io::Result<Self> {
match self.duplicate() {
Ok(d) => Ok(d),
Err(e) => {
tracing::debug!("duplicate_or_read_blocking duplicate error, {e}");
let f = mem::replace(self, IpcReadHandle::Bytes(IpcBytes::empty()));
let b = f.read_to_bytes_blocking()?;
*self = IpcReadHandle::Bytes(b);
self.duplicate()
}
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum IpcReadBlocking {
File(io::BufReader<fs::File>),
Bytes(io::Cursor<IpcBytes>),
}
impl IpcReadBlocking {
pub fn read_to_bytes(&mut self) -> io::Result<IpcBytes> {
match self {
IpcReadBlocking::File(f) => IpcBytes::from_read_blocking(f),
IpcReadBlocking::Bytes(c) => {
let start = c.position();
let len = c.get_ref().len();
c.set_position(len as u64);
if start == 0 {
Ok(c.get_ref().clone())
} else {
IpcBytes::from_slice_blocking(&c.get_ref()[start as usize..])
}
}
}
}
pub fn remaining_len(&mut self) -> io::Result<u64> {
match self {
IpcReadBlocking::File(b) => {
let total_len = b.get_ref().metadata()?.len();
let position = b.stream_position()?;
Ok(total_len - position.min(total_len))
}
IpcReadBlocking::Bytes(b) => {
let total_len = b.get_ref().len() as u64;
Ok(total_len - b.position().min(total_len))
}
}
}
}
impl io::Read for IpcReadBlocking {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
IpcReadBlocking::File(f) => f.read(buf),
IpcReadBlocking::Bytes(b) => b.read(buf),
}
}
fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
match self {
IpcReadBlocking::File(f) => f.read_vectored(bufs),
IpcReadBlocking::Bytes(b) => b.read_vectored(bufs),
}
}
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
match self {
IpcReadBlocking::File(f) => f.read_to_end(buf),
IpcReadBlocking::Bytes(b) => b.read_to_end(buf),
}
}
fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
match self {
IpcReadBlocking::File(f) => f.read_to_string(buf),
IpcReadBlocking::Bytes(b) => b.read_to_string(buf),
}
}
fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
match self {
IpcReadBlocking::File(f) => f.read_exact(buf),
IpcReadBlocking::Bytes(b) => b.read_exact(buf),
}
}
}
impl io::Seek for IpcReadBlocking {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
match self {
IpcReadBlocking::File(f) => f.seek(pos),
IpcReadBlocking::Bytes(b) => b.seek(pos),
}
}
fn stream_position(&mut self) -> io::Result<u64> {
match self {
IpcReadBlocking::File(f) => f.stream_position(),
IpcReadBlocking::Bytes(b) => b.stream_position(),
}
}
}
impl io::BufRead for IpcReadBlocking {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
match self {
IpcReadBlocking::File(f) => f.fill_buf(),
IpcReadBlocking::Bytes(b) => b.fill_buf(),
}
}
fn consume(&mut self, amount: usize) {
match self {
IpcReadBlocking::File(f) => f.consume(amount),
IpcReadBlocking::Bytes(b) => b.consume(amount),
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum IpcRead {
File(crate::io::BufReader<crate::fs::File>),
Bytes(crate::io::Cursor<IpcBytes>),
}
impl IpcRead {
pub async fn read_to_bytes(&mut self) -> io::Result<IpcBytes> {
match self {
IpcRead::File(f) => IpcBytes::from_read(pin!(f)).await,
IpcRead::Bytes(c) => {
let start = c.position();
let len = c.get_ref().len();
c.set_position(len as u64);
let b = c.get_ref().clone();
if start == 0 {
Ok(b)
} else {
blocking::unblock(move || IpcBytes::from_slice_blocking(&b[start as usize..])).await
}
}
}
}
pub async fn remaining_len(&mut self) -> io::Result<u64> {
match self {
IpcRead::File(b) => {
let total_len = b.get_ref().metadata().await?.len();
let pos = b.seek(io::SeekFrom::Current(0)).await?;
Ok(total_len - pos.min(total_len))
}
IpcRead::Bytes(b) => {
let total_len = b.get_ref().len() as u64;
Ok(total_len - b.position().min(total_len))
}
}
}
}
impl crate::io::AsyncRead for IpcRead {
fn poll_read(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> std::task::Poll<io::Result<usize>> {
match self.get_mut() {
IpcRead::File(f) => Pin::new(f).poll_read(cx, buf),
IpcRead::Bytes(b) => Pin::new(b).poll_read(cx, buf),
}
}
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
bufs: &mut [io::IoSliceMut<'_>],
) -> std::task::Poll<io::Result<usize>> {
match self.get_mut() {
IpcRead::File(f) => Pin::new(f).poll_read_vectored(cx, bufs),
IpcRead::Bytes(b) => Pin::new(b).poll_read_vectored(cx, bufs),
}
}
}
impl crate::io::AsyncBufRead for IpcRead {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<&[u8]>> {
match self.get_mut() {
IpcRead::File(f) => Pin::new(f).poll_fill_buf(cx),
IpcRead::Bytes(b) => Pin::new(b).poll_fill_buf(cx),
}
}
fn consume(self: Pin<&mut Self>, amt: usize) {
match self.get_mut() {
IpcRead::File(f) => Pin::new(f).consume(amt),
IpcRead::Bytes(b) => Pin::new(b).consume(amt),
}
}
}
impl crate::io::AsyncSeek for IpcRead {
fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
match self.get_mut() {
IpcRead::File(f) => Pin::new(f).poll_seek(cx, pos),
IpcRead::Bytes(b) => Pin::new(b).poll_seek(cx, pos),
}
}
}