Skip to main content

array_format/
storage.rs

1//! Storage backend trait and implementations.
2//!
3//! The [`Storage`] trait abstracts file I/O so the format can work with
4//! local files, object stores, or in-memory buffers. An
5//! [`ObjectStoreBackend`] adapter is provided for the `object_store` crate,
6//! and [`InMemoryStorage`] is provided for testing.
7
8use std::ops::Range;
9use std::sync::Arc;
10
11use bytes::Bytes;
12use futures::future::BoxFuture;
13use tokio::sync::RwLock;
14
15use crate::error::{Error, Result};
16
17/// Streaming writer used to build up a file in chunks.
18///
19/// Obtained from [`Storage::write_multipart`]. Callers push successive
20/// chunks via [`write_chunk`](Self::write_chunk) and finalize with
21/// [`complete`](Self::complete); dropping the writer without calling `complete`
22/// aborts the upload.
23pub trait MultipartWriter: Send {
24    /// Appends `data` to the in-flight upload.
25    fn write_chunk(&mut self, data: Bytes) -> BoxFuture<'_, Result<()>>;
26
27    /// Finalizes the upload, committing all previously written chunks.
28    fn complete(self: Box<Self>) -> BoxFuture<'static, Result<()>>;
29}
30
31/// Async storage backend for reading and writing file data.
32///
33/// All methods return [`BoxFuture`] so the trait is object-safe
34/// (`dyn Storage` is valid).
35///
36/// # Extensibility
37///
38/// Implement this trait to plug in custom storage backends (e.g. a
39/// distributed file system, an HTTP range-request backend, etc.).
40pub trait Storage: Send + Sync {
41    /// Reads the byte range `range` from the file.
42    fn read_range(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
43
44    /// Replaces the entire file content with `data`.
45    fn write(&self, data: Bytes) -> BoxFuture<'_, Result<()>>;
46
47    /// Returns the total size of the file in bytes.
48    fn size(&self) -> BoxFuture<'_, Result<u64>>;
49
50    /// Begins a streaming write that replaces the file's contents.
51    ///
52    /// The returned writer accepts successive byte chunks; the upload is
53    /// committed once [`MultipartWriter::complete`] is awaited. This lets
54    /// callers ship large payloads without materializing the whole file in
55    /// memory.
56    fn write_multipart(&self) -> BoxFuture<'_, Result<Box<dyn MultipartWriter>>>;
57}
58
59/// An in-memory storage backend for testing.
60///
61/// Wraps a `Vec<u8>` behind an `Arc<RwLock<..>>` so it can be shared
62/// across async tasks.
63#[derive(Debug, Clone)]
64pub struct InMemoryStorage {
65    data: Arc<RwLock<Vec<u8>>>,
66}
67
68impl InMemoryStorage {
69    /// Creates a new empty in-memory store.
70    pub fn new() -> Self {
71        Self {
72            data: Arc::new(RwLock::new(Vec::new())),
73        }
74    }
75
76    /// Creates an in-memory store pre-loaded with `data`.
77    pub fn from_bytes(data: Vec<u8>) -> Self {
78        Self {
79            data: Arc::new(RwLock::new(data)),
80        }
81    }
82}
83
84impl Default for InMemoryStorage {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90impl Storage for InMemoryStorage {
91    fn read_range(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
92        Box::pin(async move {
93            let data = self.data.read().await;
94            let start = range.start as usize;
95            let end = range.end as usize;
96            if end > data.len() {
97                return Err(Error::Storage(format!(
98                    "read range {}..{} exceeds file size {}",
99                    start,
100                    end,
101                    data.len()
102                )));
103            }
104            Ok(Bytes::copy_from_slice(&data[start..end]))
105        })
106    }
107
108    fn write(&self, bytes: Bytes) -> BoxFuture<'_, Result<()>> {
109        Box::pin(async move {
110            let mut data = self.data.write().await;
111            *data = bytes.to_vec();
112            Ok(())
113        })
114    }
115
116    fn size(&self) -> BoxFuture<'_, Result<u64>> {
117        Box::pin(async move {
118            let data = self.data.read().await;
119            Ok(data.len() as u64)
120        })
121    }
122
123    fn write_multipart(&self) -> BoxFuture<'_, Result<Box<dyn MultipartWriter>>> {
124        Box::pin(async move {
125            Ok(Box::new(InMemoryMultipart {
126                data: Arc::clone(&self.data),
127                buf: Vec::new(),
128            }) as Box<dyn MultipartWriter>)
129        })
130    }
131}
132
133struct InMemoryMultipart {
134    data: Arc<RwLock<Vec<u8>>>,
135    buf: Vec<u8>,
136}
137
138impl MultipartWriter for InMemoryMultipart {
139    fn write_chunk(&mut self, data: Bytes) -> BoxFuture<'_, Result<()>> {
140        Box::pin(async move {
141            self.buf.extend_from_slice(&data);
142            Ok(())
143        })
144    }
145
146    fn complete(self: Box<Self>) -> BoxFuture<'static, Result<()>> {
147        Box::pin(async move {
148            let mut guard = self.data.write().await;
149            *guard = self.buf;
150            Ok(())
151        })
152    }
153}
154
155/// A storage backend backed by an [`object_store::ObjectStore`] implementation.
156///
157/// Wraps any `ObjectStore` (local filesystem, S3, GCS, Azure, in-memory)
158/// and a [`Path`](object_store::path::Path) pointing to the target file.
159#[derive(Clone)]
160pub struct ObjectStoreBackend {
161    store: Arc<dyn object_store::ObjectStore>,
162    path: object_store::path::Path,
163}
164
165impl ObjectStoreBackend {
166    /// Creates a new backend targeting `path` within the given `store`.
167    pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
168        Self { store, path }
169    }
170}
171
172impl Storage for ObjectStoreBackend {
173    fn read_range(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
174        Box::pin(async move {
175            use object_store::ObjectStoreExt;
176            let bytes = self
177                .store
178                .get_range(&self.path, range)
179                .await
180                .map_err(|e| Error::Storage(e.to_string()))?;
181            Ok(bytes)
182        })
183    }
184
185    fn write(&self, data: Bytes) -> BoxFuture<'_, Result<()>> {
186        Box::pin(async move {
187            use object_store::ObjectStoreExt;
188            self.store
189                .put(&self.path, data.into())
190                .await
191                .map_err(|e| Error::Storage(e.to_string()))?;
192            Ok(())
193        })
194    }
195
196    fn size(&self) -> BoxFuture<'_, Result<u64>> {
197        Box::pin(async move {
198            use object_store::ObjectStoreExt;
199            let meta = self
200                .store
201                .head(&self.path)
202                .await
203                .map_err(|e| Error::Storage(e.to_string()))?;
204            Ok(meta.size as u64)
205        })
206    }
207
208    fn write_multipart(&self) -> BoxFuture<'_, Result<Box<dyn MultipartWriter>>> {
209        Box::pin(async move {
210            use object_store::buffered::BufWriter;
211            let writer = BufWriter::with_capacity(
212                Arc::clone(&self.store),
213                self.path.clone(),
214                8 * 1024 * 1024,
215            );
216            Ok(Box::new(ObjectStoreMultipart { writer }) as Box<dyn MultipartWriter>)
217        })
218    }
219}
220
221struct ObjectStoreMultipart {
222    writer: object_store::buffered::BufWriter,
223}
224
225impl MultipartWriter for ObjectStoreMultipart {
226    fn write_chunk(&mut self, data: Bytes) -> BoxFuture<'_, Result<()>> {
227        Box::pin(async move {
228            use tokio::io::AsyncWriteExt;
229            self.writer
230                .write_all(&data)
231                .await
232                .map_err(|e| Error::Storage(e.to_string()))
233        })
234    }
235
236    fn complete(mut self: Box<Self>) -> BoxFuture<'static, Result<()>> {
237        Box::pin(async move {
238            use tokio::io::AsyncWriteExt;
239            self.writer
240                .shutdown()
241                .await
242                .map_err(|e| Error::Storage(e.to_string()))
243        })
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250
251    #[tokio::test]
252    async fn in_memory_write_read() {
253        let storage = InMemoryStorage::new();
254        let payload = Bytes::from_static(b"hello world");
255        storage.write(payload.clone()).await.unwrap();
256
257        let size = storage.size().await.unwrap();
258        assert_eq!(size, 11);
259
260        let read = storage.read_range(0..5).await.unwrap();
261        assert_eq!(&read[..], b"hello");
262    }
263
264    #[tokio::test]
265    async fn in_memory_out_of_range() {
266        let storage = InMemoryStorage::from_bytes(vec![1, 2, 3]);
267        let result = storage.read_range(0..10).await;
268        assert!(result.is_err());
269    }
270
271    #[tokio::test]
272    async fn in_memory_overwrite() {
273        let storage = InMemoryStorage::new();
274        storage.write(Bytes::from_static(b"first")).await.unwrap();
275        storage.write(Bytes::from_static(b"second")).await.unwrap();
276        let size = storage.size().await.unwrap();
277        assert_eq!(size, 6);
278        let data = storage.read_range(0..6).await.unwrap();
279        assert_eq!(&data[..], b"second");
280    }
281
282    #[tokio::test]
283    async fn in_memory_multipart_streams_chunks() {
284        let storage = InMemoryStorage::from_bytes(b"stale".to_vec());
285        let mut writer = storage.write_multipart().await.unwrap();
286        writer
287            .write_chunk(Bytes::from_static(b"hello "))
288            .await
289            .unwrap();
290        writer
291            .write_chunk(Bytes::from_static(b"streaming "))
292            .await
293            .unwrap();
294        writer
295            .write_chunk(Bytes::from_static(b"world"))
296            .await
297            .unwrap();
298        writer.complete().await.unwrap();
299
300        assert_eq!(storage.size().await.unwrap(), 21);
301        let data = storage.read_range(0..21).await.unwrap();
302        assert_eq!(&data[..], b"hello streaming world");
303    }
304}