mod request;
mod source;
use std::fmt;
use std::fmt::{Debug, Display};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll, ready};
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, TryFutureExt};
pub use request::*;
pub use source::*;
use vortex_buffer::{Alignment, ByteBuffer};
use vortex_error::{SharedVortexResult, VortexError, VortexResult, vortex_err};
use crate::VortexReadAt;
#[derive(Clone)]
pub struct FileRead {
uri: Arc<str>,
size: Shared<BoxFuture<'static, SharedVortexResult<u64>>>,
events: mpsc::UnboundedSender<ReadEvent>,
next_id: Arc<AtomicUsize>,
}
impl Debug for FileRead {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FileHandle")
.field("uri", &self.uri)
.finish()
}
}
impl Display for FileRead {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.uri)
}
}
impl FileRead {
pub(crate) fn new(
uri: Arc<str>,
size: BoxFuture<'static, VortexResult<u64>>,
send: mpsc::UnboundedSender<ReadEvent>,
) -> Self {
Self {
uri,
size: size.map_err(Arc::new).boxed().shared(),
events: send,
next_id: Arc::new(AtomicUsize::new(0)),
}
}
pub fn uri(&self) -> &Arc<str> {
&self.uri
}
}
struct ReadFuture {
id: usize,
recv: oneshot::Receiver<VortexResult<ByteBuffer>>,
polled: bool,
events: mpsc::UnboundedSender<ReadEvent>,
}
impl Future for ReadFuture {
type Output = VortexResult<ByteBuffer>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.polled {
self.polled = true;
if let Err(e) = self.events.unbounded_send(ReadEvent::Polled(self.id)) {
return Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}")));
}
}
match ready!(self.recv.poll_unpin(cx)) {
Ok(result) => Poll::Ready(result),
Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))),
}
}
}
impl Drop for ReadFuture {
fn drop(&mut self) {
drop(self.events.unbounded_send(ReadEvent::Dropped(self.id)));
}
}
#[derive(Debug)]
pub(crate) enum ReadEvent {
Request(ReadRequest),
Polled(RequestId),
Dropped(RequestId),
}
#[async_trait]
impl VortexReadAt for FileRead {
fn read_at(
&self,
offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
let (send, recv) = oneshot::channel();
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let event = ReadEvent::Request(ReadRequest {
id,
offset,
length,
alignment,
callback: send,
});
if let Err(e) = self.events.unbounded_send(event) {
return async move { Err(vortex_err!("Failed to submit read request: {e}")) }.boxed();
}
ReadFuture {
id,
recv,
polled: false,
events: self.events.clone(),
}
.boxed()
}
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
self.size.clone().map_err(VortexError::from).boxed()
}
}