vortex_io/file/
object_store.rs1use std::io;
5#[cfg(unix)]
6use std::os::unix::fs::FileExt;
7use std::sync::Arc;
8
9use async_compat::Compat;
10use futures::future::BoxFuture;
11use futures::stream::BoxStream;
12use futures::{FutureExt, StreamExt};
13use tracing::Instrument;
14use vortex_buffer::ByteBufferMut;
15use vortex_error::{VortexError, VortexResult, vortex_ensure};
16
17use crate::file::IoRequest;
18use crate::file::read::{CoalesceWindow, IntoReadSource, ReadSource, ReadSourceRef};
19use crate::runtime::Handle;
20
21const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow {
22 distance: 1024 * 1024, max_size: 16 * 1024 * 1024, };
25const CONCURRENCY: usize = 192; pub struct ObjectStoreReadSource {
28 store: Arc<dyn object_store::ObjectStore>,
29 path: object_store::path::Path,
30 uri: Arc<str>,
31 concurrency: usize,
32 coalesce_window: Option<CoalesceWindow>,
33}
34
35impl ObjectStoreReadSource {
36 pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
37 let uri = Arc::from(path.to_string());
38 Self {
39 store,
40 path,
41 uri,
42 concurrency: CONCURRENCY,
43 coalesce_window: Some(COALESCING_WINDOW),
44 }
45 }
46
47 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
48 self.concurrency = concurrency;
49 self
50 }
51
52 pub fn with_coalesce_window(mut self, window: CoalesceWindow) -> Self {
53 self.coalesce_window = Some(window);
54 self
55 }
56
57 pub fn with_some_coalesce_window(mut self, window: Option<CoalesceWindow>) -> Self {
58 self.coalesce_window = window;
59 self
60 }
61}
62
63impl IntoReadSource for ObjectStoreReadSource {
64 fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
65 Ok(Arc::new(ObjectStoreIoSource { io: self, handle }))
66 }
67}
68
69struct ObjectStoreIoSource {
70 io: ObjectStoreReadSource,
71 handle: Handle,
72}
73
74impl ReadSource for ObjectStoreIoSource {
75 fn uri(&self) -> &Arc<str> {
76 &self.io.uri
77 }
78
79 fn coalesce_window(&self) -> Option<CoalesceWindow> {
80 self.io.coalesce_window
81 }
82
83 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
84 let store = self.io.store.clone();
85 let path = self.io.path.clone();
86 Compat::new(async move {
87 store
88 .head(&path)
89 .await
90 .map(|h| h.size)
91 .map_err(VortexError::from)
92 })
93 .boxed()
94 }
95
96 fn drive_send(
97 self: Arc<Self>,
98 requests: BoxStream<'static, IoRequest>,
99 ) -> BoxFuture<'static, ()> {
100 let self2 = self.clone();
101 requests
102 .map(move |req| {
103 let handle = self.handle.clone();
104 let store = self.io.store.clone();
105 let path = self.io.path.clone();
106
107 let len = req.len();
108 let range = req.range();
109 let alignment = req.alignment();
110
111 let read = async move {
112 let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
116
117 let response = store
118 .get_opts(
119 &path,
120 object_store::GetOptions {
121 range: Some(object_store::GetRange::Bounded(range.clone())),
122 ..Default::default()
123 },
124 )
125 .await?;
126
127 let buffer = match response.payload {
128 object_store::GetResultPayload::File(file, _) => {
129 unsafe { buffer.set_len(len) };
133
134 handle
135 .spawn_blocking(move || {
136 #[cfg(unix)] {
137 file.read_exact_at(&mut buffer, range.start)?;
138 Ok::<_, io::Error>(buffer)
139 }
140 #[cfg(not(unix))] {
141 file.seek(range.start)?;
142 file.read_exact(&mut buffer)?;
143 Ok::<_, io::Error>(buffer)
144 }
145 })
146 .await
147 .map_err(io::Error::other)?
148 }
149 object_store::GetResultPayload::Stream(mut byte_stream) => {
150 while let Some(bytes) = byte_stream.next().await {
151 buffer.extend_from_slice(&bytes?);
152 }
153
154 vortex_ensure!(
155 buffer.len() == len,
156 "Object store stream returned {} bytes but expected {} bytes (range: {:?})",
157 buffer.len(),
158 len,
159 range
160 );
161
162 buffer
163 }
164 };
165
166 Ok(buffer.freeze())
167 }
168 .in_current_span();
169
170 async move { req.resolve(Compat::new(read).await) }
171 })
172 .map(move |f| self2.handle.spawn(f))
173 .buffer_unordered(CONCURRENCY)
174 .collect::<()>()
175 .boxed()
176 }
177}