vortex_io/object_store/
read_at.rs1use std::io;
5use std::sync::Arc;
6
7use futures::FutureExt;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use object_store::GetOptions;
11use object_store::GetRange;
12use object_store::GetResultPayload;
13use object_store::ObjectStore;
14use object_store::path::Path as ObjectPath;
15use vortex_array::buffer::BufferHandle;
16use vortex_buffer::Alignment;
17use vortex_buffer::ByteBufferMut;
18use vortex_error::VortexError;
19use vortex_error::VortexResult;
20use vortex_error::vortex_ensure;
21
22use crate::CoalesceConfig;
23use crate::VortexReadAt;
24use crate::runtime::Handle;
25#[cfg(not(target_arch = "wasm32"))]
26use crate::std_file::read_exact_at;
27
28pub const DEFAULT_CONCURRENCY: usize = 192;
30
31pub struct ObjectStoreReadAt {
33 store: Arc<dyn ObjectStore>,
34 path: ObjectPath,
35 uri: Arc<str>,
36 handle: Handle,
37 concurrency: usize,
38 coalesce_config: Option<CoalesceConfig>,
39}
40
41impl ObjectStoreReadAt {
42 pub fn new(store: Arc<dyn ObjectStore>, path: ObjectPath, handle: Handle) -> Self {
44 let uri = Arc::from(path.to_string());
45 Self {
46 store,
47 path,
48 uri,
49 handle,
50 concurrency: DEFAULT_CONCURRENCY,
51 coalesce_config: Some(CoalesceConfig::object_storage()),
52 }
53 }
54
55 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
57 self.concurrency = concurrency;
58 self
59 }
60
61 pub fn with_coalesce_config(mut self, config: CoalesceConfig) -> Self {
63 self.coalesce_config = Some(config);
64 self
65 }
66}
67
68impl VortexReadAt for ObjectStoreReadAt {
69 fn uri(&self) -> Option<&Arc<str>> {
70 Some(&self.uri)
71 }
72
73 fn coalesce_config(&self) -> Option<CoalesceConfig> {
74 self.coalesce_config
75 }
76
77 fn concurrency(&self) -> usize {
78 self.concurrency
79 }
80
81 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
82 let store = self.store.clone();
83 let path = self.path.clone();
84 async move {
85 store
86 .head(&path)
87 .await
88 .map(|h| h.size)
89 .map_err(VortexError::from)
90 }
91 .boxed()
92 }
93
94 fn read_at(
95 &self,
96 offset: u64,
97 length: usize,
98 alignment: Alignment,
99 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
100 let store = self.store.clone();
101 let path = self.path.clone();
102 let handle = self.handle.clone();
103 let range = offset..(offset + length as u64);
104
105 async move {
106 let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment);
107
108 let response = store
109 .get_opts(
110 &path,
111 GetOptions {
112 range: Some(GetRange::Bounded(range.clone())),
113 ..Default::default()
114 },
115 )
116 .await?;
117
118 let buffer = match response.payload {
119 #[cfg(not(target_arch = "wasm32"))]
120 GetResultPayload::File(file, _) => {
121 unsafe { buffer.set_len(length) };
122
123 handle
124 .spawn_blocking(move || {
125 read_exact_at(&file, &mut buffer, range.start)?;
126 Ok::<_, io::Error>(buffer)
127 })
128 .await
129 .map_err(io::Error::other)?
130 }
131 #[cfg(target_arch = "wasm32")]
132 GetResultPayload::File(..) => {
133 unreachable!("File payload not supported on wasm32")
134 }
135 GetResultPayload::Stream(mut byte_stream) => {
136 while let Some(bytes) = byte_stream.next().await {
137 buffer.extend_from_slice(&bytes?);
138 }
139
140 vortex_ensure!(
141 buffer.len() == length,
142 "Object store stream returned {} bytes but expected {} bytes (range: {:?})",
143 buffer.len(),
144 length,
145 range
146 );
147
148 buffer
149 }
150 };
151
152 Ok(BufferHandle::new_host(buffer.freeze()))
153 }
154 .boxed()
155 }
156}