vortex_io/
object_store.rs1use std::io;
5use std::sync::Arc;
6
7use bytes::BytesMut;
8use futures::TryStreamExt;
9use futures::stream::FuturesUnordered;
10use object_store::MultipartUpload;
11use object_store::ObjectStore;
12use object_store::PutPayload;
13use object_store::PutResult;
14use object_store::path::Path;
15use vortex_error::VortexResult;
16
17use crate::IoBuf;
18use crate::VortexWrite;
19
20pub struct ObjectStoreWriter {
24 upload: Box<dyn MultipartUpload>,
25 buffer: BytesMut,
26 put_result: Option<PutResult>,
27}
28
29const CHUNK_SIZE: usize = 16 * 1024 * 1024;
30const BUFFER_SIZE: usize = 128 * 1024 * 1024;
31
32impl ObjectStoreWriter {
33 pub async fn new(object_store: Arc<dyn ObjectStore>, location: &Path) -> VortexResult<Self> {
34 let upload = object_store.put_multipart(location).await?;
35 Ok(Self {
36 upload,
37 buffer: BytesMut::with_capacity(CHUNK_SIZE),
38 put_result: None,
39 })
40 }
41
42 pub fn put_result(&self) -> Option<&PutResult> {
43 self.put_result.as_ref()
44 }
45}
46
47impl VortexWrite for ObjectStoreWriter {
48 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
49 self.buffer.extend_from_slice(buffer.as_slice());
50 let parts = FuturesUnordered::new();
51
52 if self.buffer.len() > BUFFER_SIZE {
54 while self.buffer.len() > CHUNK_SIZE {
56 let payload = self.buffer.split_to(CHUNK_SIZE).freeze();
57 let part_fut = self.upload.put_part(PutPayload::from_bytes(payload));
58
59 parts.push(part_fut);
60 }
61 }
62
63 parts.try_collect::<Vec<_>>().await?;
64
65 Ok(buffer)
66 }
67
68 async fn flush(&mut self) -> io::Result<()> {
69 let parts = FuturesUnordered::new();
70
71 while self.buffer.len() > CHUNK_SIZE {
72 let payload = self.buffer.split_to(CHUNK_SIZE).freeze();
73 let part_fut = self.upload.put_part(PutPayload::from_bytes(payload));
74
75 parts.push(part_fut);
76 }
77
78 parts.try_collect::<Vec<_>>().await?;
79
80 Ok(())
81 }
82
83 async fn shutdown(&mut self) -> io::Result<()> {
84 self.flush().await?;
85
86 if !self.buffer.is_empty() {
87 let payload = std::mem::take(&mut self.buffer).freeze();
88 self.upload
89 .put_part(PutPayload::from_bytes(payload))
90 .await?;
91 }
92
93 self.put_result = Some(self.upload.complete().await?);
94 Ok(())
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use std::sync::Arc;
101
102 use object_store::ObjectStore;
103 use object_store::local::LocalFileSystem;
104 use object_store::memory::InMemory;
105 use object_store::path::Path;
106 use rstest::rstest;
107 use tempfile::tempdir;
108
109 use super::*;
110
111 #[tokio::test]
116 #[rstest]
117 #[case(100)]
118 #[case(8 * 1024 * 1024)]
119 #[case(25 * 1024 * 1024)]
120 #[case(26 * 1024 * 1024)]
121 async fn test_object_store_writer_multiple_flushes(
122 #[case] chunk_size: usize,
123 ) -> anyhow::Result<()> {
124 let temp_dir = tempdir()?;
125 let local_store =
126 Arc::new(LocalFileSystem::new_with_prefix(temp_dir.path())?) as Arc<dyn ObjectStore>;
127 let memory_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
128 let location = Path::from("test.bin");
129
130 for test_store in [memory_store, local_store] {
131 let mut writer = ObjectStoreWriter::new(test_store.clone(), &location).await?;
132
133 #[expect(clippy::cast_possible_truncation)]
134 let data = (0..3)
135 .map(|i| vec![i as u8; chunk_size])
136 .collect::<Vec<_>>();
137
138 for i in 0..3 {
140 let data = data[i].clone();
141 writer.write_all(data).await?;
142 writer.flush().await?;
143 }
144
145 writer.shutdown().await?;
147
148 let result = test_store.get(&location).await?;
150 let bytes = result.bytes().await?;
151
152 let expected_data = itertools::concat(data.into_iter());
153 assert_eq!(bytes, expected_data);
154 }
155
156 Ok(())
157 }
158}