vortex_io/object_store/
read_at.rs1use std::io;
5use std::sync::Arc;
6
7use futures::FutureExt;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use object_store::GetOptions;
11use object_store::GetRange;
12use object_store::GetResultPayload;
13use object_store::ObjectStore;
14use object_store::ObjectStoreExt;
15use object_store::path::Path as ObjectPath;
16use vortex_array::buffer::BufferHandle;
17use vortex_array::memory::DefaultHostAllocator;
18use vortex_array::memory::HostAllocatorRef;
19use vortex_buffer::Alignment;
20use vortex_error::VortexError;
21use vortex_error::VortexResult;
22use vortex_error::vortex_ensure;
23
24use crate::CoalesceConfig;
25use crate::VortexReadAt;
26use crate::runtime::Handle;
27#[cfg(not(target_arch = "wasm32"))]
28use crate::std_file::read_exact_at;
29
30pub const DEFAULT_CONCURRENCY: usize = 192;
32
33pub struct ObjectStoreReadAt {
35 store: Arc<dyn ObjectStore>,
36 path: ObjectPath,
37 uri: Arc<str>,
38 handle: Handle,
39 allocator: HostAllocatorRef,
40 concurrency: usize,
41 coalesce_config: Option<CoalesceConfig>,
42}
43
44impl ObjectStoreReadAt {
45 pub fn new(store: Arc<dyn ObjectStore>, path: ObjectPath, handle: Handle) -> Self {
47 Self::new_with_allocator(store, path, handle, Arc::new(DefaultHostAllocator))
48 }
49
50 pub fn new_with_allocator(
52 store: Arc<dyn ObjectStore>,
53 path: ObjectPath,
54 handle: Handle,
55 allocator: HostAllocatorRef,
56 ) -> Self {
57 let uri = Arc::from(path.to_string());
58 Self {
59 store,
60 path,
61 uri,
62 handle,
63 allocator,
64 concurrency: DEFAULT_CONCURRENCY,
65 coalesce_config: Some(CoalesceConfig::object_storage()),
66 }
67 }
68
69 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
71 self.concurrency = concurrency;
72 self
73 }
74
75 pub fn with_coalesce_config(mut self, config: CoalesceConfig) -> Self {
77 self.coalesce_config = Some(config);
78 self
79 }
80}
81
82impl VortexReadAt for ObjectStoreReadAt {
83 fn uri(&self) -> Option<&Arc<str>> {
84 Some(&self.uri)
85 }
86
87 fn coalesce_config(&self) -> Option<CoalesceConfig> {
88 self.coalesce_config
89 }
90
91 fn concurrency(&self) -> usize {
92 self.concurrency
93 }
94
95 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
96 let store = Arc::clone(&self.store);
97 let path = self.path.clone();
98 async move {
99 store
100 .head(&path)
101 .await
102 .map(|h| h.size)
103 .map_err(VortexError::from)
104 }
105 .boxed()
106 }
107
108 fn read_at(
109 &self,
110 offset: u64,
111 length: usize,
112 alignment: Alignment,
113 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
114 let store = Arc::clone(&self.store);
115 let path = self.path.clone();
116 let handle = self.handle.clone();
117 let allocator = Arc::clone(&self.allocator);
118 let range = offset..(offset + length as u64);
119
120 let io_handle = handle.clone();
122
123 handle
124 .spawn_io(async move {
125 let mut buffer = allocator.allocate(length, alignment)?;
126
127 let response = store
128 .get_opts(
129 &path,
130 GetOptions {
131 range: Some(GetRange::Bounded(range.clone())),
132 ..Default::default()
133 },
134 )
135 .await?;
136
137 let buffer = match response.payload {
138 #[cfg(not(target_arch = "wasm32"))]
139 GetResultPayload::File(file, _) => {
140 io_handle
141 .spawn_blocking(move || {
142 read_exact_at(&file, buffer.as_mut_slice(), range.start)?;
143 Ok::<_, io::Error>(buffer)
144 })
145 .await
146 .map_err(io::Error::other)?
147 }
148 #[cfg(target_arch = "wasm32")]
149 GetResultPayload::File(..) => {
150 unreachable!("File payload not supported on wasm32")
151 }
152 GetResultPayload::Stream(mut byte_stream) => {
153 let mut written = 0usize;
154 while let Some(bytes) = byte_stream.next().await {
155 let bytes = bytes?;
156 let end = written + bytes.len();
157 vortex_ensure!(
158 end <= length,
159 "Object store stream returned too many bytes: {} > expected {} (range: {:?})",
160 end,
161 length,
162 range
163 );
164 buffer.as_mut_slice()[written..end].copy_from_slice(&bytes);
165 written = end;
166 }
167
168 vortex_ensure!(
169 written == length,
170 "Object store stream returned {} bytes but expected {} bytes (range: {:?})",
171 written,
172 length,
173 range
174 );
175
176 buffer
177 }
178 };
179
180 Ok(BufferHandle::new_host(buffer.freeze()))
181 })
182 .boxed()
183 }
184}
185
186#[cfg(test)]
187mod tests {
188
189 use std::sync::atomic::AtomicUsize;
190 use std::sync::atomic::Ordering;
191
192 use object_store::PutPayload;
193 use object_store::memory::InMemory;
194
195 use super::*;
196 use crate::runtime::AbortHandle;
197 use crate::runtime::AbortHandleRef;
198 use crate::runtime::Executor;
199
200 const TEST_DATA: &[u8] = b"object store test data";
201
202 #[derive(Default)]
203 struct CountingExecutor {
204 spawn_count: AtomicUsize,
205 spawn_io_count: AtomicUsize,
206 }
207
208 impl Executor for CountingExecutor {
209 fn spawn(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef {
210 self.spawn_count.fetch_add(1, Ordering::SeqCst);
211 TokioAbortHandle::new_handle(tokio::spawn(fut).abort_handle())
212 }
213
214 fn spawn_io(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef {
215 self.spawn_io_count.fetch_add(1, Ordering::SeqCst);
216 TokioAbortHandle::new_handle(tokio::spawn(fut).abort_handle())
217 }
218
219 fn spawn_cpu(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
220 TokioAbortHandle::new_handle(tokio::spawn(async move { task() }).abort_handle())
221 }
222
223 fn spawn_blocking_io(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
224 TokioAbortHandle::new_handle(tokio::task::spawn_blocking(task).abort_handle())
225 }
226 }
227
228 struct TokioAbortHandle(tokio::task::AbortHandle);
229
230 impl TokioAbortHandle {
231 fn new_handle(handle: tokio::task::AbortHandle) -> AbortHandleRef {
232 Box::new(Self(handle))
233 }
234 }
235
236 impl AbortHandle for TokioAbortHandle {
237 fn abort(self: Box<Self>) {
238 self.0.abort();
239 }
240 }
241
242 #[tokio::test]
243 async fn read_at_uses_spawn_io() -> anyhow::Result<()> {
244 let executor = Arc::new(CountingExecutor::default());
245 let runtime = Arc::clone(&executor) as Arc<dyn Executor>;
246 let handle = Handle::new(Arc::downgrade(&runtime));
247
248 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
249 let path = ObjectPath::from("test.bin");
250 store.put(&path, PutPayload::from_static(TEST_DATA)).await?;
251
252 let reader = ObjectStoreReadAt::new(store, path, handle);
253 let buffer = reader.read_at(7, 5, Alignment::new(1)).await?;
254
255 assert_eq!(buffer.to_host().await.as_slice(), b"store");
256 assert_eq!(executor.spawn_io_count.load(Ordering::SeqCst), 1);
257 assert_eq!(executor.spawn_count.load(Ordering::SeqCst), 0);
258
259 Ok(())
260 }
261}