vortex_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::sync::Arc;
6
7use bytes::BytesMut;
8use futures::TryStreamExt;
9use futures::stream::FuturesUnordered;
10use object_store::MultipartUpload;
11use object_store::ObjectStore;
12use object_store::PutPayload;
13use object_store::PutResult;
14use object_store::path::Path;
15use vortex_error::VortexResult;
16
17use crate::IoBuf;
18use crate::VortexWrite;
19
20/// Adapter type to write data through a [`ObjectStore`] instance.
21///
22/// After writing, the caller must make sure to call `shutdown`, in order to ensure the data is actually persisted.
23pub struct ObjectStoreWriter {
24    upload: Box<dyn MultipartUpload>,
25    buffer: BytesMut,
26    put_result: Option<PutResult>,
27}
28
29const CHUNK_SIZE: usize = 16 * 1024 * 1024;
30const BUFFER_SIZE: usize = 128 * 1024 * 1024;
31
32impl ObjectStoreWriter {
33    pub async fn new(object_store: Arc<dyn ObjectStore>, location: &Path) -> VortexResult<Self> {
34        let upload = object_store.put_multipart(location).await?;
35        Ok(Self {
36            upload,
37            buffer: BytesMut::with_capacity(CHUNK_SIZE),
38            put_result: None,
39        })
40    }
41
42    pub fn put_result(&self) -> Option<&PutResult> {
43        self.put_result.as_ref()
44    }
45}
46
47impl VortexWrite for ObjectStoreWriter {
48    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
49        self.buffer.extend_from_slice(buffer.as_slice());
50        let parts = FuturesUnordered::new();
51
52        // If the buffer is full
53        if self.buffer.len() > BUFFER_SIZE {
54            // Split off chunks while buffer is larger than CHUNKS_SIZE
55            while self.buffer.len() > CHUNK_SIZE {
56                let payload = self.buffer.split_to(CHUNK_SIZE).freeze();
57                let part_fut = self.upload.put_part(PutPayload::from_bytes(payload));
58
59                parts.push(part_fut);
60            }
61        }
62
63        parts.try_collect::<Vec<_>>().await?;
64
65        Ok(buffer)
66    }
67
68    async fn flush(&mut self) -> io::Result<()> {
69        let parts = FuturesUnordered::new();
70
71        while self.buffer.len() > CHUNK_SIZE {
72            let payload = self.buffer.split_to(CHUNK_SIZE).freeze();
73            let part_fut = self.upload.put_part(PutPayload::from_bytes(payload));
74
75            parts.push(part_fut);
76        }
77
78        parts.try_collect::<Vec<_>>().await?;
79
80        Ok(())
81    }
82
83    async fn shutdown(&mut self) -> io::Result<()> {
84        self.flush().await?;
85
86        if !self.buffer.is_empty() {
87            let payload = std::mem::take(&mut self.buffer).freeze();
88            self.upload
89                .put_part(PutPayload::from_bytes(payload))
90                .await?;
91        }
92
93        self.put_result = Some(self.upload.complete().await?);
94        Ok(())
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use std::sync::Arc;
101
102    use object_store::ObjectStore;
103    use object_store::local::LocalFileSystem;
104    use object_store::memory::InMemory;
105    use object_store::path::Path;
106    use rstest::rstest;
107    use tempfile::tempdir;
108
109    use super::*;
110
111    // Note: Concurrent writes test removed because &mut self in write_all already ensures
112    // exclusive access. Multiple writers would need to be created with separate buffers,
113    // which is not the intended use case.
114
115    #[tokio::test]
116    #[rstest]
117    #[case(100)]
118    #[case(8 * 1024 * 1024)]
119    #[case(25 * 1024 * 1024)]
120    #[case(26 * 1024 * 1024)]
121    async fn test_object_store_writer_multiple_flushes(
122        #[case] chunk_size: usize,
123    ) -> anyhow::Result<()> {
124        let temp_dir = tempdir()?;
125        let local_store =
126            Arc::new(LocalFileSystem::new_with_prefix(temp_dir.path())?) as Arc<dyn ObjectStore>;
127        let memory_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
128        let location = Path::from("test.bin");
129
130        for test_store in [memory_store, local_store] {
131            let mut writer = ObjectStoreWriter::new(test_store.clone(), &location).await?;
132
133            #[expect(clippy::cast_possible_truncation)]
134            let data = (0..3)
135                .map(|i| vec![i as u8; chunk_size])
136                .collect::<Vec<_>>();
137
138            // Write and flush multiple times
139            for i in 0..3 {
140                let data = data[i].clone();
141                writer.write_all(data).await?;
142                writer.flush().await?;
143            }
144
145            // Shutdown the writer to make sure data actually gets persisted.
146            writer.shutdown().await?;
147
148            // Verify all data was written
149            let result = test_store.get(&location).await?;
150            let bytes = result.bytes().await?;
151
152            let expected_data = itertools::concat(data.into_iter());
153            assert_eq!(bytes, expected_data);
154        }
155
156        Ok(())
157    }
158}