vortex_io/file/read/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod request;
5mod source;
6
7use std::fmt;
8use std::fmt::Debug;
9use std::fmt::Display;
10use std::pin::Pin;
11use std::sync::Arc;
12use std::sync::atomic::AtomicUsize;
13use std::sync::atomic::Ordering;
14use std::task::Context;
15use std::task::Poll;
16use std::task::ready;
17
18use async_trait::async_trait;
19use futures::FutureExt;
20use futures::TryFutureExt;
21use futures::channel::mpsc;
22use futures::future::BoxFuture;
23use futures::future::Shared;
24pub use request::*;
25pub use source::*;
26use vortex_buffer::Alignment;
27use vortex_buffer::ByteBuffer;
28use vortex_error::SharedVortexResult;
29use vortex_error::VortexError;
30use vortex_error::VortexResult;
31use vortex_error::vortex_err;
32
33use crate::VortexReadAt;
34
35/// A handle to an open file that can be read using a Vortex runtime.
36///
37/// ## Coalescing and Pre-fetching
38///
39/// It is important to understand the semantics of the read futures returned by a [`FileRead`].
40/// Under the hood, each [`FileRead`] is backed by a stream that services read requests by
41/// applying coalescing and concurrency constraints.
42///
43/// Each read future has four states:
44/// * `registered` - the read future has been created, but not yet polled.
45/// * `requested` - the read future has been polled.
46/// * `in-flight` - the read request has been sent to the underlying storage system.
47/// * `resolved` - the read future has completed and resolved a result.
48///
49/// When a read request is `registered`, it will not itself trigger any I/O, but is eligible to
50/// be coalesced with other requests.
51///
52/// If a read future is dropped, it will be canceled if possible. This depends on the current
53/// state of the request, as well as whether the underlying storage system supports cancellation.
54///
55/// I/O requests will be processed in the order they are `registered`, however coalescing may mean
56/// other registered requests are lumped together into a single I/O operation.
57#[derive(Clone)]
58pub struct FileRead {
59    /// Human-readable descriptor for the file, typically its URI.
60    uri: Arc<str>,
61    /// A shared future that resolves to the size of the file.
62    size: Shared<BoxFuture<'static, SharedVortexResult<u64>>>,
63    /// A queue for sending read request events to the I/O stream.
64    events: mpsc::UnboundedSender<ReadEvent>,
65    /// The next read request ID.
66    next_id: Arc<AtomicUsize>,
67}
68
69impl Debug for FileRead {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.debug_struct("FileHandle")
72            .field("uri", &self.uri)
73            .finish()
74    }
75}
76
77impl Display for FileRead {
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        write!(f, "{}", self.uri)
80    }
81}
82
83impl FileRead {
84    pub(crate) fn new(
85        uri: Arc<str>,
86        size: BoxFuture<'static, VortexResult<u64>>,
87        send: mpsc::UnboundedSender<ReadEvent>,
88    ) -> Self {
89        Self {
90            uri,
91            size: size.map_err(Arc::new).boxed().shared(),
92            events: send,
93            next_id: Arc::new(AtomicUsize::new(0)),
94        }
95    }
96
97    /// The URI of the file.
98    pub fn uri(&self) -> &Arc<str> {
99        &self.uri
100    }
101}
102
103/// A future that resolves a read request from a [`FileRead`].
104///
105/// See the documentation for [`FileRead`] for details on coalescing and pre-fetching.
106/// If dropped, the read request will be canceled where possible.
107struct ReadFuture {
108    id: usize,
109    recv: oneshot::Receiver<VortexResult<ByteBuffer>>,
110    polled: bool,
111    events: mpsc::UnboundedSender<ReadEvent>,
112}
113
114impl Future for ReadFuture {
115    type Output = VortexResult<ByteBuffer>;
116
117    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
118        if !self.polled {
119            self.polled = true;
120            // Notify the I/O stream that this request has been polled.
121            if let Err(e) = self.events.unbounded_send(ReadEvent::Polled(self.id)) {
122                return Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}")));
123            }
124        }
125
126        match ready!(self.recv.poll_unpin(cx)) {
127            Ok(result) => Poll::Ready(result),
128            Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))),
129        }
130    }
131}
132
133impl Drop for ReadFuture {
134    fn drop(&mut self) {
135        // When the FileHandle is dropped, we can send a shutdown event to the I/O stream.
136        // If the I/O stream has already been dropped, this will fail silently.
137        drop(self.events.unbounded_send(ReadEvent::Dropped(self.id)));
138    }
139}
140
141#[derive(Debug)]
142pub(crate) enum ReadEvent {
143    Request(ReadRequest),
144    Polled(RequestId),
145    Dropped(RequestId),
146}
147
148#[async_trait]
149impl VortexReadAt for FileRead {
150    fn read_at(
151        &self,
152        offset: u64,
153        length: usize,
154        alignment: Alignment,
155    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
156        let (send, recv) = oneshot::channel();
157        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
158        let event = ReadEvent::Request(ReadRequest {
159            id,
160            offset,
161            length,
162            alignment,
163            callback: send,
164        });
165
166        // If we fail to submit the event, we create a future that has failed.
167        if let Err(e) = self.events.unbounded_send(event) {
168            return async move { Err(vortex_err!("Failed to submit read request: {e}")) }.boxed();
169        }
170
171        ReadFuture {
172            id,
173            recv,
174            polled: false,
175            events: self.events.clone(),
176        }
177        .boxed()
178    }
179
180    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
181        self.size.clone().map_err(VortexError::from).boxed()
182    }
183}