Skip to main content

liquid_cache_common/
mock_store.rs

1//! Mock object store for testing purposes.
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use futures::{StreamExt, stream::BoxStream};
7use object_store::PutMultipartOptions;
8use object_store::{
9    Attributes, CopyOptions, Error, GetOptions, GetResult, GetResultPayload, ListResult,
10    MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result,
11    path::Path,
12};
13use std::collections::{BTreeMap, BTreeSet};
14use std::ops::Range;
15use std::sync::Arc;
16use std::sync::Mutex;
17use std::sync::RwLock;
18use std::sync::atomic::{AtomicUsize, Ordering};
19
20/// In-memory storage for testing purposes.
21///
22/// This can be used as a mock object store for testing purposes instead of using a real object store like S3.
23/// It is not meant to be used in production.
24///
25/// # Usage
26///
27/// The following example shows how to create a store, list items, get an object,
28/// and put a new object.
29///
30/// ```rust
31/// use liquid_cache_common::mock_store::MockStore;
32/// use object_store::{path::Path, GetOptions, PutPayload, ObjectMeta, ObjectStore};
33/// use futures::TryStreamExt;
34/// use bytes::Bytes;
35/// use object_store::ObjectStoreExt;
36/// async fn test() -> Result<(), object_store::Error> {
37/// let store = MockStore::new_with_files(10, 1024 * 10); // 10 files of 10KB each
38/// let paths: Vec<ObjectMeta> = store.list(None).try_collect().await?;
39///
40/// let options = GetOptions {
41///     range: Some((0..(1024 * 10)).into()),
42///     ..Default::default()
43/// };
44/// let path = Path::from("1.parquet");
45/// let result = store.get_opts(&path, options).await?;
46/// let bytes = result.bytes().await?;
47///
48/// let path = Path::from("11.parquet");
49/// let payload = PutPayload::from(Bytes::from_static(b"test data"));
50/// store.put(&path, payload).await?;
51/// Ok(())
52/// }
53/// ```
54///
55#[derive(Debug, Default)]
56pub struct MockStore {
57    storage: SharedStorage,
58}
59
60/// A specialized `Error` for in-memory object store-related errors
61#[derive(Debug, thiserror::Error)]
62enum MockStoreError {
63    #[error("No data in memory found. Location: {path}")]
64    NoDataInMemory { path: String },
65
66    #[error("Object already exists at that location: {path}")]
67    AlreadyExists { path: String },
68
69    #[error("Invalid range")]
70    InvalidGetRange,
71}
72
73impl From<MockStoreError> for object_store::Error {
74    fn from(source: MockStoreError) -> Self {
75        match source {
76            MockStoreError::NoDataInMemory { ref path } => Self::NotFound {
77                path: path.into(),
78                source: source.into(),
79            },
80            MockStoreError::AlreadyExists { ref path } => Self::AlreadyExists {
81                path: path.into(),
82                source: source.into(),
83            },
84            _ => Self::Generic {
85                store: "MockStore",
86                source: Box::new(source),
87            },
88        }
89    }
90}
91
92#[derive(Debug, Clone)]
93struct Entry {
94    data: Bytes,
95    last_modified: DateTime<Utc>,
96    attributes: Attributes,
97    e_tag: usize,
98    access_count: Arc<AtomicUsize>,
99    access_ranges: Arc<Mutex<Vec<Range<u64>>>>,
100}
101
102impl Entry {
103    fn new(
104        data: Bytes,
105        last_modified: DateTime<Utc>,
106        e_tag: usize,
107        attributes: Attributes,
108    ) -> Self {
109        Self {
110            data,
111            last_modified,
112            e_tag,
113            attributes,
114            access_count: Arc::new(AtomicUsize::new(0)),
115            access_ranges: Arc::new(Mutex::new(Vec::new())),
116        }
117    }
118}
119
120#[derive(Debug, Default, Clone)]
121struct Storage {
122    next_etag: usize,
123    map: BTreeMap<Path, Entry>,
124}
125
126type SharedStorage = Arc<RwLock<Storage>>;
127
128impl Storage {
129    fn insert(&mut self, location: &Path, bytes: Bytes, attributes: Attributes) -> usize {
130        let etag = self.next_etag;
131        self.next_etag += 1;
132        let entry = Entry::new(bytes, Utc::now(), etag, attributes);
133        self.overwrite(location, entry);
134        etag
135    }
136
137    fn overwrite(&mut self, location: &Path, entry: Entry) {
138        self.map.insert(location.clone(), entry);
139    }
140
141    fn create(&mut self, location: &Path, entry: Entry) -> Result<()> {
142        use std::collections::btree_map;
143        match self.map.entry(location.clone()) {
144            btree_map::Entry::Occupied(_) => Err(MockStoreError::AlreadyExists {
145                path: location.to_string(),
146            }
147            .into()),
148            btree_map::Entry::Vacant(v) => {
149                v.insert(entry);
150                Ok(())
151            }
152        }
153    }
154}
155
156impl std::fmt::Display for MockStore {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        write!(f, "MockStore")
159    }
160}
161
162impl MockStore {
163    /// Create new in-memory storage.
164    pub fn new() -> Self {
165        Self::default()
166    }
167
168    /// Initialize the object store with preset values
169    pub fn new_with_files(file_count: usize, file_size: usize) -> Self {
170        let store = Self::new();
171        {
172            let mut storage = store.storage.write().unwrap();
173            let data = vec![0u8; file_size];
174
175            // Fill the data with a pattern: index % 256
176            // This makes it easy to verify ranges
177            let data: Vec<u8> = data
178                .iter()
179                .enumerate()
180                .map(|(i, _)| (i % 256) as u8)
181                .collect();
182
183            for file_name in 0..file_count {
184                let path = Path::from(format!("{file_name}.parquet"));
185                storage.insert(&path, Bytes::from(data.clone()), Attributes::default());
186            }
187        }
188        store
189    }
190
191    /// Creates a fork of the store, with the current content copied into the
192    /// new store.
193    pub fn fork(&self) -> Self {
194        let storage = self.storage.read().unwrap();
195        let storage = Arc::new(RwLock::new(storage.clone()));
196        Self { storage }
197    }
198
199    /// Get the access count(no. of calls to `get_opts`) for a specific file
200    pub fn get_access_count(&self, location: &Path) -> Option<usize> {
201        self.storage
202            .read()
203            .unwrap()
204            .map
205            .get(location)
206            .map(|entry| entry.access_count.load(Ordering::SeqCst))
207    }
208
209    /// Get a list of ranges that have been requested via `get_opts`
210    pub fn get_access_ranges(&self, location: &Path) -> Option<Vec<Range<u64>>> {
211        self.storage
212            .read()
213            .unwrap()
214            .map
215            .get(location)
216            .map(|entry| entry.access_ranges.lock().unwrap().clone())
217    }
218
219    /// Get the number of objects stored in the store
220    pub fn get_file_count(&self) -> usize {
221        self.storage.read().unwrap().map.len()
222    }
223
224    /// Get the total size of all objects in the store
225    pub fn get_store_size(&self) -> usize {
226        self.storage
227            .read()
228            .unwrap()
229            .map
230            .values()
231            .map(|entry| entry.data.len())
232            .sum()
233    }
234
235    fn entry(&self, location: &Path) -> Result<Entry> {
236        let storage = self.storage.read().unwrap();
237        let value =
238            storage
239                .map
240                .get(location)
241                .cloned()
242                .ok_or_else(|| MockStoreError::NoDataInMemory {
243                    path: location.to_string(),
244                })?;
245
246        Ok(value)
247    }
248}
249
250#[async_trait]
251impl ObjectStore for MockStore {
252    async fn put_opts(
253        &self,
254        location: &Path,
255        payload: PutPayload,
256        opts: PutOptions,
257    ) -> Result<PutResult> {
258        let mut storage = self.storage.write().unwrap();
259        let etag = storage.next_etag;
260        let entry = Entry::new(payload.into(), Utc::now(), etag, opts.attributes);
261
262        match opts.mode {
263            PutMode::Overwrite => storage.overwrite(location, entry),
264            PutMode::Create => storage.create(location, entry)?,
265            PutMode::Update(_) => unreachable!("MockStore does not support update"),
266        }
267        storage.next_etag += 1;
268
269        Ok(PutResult {
270            e_tag: Some(etag.to_string()),
271            version: None,
272        })
273    }
274
275    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
276        let entry = self.entry(location)?;
277
278        let e_tag = entry.e_tag.to_string();
279
280        let meta = ObjectMeta {
281            location: location.clone(),
282            last_modified: entry.last_modified,
283            size: entry.data.len() as u64,
284            e_tag: Some(e_tag),
285            version: None,
286        };
287        options.check_preconditions(&meta)?;
288
289        if options.head {
290            let stream = futures::stream::empty().boxed();
291            return Ok(GetResult {
292                payload: GetResultPayload::Stream(stream),
293                attributes: entry.attributes,
294                meta,
295                range: 0..0,
296            });
297        }
298
299        // Atomically increment the count. This is a fast, lock-free operation.
300        entry.access_count.fetch_add(1, Ordering::SeqCst);
301
302        let (range, data) = match options.range {
303            Some(range) => {
304                let r = range
305                    .as_range(entry.data.len() as u64)
306                    .map_err(|_| Error::Generic {
307                        store: "MockStore",
308                        source: Box::new(MockStoreError::InvalidGetRange),
309                    })?;
310                (
311                    r.clone(),
312                    entry.data.slice(r.start as usize..r.end as usize),
313                )
314            }
315            None => (0..entry.data.len() as u64, entry.data),
316        };
317        entry.access_ranges.lock().unwrap().push(range.clone());
318        let stream = futures::stream::once(futures::future::ready(Ok(data)));
319
320        Ok(GetResult {
321            payload: GetResultPayload::Stream(stream.boxed()),
322            attributes: entry.attributes,
323            meta,
324            range,
325        })
326    }
327
328    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
329        let root = Path::default();
330        let prefix = prefix.unwrap_or(&root);
331
332        let storage = self.storage.read().unwrap();
333        let values: Vec<_> = storage
334            .map
335            .range((prefix)..)
336            .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
337            .filter(|(key, _)| {
338                // Don't return for exact prefix match
339                key.prefix_match(prefix)
340                    .map(|mut x| x.next().is_some())
341                    .unwrap_or(false)
342            })
343            .map(|(key, value)| {
344                Ok(ObjectMeta {
345                    location: key.clone(),
346                    last_modified: value.last_modified,
347                    size: value.data.len() as u64,
348                    e_tag: Some(value.e_tag.to_string()),
349                    version: None,
350                })
351            })
352            .collect();
353
354        futures::stream::iter(values).boxed()
355    }
356
357    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
358        let root = Path::default();
359        let prefix = prefix.unwrap_or(&root);
360
361        let mut common_prefixes = BTreeSet::new();
362
363        // Only objects in this base level should be returned in the
364        // response. Otherwise, we just collect the common prefixes.
365        let mut objects = vec![];
366        for (k, v) in self.storage.read().unwrap().map.range((prefix)..) {
367            if !k.as_ref().starts_with(prefix.as_ref()) {
368                break;
369            }
370
371            let mut parts = match k.prefix_match(prefix) {
372                Some(parts) => parts,
373                None => continue,
374            };
375
376            // Pop first element
377            let common_prefix = match parts.next() {
378                Some(p) => p,
379                // Should only return children of the prefix
380                None => continue,
381            };
382
383            if parts.next().is_some() {
384                common_prefixes.insert(prefix.clone().join(common_prefix));
385            } else {
386                let object = ObjectMeta {
387                    location: k.clone(),
388                    last_modified: v.last_modified,
389                    size: v.data.len() as u64,
390                    e_tag: Some(v.e_tag.to_string()),
391                    version: None,
392                };
393                objects.push(object);
394            }
395        }
396
397        Ok(ListResult {
398            objects,
399            common_prefixes: common_prefixes.into_iter().collect(),
400        })
401    }
402
403    async fn put_multipart_opts(
404        &self,
405        _location: &Path,
406        _opts: PutMultipartOptions,
407    ) -> Result<Box<dyn MultipartUpload>> {
408        unreachable!("MockStore does not support multipart upload")
409    }
410
411    fn delete_stream(
412        &self,
413        _locations: BoxStream<'static, Result<Path>>,
414    ) -> BoxStream<'static, Result<Path>> {
415        unreachable!("MockStore does not support delete")
416    }
417
418    async fn copy_opts(&self, _from: &Path, _to: &Path, _options: CopyOptions) -> Result<()> {
419        unreachable!("MockStore does not support copy")
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426    use futures::TryStreamExt;
427    use object_store::ObjectStoreExt;
428
429    async fn setup_test_store() -> MockStore {
430        let store = MockStore::new_with_files(10, 1024 * 10); // 10 files of 10KB
431        let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
432        assert_eq!(paths.len(), 10, "Initial store should have 10 files");
433        store
434    }
435
436    #[tokio::test]
437    async fn test_new_with_files() {
438        let store = setup_test_store().await;
439        let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
440        assert_eq!(paths.len(), 10);
441
442        // Test if the files are present and have the correct size
443        for (i, meta) in paths.iter().enumerate() {
444            let path = Path::from(format!("{i}.parquet"));
445            let loaded_meta = store.head(&path).await.unwrap();
446            assert_eq!(
447                loaded_meta.size,
448                1024 * 10,
449                "Expected size of 10KB, got {}",
450                loaded_meta.size
451            );
452            assert_eq!(
453                meta.location, path,
454                "Expected location to be {path}, got {}",
455                meta.location
456            );
457        }
458    }
459
460    #[tokio::test]
461    async fn test_get_opts() {
462        let store = setup_test_store().await;
463        let path = Path::from("1.parquet");
464
465        // Test for a range that is fully in bounds
466        let options = GetOptions {
467            range: Some((0..(1024 * 10)).into()),
468            ..GetOptions::default()
469        };
470        let result = store.get_opts(&path, options).await.unwrap();
471        let bytes = result.bytes().await.unwrap();
472        assert_eq!(bytes.len(), 1024 * 10);
473
474        // Test for a range that is partially in bounds
475        let options = GetOptions {
476            range: Some((1024..4096).into()),
477            ..GetOptions::default()
478        };
479        let result = store.get_opts(&path, options).await.unwrap();
480        let bytes = result.bytes().await.unwrap();
481        assert_eq!(bytes.len(), 3072);
482
483        // Test for a range that is partially out of bounds
484        let options = GetOptions {
485            range: Some((8192..12288).into()),
486            ..GetOptions::default()
487        };
488        let result = store.get_opts(&path, options).await.unwrap();
489        let bytes = result.bytes().await.unwrap();
490        // The store should return only the valid part of the range.
491        assert_eq!(bytes.len(), 2048);
492
493        // Test for a range that is fully out of bounds
494        let options = GetOptions {
495            range: Some((20480..30720).into()),
496            ..GetOptions::default()
497        };
498        let err = store.get_opts(&path, options).await.unwrap_err();
499        assert!(
500            matches!(err, Error::Generic { .. }),
501            "Expected an error for out-of-bounds request, got {err:?}"
502        );
503    }
504
505    #[tokio::test]
506    async fn test_insert_and_list() {
507        let store = setup_test_store().await;
508
509        // Test for a new file insertion
510        let new_path = Path::from("11.parquet");
511        let payload = PutPayload::from(Bytes::from_static(b"test data"));
512        store
513            .put_opts(&new_path, payload, PutOptions::default())
514            .await
515            .unwrap();
516
517        let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
518        assert_eq!(
519            paths.len(),
520            11,
521            "Store should have 11 files after insertion"
522        );
523
524        let meta = store.head(&new_path).await.unwrap();
525        assert_eq!(meta.size, 9);
526        assert_eq!(meta.location, new_path);
527    }
528
529    #[tokio::test]
530    async fn test_list_uses_directories_correctly() {
531        let store = setup_test_store().await;
532        let folder_path = Path::from("folder/");
533        let file_path = Path::from("folder/file.parquet");
534        store
535            .put_opts(
536                &file_path,
537                PutPayload::from(Bytes::from_static(b"test")),
538                PutOptions::default(),
539            )
540            .await
541            .unwrap();
542
543        // Check listing the root
544        let list_result = store.list_with_delimiter(None).await.unwrap();
545        assert_eq!(list_result.objects.len(), 10, "Root should have 10 objects");
546        assert_eq!(
547            list_result.common_prefixes.len(),
548            1,
549            "Root should have 1 common prefix (folder)"
550        );
551        assert_eq!(
552            list_result.common_prefixes[0], folder_path,
553            "Common prefix should be 'folder/'"
554        );
555
556        // Check listing the sub-directory
557        let list_result = store.list_with_delimiter(Some(&folder_path)).await.unwrap();
558        assert_eq!(
559            list_result.objects.len(),
560            1,
561            "Folder should contain 1 object"
562        );
563        assert_eq!(
564            list_result.common_prefixes.len(),
565            0,
566            "Folder should have no common prefixes"
567        );
568        assert_eq!(list_result.objects[0].location, file_path);
569    }
570
571    #[tokio::test]
572    async fn test_fork() {
573        let original_store = setup_test_store().await;
574        let forked_store = original_store.fork();
575
576        // Modify the original store
577        original_store
578            .put_opts(
579                &Path::from("11.parquet"),
580                PutPayload::from(Bytes::from_static(b"new data")),
581                PutOptions::default(),
582            )
583            .await
584            .unwrap();
585
586        // Verify the original store has changed
587        let original_paths: Vec<ObjectMeta> =
588            original_store.list(None).try_collect().await.unwrap();
589        assert_eq!(original_paths.len(), 11);
590
591        // Verify the forked store has NOT changed
592        let forked_paths: Vec<ObjectMeta> = forked_store.list(None).try_collect().await.unwrap();
593        assert_eq!(
594            forked_paths.len(),
595            10,
596            "Forked store should not be affected by changes to the original"
597        );
598    }
599
600    #[tokio::test]
601    async fn test_access_count() {
602        let store = setup_test_store().await;
603        let path = Path::from("3.parquet");
604
605        let count = store.get_access_count(&path).unwrap();
606        assert_eq!(count, 0, "Initial access count should be 0, got {count}");
607
608        // First get
609        let _ = store.get_opts(&path, GetOptions::default()).await.unwrap();
610        let count = store.get_access_count(&path).unwrap();
611        assert_eq!(
612            count, 1,
613            "Access count should be 1 after one get, got {count}"
614        );
615
616        // Second get
617        let _ = store.get_opts(&path, GetOptions::default()).await.unwrap();
618        let count = store.get_access_count(&path).unwrap();
619        assert_eq!(
620            count, 2,
621            "Access count should be 2 after two gets, got {count}"
622        );
623    }
624
625    #[tokio::test]
626    async fn test_store_metrics() {
627        let store = setup_test_store().await;
628
629        // Initial state from setup_test_store
630        assert_eq!(store.get_file_count(), 10);
631        assert_eq!(store.get_store_size(), 10 * 1024 * 10);
632
633        // Add a new file
634        let new_path = Path::from("new_file.parquet");
635        let new_data = Bytes::from_static(b"some new data");
636        let new_data_len = new_data.len();
637        store
638            .put_opts(&new_path, PutPayload::from(new_data), PutOptions::default())
639            .await
640            .unwrap();
641
642        assert_eq!(store.get_file_count(), 11);
643        assert_eq!(store.get_store_size(), 10 * 1024 * 10 + new_data_len);
644    }
645}