1use std::collections::{BTreeMap, BTreeSet, HashMap};
20use std::ops::Range;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use bytes::Bytes;
25use chrono::{DateTime, Utc};
26use futures_util::{StreamExt, stream::BoxStream};
27use parking_lot::RwLock;
28
29use crate::multipart::{MultipartStore, PartId};
30use crate::util::InvalidGetRange;
31use crate::{
32 Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload,
33 ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutResult, Result,
34 UpdateVersion, UploadPart, path::Path,
35};
36use crate::{CopyMode, CopyOptions, GetOptions, PutPayload};
37
38#[derive(Debug, thiserror::Error)]
40enum Error {
41 #[error("No data in memory found. Location: {path}")]
42 NoDataInMemory { path: String },
43
44 #[error("Invalid range: {source}")]
45 Range { source: InvalidGetRange },
46
47 #[error("Object already exists at that location: {path}")]
48 AlreadyExists { path: String },
49
50 #[error("ETag required for conditional update")]
51 MissingETag,
52
53 #[error("MultipartUpload not found: {id}")]
54 UploadNotFound { id: String },
55
56 #[error("Missing part at index: {part}")]
57 MissingPart { part: usize },
58}
59
60impl From<Error> for super::Error {
61 fn from(source: Error) -> Self {
62 match source {
63 Error::NoDataInMemory { ref path } => Self::NotFound {
64 path: path.into(),
65 source: source.into(),
66 },
67 Error::AlreadyExists { ref path } => Self::AlreadyExists {
68 path: path.into(),
69 source: source.into(),
70 },
71 _ => Self::Generic {
72 store: "InMemory",
73 source: Box::new(source),
74 },
75 }
76 }
77}
78
79#[derive(Debug, Default, Clone)]
82pub struct InMemory {
83 storage: SharedStorage,
84}
85
86#[derive(Debug, Clone)]
87struct Entry {
88 data: Bytes,
89 last_modified: DateTime<Utc>,
90 attributes: Attributes,
91 e_tag: usize,
92}
93
94impl Entry {
95 fn new(
96 data: Bytes,
97 last_modified: DateTime<Utc>,
98 e_tag: usize,
99 attributes: Attributes,
100 ) -> Self {
101 Self {
102 data,
103 last_modified,
104 e_tag,
105 attributes,
106 }
107 }
108}
109
110#[derive(Debug, Default, Clone)]
111struct Storage {
112 next_etag: usize,
113 map: BTreeMap<Path, Entry>,
114 uploads: HashMap<usize, PartStorage>,
115}
116
117#[derive(Debug, Default, Clone)]
118struct PartStorage {
119 parts: Vec<Option<Bytes>>,
120}
121
122type SharedStorage = Arc<RwLock<Storage>>;
123
124impl Storage {
125 fn insert(&mut self, location: &Path, bytes: Bytes, attributes: Attributes) -> usize {
126 let etag = self.next_etag;
127 self.next_etag += 1;
128 let entry = Entry::new(bytes, Utc::now(), etag, attributes);
129 self.overwrite(location, entry);
130 etag
131 }
132
133 fn overwrite(&mut self, location: &Path, entry: Entry) {
134 self.map.insert(location.clone(), entry);
135 }
136
137 fn create(&mut self, location: &Path, entry: Entry) -> Result<()> {
138 use std::collections::btree_map;
139 match self.map.entry(location.clone()) {
140 btree_map::Entry::Occupied(_) => Err(Error::AlreadyExists {
141 path: location.to_string(),
142 }
143 .into()),
144 btree_map::Entry::Vacant(v) => {
145 v.insert(entry);
146 Ok(())
147 }
148 }
149 }
150
151 fn update(&mut self, location: &Path, v: UpdateVersion, entry: Entry) -> Result<()> {
152 match self.map.get_mut(location) {
153 None => Err(crate::Error::Precondition {
155 path: location.to_string(),
156 source: format!("Object at location {location} not found").into(),
157 }),
158 Some(e) => {
159 let existing = e.e_tag.to_string();
160 let expected = v.e_tag.ok_or(Error::MissingETag)?;
161 if existing == expected {
162 *e = entry;
163 Ok(())
164 } else {
165 Err(crate::Error::Precondition {
166 path: location.to_string(),
167 source: format!("{existing} does not match {expected}").into(),
168 })
169 }
170 }
171 }
172 }
173
174 fn upload_mut(&mut self, id: &MultipartId) -> Result<&mut PartStorage> {
175 let parts = id
176 .parse()
177 .ok()
178 .and_then(|x| self.uploads.get_mut(&x))
179 .ok_or_else(|| Error::UploadNotFound { id: id.into() })?;
180 Ok(parts)
181 }
182
183 fn remove_upload(&mut self, id: &MultipartId) -> Result<PartStorage> {
184 let parts = id
185 .parse()
186 .ok()
187 .and_then(|x| self.uploads.remove(&x))
188 .ok_or_else(|| Error::UploadNotFound { id: id.into() })?;
189 Ok(parts)
190 }
191}
192
193impl std::fmt::Display for InMemory {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 write!(f, "InMemory")
196 }
197}
198
199#[async_trait]
200impl ObjectStore for InMemory {
201 async fn put_opts(
202 &self,
203 location: &Path,
204 payload: PutPayload,
205 opts: PutOptions,
206 ) -> Result<PutResult> {
207 let mut storage = self.storage.write();
208 let etag = storage.next_etag;
209 let entry = Entry::new(payload.into(), Utc::now(), etag, opts.attributes);
210
211 match opts.mode {
212 PutMode::Overwrite => storage.overwrite(location, entry),
213 PutMode::Create => storage.create(location, entry)?,
214 PutMode::Update(v) => storage.update(location, v, entry)?,
215 }
216 storage.next_etag += 1;
217
218 Ok(PutResult {
219 e_tag: Some(etag.to_string()),
220 version: None,
221 })
222 }
223
224 async fn put_multipart_opts(
225 &self,
226 location: &Path,
227 opts: PutMultipartOptions,
228 ) -> Result<Box<dyn MultipartUpload>> {
229 Ok(Box::new(InMemoryUpload {
230 location: location.clone(),
231 attributes: opts.attributes,
232 parts: vec![],
233 storage: Arc::clone(&self.storage),
234 }))
235 }
236
237 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
238 let entry = self.entry(location)?;
239 let e_tag = entry.e_tag.to_string();
240
241 let meta = ObjectMeta {
242 location: location.clone(),
243 last_modified: entry.last_modified,
244 size: entry.data.len() as u64,
245 e_tag: Some(e_tag),
246 version: None,
247 };
248 options.check_preconditions(&meta)?;
249
250 let (range, data) = match options.range {
251 Some(range) => {
252 let r = range
253 .as_range(entry.data.len() as u64)
254 .map_err(|source| Error::Range { source })?;
255 (
256 r.clone(),
257 entry.data.slice(r.start as usize..r.end as usize),
258 )
259 }
260 None => (0..entry.data.len() as u64, entry.data),
261 };
262 let stream = futures_util::stream::once(futures_util::future::ready(Ok(data)));
263
264 Ok(GetResult {
265 payload: GetResultPayload::Stream(stream.boxed()),
266 attributes: entry.attributes,
267 meta,
268 range,
269 })
270 }
271
272 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
273 let entry = self.entry(location)?;
274 ranges
275 .iter()
276 .map(|range| {
277 let r = GetRange::Bounded(range.clone())
278 .as_range(entry.data.len() as u64)
279 .map_err(|source| Error::Range { source })?;
280 let r_end = usize::try_from(r.end).map_err(|_e| Error::Range {
281 source: InvalidGetRange::TooLarge {
282 requested: r.end,
283 max: usize::MAX as u64,
284 },
285 })?;
286 let r_start = usize::try_from(r.start).map_err(|_e| Error::Range {
287 source: InvalidGetRange::TooLarge {
288 requested: r.start,
289 max: usize::MAX as u64,
290 },
291 })?;
292 Ok(entry.data.slice(r_start..r_end))
293 })
294 .collect()
295 }
296
297 fn delete_stream(
298 &self,
299 locations: BoxStream<'static, Result<Path>>,
300 ) -> BoxStream<'static, Result<Path>> {
301 let storage = Arc::clone(&self.storage);
302 locations
303 .map(move |location| {
304 let location = location?;
305 storage.write().map.remove(&location);
306 Ok(location)
307 })
308 .boxed()
309 }
310
311 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
312 let root = Path::default();
313 let prefix = prefix.unwrap_or(&root);
314
315 let storage = self.storage.read();
316 let values: Vec<_> = storage
317 .map
318 .range((prefix)..)
319 .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
320 .filter(|(key, _)| {
321 key.prefix_match(prefix)
323 .map(|mut x| x.next().is_some())
324 .unwrap_or(false)
325 })
326 .map(|(key, value)| {
327 Ok(ObjectMeta {
328 location: key.clone(),
329 last_modified: value.last_modified,
330 size: value.data.len() as u64,
331 e_tag: Some(value.e_tag.to_string()),
332 version: None,
333 })
334 })
335 .collect();
336
337 futures_util::stream::iter(values).boxed()
338 }
339
340 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
344 let root = Path::default();
345 let prefix = prefix.unwrap_or(&root);
346
347 let mut common_prefixes = BTreeSet::new();
348
349 let mut objects = vec![];
352 for (k, v) in self.storage.read().map.range((prefix)..) {
353 if !k.as_ref().starts_with(prefix.as_ref()) {
354 break;
355 }
356
357 let mut parts = match k.prefix_match(prefix) {
358 Some(parts) => parts,
359 None => continue,
360 };
361
362 let common_prefix = match parts.next() {
364 Some(p) => p,
365 None => continue,
367 };
368
369 if parts.next().is_some() {
370 common_prefixes.insert(prefix.clone().join(common_prefix));
371 } else {
372 let object = ObjectMeta {
373 location: k.clone(),
374 last_modified: v.last_modified,
375 size: v.data.len() as u64,
376 e_tag: Some(v.e_tag.to_string()),
377 version: None,
378 };
379 objects.push(object);
380 }
381 }
382
383 Ok(ListResult {
384 objects,
385 common_prefixes: common_prefixes.into_iter().collect(),
386 })
387 }
388
389 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
390 let CopyOptions {
391 mode,
392 extensions: _,
393 } = options;
394
395 let entry = self.entry(from)?;
396 let mut storage = self.storage.write();
397
398 match mode {
399 CopyMode::Overwrite => {
400 storage.insert(to, entry.data, entry.attributes);
401 }
402 CopyMode::Create => {
403 if storage.map.contains_key(to) {
404 return Err(Error::AlreadyExists {
405 path: to.to_string(),
406 }
407 .into());
408 }
409 storage.insert(to, entry.data, entry.attributes);
410 }
411 }
412
413 Ok(())
414 }
415}
416
417#[async_trait]
418impl MultipartStore for InMemory {
419 async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
420 let mut storage = self.storage.write();
421 let etag = storage.next_etag;
422 storage.next_etag += 1;
423 storage.uploads.insert(etag, Default::default());
424 Ok(etag.to_string())
425 }
426
427 async fn put_part(
428 &self,
429 _path: &Path,
430 id: &MultipartId,
431 part_idx: usize,
432 payload: PutPayload,
433 ) -> Result<PartId> {
434 let mut storage = self.storage.write();
435 let upload = storage.upload_mut(id)?;
436 if part_idx <= upload.parts.len() {
437 upload.parts.resize(part_idx + 1, None);
438 }
439 upload.parts[part_idx] = Some(payload.into());
440 Ok(PartId {
441 content_id: Default::default(),
442 })
443 }
444
445 async fn complete_multipart(
446 &self,
447 path: &Path,
448 id: &MultipartId,
449 _parts: Vec<PartId>,
450 ) -> Result<PutResult> {
451 let mut storage = self.storage.write();
452 let upload = storage.remove_upload(id)?;
453
454 let mut cap = 0;
455 for (part, x) in upload.parts.iter().enumerate() {
456 cap += x.as_ref().ok_or(Error::MissingPart { part })?.len();
457 }
458 let mut buf = Vec::with_capacity(cap);
459 for x in &upload.parts {
460 buf.extend_from_slice(x.as_ref().unwrap())
461 }
462 let etag = storage.insert(path, buf.into(), Default::default());
463 Ok(PutResult {
464 e_tag: Some(etag.to_string()),
465 version: None,
466 })
467 }
468
469 async fn abort_multipart(&self, _path: &Path, id: &MultipartId) -> Result<()> {
470 self.storage.write().remove_upload(id)?;
471 Ok(())
472 }
473}
474
475impl InMemory {
476 pub fn new() -> Self {
478 Self::default()
479 }
480
481 pub fn fork(&self) -> Self {
484 let storage = self.storage.read();
485 let storage = Arc::new(RwLock::new(storage.clone()));
486 Self { storage }
487 }
488
489 fn entry(&self, location: &Path) -> Result<Entry> {
490 let storage = self.storage.read();
491 let value = storage
492 .map
493 .get(location)
494 .cloned()
495 .ok_or_else(|| Error::NoDataInMemory {
496 path: location.to_string(),
497 })?;
498
499 Ok(value)
500 }
501}
502
503#[derive(Debug)]
504struct InMemoryUpload {
505 location: Path,
506 attributes: Attributes,
507 parts: Vec<PutPayload>,
508 storage: Arc<RwLock<Storage>>,
509}
510
511#[async_trait]
512impl MultipartUpload for InMemoryUpload {
513 fn put_part(&mut self, payload: PutPayload) -> UploadPart {
514 self.parts.push(payload);
515 Box::pin(futures_util::future::ready(Ok(())))
516 }
517
518 async fn complete(&mut self) -> Result<PutResult> {
519 let cap = self.parts.iter().map(|x| x.content_length()).sum();
520 let mut buf = Vec::with_capacity(cap);
521 let parts = self.parts.iter().flatten();
522 parts.for_each(|x| buf.extend_from_slice(x));
523 let etag = self.storage.write().insert(
524 &self.location,
525 buf.into(),
526 std::mem::take(&mut self.attributes),
527 );
528
529 Ok(PutResult {
530 e_tag: Some(etag.to_string()),
531 version: None,
532 })
533 }
534
535 async fn abort(&mut self) -> Result<()> {
536 Ok(())
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use crate::{ObjectStoreExt, integration::*};
543
544 use super::*;
545
546 #[tokio::test]
547 async fn in_memory_test() {
548 let integration = InMemory::new();
549
550 put_get_delete_list(&integration).await;
551 list_with_offset_exclusivity(&integration).await;
552 get_opts(&integration).await;
553 list_uses_directories_correctly(&integration).await;
554 list_with_delimiter(&integration).await;
555 rename_and_copy(&integration).await;
556 copy_if_not_exists(&integration).await;
557 stream_get(&integration).await;
558 put_opts(&integration, true).await;
559 multipart(&integration, &integration).await;
560 put_get_attributes(&integration).await;
561 }
562
563 #[tokio::test]
564 async fn box_test() {
565 let integration: Box<dyn ObjectStore> = Box::new(InMemory::new());
566
567 put_get_delete_list(&integration).await;
568 list_with_offset_exclusivity(&integration).await;
569 get_opts(&integration).await;
570 list_uses_directories_correctly(&integration).await;
571 list_with_delimiter(&integration).await;
572 rename_and_copy(&integration).await;
573 copy_if_not_exists(&integration).await;
574 stream_get(&integration).await;
575 }
576
577 #[tokio::test]
578 async fn arc_test() {
579 let integration: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
580
581 put_get_delete_list(&integration).await;
582 list_with_offset_exclusivity(&integration).await;
583 get_opts(&integration).await;
584 list_uses_directories_correctly(&integration).await;
585 list_with_delimiter(&integration).await;
586 rename_and_copy(&integration).await;
587 copy_if_not_exists(&integration).await;
588 stream_get(&integration).await;
589 }
590
591 #[tokio::test]
592 async fn unknown_length() {
593 let integration = InMemory::new();
594
595 let location = Path::from("some_file");
596
597 let data = Bytes::from("arbitrary data");
598
599 integration
600 .put(&location, data.clone().into())
601 .await
602 .unwrap();
603
604 let read_data = integration
605 .get(&location)
606 .await
607 .unwrap()
608 .bytes()
609 .await
610 .unwrap();
611 assert_eq!(&*read_data, data);
612 }
613
614 const NON_EXISTENT_NAME: &str = "nonexistentname";
615
616 #[tokio::test]
617 async fn nonexistent_location() {
618 let integration = InMemory::new();
619
620 let location = Path::from(NON_EXISTENT_NAME);
621
622 let err = get_nonexistent_object(&integration, Some(location))
623 .await
624 .unwrap_err();
625 if let crate::Error::NotFound { path, source } = err {
626 let source_variant = source.downcast_ref::<Error>();
627 assert!(
628 matches!(source_variant, Some(Error::NoDataInMemory { .. }),),
629 "got: {source_variant:?}"
630 );
631 assert_eq!(path, NON_EXISTENT_NAME);
632 } else {
633 panic!("unexpected error type: {err:?}");
634 }
635 }
636}