Skip to main content

vortex_io/object_store/
read_at.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::sync::Arc;
6
7use futures::FutureExt;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use object_store::GetOptions;
11use object_store::GetRange;
12use object_store::GetResultPayload;
13use object_store::ObjectStore;
14use object_store::path::Path as ObjectPath;
15use vortex_array::buffer::BufferHandle;
16use vortex_buffer::Alignment;
17use vortex_buffer::ByteBufferMut;
18use vortex_error::VortexError;
19use vortex_error::VortexResult;
20use vortex_error::vortex_ensure;
21
22use crate::CoalesceConfig;
23use crate::VortexReadAt;
24use crate::runtime::Handle;
25#[cfg(not(target_arch = "wasm32"))]
26use crate::std_file::read_exact_at;
27
28/// Default number of concurrent requests to allow.
29pub const DEFAULT_CONCURRENCY: usize = 192;
30
31/// An object store backed I/O source.
32pub struct ObjectStoreReadAt {
33    store: Arc<dyn ObjectStore>,
34    path: ObjectPath,
35    uri: Arc<str>,
36    handle: Handle,
37    concurrency: usize,
38    coalesce_config: Option<CoalesceConfig>,
39}
40
41impl ObjectStoreReadAt {
42    /// Create a new object store source.
43    pub fn new(store: Arc<dyn ObjectStore>, path: ObjectPath, handle: Handle) -> Self {
44        let uri = Arc::from(path.to_string());
45        Self {
46            store,
47            path,
48            uri,
49            handle,
50            concurrency: DEFAULT_CONCURRENCY,
51            coalesce_config: Some(CoalesceConfig::object_storage()),
52        }
53    }
54
55    /// Set the concurrency for this source.
56    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
57        self.concurrency = concurrency;
58        self
59    }
60
61    /// Set the coalesce config for this source.
62    pub fn with_coalesce_config(mut self, config: CoalesceConfig) -> Self {
63        self.coalesce_config = Some(config);
64        self
65    }
66
67    /// Set an optional coalesce config for this source.
68    pub fn with_some_coalesce_config(mut self, config: Option<CoalesceConfig>) -> Self {
69        self.coalesce_config = config;
70        self
71    }
72}
73
74impl VortexReadAt for ObjectStoreReadAt {
75    fn uri(&self) -> Option<&Arc<str>> {
76        Some(&self.uri)
77    }
78
79    fn coalesce_config(&self) -> Option<CoalesceConfig> {
80        self.coalesce_config
81    }
82
83    fn concurrency(&self) -> usize {
84        self.concurrency
85    }
86
87    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
88        let store = self.store.clone();
89        let path = self.path.clone();
90        async move {
91            store
92                .head(&path)
93                .await
94                .map(|h| h.size)
95                .map_err(VortexError::from)
96        }
97        .boxed()
98    }
99
100    fn read_at(
101        &self,
102        offset: u64,
103        length: usize,
104        alignment: Alignment,
105    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
106        let store = self.store.clone();
107        let path = self.path.clone();
108        let handle = self.handle.clone();
109        let range = offset..(offset + length as u64);
110
111        async move {
112            let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment);
113
114            let response = store
115                .get_opts(
116                    &path,
117                    GetOptions {
118                        range: Some(GetRange::Bounded(range.clone())),
119                        ..Default::default()
120                    },
121                )
122                .await?;
123
124            let buffer = match response.payload {
125                #[cfg(not(target_arch = "wasm32"))]
126                GetResultPayload::File(file, _) => {
127                    unsafe { buffer.set_len(length) };
128
129                    handle
130                        .spawn_blocking(move || {
131                            read_exact_at(&file, &mut buffer, range.start)?;
132                            Ok::<_, io::Error>(buffer)
133                        })
134                        .await
135                        .map_err(io::Error::other)?
136                }
137                #[cfg(target_arch = "wasm32")]
138                GetResultPayload::File(..) => {
139                    unreachable!("File payload not supported on wasm32")
140                }
141                GetResultPayload::Stream(mut byte_stream) => {
142                    while let Some(bytes) = byte_stream.next().await {
143                        buffer.extend_from_slice(&bytes?);
144                    }
145
146                    vortex_ensure!(
147                        buffer.len() == length,
148                        "Object store stream returned {} bytes but expected {} bytes (range: {:?})",
149                        buffer.len(),
150                        length,
151                        range
152                    );
153
154                    buffer
155                }
156            };
157
158            Ok(BufferHandle::new_host(buffer.freeze()))
159        }
160        .boxed()
161    }
162}