vortex_io/object_store/
read_at.rs1use std::io;
5use std::sync::Arc;
6
7use futures::FutureExt;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use object_store::GetOptions;
11use object_store::GetRange;
12use object_store::GetResultPayload;
13use object_store::ObjectStore;
14use object_store::ObjectStoreExt;
15use object_store::path::Path as ObjectPath;
16use vortex_array::buffer::BufferHandle;
17use vortex_array::memory::DefaultHostAllocator;
18use vortex_array::memory::HostAllocatorRef;
19use vortex_buffer::Alignment;
20use vortex_error::VortexError;
21use vortex_error::VortexResult;
22use vortex_error::vortex_ensure;
23
24use crate::CoalesceConfig;
25use crate::VortexReadAt;
26use crate::runtime::Handle;
27#[cfg(not(target_arch = "wasm32"))]
28use crate::std_file::read_exact_at;
29
30pub const DEFAULT_CONCURRENCY: usize = 192;
32
33pub struct ObjectStoreReadAt {
35 store: Arc<dyn ObjectStore>,
36 path: ObjectPath,
37 uri: Arc<str>,
38 handle: Handle,
39 allocator: HostAllocatorRef,
40 concurrency: usize,
41 coalesce_config: Option<CoalesceConfig>,
42}
43
44impl ObjectStoreReadAt {
45 pub fn new(store: Arc<dyn ObjectStore>, path: ObjectPath, handle: Handle) -> Self {
47 Self::new_with_allocator(store, path, handle, Arc::new(DefaultHostAllocator))
48 }
49
50 pub fn new_with_allocator(
52 store: Arc<dyn ObjectStore>,
53 path: ObjectPath,
54 handle: Handle,
55 allocator: HostAllocatorRef,
56 ) -> Self {
57 let uri = Arc::from(path.to_string());
58 Self {
59 store,
60 path,
61 uri,
62 handle,
63 allocator,
64 concurrency: DEFAULT_CONCURRENCY,
65 coalesce_config: Some(CoalesceConfig::object_storage()),
66 }
67 }
68
69 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
71 self.concurrency = concurrency;
72 self
73 }
74
75 pub fn with_coalesce_config(mut self, config: CoalesceConfig) -> Self {
77 self.coalesce_config = Some(config);
78 self
79 }
80}
81
82impl VortexReadAt for ObjectStoreReadAt {
83 fn uri(&self) -> Option<&Arc<str>> {
84 Some(&self.uri)
85 }
86
87 fn coalesce_config(&self) -> Option<CoalesceConfig> {
88 self.coalesce_config
89 }
90
91 fn concurrency(&self) -> usize {
92 self.concurrency
93 }
94
95 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
96 let store = Arc::clone(&self.store);
97 let path = self.path.clone();
98 async move {
99 store
100 .head(&path)
101 .await
102 .map(|h| h.size)
103 .map_err(VortexError::from)
104 }
105 .boxed()
106 }
107
108 fn read_at(
109 &self,
110 offset: u64,
111 length: usize,
112 alignment: Alignment,
113 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
114 let store = Arc::clone(&self.store);
115 let path = self.path.clone();
116 let handle = self.handle.clone();
117 let allocator = Arc::clone(&self.allocator);
118 let range = offset..(offset + length as u64);
119
120 async move {
121 let mut buffer = allocator.allocate(length, alignment)?;
122
123 let response = store
124 .get_opts(
125 &path,
126 GetOptions {
127 range: Some(GetRange::Bounded(range.clone())),
128 ..Default::default()
129 },
130 )
131 .await?;
132
133 let buffer = match response.payload {
134 #[cfg(not(target_arch = "wasm32"))]
135 GetResultPayload::File(file, _) => {
136 handle
137 .spawn_blocking(move || {
138 read_exact_at(&file, buffer.as_mut_slice(), range.start)?;
139 Ok::<_, io::Error>(buffer)
140 })
141 .await
142 .map_err(io::Error::other)?
143 }
144 #[cfg(target_arch = "wasm32")]
145 GetResultPayload::File(..) => {
146 unreachable!("File payload not supported on wasm32")
147 }
148 GetResultPayload::Stream(mut byte_stream) => {
149 let mut written = 0usize;
150 while let Some(bytes) = byte_stream.next().await {
151 let bytes = bytes?;
152 let end = written + bytes.len();
153 vortex_ensure!(
154 end <= length,
155 "Object store stream returned too many bytes: {} > expected {} (range: {:?})",
156 end,
157 length,
158 range
159 );
160 buffer.as_mut_slice()[written..end].copy_from_slice(&bytes);
161 written = end;
162 }
163
164 vortex_ensure!(
165 written == length,
166 "Object store stream returned {} bytes but expected {} bytes (range: {:?})",
167 written,
168 length,
169 range
170 );
171
172 buffer
173 }
174 };
175
176 Ok(BufferHandle::new_host(buffer.freeze()))
177 }
178 .boxed()
179 }
180}