vortex_io/file/read/
mod.rs1mod request;
5mod source;
6
7use std::fmt;
8use std::fmt::Debug;
9use std::fmt::Display;
10use std::pin::Pin;
11use std::sync::Arc;
12use std::sync::atomic::AtomicUsize;
13use std::sync::atomic::Ordering;
14use std::task::Context;
15use std::task::Poll;
16use std::task::ready;
17
18use async_trait::async_trait;
19use futures::FutureExt;
20use futures::TryFutureExt;
21use futures::channel::mpsc;
22use futures::future::BoxFuture;
23use futures::future::Shared;
24pub use request::*;
25pub use source::*;
26use vortex_buffer::Alignment;
27use vortex_buffer::ByteBuffer;
28use vortex_error::SharedVortexResult;
29use vortex_error::VortexError;
30use vortex_error::VortexResult;
31use vortex_error::vortex_err;
32
33use crate::VortexReadAt;
34
35#[derive(Clone)]
58pub struct FileRead {
59 uri: Arc<str>,
61 size: Shared<BoxFuture<'static, SharedVortexResult<u64>>>,
63 events: mpsc::UnboundedSender<ReadEvent>,
65 next_id: Arc<AtomicUsize>,
67}
68
69impl Debug for FileRead {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.debug_struct("FileHandle")
72 .field("uri", &self.uri)
73 .finish()
74 }
75}
76
77impl Display for FileRead {
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 write!(f, "{}", self.uri)
80 }
81}
82
83impl FileRead {
84 pub(crate) fn new(
85 uri: Arc<str>,
86 size: BoxFuture<'static, VortexResult<u64>>,
87 send: mpsc::UnboundedSender<ReadEvent>,
88 ) -> Self {
89 Self {
90 uri,
91 size: size.map_err(Arc::new).boxed().shared(),
92 events: send,
93 next_id: Arc::new(AtomicUsize::new(0)),
94 }
95 }
96
97 pub fn uri(&self) -> &Arc<str> {
99 &self.uri
100 }
101}
102
103struct ReadFuture {
108 id: usize,
109 recv: oneshot::Receiver<VortexResult<ByteBuffer>>,
110 polled: bool,
111 events: mpsc::UnboundedSender<ReadEvent>,
112}
113
114impl Future for ReadFuture {
115 type Output = VortexResult<ByteBuffer>;
116
117 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
118 if !self.polled {
119 self.polled = true;
120 if let Err(e) = self.events.unbounded_send(ReadEvent::Polled(self.id)) {
122 return Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}")));
123 }
124 }
125
126 match ready!(self.recv.poll_unpin(cx)) {
127 Ok(result) => Poll::Ready(result),
128 Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))),
129 }
130 }
131}
132
133impl Drop for ReadFuture {
134 fn drop(&mut self) {
135 drop(self.events.unbounded_send(ReadEvent::Dropped(self.id)));
138 }
139}
140
141#[derive(Debug)]
142pub(crate) enum ReadEvent {
143 Request(ReadRequest),
144 Polled(RequestId),
145 Dropped(RequestId),
146}
147
148#[async_trait]
149impl VortexReadAt for FileRead {
150 fn read_at(
151 &self,
152 offset: u64,
153 length: usize,
154 alignment: Alignment,
155 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
156 let (send, recv) = oneshot::channel();
157 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
158 let event = ReadEvent::Request(ReadRequest {
159 id,
160 offset,
161 length,
162 alignment,
163 callback: send,
164 });
165
166 if let Err(e) = self.events.unbounded_send(event) {
168 return async move { Err(vortex_err!("Failed to submit read request: {e}")) }.boxed();
169 }
170
171 ReadFuture {
172 id,
173 recv,
174 polled: false,
175 events: self.events.clone(),
176 }
177 .boxed()
178 }
179
180 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
181 self.size.clone().map_err(VortexError::from).boxed()
182 }
183}