vortex_io/file/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::sync::Arc;
6
7use async_compat::Compat;
8use futures::FutureExt;
9use futures::StreamExt;
10use futures::future::BoxFuture;
11use futures::stream::BoxStream;
12use tracing::Instrument;
13use vortex_buffer::ByteBufferMut;
14use vortex_error::VortexError;
15use vortex_error::VortexResult;
16use vortex_error::vortex_ensure;
17
18use crate::file::IoRequest;
19use crate::file::read::CoalesceWindow;
20use crate::file::read::IntoReadSource;
21use crate::file::read::ReadSource;
22use crate::file::read::ReadSourceRef;
23#[cfg(not(target_arch = "wasm32"))]
24use crate::file::std_file::read_exact_at;
25use crate::runtime::Handle;
26
27const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow {
28    distance: 1024 * 1024,      // 1 MB
29    max_size: 16 * 1024 * 1024, // 16 MB
30};
31const CONCURRENCY: usize = 192; // Number of concurrent requests to allow.
32
33pub struct ObjectStoreReadSource {
34    store: Arc<dyn object_store::ObjectStore>,
35    path: object_store::path::Path,
36    uri: Arc<str>,
37    concurrency: usize,
38    coalesce_window: Option<CoalesceWindow>,
39}
40
41impl ObjectStoreReadSource {
42    pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
43        let uri = Arc::from(path.to_string());
44        Self {
45            store,
46            path,
47            uri,
48            concurrency: CONCURRENCY,
49            coalesce_window: Some(COALESCING_WINDOW),
50        }
51    }
52
53    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
54        self.concurrency = concurrency;
55        self
56    }
57
58    pub fn with_coalesce_window(mut self, window: CoalesceWindow) -> Self {
59        self.coalesce_window = Some(window);
60        self
61    }
62
63    pub fn with_some_coalesce_window(mut self, window: Option<CoalesceWindow>) -> Self {
64        self.coalesce_window = window;
65        self
66    }
67}
68
69impl IntoReadSource for ObjectStoreReadSource {
70    fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
71        Ok(Arc::new(ObjectStoreIoSource { io: self, handle }))
72    }
73}
74
75struct ObjectStoreIoSource {
76    io: ObjectStoreReadSource,
77    handle: Handle,
78}
79
80impl ReadSource for ObjectStoreIoSource {
81    fn uri(&self) -> &Arc<str> {
82        &self.io.uri
83    }
84
85    fn coalesce_window(&self) -> Option<CoalesceWindow> {
86        self.io.coalesce_window
87    }
88
89    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
90        let store = self.io.store.clone();
91        let path = self.io.path.clone();
92        Compat::new(async move {
93            store
94                .head(&path)
95                .await
96                .map(|h| h.size)
97                .map_err(VortexError::from)
98        })
99        .boxed()
100    }
101
102    fn drive_send(
103        self: Arc<Self>,
104        requests: BoxStream<'static, IoRequest>,
105    ) -> BoxFuture<'static, ()> {
106        let self2 = self.clone();
107        let concurrency = self.io.concurrency;
108        requests
109            .map(move |req| {
110                let handle = self.handle.clone();
111                let store = self.io.store.clone();
112                let path = self.io.path.clone();
113
114                let len = req.len();
115                let range = req.range();
116                let alignment = req.alignment();
117
118                let read = async move {
119                    // Instead of calling `ObjectStore::get_range`, we expand the implementation and run it
120                    // ourselves to avoid a second copy to align the buffer. Instead, we can write directly
121                    // into the aligned buffer.
122                    let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
123
124                    let response = store
125                        .get_opts(
126                            &path,
127                            object_store::GetOptions {
128                                range: Some(object_store::GetRange::Bounded(range.clone())),
129                                ..Default::default()
130                            },
131                        )
132                        .await?;
133
134                    let buffer = match response.payload {
135                        object_store::GetResultPayload::File(file, _) => {
136                            // SAFETY: We're setting the length to the exact size we're about to read.
137                            // The read_exact_at call will either fill the entire buffer or return an error,
138                            // ensuring no uninitialized memory is exposed.
139                            unsafe { buffer.set_len(len) };
140
141                            handle
142                                .spawn_blocking(move || {
143                                    read_exact_at(&file, &mut buffer, range.start)?;
144                                    Ok::<_, io::Error>(buffer)
145                                })
146                                .await
147                                .map_err(io::Error::other)?
148                        }
149                        object_store::GetResultPayload::Stream(mut byte_stream) => {
150                            while let Some(bytes) = byte_stream.next().await {
151                                buffer.extend_from_slice(&bytes?);
152                            }
153
154                            vortex_ensure!(
155                                buffer.len() == len,
156                                "Object store stream returned {} bytes but expected {} bytes (range: {:?})",
157                                buffer.len(),
158                                len,
159                                range
160                            );
161
162                            buffer
163                        }
164                    };
165
166                    Ok(buffer.freeze())
167                }
168                .in_current_span();
169
170                async move { req.resolve(Compat::new(read).await) }
171            })
172            .map(move |f| self2.handle.spawn(f))
173            .buffer_unordered(concurrency)
174            .collect::<()>()
175            .boxed()
176    }
177}