vortex_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::sync::Arc;
6
7use bytes::BytesMut;
8use futures::TryStreamExt;
9use futures::stream::FuturesUnordered;
10use object_store::path::Path;
11use object_store::{MultipartUpload, ObjectStore, PutPayload, PutResult};
12use vortex_error::VortexResult;
13
14use crate::{IoBuf, VortexWrite};
15
16/// Adapter type to write data through a [`ObjectStore`] instance.
17///
18/// After writing, the caller must make sure to call `shutdown`, in order to ensure the data is actually persisted.
19pub struct ObjectStoreWriter {
20    upload: Box<dyn MultipartUpload>,
21    buffer: BytesMut,
22    put_result: Option<PutResult>,
23}
24
25const CHUNK_SIZE: usize = 16 * 1024 * 1024;
26const BUFFER_SIZE: usize = 128 * 1024 * 1024;
27
28impl ObjectStoreWriter {
29    pub async fn new(object_store: Arc<dyn ObjectStore>, location: &Path) -> VortexResult<Self> {
30        let upload = object_store.put_multipart(location).await?;
31        Ok(Self {
32            upload,
33            buffer: BytesMut::with_capacity(CHUNK_SIZE),
34            put_result: None,
35        })
36    }
37
38    pub fn put_result(&self) -> Option<&PutResult> {
39        self.put_result.as_ref()
40    }
41}
42
43impl VortexWrite for ObjectStoreWriter {
44    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
45        self.buffer.extend_from_slice(buffer.as_slice());
46        let parts = FuturesUnordered::new();
47
48        // If the buffer is full
49        if self.buffer.len() > BUFFER_SIZE {
50            // Split off chunks while buffer is larger than CHUNKS_SIZE
51            while self.buffer.len() > CHUNK_SIZE {
52                let payload = self.buffer.split_to(CHUNK_SIZE).freeze();
53                let part_fut = self.upload.put_part(PutPayload::from_bytes(payload));
54
55                parts.push(part_fut);
56            }
57        }
58
59        parts.try_collect::<Vec<_>>().await?;
60
61        Ok(buffer)
62    }
63
64    async fn flush(&mut self) -> io::Result<()> {
65        let parts = FuturesUnordered::new();
66
67        while self.buffer.len() > CHUNK_SIZE {
68            let payload = self.buffer.split_to(CHUNK_SIZE).freeze();
69            let part_fut = self.upload.put_part(PutPayload::from_bytes(payload));
70
71            parts.push(part_fut);
72        }
73
74        parts.try_collect::<Vec<_>>().await?;
75
76        Ok(())
77    }
78
79    async fn shutdown(&mut self) -> io::Result<()> {
80        self.flush().await?;
81
82        if !self.buffer.is_empty() {
83            let payload = std::mem::take(&mut self.buffer).freeze();
84            self.upload
85                .put_part(PutPayload::from_bytes(payload))
86                .await?;
87        }
88
89        self.put_result = Some(self.upload.complete().await?);
90        Ok(())
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use std::sync::Arc;
97
98    use object_store::ObjectStore;
99    use object_store::local::LocalFileSystem;
100    use object_store::memory::InMemory;
101    use object_store::path::Path;
102    use rstest::rstest;
103    use tempfile::tempdir;
104
105    use super::*;
106
107    // Note: Concurrent writes test removed because &mut self in write_all already ensures
108    // exclusive access. Multiple writers would need to be created with separate buffers,
109    // which is not the intended use case.
110
111    #[tokio::test]
112    #[rstest]
113    #[case(100)]
114    #[case(8 * 1024 * 1024)]
115    #[case(25 * 1024 * 1024)]
116    #[case(26 * 1024 * 1024)]
117    async fn test_object_store_writer_multiple_flushes(
118        #[case] chunk_size: usize,
119    ) -> anyhow::Result<()> {
120        let temp_dir = tempdir()?;
121        let local_store =
122            Arc::new(LocalFileSystem::new_with_prefix(temp_dir.path())?) as Arc<dyn ObjectStore>;
123        let memory_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
124        let location = Path::from("test.bin");
125
126        for test_store in [memory_store, local_store] {
127            let mut writer = ObjectStoreWriter::new(test_store.clone(), &location).await?;
128
129            #[expect(clippy::cast_possible_truncation)]
130            let data = (0..3)
131                .map(|i| vec![i as u8; chunk_size])
132                .collect::<Vec<_>>();
133
134            // Write and flush multiple times
135            for i in 0..3 {
136                let data = data[i].clone();
137                writer.write_all(data).await?;
138                writer.flush().await?;
139            }
140
141            // Shutdown the writer to make sure data actually gets persisted.
142            writer.shutdown().await?;
143
144            // Verify all data was written
145            let result = test_store.get(&location).await?;
146            let bytes = result.bytes().await?;
147
148            let expected_data = itertools::concat(data.into_iter());
149            assert_eq!(bytes, expected_data);
150        }
151
152        Ok(())
153    }
154}