vortex_io/file/read/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod request;
5mod source;
6
7use std::fmt;
8use std::fmt::{Debug, Display};
9use std::pin::Pin;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::task::{Context, Poll, ready};
13
14use async_trait::async_trait;
15use futures::channel::mpsc;
16use futures::future::{BoxFuture, Shared};
17use futures::{FutureExt, TryFutureExt};
18pub use request::*;
19pub use source::*;
20use vortex_buffer::{Alignment, ByteBuffer};
21use vortex_error::{SharedVortexResult, VortexError, VortexResult, vortex_err};
22
23use crate::VortexReadAt;
24
25/// A handle to an open file that can be read using a Vortex runtime.
26///
27/// ## Coalescing and Pre-fetching
28///
29/// It is important to understand the semantics of the read futures returned by a [`FileRead`].
30/// Under the hood, each [`FileRead`] is backed by a stream that services read requests by
31/// applying coalescing and concurrency constraints.
32///
33/// Each read future has four states:
34/// * `registered` - the read future has been created, but not yet polled.
35/// * `requested` - the read future has been polled.
36/// * `in-flight` - the read request has been sent to the underlying storage system.
37/// * `resolved` - the read future has completed and resolved a result.
38///
39/// When a read request is `registered`, it will not itself trigger any I/O, but is eligible to
40/// be coalesced with other requests.
41///
42/// If a read future is dropped, it will be canceled if possible. This depends on the current
43/// state of the request, as well as whether the underlying storage system supports cancellation.
44///
45/// I/O requests will be processed in the order they are `registered`, however coalescing may mean
46/// other registered requests are lumped together into a single I/O operation.
47#[derive(Clone)]
48pub struct FileRead {
49    /// Human-readable descriptor for the file, typically its URI.
50    uri: Arc<str>,
51    /// A shared future that resolves to the size of the file.
52    size: Shared<BoxFuture<'static, SharedVortexResult<u64>>>,
53    /// A queue for sending read request events to the I/O stream.
54    events: mpsc::UnboundedSender<ReadEvent>,
55    /// The next read request ID.
56    next_id: Arc<AtomicUsize>,
57}
58
59impl Debug for FileRead {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        f.debug_struct("FileHandle")
62            .field("uri", &self.uri)
63            .finish()
64    }
65}
66
67impl Display for FileRead {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        write!(f, "{}", self.uri)
70    }
71}
72
73impl FileRead {
74    pub(crate) fn new(
75        uri: Arc<str>,
76        size: BoxFuture<'static, VortexResult<u64>>,
77        send: mpsc::UnboundedSender<ReadEvent>,
78    ) -> Self {
79        Self {
80            uri,
81            size: size.map_err(Arc::new).boxed().shared(),
82            events: send,
83            next_id: Arc::new(AtomicUsize::new(0)),
84        }
85    }
86
87    /// The URI of the file.
88    pub fn uri(&self) -> &Arc<str> {
89        &self.uri
90    }
91}
92
93/// A future that resolves a read request from a [`FileRead`].
94///
95/// See the documentation for [`FileRead`] for details on coalescing and pre-fetching.
96/// If dropped, the read request will be canceled where possible.
97struct ReadFuture {
98    id: usize,
99    recv: oneshot::Receiver<VortexResult<ByteBuffer>>,
100    polled: bool,
101    events: mpsc::UnboundedSender<ReadEvent>,
102}
103
104impl Future for ReadFuture {
105    type Output = VortexResult<ByteBuffer>;
106
107    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
108        if !self.polled {
109            self.polled = true;
110            // Notify the I/O stream that this request has been polled.
111            if let Err(e) = self.events.unbounded_send(ReadEvent::Polled(self.id)) {
112                return Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}")));
113            }
114        }
115
116        match ready!(self.recv.poll_unpin(cx)) {
117            Ok(result) => Poll::Ready(result),
118            Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))),
119        }
120    }
121}
122
123impl Drop for ReadFuture {
124    fn drop(&mut self) {
125        // When the FileHandle is dropped, we can send a shutdown event to the I/O stream.
126        // If the I/O stream has already been dropped, this will fail silently.
127        let _ = self.events.unbounded_send(ReadEvent::Dropped(self.id));
128    }
129}
130
131#[derive(Debug)]
132pub(crate) enum ReadEvent {
133    Request(ReadRequest),
134    Polled(RequestId),
135    Dropped(RequestId),
136}
137
138#[async_trait]
139impl VortexReadAt for FileRead {
140    fn read_at(
141        &self,
142        offset: u64,
143        length: usize,
144        alignment: Alignment,
145    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
146        let (send, recv) = oneshot::channel();
147        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
148        let event = ReadEvent::Request(ReadRequest {
149            id,
150            offset,
151            length,
152            alignment,
153            callback: send,
154        });
155
156        // If we fail to submit the event, we create a future that has failed.
157        if let Err(e) = self.events.unbounded_send(event) {
158            return async move { Err(vortex_err!("Failed to submit read request: {e}")) }.boxed();
159        }
160
161        ReadFuture {
162            id,
163            recv,
164            polled: false,
165            events: self.events.clone(),
166        }
167        .boxed()
168    }
169
170    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
171        self.size.clone().map_err(VortexError::from).boxed()
172    }
173}