vortex_io/file/
object_store.rs1use std::io;
5use std::sync::Arc;
6
7use async_compat::Compat;
8use futures::FutureExt;
9use futures::StreamExt;
10use futures::future::BoxFuture;
11use futures::stream::BoxStream;
12use tracing::Instrument;
13use vortex_buffer::ByteBufferMut;
14use vortex_error::VortexError;
15use vortex_error::VortexResult;
16use vortex_error::vortex_ensure;
17
18use crate::file::IoRequest;
19use crate::file::read::CoalesceWindow;
20use crate::file::read::IntoReadSource;
21use crate::file::read::ReadSource;
22use crate::file::read::ReadSourceRef;
23#[cfg(not(target_arch = "wasm32"))]
24use crate::file::std_file::read_exact_at;
25use crate::runtime::Handle;
26
27const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow {
28 distance: 1024 * 1024, max_size: 16 * 1024 * 1024, };
31const CONCURRENCY: usize = 192; pub struct ObjectStoreReadSource {
34 store: Arc<dyn object_store::ObjectStore>,
35 path: object_store::path::Path,
36 uri: Arc<str>,
37 concurrency: usize,
38 coalesce_window: Option<CoalesceWindow>,
39}
40
41impl ObjectStoreReadSource {
42 pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
43 let uri = Arc::from(path.to_string());
44 Self {
45 store,
46 path,
47 uri,
48 concurrency: CONCURRENCY,
49 coalesce_window: Some(COALESCING_WINDOW),
50 }
51 }
52
53 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
54 self.concurrency = concurrency;
55 self
56 }
57
58 pub fn with_coalesce_window(mut self, window: CoalesceWindow) -> Self {
59 self.coalesce_window = Some(window);
60 self
61 }
62
63 pub fn with_some_coalesce_window(mut self, window: Option<CoalesceWindow>) -> Self {
64 self.coalesce_window = window;
65 self
66 }
67}
68
69impl IntoReadSource for ObjectStoreReadSource {
70 fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
71 Ok(Arc::new(ObjectStoreIoSource { io: self, handle }))
72 }
73}
74
75struct ObjectStoreIoSource {
76 io: ObjectStoreReadSource,
77 handle: Handle,
78}
79
80impl ReadSource for ObjectStoreIoSource {
81 fn uri(&self) -> &Arc<str> {
82 &self.io.uri
83 }
84
85 fn coalesce_window(&self) -> Option<CoalesceWindow> {
86 self.io.coalesce_window
87 }
88
89 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
90 let store = self.io.store.clone();
91 let path = self.io.path.clone();
92 Compat::new(async move {
93 store
94 .head(&path)
95 .await
96 .map(|h| h.size)
97 .map_err(VortexError::from)
98 })
99 .boxed()
100 }
101
102 fn drive_send(
103 self: Arc<Self>,
104 requests: BoxStream<'static, IoRequest>,
105 ) -> BoxFuture<'static, ()> {
106 let self2 = self.clone();
107 let concurrency = self.io.concurrency;
108 requests
109 .map(move |req| {
110 let handle = self.handle.clone();
111 let store = self.io.store.clone();
112 let path = self.io.path.clone();
113
114 let len = req.len();
115 let range = req.range();
116 let alignment = req.alignment();
117
118 let read = async move {
119 let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
123
124 let response = store
125 .get_opts(
126 &path,
127 object_store::GetOptions {
128 range: Some(object_store::GetRange::Bounded(range.clone())),
129 ..Default::default()
130 },
131 )
132 .await?;
133
134 let buffer = match response.payload {
135 object_store::GetResultPayload::File(file, _) => {
136 unsafe { buffer.set_len(len) };
140
141 handle
142 .spawn_blocking(move || {
143 read_exact_at(&file, &mut buffer, range.start)?;
144 Ok::<_, io::Error>(buffer)
145 })
146 .await
147 .map_err(io::Error::other)?
148 }
149 object_store::GetResultPayload::Stream(mut byte_stream) => {
150 while let Some(bytes) = byte_stream.next().await {
151 buffer.extend_from_slice(&bytes?);
152 }
153
154 vortex_ensure!(
155 buffer.len() == len,
156 "Object store stream returned {} bytes but expected {} bytes (range: {:?})",
157 buffer.len(),
158 len,
159 range
160 );
161
162 buffer
163 }
164 };
165
166 Ok(buffer.freeze())
167 }
168 .in_current_span();
169
170 async move { req.resolve(Compat::new(read).await) }
171 })
172 .map(move |f| self2.handle.spawn(f))
173 .buffer_unordered(concurrency)
174 .collect::<()>()
175 .boxed()
176 }
177}