1use std::ops::Range;
9use std::sync::Arc;
10
11use bytes::Bytes;
12use futures::future::BoxFuture;
13use tokio::sync::RwLock;
14
15use crate::error::{Error, Result};
16
17pub trait MultipartWriter: Send {
24 fn write_chunk(&mut self, data: Bytes) -> BoxFuture<'_, Result<()>>;
26
27 fn complete(self: Box<Self>) -> BoxFuture<'static, Result<()>>;
29}
30
31pub trait Storage: Send + Sync {
41 fn read_range(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
43
44 fn write(&self, data: Bytes) -> BoxFuture<'_, Result<()>>;
46
47 fn size(&self) -> BoxFuture<'_, Result<u64>>;
49
50 fn write_multipart(&self) -> BoxFuture<'_, Result<Box<dyn MultipartWriter>>>;
57}
58
59#[derive(Debug, Clone)]
64pub struct InMemoryStorage {
65 data: Arc<RwLock<Vec<u8>>>,
66}
67
68impl InMemoryStorage {
69 pub fn new() -> Self {
71 Self {
72 data: Arc::new(RwLock::new(Vec::new())),
73 }
74 }
75
76 pub fn from_bytes(data: Vec<u8>) -> Self {
78 Self {
79 data: Arc::new(RwLock::new(data)),
80 }
81 }
82}
83
84impl Default for InMemoryStorage {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90impl Storage for InMemoryStorage {
91 fn read_range(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
92 Box::pin(async move {
93 let data = self.data.read().await;
94 let start = range.start as usize;
95 let end = range.end as usize;
96 if end > data.len() {
97 return Err(Error::Storage(format!(
98 "read range {}..{} exceeds file size {}",
99 start,
100 end,
101 data.len()
102 )));
103 }
104 Ok(Bytes::copy_from_slice(&data[start..end]))
105 })
106 }
107
108 fn write(&self, bytes: Bytes) -> BoxFuture<'_, Result<()>> {
109 Box::pin(async move {
110 let mut data = self.data.write().await;
111 *data = bytes.to_vec();
112 Ok(())
113 })
114 }
115
116 fn size(&self) -> BoxFuture<'_, Result<u64>> {
117 Box::pin(async move {
118 let data = self.data.read().await;
119 Ok(data.len() as u64)
120 })
121 }
122
123 fn write_multipart(&self) -> BoxFuture<'_, Result<Box<dyn MultipartWriter>>> {
124 Box::pin(async move {
125 Ok(Box::new(InMemoryMultipart {
126 data: Arc::clone(&self.data),
127 buf: Vec::new(),
128 }) as Box<dyn MultipartWriter>)
129 })
130 }
131}
132
133struct InMemoryMultipart {
134 data: Arc<RwLock<Vec<u8>>>,
135 buf: Vec<u8>,
136}
137
138impl MultipartWriter for InMemoryMultipart {
139 fn write_chunk(&mut self, data: Bytes) -> BoxFuture<'_, Result<()>> {
140 Box::pin(async move {
141 self.buf.extend_from_slice(&data);
142 Ok(())
143 })
144 }
145
146 fn complete(self: Box<Self>) -> BoxFuture<'static, Result<()>> {
147 Box::pin(async move {
148 let mut guard = self.data.write().await;
149 *guard = self.buf;
150 Ok(())
151 })
152 }
153}
154
155#[derive(Clone)]
160pub struct ObjectStoreBackend {
161 store: Arc<dyn object_store::ObjectStore>,
162 path: object_store::path::Path,
163}
164
165impl ObjectStoreBackend {
166 pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
168 Self { store, path }
169 }
170}
171
172impl Storage for ObjectStoreBackend {
173 fn read_range(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
174 Box::pin(async move {
175 use object_store::ObjectStoreExt;
176 let bytes = self
177 .store
178 .get_range(&self.path, range)
179 .await
180 .map_err(|e| Error::Storage(e.to_string()))?;
181 Ok(bytes)
182 })
183 }
184
185 fn write(&self, data: Bytes) -> BoxFuture<'_, Result<()>> {
186 Box::pin(async move {
187 use object_store::ObjectStoreExt;
188 self.store
189 .put(&self.path, data.into())
190 .await
191 .map_err(|e| Error::Storage(e.to_string()))?;
192 Ok(())
193 })
194 }
195
196 fn size(&self) -> BoxFuture<'_, Result<u64>> {
197 Box::pin(async move {
198 use object_store::ObjectStoreExt;
199 let meta = self
200 .store
201 .head(&self.path)
202 .await
203 .map_err(|e| Error::Storage(e.to_string()))?;
204 Ok(meta.size as u64)
205 })
206 }
207
208 fn write_multipart(&self) -> BoxFuture<'_, Result<Box<dyn MultipartWriter>>> {
209 Box::pin(async move {
210 use object_store::buffered::BufWriter;
211 let writer = BufWriter::with_capacity(
212 Arc::clone(&self.store),
213 self.path.clone(),
214 8 * 1024 * 1024,
215 );
216 Ok(Box::new(ObjectStoreMultipart { writer }) as Box<dyn MultipartWriter>)
217 })
218 }
219}
220
221struct ObjectStoreMultipart {
222 writer: object_store::buffered::BufWriter,
223}
224
225impl MultipartWriter for ObjectStoreMultipart {
226 fn write_chunk(&mut self, data: Bytes) -> BoxFuture<'_, Result<()>> {
227 Box::pin(async move {
228 use tokio::io::AsyncWriteExt;
229 self.writer
230 .write_all(&data)
231 .await
232 .map_err(|e| Error::Storage(e.to_string()))
233 })
234 }
235
236 fn complete(mut self: Box<Self>) -> BoxFuture<'static, Result<()>> {
237 Box::pin(async move {
238 use tokio::io::AsyncWriteExt;
239 self.writer
240 .shutdown()
241 .await
242 .map_err(|e| Error::Storage(e.to_string()))
243 })
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250
251 #[tokio::test]
252 async fn in_memory_write_read() {
253 let storage = InMemoryStorage::new();
254 let payload = Bytes::from_static(b"hello world");
255 storage.write(payload.clone()).await.unwrap();
256
257 let size = storage.size().await.unwrap();
258 assert_eq!(size, 11);
259
260 let read = storage.read_range(0..5).await.unwrap();
261 assert_eq!(&read[..], b"hello");
262 }
263
264 #[tokio::test]
265 async fn in_memory_out_of_range() {
266 let storage = InMemoryStorage::from_bytes(vec![1, 2, 3]);
267 let result = storage.read_range(0..10).await;
268 assert!(result.is_err());
269 }
270
271 #[tokio::test]
272 async fn in_memory_overwrite() {
273 let storage = InMemoryStorage::new();
274 storage.write(Bytes::from_static(b"first")).await.unwrap();
275 storage.write(Bytes::from_static(b"second")).await.unwrap();
276 let size = storage.size().await.unwrap();
277 assert_eq!(size, 6);
278 let data = storage.read_range(0..6).await.unwrap();
279 assert_eq!(&data[..], b"second");
280 }
281
282 #[tokio::test]
283 async fn in_memory_multipart_streams_chunks() {
284 let storage = InMemoryStorage::from_bytes(b"stale".to_vec());
285 let mut writer = storage.write_multipart().await.unwrap();
286 writer
287 .write_chunk(Bytes::from_static(b"hello "))
288 .await
289 .unwrap();
290 writer
291 .write_chunk(Bytes::from_static(b"streaming "))
292 .await
293 .unwrap();
294 writer
295 .write_chunk(Bytes::from_static(b"world"))
296 .await
297 .unwrap();
298 writer.complete().await.unwrap();
299
300 assert_eq!(storage.size().await.unwrap(), 21);
301 let data = storage.read_range(0..21).await.unwrap();
302 assert_eq!(&data[..], b"hello streaming world");
303 }
304}