Skip to main content

vortex_io/object_store/
read_at.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::sync::Arc;
6
7use futures::FutureExt;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use object_store::GetOptions;
11use object_store::GetRange;
12use object_store::GetResultPayload;
13use object_store::ObjectStore;
14use object_store::ObjectStoreExt;
15use object_store::path::Path as ObjectPath;
16use vortex_array::buffer::BufferHandle;
17use vortex_array::memory::DefaultHostAllocator;
18use vortex_array::memory::HostAllocatorRef;
19use vortex_buffer::Alignment;
20use vortex_error::VortexError;
21use vortex_error::VortexResult;
22use vortex_error::vortex_ensure;
23
24use crate::CoalesceConfig;
25use crate::VortexReadAt;
26use crate::runtime::Handle;
27#[cfg(not(target_arch = "wasm32"))]
28use crate::std_file::read_exact_at;
29
30/// Default number of concurrent requests to allow.
31pub const DEFAULT_CONCURRENCY: usize = 192;
32
33/// An object store backed I/O source.
34pub struct ObjectStoreReadAt {
35    store: Arc<dyn ObjectStore>,
36    path: ObjectPath,
37    uri: Arc<str>,
38    handle: Handle,
39    allocator: HostAllocatorRef,
40    concurrency: usize,
41    coalesce_config: Option<CoalesceConfig>,
42}
43
44impl ObjectStoreReadAt {
45    /// Create a new object store source.
46    pub fn new(store: Arc<dyn ObjectStore>, path: ObjectPath, handle: Handle) -> Self {
47        Self::new_with_allocator(store, path, handle, Arc::new(DefaultHostAllocator))
48    }
49
50    /// Create a new object store source with a custom writable buffer allocator.
51    pub fn new_with_allocator(
52        store: Arc<dyn ObjectStore>,
53        path: ObjectPath,
54        handle: Handle,
55        allocator: HostAllocatorRef,
56    ) -> Self {
57        let uri = Arc::from(path.to_string());
58        Self {
59            store,
60            path,
61            uri,
62            handle,
63            allocator,
64            concurrency: DEFAULT_CONCURRENCY,
65            coalesce_config: Some(CoalesceConfig::object_storage()),
66        }
67    }
68
69    /// Set the concurrency for this source.
70    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
71        self.concurrency = concurrency;
72        self
73    }
74
75    /// Set the coalesce config for this source.
76    pub fn with_coalesce_config(mut self, config: CoalesceConfig) -> Self {
77        self.coalesce_config = Some(config);
78        self
79    }
80}
81
82impl VortexReadAt for ObjectStoreReadAt {
83    fn uri(&self) -> Option<&Arc<str>> {
84        Some(&self.uri)
85    }
86
87    fn coalesce_config(&self) -> Option<CoalesceConfig> {
88        self.coalesce_config
89    }
90
91    fn concurrency(&self) -> usize {
92        self.concurrency
93    }
94
95    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
96        let store = Arc::clone(&self.store);
97        let path = self.path.clone();
98        async move {
99            store
100                .head(&path)
101                .await
102                .map(|h| h.size)
103                .map_err(VortexError::from)
104        }
105        .boxed()
106    }
107
108    fn read_at(
109        &self,
110        offset: u64,
111        length: usize,
112        alignment: Alignment,
113    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
114        let store = Arc::clone(&self.store);
115        let path = self.path.clone();
116        let handle = self.handle.clone();
117        let allocator = Arc::clone(&self.allocator);
118        let range = offset..(offset + length as u64);
119
120        async move {
121            let mut buffer = allocator.allocate(length, alignment)?;
122
123            let response = store
124                .get_opts(
125                    &path,
126                    GetOptions {
127                        range: Some(GetRange::Bounded(range.clone())),
128                        ..Default::default()
129                    },
130                )
131                .await?;
132
133            let buffer = match response.payload {
134                #[cfg(not(target_arch = "wasm32"))]
135                GetResultPayload::File(file, _) => {
136                    handle
137                        .spawn_blocking(move || {
138                            read_exact_at(&file, buffer.as_mut_slice(), range.start)?;
139                            Ok::<_, io::Error>(buffer)
140                        })
141                        .await
142                        .map_err(io::Error::other)?
143                }
144                #[cfg(target_arch = "wasm32")]
145                GetResultPayload::File(..) => {
146                    unreachable!("File payload not supported on wasm32")
147                }
148                GetResultPayload::Stream(mut byte_stream) => {
149                    let mut written = 0usize;
150                    while let Some(bytes) = byte_stream.next().await {
151                        let bytes = bytes?;
152                        let end = written + bytes.len();
153                        vortex_ensure!(
154                            end <= length,
155                            "Object store stream returned too many bytes: {} > expected {} (range: {:?})",
156                            end,
157                            length,
158                            range
159                        );
160                        buffer.as_mut_slice()[written..end].copy_from_slice(&bytes);
161                        written = end;
162                    }
163
164                    vortex_ensure!(
165                        written == length,
166                        "Object store stream returned {} bytes but expected {} bytes (range: {:?})",
167                        written,
168                        length,
169                        range
170                    );
171
172                    buffer
173                }
174            };
175
176            Ok(BufferHandle::new_host(buffer.freeze()))
177        }
178        .boxed()
179    }
180}