vortex_io/
object_store.rs1use std::io;
5use std::sync::Arc;
6
7use bytes::BytesMut;
8use futures::TryStreamExt;
9use futures::stream::FuturesUnordered;
10use object_store::path::Path;
11use object_store::{MultipartUpload, ObjectStore, PutPayload, PutResult};
12use vortex_error::VortexResult;
13
14use crate::{IoBuf, VortexWrite};
15
16pub struct ObjectStoreWriter {
20 upload: Box<dyn MultipartUpload>,
21 buffer: BytesMut,
22 put_result: Option<PutResult>,
23}
24
25const CHUNK_SIZE: usize = 16 * 1024 * 1024;
26const BUFFER_SIZE: usize = 128 * 1024 * 1024;
27
28impl ObjectStoreWriter {
29 pub async fn new(object_store: Arc<dyn ObjectStore>, location: &Path) -> VortexResult<Self> {
30 let upload = object_store.put_multipart(location).await?;
31 Ok(Self {
32 upload,
33 buffer: BytesMut::with_capacity(CHUNK_SIZE),
34 put_result: None,
35 })
36 }
37
38 pub fn put_result(&self) -> Option<&PutResult> {
39 self.put_result.as_ref()
40 }
41}
42
43impl VortexWrite for ObjectStoreWriter {
44 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
45 self.buffer.extend_from_slice(buffer.as_slice());
46 let parts = FuturesUnordered::new();
47
48 if self.buffer.len() > BUFFER_SIZE {
50 while self.buffer.len() > CHUNK_SIZE {
52 let payload = self.buffer.split_to(CHUNK_SIZE).freeze();
53 let part_fut = self.upload.put_part(PutPayload::from_bytes(payload));
54
55 parts.push(part_fut);
56 }
57 }
58
59 parts.try_collect::<Vec<_>>().await?;
60
61 Ok(buffer)
62 }
63
64 async fn flush(&mut self) -> io::Result<()> {
65 let parts = FuturesUnordered::new();
66
67 while self.buffer.len() > CHUNK_SIZE {
68 let payload = self.buffer.split_to(CHUNK_SIZE).freeze();
69 let part_fut = self.upload.put_part(PutPayload::from_bytes(payload));
70
71 parts.push(part_fut);
72 }
73
74 parts.try_collect::<Vec<_>>().await?;
75
76 Ok(())
77 }
78
79 async fn shutdown(&mut self) -> io::Result<()> {
80 self.flush().await?;
81
82 if !self.buffer.is_empty() {
83 let payload = std::mem::take(&mut self.buffer).freeze();
84 self.upload
85 .put_part(PutPayload::from_bytes(payload))
86 .await?;
87 }
88
89 self.put_result = Some(self.upload.complete().await?);
90 Ok(())
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use std::sync::Arc;
97
98 use object_store::ObjectStore;
99 use object_store::local::LocalFileSystem;
100 use object_store::memory::InMemory;
101 use object_store::path::Path;
102 use rstest::rstest;
103 use tempfile::tempdir;
104
105 use super::*;
106
107 #[tokio::test]
112 #[rstest]
113 #[case(100)]
114 #[case(8 * 1024 * 1024)]
115 #[case(25 * 1024 * 1024)]
116 #[case(26 * 1024 * 1024)]
117 async fn test_object_store_writer_multiple_flushes(
118 #[case] chunk_size: usize,
119 ) -> anyhow::Result<()> {
120 let temp_dir = tempdir()?;
121 let local_store =
122 Arc::new(LocalFileSystem::new_with_prefix(temp_dir.path())?) as Arc<dyn ObjectStore>;
123 let memory_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
124 let location = Path::from("test.bin");
125
126 for test_store in [memory_store, local_store] {
127 let mut writer = ObjectStoreWriter::new(test_store.clone(), &location).await?;
128
129 #[expect(clippy::cast_possible_truncation)]
130 let data = (0..3)
131 .map(|i| vec![i as u8; chunk_size])
132 .collect::<Vec<_>>();
133
134 for i in 0..3 {
136 let data = data[i].clone();
137 writer.write_all(data).await?;
138 writer.flush().await?;
139 }
140
141 writer.shutdown().await?;
143
144 let result = test_store.get(&location).await?;
146 let bytes = result.bytes().await?;
147
148 let expected_data = itertools::concat(data.into_iter());
149 assert_eq!(bytes, expected_data);
150 }
151
152 Ok(())
153 }
154}