vortex_io/file/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5#[cfg(unix)]
6use std::os::unix::fs::FileExt;
7use std::sync::Arc;
8
9use async_compat::Compat;
10use futures::future::BoxFuture;
11use futures::stream::BoxStream;
12use futures::{FutureExt, StreamExt};
13use tracing::Instrument;
14use vortex_buffer::ByteBufferMut;
15use vortex_error::{VortexError, VortexResult, vortex_ensure};
16
17use crate::file::IoRequest;
18use crate::file::read::{CoalesceWindow, IntoReadSource, ReadSource, ReadSourceRef};
19use crate::runtime::Handle;
20
21const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow {
22    distance: 1024 * 1024,      // 1 MB
23    max_size: 16 * 1024 * 1024, // 16 MB
24};
25const CONCURRENCY: usize = 192; // Number of concurrent requests to allow.
26
27pub struct ObjectStoreReadSource {
28    store: Arc<dyn object_store::ObjectStore>,
29    path: object_store::path::Path,
30    uri: Arc<str>,
31    concurrency: usize,
32    coalesce_window: Option<CoalesceWindow>,
33}
34
35impl ObjectStoreReadSource {
36    pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
37        let uri = Arc::from(path.to_string());
38        Self {
39            store,
40            path,
41            uri,
42            concurrency: CONCURRENCY,
43            coalesce_window: Some(COALESCING_WINDOW),
44        }
45    }
46
47    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
48        self.concurrency = concurrency;
49        self
50    }
51
52    pub fn with_coalesce_window(mut self, window: CoalesceWindow) -> Self {
53        self.coalesce_window = Some(window);
54        self
55    }
56
57    pub fn with_some_coalesce_window(mut self, window: Option<CoalesceWindow>) -> Self {
58        self.coalesce_window = window;
59        self
60    }
61}
62
63impl IntoReadSource for ObjectStoreReadSource {
64    fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
65        Ok(Arc::new(ObjectStoreIoSource { io: self, handle }))
66    }
67}
68
69struct ObjectStoreIoSource {
70    io: ObjectStoreReadSource,
71    handle: Handle,
72}
73
74impl ReadSource for ObjectStoreIoSource {
75    fn uri(&self) -> &Arc<str> {
76        &self.io.uri
77    }
78
79    fn coalesce_window(&self) -> Option<CoalesceWindow> {
80        self.io.coalesce_window
81    }
82
83    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
84        let store = self.io.store.clone();
85        let path = self.io.path.clone();
86        Compat::new(async move {
87            store
88                .head(&path)
89                .await
90                .map(|h| h.size)
91                .map_err(VortexError::from)
92        })
93        .boxed()
94    }
95
96    fn drive_send(
97        self: Arc<Self>,
98        requests: BoxStream<'static, IoRequest>,
99    ) -> BoxFuture<'static, ()> {
100        let self2 = self.clone();
101        requests
102            .map(move |req| {
103                let handle = self.handle.clone();
104                let store = self.io.store.clone();
105                let path = self.io.path.clone();
106
107                let len = req.len();
108                let range = req.range();
109                let alignment = req.alignment();
110
111                let read = async move {
112                    // Instead of calling `ObjectStore::get_range`, we expand the implementation and run it
113                    // ourselves to avoid a second copy to align the buffer. Instead, we can write directly
114                    // into the aligned buffer.
115                    let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
116
117                    let response = store
118                        .get_opts(
119                            &path,
120                            object_store::GetOptions {
121                                range: Some(object_store::GetRange::Bounded(range.clone())),
122                                ..Default::default()
123                            },
124                        )
125                        .await?;
126
127                    let buffer = match response.payload {
128                        object_store::GetResultPayload::File(file, _) => {
129                            // SAFETY: We're setting the length to the exact size we're about to read.
130                            // The read_exact_at call will either fill the entire buffer or return an error,
131                            // ensuring no uninitialized memory is exposed.
132                            unsafe { buffer.set_len(len) };
133
134                            handle
135                                .spawn_blocking(move || {
136                                    #[cfg(unix)] {
137                                        file.read_exact_at(&mut buffer, range.start)?;
138                                        Ok::<_, io::Error>(buffer)
139                                    }
140                                    #[cfg(not(unix))] {
141                                        file.seek(range.start)?;
142                                        file.read_exact(&mut buffer)?;
143                                        Ok::<_, io::Error>(buffer)
144                                    }
145                                })
146                                .await
147                                .map_err(io::Error::other)?
148                        }
149                        object_store::GetResultPayload::Stream(mut byte_stream) => {
150                            while let Some(bytes) = byte_stream.next().await {
151                                buffer.extend_from_slice(&bytes?);
152                            }
153
154                            vortex_ensure!(
155                                buffer.len() == len,
156                                "Object store stream returned {} bytes but expected {} bytes (range: {:?})",
157                                buffer.len(),
158                                len,
159                                range
160                            );
161
162                            buffer
163                        }
164                    };
165
166                    Ok(buffer.freeze())
167                }
168                .in_current_span();
169
170                async move { req.resolve(Compat::new(read).await) }
171            })
172            .map(move |f| self2.handle.spawn(f))
173            .buffer_unordered(CONCURRENCY)
174            .collect::<()>()
175            .boxed()
176    }
177}