Skip to main content

vortex_io/object_store/
read_at.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::sync::Arc;
6
7use futures::FutureExt;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use object_store::GetOptions;
11use object_store::GetRange;
12use object_store::GetResultPayload;
13use object_store::ObjectStore;
14use object_store::ObjectStoreExt;
15use object_store::path::Path as ObjectPath;
16use vortex_array::buffer::BufferHandle;
17use vortex_array::memory::DefaultHostAllocator;
18use vortex_array::memory::HostAllocatorRef;
19use vortex_buffer::Alignment;
20use vortex_error::VortexError;
21use vortex_error::VortexResult;
22use vortex_error::vortex_ensure;
23
24use crate::CoalesceConfig;
25use crate::VortexReadAt;
26use crate::runtime::Handle;
27#[cfg(not(target_arch = "wasm32"))]
28use crate::std_file::read_exact_at;
29
30/// Default number of concurrent requests to allow.
31pub const DEFAULT_CONCURRENCY: usize = 192;
32
33/// An object store backed I/O source.
34pub struct ObjectStoreReadAt {
35    store: Arc<dyn ObjectStore>,
36    path: ObjectPath,
37    uri: Arc<str>,
38    handle: Handle,
39    allocator: HostAllocatorRef,
40    concurrency: usize,
41    coalesce_config: Option<CoalesceConfig>,
42}
43
44impl ObjectStoreReadAt {
45    /// Create a new object store source.
46    pub fn new(store: Arc<dyn ObjectStore>, path: ObjectPath, handle: Handle) -> Self {
47        Self::new_with_allocator(store, path, handle, Arc::new(DefaultHostAllocator))
48    }
49
50    /// Create a new object store source with a custom writable buffer allocator.
51    pub fn new_with_allocator(
52        store: Arc<dyn ObjectStore>,
53        path: ObjectPath,
54        handle: Handle,
55        allocator: HostAllocatorRef,
56    ) -> Self {
57        let uri = Arc::from(path.to_string());
58        Self {
59            store,
60            path,
61            uri,
62            handle,
63            allocator,
64            concurrency: DEFAULT_CONCURRENCY,
65            coalesce_config: Some(CoalesceConfig::object_storage()),
66        }
67    }
68
69    /// Set the concurrency for this source.
70    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
71        self.concurrency = concurrency;
72        self
73    }
74
75    /// Set the coalesce config for this source.
76    pub fn with_coalesce_config(mut self, config: CoalesceConfig) -> Self {
77        self.coalesce_config = Some(config);
78        self
79    }
80}
81
82impl VortexReadAt for ObjectStoreReadAt {
83    fn uri(&self) -> Option<&Arc<str>> {
84        Some(&self.uri)
85    }
86
87    fn coalesce_config(&self) -> Option<CoalesceConfig> {
88        self.coalesce_config
89    }
90
91    fn concurrency(&self) -> usize {
92        self.concurrency
93    }
94
95    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
96        let store = Arc::clone(&self.store);
97        let path = self.path.clone();
98        async move {
99            store
100                .head(&path)
101                .await
102                .map(|h| h.size)
103                .map_err(VortexError::from)
104        }
105        .boxed()
106    }
107
108    fn read_at(
109        &self,
110        offset: u64,
111        length: usize,
112        alignment: Alignment,
113    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
114        let store = Arc::clone(&self.store);
115        let path = self.path.clone();
116        let handle = self.handle.clone();
117        let allocator = Arc::clone(&self.allocator);
118        let range = offset..(offset + length as u64);
119
120        // Requires to deal with borrowed lifetimes
121        let io_handle = handle.clone();
122
123        handle
124                .spawn_io(async move {
125                    let mut buffer = allocator.allocate(length, alignment)?;
126
127                    let response = store
128                        .get_opts(
129                            &path,
130                            GetOptions {
131                                range: Some(GetRange::Bounded(range.clone())),
132                                ..Default::default()
133                            },
134                        )
135                        .await?;
136
137                    let buffer = match response.payload {
138                        #[cfg(not(target_arch = "wasm32"))]
139                        GetResultPayload::File(file, _) => {
140                            io_handle
141                                .spawn_blocking(move || {
142                                    read_exact_at(&file, buffer.as_mut_slice(), range.start)?;
143                                    Ok::<_, io::Error>(buffer)
144                                })
145                                .await
146                                .map_err(io::Error::other)?
147                        }
148                        #[cfg(target_arch = "wasm32")]
149                        GetResultPayload::File(..) => {
150                            unreachable!("File payload not supported on wasm32")
151                        }
152                        GetResultPayload::Stream(mut byte_stream) => {
153                            let mut written = 0usize;
154                            while let Some(bytes) = byte_stream.next().await {
155                                let bytes = bytes?;
156                                let end = written + bytes.len();
157                                vortex_ensure!(
158                                    end <= length,
159                                    "Object store stream returned too many bytes: {} > expected {} (range: {:?})",
160                                    end,
161                                    length,
162                                    range
163                                );
164                                buffer.as_mut_slice()[written..end].copy_from_slice(&bytes);
165                                written = end;
166                            }
167
168                            vortex_ensure!(
169                                written == length,
170                                "Object store stream returned {} bytes but expected {} bytes (range: {:?})",
171                                written,
172                                length,
173                                range
174                            );
175
176                            buffer
177                        }
178                    };
179
180                    Ok(BufferHandle::new_host(buffer.freeze()))
181                })
182        .boxed()
183    }
184}
185
186#[cfg(test)]
187mod tests {
188
189    use std::sync::atomic::AtomicUsize;
190    use std::sync::atomic::Ordering;
191
192    use object_store::PutPayload;
193    use object_store::memory::InMemory;
194
195    use super::*;
196    use crate::runtime::AbortHandle;
197    use crate::runtime::AbortHandleRef;
198    use crate::runtime::Executor;
199
200    const TEST_DATA: &[u8] = b"object store test data";
201
202    #[derive(Default)]
203    struct CountingExecutor {
204        spawn_count: AtomicUsize,
205        spawn_io_count: AtomicUsize,
206    }
207
208    impl Executor for CountingExecutor {
209        fn spawn(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef {
210            self.spawn_count.fetch_add(1, Ordering::SeqCst);
211            TokioAbortHandle::new_handle(tokio::spawn(fut).abort_handle())
212        }
213
214        fn spawn_io(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef {
215            self.spawn_io_count.fetch_add(1, Ordering::SeqCst);
216            TokioAbortHandle::new_handle(tokio::spawn(fut).abort_handle())
217        }
218
219        fn spawn_cpu(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
220            TokioAbortHandle::new_handle(tokio::spawn(async move { task() }).abort_handle())
221        }
222
223        fn spawn_blocking_io(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
224            TokioAbortHandle::new_handle(tokio::task::spawn_blocking(task).abort_handle())
225        }
226    }
227
228    struct TokioAbortHandle(tokio::task::AbortHandle);
229
230    impl TokioAbortHandle {
231        fn new_handle(handle: tokio::task::AbortHandle) -> AbortHandleRef {
232            Box::new(Self(handle))
233        }
234    }
235
236    impl AbortHandle for TokioAbortHandle {
237        fn abort(self: Box<Self>) {
238            self.0.abort();
239        }
240    }
241
242    #[tokio::test]
243    async fn read_at_uses_spawn_io() -> anyhow::Result<()> {
244        let executor = Arc::new(CountingExecutor::default());
245        let runtime = Arc::clone(&executor) as Arc<dyn Executor>;
246        let handle = Handle::new(Arc::downgrade(&runtime));
247
248        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
249        let path = ObjectPath::from("test.bin");
250        store.put(&path, PutPayload::from_static(TEST_DATA)).await?;
251
252        let reader = ObjectStoreReadAt::new(store, path, handle);
253        let buffer = reader.read_at(7, 5, Alignment::new(1)).await?;
254
255        assert_eq!(buffer.to_host().await.as_slice(), b"store");
256        assert_eq!(executor.spawn_io_count.load(Ordering::SeqCst), 1);
257        assert_eq!(executor.spawn_count.load(Ordering::SeqCst), 0);
258
259        Ok(())
260    }
261}