Skip to main content

vortex_io/object_store/
read_at.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::sync::Arc;
6
7use futures::FutureExt;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use object_store::GetOptions;
11use object_store::GetRange;
12use object_store::GetResultPayload;
13use object_store::ObjectStore;
14use object_store::path::Path as ObjectPath;
15use vortex_array::buffer::BufferHandle;
16use vortex_buffer::Alignment;
17use vortex_buffer::ByteBufferMut;
18use vortex_error::VortexError;
19use vortex_error::VortexResult;
20use vortex_error::vortex_ensure;
21
22use crate::CoalesceConfig;
23use crate::VortexReadAt;
24use crate::runtime::Handle;
25#[cfg(not(target_arch = "wasm32"))]
26use crate::std_file::read_exact_at;
27
28/// Default number of concurrent requests to allow.
29pub const DEFAULT_CONCURRENCY: usize = 192;
30
31/// An object store backed I/O source.
32pub struct ObjectStoreReadAt {
33    store: Arc<dyn ObjectStore>,
34    path: ObjectPath,
35    uri: Arc<str>,
36    handle: Handle,
37    concurrency: usize,
38    coalesce_config: Option<CoalesceConfig>,
39}
40
41impl ObjectStoreReadAt {
42    /// Create a new object store source.
43    pub fn new(store: Arc<dyn ObjectStore>, path: ObjectPath, handle: Handle) -> Self {
44        let uri = Arc::from(path.to_string());
45        Self {
46            store,
47            path,
48            uri,
49            handle,
50            concurrency: DEFAULT_CONCURRENCY,
51            coalesce_config: Some(CoalesceConfig::object_storage()),
52        }
53    }
54
55    /// Set the concurrency for this source.
56    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
57        self.concurrency = concurrency;
58        self
59    }
60
61    /// Set the coalesce config for this source.
62    pub fn with_coalesce_config(mut self, config: CoalesceConfig) -> Self {
63        self.coalesce_config = Some(config);
64        self
65    }
66}
67
68impl VortexReadAt for ObjectStoreReadAt {
69    fn uri(&self) -> Option<&Arc<str>> {
70        Some(&self.uri)
71    }
72
73    fn coalesce_config(&self) -> Option<CoalesceConfig> {
74        self.coalesce_config
75    }
76
77    fn concurrency(&self) -> usize {
78        self.concurrency
79    }
80
81    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
82        let store = self.store.clone();
83        let path = self.path.clone();
84        async move {
85            store
86                .head(&path)
87                .await
88                .map(|h| h.size)
89                .map_err(VortexError::from)
90        }
91        .boxed()
92    }
93
94    fn read_at(
95        &self,
96        offset: u64,
97        length: usize,
98        alignment: Alignment,
99    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
100        let store = self.store.clone();
101        let path = self.path.clone();
102        let handle = self.handle.clone();
103        let range = offset..(offset + length as u64);
104
105        async move {
106            let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment);
107
108            let response = store
109                .get_opts(
110                    &path,
111                    GetOptions {
112                        range: Some(GetRange::Bounded(range.clone())),
113                        ..Default::default()
114                    },
115                )
116                .await?;
117
118            let buffer = match response.payload {
119                #[cfg(not(target_arch = "wasm32"))]
120                GetResultPayload::File(file, _) => {
121                    unsafe { buffer.set_len(length) };
122
123                    handle
124                        .spawn_blocking(move || {
125                            read_exact_at(&file, &mut buffer, range.start)?;
126                            Ok::<_, io::Error>(buffer)
127                        })
128                        .await
129                        .map_err(io::Error::other)?
130                }
131                #[cfg(target_arch = "wasm32")]
132                GetResultPayload::File(..) => {
133                    unreachable!("File payload not supported on wasm32")
134                }
135                GetResultPayload::Stream(mut byte_stream) => {
136                    while let Some(bytes) = byte_stream.next().await {
137                        buffer.extend_from_slice(&bytes?);
138                    }
139
140                    vortex_ensure!(
141                        buffer.len() == length,
142                        "Object store stream returned {} bytes but expected {} bytes (range: {:?})",
143                        buffer.len(),
144                        length,
145                        range
146                    );
147
148                    buffer
149                }
150            };
151
152            Ok(BufferHandle::new_host(buffer.freeze()))
153        }
154        .boxed()
155    }
156}