vortex_io/file/
object_store.rs1use std::io;
5use std::os::unix::fs::FileExt;
6use std::sync::Arc;
7
8use async_compat::Compat;
9use futures::future::BoxFuture;
10use futures::stream::BoxStream;
11use futures::{FutureExt, StreamExt};
12use tracing::Instrument;
13use vortex_buffer::ByteBufferMut;
14use vortex_error::{VortexError, VortexResult};
15
16use crate::file::IoRequest;
17use crate::file::read::{CoalesceWindow, IntoReadSource, ReadSource, ReadSourceRef};
18use crate::runtime::Handle;
19
20const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow {
21 distance: 1024 * 1024, max_size: 16 * 1024 * 1024, };
24const CONCURRENCY: usize = 192; pub struct ObjectStoreReadSource {
27 store: Arc<dyn object_store::ObjectStore>,
28 path: object_store::path::Path,
29 uri: Arc<str>,
30 concurrency: usize,
31 coalesce_window: Option<CoalesceWindow>,
32}
33
34impl ObjectStoreReadSource {
35 pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
36 let uri = Arc::from(path.to_string());
37 Self {
38 store,
39 path,
40 uri,
41 concurrency: CONCURRENCY,
42 coalesce_window: Some(COALESCING_WINDOW),
43 }
44 }
45
46 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
47 self.concurrency = concurrency;
48 self
49 }
50
51 pub fn with_coalesce_window(mut self, window: CoalesceWindow) -> Self {
52 self.coalesce_window = Some(window);
53 self
54 }
55
56 pub fn with_some_coalesce_window(mut self, window: Option<CoalesceWindow>) -> Self {
57 self.coalesce_window = window;
58 self
59 }
60}
61
62impl IntoReadSource for ObjectStoreReadSource {
63 fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
64 Ok(Arc::new(ObjectStoreIoSource { io: self, handle }))
65 }
66}
67
68struct ObjectStoreIoSource {
69 io: ObjectStoreReadSource,
70 handle: Handle,
71}
72
73impl ReadSource for ObjectStoreIoSource {
74 fn uri(&self) -> &Arc<str> {
75 &self.io.uri
76 }
77
78 fn coalesce_window(&self) -> Option<CoalesceWindow> {
79 self.io.coalesce_window
80 }
81
82 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
83 let store = self.io.store.clone();
84 let path = self.io.path.clone();
85 Compat::new(async move {
86 store
87 .head(&path)
88 .await
89 .map(|h| h.size)
90 .map_err(VortexError::from)
91 })
92 .boxed()
93 }
94
95 fn drive_send(
96 self: Arc<Self>,
97 requests: BoxStream<'static, IoRequest>,
98 ) -> BoxFuture<'static, ()> {
99 let self2 = self.clone();
100 requests
101 .map(move |req| {
102 let handle = self.handle.clone();
103 let store = self.io.store.clone();
104 let path = self.io.path.clone();
105
106 let len = req.len();
107 let range = req.range();
108 let alignment = req.alignment();
109
110 let read = async move {
111 let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
115
116 let response = store
117 .get_opts(
118 &path,
119 object_store::GetOptions {
120 range: Some(object_store::GetRange::Bounded(range.clone())),
121 ..Default::default()
122 },
123 )
124 .await?;
125
126 let buffer = match response.payload {
127 object_store::GetResultPayload::File(file, _) => {
128 unsafe { buffer.set_len(len) };
132 handle
133 .spawn_blocking(move || {
134 file.read_exact_at(&mut buffer, range.start)?;
135 Ok::<_, io::Error>(buffer)
136 })
137 .await
138 .map_err(io::Error::other)?
139 }
140 object_store::GetResultPayload::Stream(mut byte_stream) => {
141 while let Some(bytes) = byte_stream.next().await {
142 buffer.extend_from_slice(&bytes?);
143 }
144 buffer
145 }
146 };
147
148 Ok(buffer.freeze())
149 }
150 .in_current_span();
151
152 async move { req.resolve(Compat::new(read).await) }
153 })
154 .map(move |f| self2.handle.spawn(f))
155 .buffer_unordered(CONCURRENCY)
156 .collect::<()>()
157 .boxed()
158 }
159}