vortex_io/file/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::os::unix::fs::FileExt;
6use std::sync::Arc;
7
8use async_compat::Compat;
9use futures::future::BoxFuture;
10use futures::stream::BoxStream;
11use futures::{FutureExt, StreamExt};
12use tracing::Instrument;
13use vortex_buffer::ByteBufferMut;
14use vortex_error::{VortexError, VortexResult};
15
16use crate::file::IoRequest;
17use crate::file::read::{CoalesceWindow, IntoReadSource, ReadSource, ReadSourceRef};
18use crate::runtime::Handle;
19
20const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow {
21    distance: 1024 * 1024,      // 1 MB
22    max_size: 16 * 1024 * 1024, // 16 MB
23};
24const CONCURRENCY: usize = 192; // Number of concurrent requests to allow.
25
26pub struct ObjectStoreReadSource {
27    store: Arc<dyn object_store::ObjectStore>,
28    path: object_store::path::Path,
29    uri: Arc<str>,
30    concurrency: usize,
31    coalesce_window: Option<CoalesceWindow>,
32}
33
34impl ObjectStoreReadSource {
35    pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
36        let uri = Arc::from(path.to_string());
37        Self {
38            store,
39            path,
40            uri,
41            concurrency: CONCURRENCY,
42            coalesce_window: Some(COALESCING_WINDOW),
43        }
44    }
45
46    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
47        self.concurrency = concurrency;
48        self
49    }
50
51    pub fn with_coalesce_window(mut self, window: CoalesceWindow) -> Self {
52        self.coalesce_window = Some(window);
53        self
54    }
55
56    pub fn with_some_coalesce_window(mut self, window: Option<CoalesceWindow>) -> Self {
57        self.coalesce_window = window;
58        self
59    }
60}
61
62impl IntoReadSource for ObjectStoreReadSource {
63    fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
64        Ok(Arc::new(ObjectStoreIoSource { io: self, handle }))
65    }
66}
67
68struct ObjectStoreIoSource {
69    io: ObjectStoreReadSource,
70    handle: Handle,
71}
72
73impl ReadSource for ObjectStoreIoSource {
74    fn uri(&self) -> &Arc<str> {
75        &self.io.uri
76    }
77
78    fn coalesce_window(&self) -> Option<CoalesceWindow> {
79        self.io.coalesce_window
80    }
81
82    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
83        let store = self.io.store.clone();
84        let path = self.io.path.clone();
85        Compat::new(async move {
86            store
87                .head(&path)
88                .await
89                .map(|h| h.size)
90                .map_err(VortexError::from)
91        })
92        .boxed()
93    }
94
95    fn drive_send(
96        self: Arc<Self>,
97        requests: BoxStream<'static, IoRequest>,
98    ) -> BoxFuture<'static, ()> {
99        let self2 = self.clone();
100        requests
101            .map(move |req| {
102                let handle = self.handle.clone();
103                let store = self.io.store.clone();
104                let path = self.io.path.clone();
105
106                let len = req.len();
107                let range = req.range();
108                let alignment = req.alignment();
109
110                let read = async move {
111                    // Instead of calling `ObjectStore::get_range`, we expand the implementation and run it
112                    // ourselves to avoid a second copy to align the buffer. Instead, we can write directly
113                    // into the aligned buffer.
114                    let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
115
116                    let response = store
117                        .get_opts(
118                            &path,
119                            object_store::GetOptions {
120                                range: Some(object_store::GetRange::Bounded(range.clone())),
121                                ..Default::default()
122                            },
123                        )
124                        .await?;
125
126                    let buffer = match response.payload {
127                        object_store::GetResultPayload::File(file, _) => {
128                            // SAFETY: We're setting the length to the exact size we're about to read.
129                            // The read_exact_at call will either fill the entire buffer or return an error,
130                            // ensuring no uninitialized memory is exposed.
131                            unsafe { buffer.set_len(len) };
132                            handle
133                                .spawn_blocking(move || {
134                                    file.read_exact_at(&mut buffer, range.start)?;
135                                    Ok::<_, io::Error>(buffer)
136                                })
137                                .await
138                                .map_err(io::Error::other)?
139                        }
140                        object_store::GetResultPayload::Stream(mut byte_stream) => {
141                            while let Some(bytes) = byte_stream.next().await {
142                                buffer.extend_from_slice(&bytes?);
143                            }
144                            buffer
145                        }
146                    };
147
148                    Ok(buffer.freeze())
149                }
150                .in_current_span();
151
152                async move { req.resolve(Compat::new(read).await) }
153            })
154            .map(move |f| self2.handle.spawn(f))
155            .buffer_unordered(CONCURRENCY)
156            .collect::<()>()
157            .boxed()
158    }
159}