vortex_io/object_store/
read_at.rs1use std::io;
5use std::sync::Arc;
6
7use futures::FutureExt;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use object_store::GetOptions;
11use object_store::GetRange;
12use object_store::GetResultPayload;
13use object_store::ObjectStore;
14use object_store::path::Path as ObjectPath;
15use vortex_array::buffer::BufferHandle;
16use vortex_buffer::Alignment;
17use vortex_buffer::ByteBufferMut;
18use vortex_error::VortexError;
19use vortex_error::VortexResult;
20use vortex_error::vortex_ensure;
21
22use crate::CoalesceConfig;
23use crate::VortexReadAt;
24use crate::runtime::Handle;
25#[cfg(not(target_arch = "wasm32"))]
26use crate::std_file::read_exact_at;
27
28pub const DEFAULT_CONCURRENCY: usize = 192;
30
31pub struct ObjectStoreReadAt {
33 store: Arc<dyn ObjectStore>,
34 path: ObjectPath,
35 uri: Arc<str>,
36 handle: Handle,
37 concurrency: usize,
38 coalesce_config: Option<CoalesceConfig>,
39}
40
41impl ObjectStoreReadAt {
42 pub fn new(store: Arc<dyn ObjectStore>, path: ObjectPath, handle: Handle) -> Self {
44 let uri = Arc::from(path.to_string());
45 Self {
46 store,
47 path,
48 uri,
49 handle,
50 concurrency: DEFAULT_CONCURRENCY,
51 coalesce_config: Some(CoalesceConfig::object_storage()),
52 }
53 }
54
55 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
57 self.concurrency = concurrency;
58 self
59 }
60
61 pub fn with_coalesce_config(mut self, config: CoalesceConfig) -> Self {
63 self.coalesce_config = Some(config);
64 self
65 }
66
67 pub fn with_some_coalesce_config(mut self, config: Option<CoalesceConfig>) -> Self {
69 self.coalesce_config = config;
70 self
71 }
72}
73
74impl VortexReadAt for ObjectStoreReadAt {
75 fn uri(&self) -> Option<&Arc<str>> {
76 Some(&self.uri)
77 }
78
79 fn coalesce_config(&self) -> Option<CoalesceConfig> {
80 self.coalesce_config
81 }
82
83 fn concurrency(&self) -> usize {
84 self.concurrency
85 }
86
87 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
88 let store = self.store.clone();
89 let path = self.path.clone();
90 async move {
91 store
92 .head(&path)
93 .await
94 .map(|h| h.size)
95 .map_err(VortexError::from)
96 }
97 .boxed()
98 }
99
100 fn read_at(
101 &self,
102 offset: u64,
103 length: usize,
104 alignment: Alignment,
105 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
106 let store = self.store.clone();
107 let path = self.path.clone();
108 let handle = self.handle.clone();
109 let range = offset..(offset + length as u64);
110
111 async move {
112 let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment);
113
114 let response = store
115 .get_opts(
116 &path,
117 GetOptions {
118 range: Some(GetRange::Bounded(range.clone())),
119 ..Default::default()
120 },
121 )
122 .await?;
123
124 let buffer = match response.payload {
125 #[cfg(not(target_arch = "wasm32"))]
126 GetResultPayload::File(file, _) => {
127 unsafe { buffer.set_len(length) };
128
129 handle
130 .spawn_blocking(move || {
131 read_exact_at(&file, &mut buffer, range.start)?;
132 Ok::<_, io::Error>(buffer)
133 })
134 .await
135 .map_err(io::Error::other)?
136 }
137 #[cfg(target_arch = "wasm32")]
138 GetResultPayload::File(..) => {
139 unreachable!("File payload not supported on wasm32")
140 }
141 GetResultPayload::Stream(mut byte_stream) => {
142 while let Some(bytes) = byte_stream.next().await {
143 buffer.extend_from_slice(&bytes?);
144 }
145
146 vortex_ensure!(
147 buffer.len() == length,
148 "Object store stream returned {} bytes but expected {} bytes (range: {:?})",
149 buffer.len(),
150 length,
151 range
152 );
153
154 buffer
155 }
156 };
157
158 Ok(BufferHandle::new_host(buffer.freeze()))
159 }
160 .boxed()
161 }
162}