Skip to main content

object_store/
memory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An in-memory object store implementation
19use std::collections::{BTreeMap, BTreeSet, HashMap};
20use std::ops::Range;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use bytes::Bytes;
25use chrono::{DateTime, Utc};
26use futures_util::{StreamExt, stream::BoxStream};
27use parking_lot::RwLock;
28
29use crate::multipart::{MultipartStore, PartId};
30use crate::util::InvalidGetRange;
31use crate::{
32    Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload,
33    ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutResult, Result,
34    UpdateVersion, UploadPart, path::Path,
35};
36use crate::{CopyMode, CopyOptions, GetOptions, PutPayload};
37
38/// A specialized `Error` for in-memory object store-related errors
39#[derive(Debug, thiserror::Error)]
40enum Error {
41    #[error("No data in memory found. Location: {path}")]
42    NoDataInMemory { path: String },
43
44    #[error("Invalid range: {source}")]
45    Range { source: InvalidGetRange },
46
47    #[error("Object already exists at that location: {path}")]
48    AlreadyExists { path: String },
49
50    #[error("ETag required for conditional update")]
51    MissingETag,
52
53    #[error("MultipartUpload not found: {id}")]
54    UploadNotFound { id: String },
55
56    #[error("Missing part at index: {part}")]
57    MissingPart { part: usize },
58}
59
60impl From<Error> for super::Error {
61    fn from(source: Error) -> Self {
62        match source {
63            Error::NoDataInMemory { ref path } => Self::NotFound {
64                path: path.into(),
65                source: source.into(),
66            },
67            Error::AlreadyExists { ref path } => Self::AlreadyExists {
68                path: path.into(),
69                source: source.into(),
70            },
71            _ => Self::Generic {
72                store: "InMemory",
73                source: Box::new(source),
74            },
75        }
76    }
77}
78
79/// In-memory storage suitable for testing or for opting out of using a cloud
80/// storage provider.
81#[derive(Debug, Default, Clone)]
82pub struct InMemory {
83    storage: SharedStorage,
84}
85
86#[derive(Debug, Clone)]
87struct Entry {
88    data: Bytes,
89    last_modified: DateTime<Utc>,
90    attributes: Attributes,
91    e_tag: usize,
92}
93
94impl Entry {
95    fn new(
96        data: Bytes,
97        last_modified: DateTime<Utc>,
98        e_tag: usize,
99        attributes: Attributes,
100    ) -> Self {
101        Self {
102            data,
103            last_modified,
104            e_tag,
105            attributes,
106        }
107    }
108}
109
110#[derive(Debug, Default, Clone)]
111struct Storage {
112    next_etag: usize,
113    map: BTreeMap<Path, Entry>,
114    uploads: HashMap<usize, PartStorage>,
115}
116
117#[derive(Debug, Default, Clone)]
118struct PartStorage {
119    parts: Vec<Option<Bytes>>,
120}
121
122type SharedStorage = Arc<RwLock<Storage>>;
123
124impl Storage {
125    fn insert(&mut self, location: &Path, bytes: Bytes, attributes: Attributes) -> usize {
126        let etag = self.next_etag;
127        self.next_etag += 1;
128        let entry = Entry::new(bytes, Utc::now(), etag, attributes);
129        self.overwrite(location, entry);
130        etag
131    }
132
133    fn overwrite(&mut self, location: &Path, entry: Entry) {
134        self.map.insert(location.clone(), entry);
135    }
136
137    fn create(&mut self, location: &Path, entry: Entry) -> Result<()> {
138        use std::collections::btree_map;
139        match self.map.entry(location.clone()) {
140            btree_map::Entry::Occupied(_) => Err(Error::AlreadyExists {
141                path: location.to_string(),
142            }
143            .into()),
144            btree_map::Entry::Vacant(v) => {
145                v.insert(entry);
146                Ok(())
147            }
148        }
149    }
150
151    fn update(&mut self, location: &Path, v: UpdateVersion, entry: Entry) -> Result<()> {
152        match self.map.get_mut(location) {
153            // Return Precondition instead of NotFound for consistency with stores
154            None => Err(crate::Error::Precondition {
155                path: location.to_string(),
156                source: format!("Object at location {location} not found").into(),
157            }),
158            Some(e) => {
159                let existing = e.e_tag.to_string();
160                let expected = v.e_tag.ok_or(Error::MissingETag)?;
161                if existing == expected {
162                    *e = entry;
163                    Ok(())
164                } else {
165                    Err(crate::Error::Precondition {
166                        path: location.to_string(),
167                        source: format!("{existing} does not match {expected}").into(),
168                    })
169                }
170            }
171        }
172    }
173
174    fn upload_mut(&mut self, id: &MultipartId) -> Result<&mut PartStorage> {
175        let parts = id
176            .parse()
177            .ok()
178            .and_then(|x| self.uploads.get_mut(&x))
179            .ok_or_else(|| Error::UploadNotFound { id: id.into() })?;
180        Ok(parts)
181    }
182
183    fn remove_upload(&mut self, id: &MultipartId) -> Result<PartStorage> {
184        let parts = id
185            .parse()
186            .ok()
187            .and_then(|x| self.uploads.remove(&x))
188            .ok_or_else(|| Error::UploadNotFound { id: id.into() })?;
189        Ok(parts)
190    }
191}
192
193impl std::fmt::Display for InMemory {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        write!(f, "InMemory")
196    }
197}
198
199#[async_trait]
200impl ObjectStore for InMemory {
201    async fn put_opts(
202        &self,
203        location: &Path,
204        payload: PutPayload,
205        opts: PutOptions,
206    ) -> Result<PutResult> {
207        let mut storage = self.storage.write();
208        let etag = storage.next_etag;
209        let entry = Entry::new(payload.into(), Utc::now(), etag, opts.attributes);
210
211        match opts.mode {
212            PutMode::Overwrite => storage.overwrite(location, entry),
213            PutMode::Create => storage.create(location, entry)?,
214            PutMode::Update(v) => storage.update(location, v, entry)?,
215        }
216        storage.next_etag += 1;
217
218        Ok(PutResult {
219            e_tag: Some(etag.to_string()),
220            version: None,
221        })
222    }
223
224    async fn put_multipart_opts(
225        &self,
226        location: &Path,
227        opts: PutMultipartOptions,
228    ) -> Result<Box<dyn MultipartUpload>> {
229        Ok(Box::new(InMemoryUpload {
230            location: location.clone(),
231            attributes: opts.attributes,
232            parts: vec![],
233            storage: Arc::clone(&self.storage),
234        }))
235    }
236
237    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
238        let entry = self.entry(location)?;
239        let e_tag = entry.e_tag.to_string();
240
241        let meta = ObjectMeta {
242            location: location.clone(),
243            last_modified: entry.last_modified,
244            size: entry.data.len() as u64,
245            e_tag: Some(e_tag),
246            version: None,
247        };
248        options.check_preconditions(&meta)?;
249
250        let (range, data) = match options.range {
251            Some(range) => {
252                let r = range
253                    .as_range(entry.data.len() as u64)
254                    .map_err(|source| Error::Range { source })?;
255                (
256                    r.clone(),
257                    entry.data.slice(r.start as usize..r.end as usize),
258                )
259            }
260            None => (0..entry.data.len() as u64, entry.data),
261        };
262        let stream = futures_util::stream::once(futures_util::future::ready(Ok(data)));
263
264        Ok(GetResult {
265            payload: GetResultPayload::Stream(stream.boxed()),
266            attributes: entry.attributes,
267            meta,
268            range,
269        })
270    }
271
272    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
273        let entry = self.entry(location)?;
274        ranges
275            .iter()
276            .map(|range| {
277                let r = GetRange::Bounded(range.clone())
278                    .as_range(entry.data.len() as u64)
279                    .map_err(|source| Error::Range { source })?;
280                let r_end = usize::try_from(r.end).map_err(|_e| Error::Range {
281                    source: InvalidGetRange::TooLarge {
282                        requested: r.end,
283                        max: usize::MAX as u64,
284                    },
285                })?;
286                let r_start = usize::try_from(r.start).map_err(|_e| Error::Range {
287                    source: InvalidGetRange::TooLarge {
288                        requested: r.start,
289                        max: usize::MAX as u64,
290                    },
291                })?;
292                Ok(entry.data.slice(r_start..r_end))
293            })
294            .collect()
295    }
296
297    fn delete_stream(
298        &self,
299        locations: BoxStream<'static, Result<Path>>,
300    ) -> BoxStream<'static, Result<Path>> {
301        let storage = Arc::clone(&self.storage);
302        locations
303            .map(move |location| {
304                let location = location?;
305                storage.write().map.remove(&location);
306                Ok(location)
307            })
308            .boxed()
309    }
310
311    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
312        let root = Path::default();
313        let prefix = prefix.unwrap_or(&root);
314
315        let storage = self.storage.read();
316        let values: Vec<_> = storage
317            .map
318            .range((prefix)..)
319            .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
320            .filter(|(key, _)| {
321                // Don't return for exact prefix match
322                key.prefix_match(prefix)
323                    .map(|mut x| x.next().is_some())
324                    .unwrap_or(false)
325            })
326            .map(|(key, value)| {
327                Ok(ObjectMeta {
328                    location: key.clone(),
329                    last_modified: value.last_modified,
330                    size: value.data.len() as u64,
331                    e_tag: Some(value.e_tag.to_string()),
332                    version: None,
333                })
334            })
335            .collect();
336
337        futures_util::stream::iter(values).boxed()
338    }
339
340    /// The memory implementation returns all results, as opposed to the cloud
341    /// versions which limit their results to 1k or more because of API
342    /// limitations.
343    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
344        let root = Path::default();
345        let prefix = prefix.unwrap_or(&root);
346
347        let mut common_prefixes = BTreeSet::new();
348
349        // Only objects in this base level should be returned in the
350        // response. Otherwise, we just collect the common prefixes.
351        let mut objects = vec![];
352        for (k, v) in self.storage.read().map.range((prefix)..) {
353            if !k.as_ref().starts_with(prefix.as_ref()) {
354                break;
355            }
356
357            let mut parts = match k.prefix_match(prefix) {
358                Some(parts) => parts,
359                None => continue,
360            };
361
362            // Pop first element
363            let common_prefix = match parts.next() {
364                Some(p) => p,
365                // Should only return children of the prefix
366                None => continue,
367            };
368
369            if parts.next().is_some() {
370                common_prefixes.insert(prefix.clone().join(common_prefix));
371            } else {
372                let object = ObjectMeta {
373                    location: k.clone(),
374                    last_modified: v.last_modified,
375                    size: v.data.len() as u64,
376                    e_tag: Some(v.e_tag.to_string()),
377                    version: None,
378                };
379                objects.push(object);
380            }
381        }
382
383        Ok(ListResult {
384            objects,
385            common_prefixes: common_prefixes.into_iter().collect(),
386        })
387    }
388
389    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
390        let CopyOptions {
391            mode,
392            extensions: _,
393        } = options;
394
395        let entry = self.entry(from)?;
396        let mut storage = self.storage.write();
397
398        match mode {
399            CopyMode::Overwrite => {
400                storage.insert(to, entry.data, entry.attributes);
401            }
402            CopyMode::Create => {
403                if storage.map.contains_key(to) {
404                    return Err(Error::AlreadyExists {
405                        path: to.to_string(),
406                    }
407                    .into());
408                }
409                storage.insert(to, entry.data, entry.attributes);
410            }
411        }
412
413        Ok(())
414    }
415}
416
417#[async_trait]
418impl MultipartStore for InMemory {
419    async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
420        let mut storage = self.storage.write();
421        let etag = storage.next_etag;
422        storage.next_etag += 1;
423        storage.uploads.insert(etag, Default::default());
424        Ok(etag.to_string())
425    }
426
427    async fn put_part(
428        &self,
429        _path: &Path,
430        id: &MultipartId,
431        part_idx: usize,
432        payload: PutPayload,
433    ) -> Result<PartId> {
434        let mut storage = self.storage.write();
435        let upload = storage.upload_mut(id)?;
436        if part_idx <= upload.parts.len() {
437            upload.parts.resize(part_idx + 1, None);
438        }
439        upload.parts[part_idx] = Some(payload.into());
440        Ok(PartId {
441            content_id: Default::default(),
442        })
443    }
444
445    async fn complete_multipart(
446        &self,
447        path: &Path,
448        id: &MultipartId,
449        _parts: Vec<PartId>,
450    ) -> Result<PutResult> {
451        let mut storage = self.storage.write();
452        let upload = storage.remove_upload(id)?;
453
454        let mut cap = 0;
455        for (part, x) in upload.parts.iter().enumerate() {
456            cap += x.as_ref().ok_or(Error::MissingPart { part })?.len();
457        }
458        let mut buf = Vec::with_capacity(cap);
459        for x in &upload.parts {
460            buf.extend_from_slice(x.as_ref().unwrap())
461        }
462        let etag = storage.insert(path, buf.into(), Default::default());
463        Ok(PutResult {
464            e_tag: Some(etag.to_string()),
465            version: None,
466        })
467    }
468
469    async fn abort_multipart(&self, _path: &Path, id: &MultipartId) -> Result<()> {
470        self.storage.write().remove_upload(id)?;
471        Ok(())
472    }
473}
474
475impl InMemory {
476    /// Create new in-memory storage.
477    pub fn new() -> Self {
478        Self::default()
479    }
480
481    /// Creates a fork of the store, with the current content copied into the
482    /// new store.
483    pub fn fork(&self) -> Self {
484        let storage = self.storage.read();
485        let storage = Arc::new(RwLock::new(storage.clone()));
486        Self { storage }
487    }
488
489    fn entry(&self, location: &Path) -> Result<Entry> {
490        let storage = self.storage.read();
491        let value = storage
492            .map
493            .get(location)
494            .cloned()
495            .ok_or_else(|| Error::NoDataInMemory {
496                path: location.to_string(),
497            })?;
498
499        Ok(value)
500    }
501}
502
503#[derive(Debug)]
504struct InMemoryUpload {
505    location: Path,
506    attributes: Attributes,
507    parts: Vec<PutPayload>,
508    storage: Arc<RwLock<Storage>>,
509}
510
511#[async_trait]
512impl MultipartUpload for InMemoryUpload {
513    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
514        self.parts.push(payload);
515        Box::pin(futures_util::future::ready(Ok(())))
516    }
517
518    async fn complete(&mut self) -> Result<PutResult> {
519        let cap = self.parts.iter().map(|x| x.content_length()).sum();
520        let mut buf = Vec::with_capacity(cap);
521        let parts = self.parts.iter().flatten();
522        parts.for_each(|x| buf.extend_from_slice(x));
523        let etag = self.storage.write().insert(
524            &self.location,
525            buf.into(),
526            std::mem::take(&mut self.attributes),
527        );
528
529        Ok(PutResult {
530            e_tag: Some(etag.to_string()),
531            version: None,
532        })
533    }
534
535    async fn abort(&mut self) -> Result<()> {
536        Ok(())
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use crate::{ObjectStoreExt, integration::*};
543
544    use super::*;
545
546    #[tokio::test]
547    async fn in_memory_test() {
548        let integration = InMemory::new();
549
550        put_get_delete_list(&integration).await;
551        list_with_offset_exclusivity(&integration).await;
552        get_opts(&integration).await;
553        list_uses_directories_correctly(&integration).await;
554        list_with_delimiter(&integration).await;
555        rename_and_copy(&integration).await;
556        copy_if_not_exists(&integration).await;
557        stream_get(&integration).await;
558        put_opts(&integration, true).await;
559        multipart(&integration, &integration).await;
560        put_get_attributes(&integration).await;
561    }
562
563    #[tokio::test]
564    async fn box_test() {
565        let integration: Box<dyn ObjectStore> = Box::new(InMemory::new());
566
567        put_get_delete_list(&integration).await;
568        list_with_offset_exclusivity(&integration).await;
569        get_opts(&integration).await;
570        list_uses_directories_correctly(&integration).await;
571        list_with_delimiter(&integration).await;
572        rename_and_copy(&integration).await;
573        copy_if_not_exists(&integration).await;
574        stream_get(&integration).await;
575    }
576
577    #[tokio::test]
578    async fn arc_test() {
579        let integration: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
580
581        put_get_delete_list(&integration).await;
582        list_with_offset_exclusivity(&integration).await;
583        get_opts(&integration).await;
584        list_uses_directories_correctly(&integration).await;
585        list_with_delimiter(&integration).await;
586        rename_and_copy(&integration).await;
587        copy_if_not_exists(&integration).await;
588        stream_get(&integration).await;
589    }
590
591    #[tokio::test]
592    async fn unknown_length() {
593        let integration = InMemory::new();
594
595        let location = Path::from("some_file");
596
597        let data = Bytes::from("arbitrary data");
598
599        integration
600            .put(&location, data.clone().into())
601            .await
602            .unwrap();
603
604        let read_data = integration
605            .get(&location)
606            .await
607            .unwrap()
608            .bytes()
609            .await
610            .unwrap();
611        assert_eq!(&*read_data, data);
612    }
613
614    const NON_EXISTENT_NAME: &str = "nonexistentname";
615
616    #[tokio::test]
617    async fn nonexistent_location() {
618        let integration = InMemory::new();
619
620        let location = Path::from(NON_EXISTENT_NAME);
621
622        let err = get_nonexistent_object(&integration, Some(location))
623            .await
624            .unwrap_err();
625        if let crate::Error::NotFound { path, source } = err {
626            let source_variant = source.downcast_ref::<Error>();
627            assert!(
628                matches!(source_variant, Some(Error::NoDataInMemory { .. }),),
629                "got: {source_variant:?}"
630            );
631            assert_eq!(path, NON_EXISTENT_NAME);
632        } else {
633            panic!("unexpected error type: {err:?}");
634        }
635    }
636}