vortex_io/file/read/
mod.rs1mod request;
5mod source;
6
7use std::fmt;
8use std::fmt::{Debug, Display};
9use std::pin::Pin;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::task::{Context, Poll, ready};
13
14use async_trait::async_trait;
15use futures::channel::mpsc;
16use futures::future::{BoxFuture, Shared};
17use futures::{FutureExt, TryFutureExt};
18pub use request::*;
19pub use source::*;
20use vortex_buffer::{Alignment, ByteBuffer};
21use vortex_error::{SharedVortexResult, VortexError, VortexResult, vortex_err};
22
23use crate::VortexReadAt;
24
25#[derive(Clone)]
48pub struct FileRead {
49 uri: Arc<str>,
51 size: Shared<BoxFuture<'static, SharedVortexResult<u64>>>,
53 events: mpsc::UnboundedSender<ReadEvent>,
55 next_id: Arc<AtomicUsize>,
57}
58
59impl Debug for FileRead {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 f.debug_struct("FileHandle")
62 .field("uri", &self.uri)
63 .finish()
64 }
65}
66
67impl Display for FileRead {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 write!(f, "{}", self.uri)
70 }
71}
72
73impl FileRead {
74 pub(crate) fn new(
75 uri: Arc<str>,
76 size: BoxFuture<'static, VortexResult<u64>>,
77 send: mpsc::UnboundedSender<ReadEvent>,
78 ) -> Self {
79 Self {
80 uri,
81 size: size.map_err(Arc::new).boxed().shared(),
82 events: send,
83 next_id: Arc::new(AtomicUsize::new(0)),
84 }
85 }
86
87 pub fn uri(&self) -> &Arc<str> {
89 &self.uri
90 }
91}
92
93struct ReadFuture {
98 id: usize,
99 recv: oneshot::Receiver<VortexResult<ByteBuffer>>,
100 polled: bool,
101 events: mpsc::UnboundedSender<ReadEvent>,
102}
103
104impl Future for ReadFuture {
105 type Output = VortexResult<ByteBuffer>;
106
107 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
108 if !self.polled {
109 self.polled = true;
110 if let Err(e) = self.events.unbounded_send(ReadEvent::Polled(self.id)) {
112 return Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}")));
113 }
114 }
115
116 match ready!(self.recv.poll_unpin(cx)) {
117 Ok(result) => Poll::Ready(result),
118 Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))),
119 }
120 }
121}
122
123impl Drop for ReadFuture {
124 fn drop(&mut self) {
125 let _ = self.events.unbounded_send(ReadEvent::Dropped(self.id));
128 }
129}
130
131#[derive(Debug)]
132pub(crate) enum ReadEvent {
133 Request(ReadRequest),
134 Polled(RequestId),
135 Dropped(RequestId),
136}
137
138#[async_trait]
139impl VortexReadAt for FileRead {
140 fn read_at(
141 &self,
142 offset: u64,
143 length: usize,
144 alignment: Alignment,
145 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
146 let (send, recv) = oneshot::channel();
147 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
148 let event = ReadEvent::Request(ReadRequest {
149 id,
150 offset,
151 length,
152 alignment,
153 callback: send,
154 });
155
156 if let Err(e) = self.events.unbounded_send(event) {
158 return async move { Err(vortex_err!("Failed to submit read request: {e}")) }.boxed();
159 }
160
161 ReadFuture {
162 id,
163 recv,
164 polled: false,
165 events: self.events.clone(),
166 }
167 .boxed()
168 }
169
170 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
171 self.size.clone().map_err(VortexError::from).boxed()
172 }
173}