liquid_cache_common/
mock_store.rs

1//! Mock object store for testing purposes.
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use futures::{StreamExt, stream::BoxStream};
7use object_store::PutMultipartOptions;
8use object_store::{
9    Attributes, Error, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
10    ObjectMeta, ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result, path::Path,
11};
12use std::collections::{BTreeMap, BTreeSet};
13use std::ops::Range;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::sync::RwLock;
17use std::sync::atomic::{AtomicUsize, Ordering};
18
19/// In-memory storage for testing purposes.
20///
21/// This can be used as a mock object store for testing purposes instead of using a real object store like S3.
22/// It is not meant to be used in production.
23///
24/// # Usage
25///
26/// The following example shows how to create a store, list items, get an object,
27/// and put a new object.
28///
29/// ```rust
30/// # use liquid_cache_common::mock_store::MockStore;
31/// # use object_store::{path::Path, GetOptions, PutPayload, ObjectMeta, ObjectStore};
32/// # use futures::TryStreamExt;
33/// # use bytes::Bytes;
34/// # async fn test() -> Result<(), object_store::Error> {
35/// let store = MockStore::new_with_files(10, 1024 * 10); // 10 files of 10KB each
36/// let paths: Vec<ObjectMeta> = store.list(None).try_collect().await?;
37///
38/// let options = GetOptions {
39///     range: Some((0..(1024 * 10)).into()),
40///     ..Default::default()
41/// };
42/// let path = Path::from("1.parquet");
43/// let result = store.get_opts(&path, options).await?;
44/// let bytes = result.bytes().await?;
45///
46/// let path = Path::from("11.parquet");
47/// let payload = PutPayload::from(Bytes::from_static(b"test data"));
48/// store.put(&path, payload).await?;
49/// # Ok(())
50/// # }
51/// ```
52///
53#[derive(Debug, Default)]
54pub struct MockStore {
55    storage: SharedStorage,
56}
57
58/// A specialized `Error` for in-memory object store-related errors
59#[derive(Debug, thiserror::Error)]
60enum MockStoreError {
61    #[error("No data in memory found. Location: {path}")]
62    NoDataInMemory { path: String },
63
64    #[error("Object already exists at that location: {path}")]
65    AlreadyExists { path: String },
66
67    #[error("Invalid range")]
68    InvalidGetRange,
69}
70
71impl From<MockStoreError> for object_store::Error {
72    fn from(source: MockStoreError) -> Self {
73        match source {
74            MockStoreError::NoDataInMemory { ref path } => Self::NotFound {
75                path: path.into(),
76                source: source.into(),
77            },
78            MockStoreError::AlreadyExists { ref path } => Self::AlreadyExists {
79                path: path.into(),
80                source: source.into(),
81            },
82            _ => Self::Generic {
83                store: "MockStore",
84                source: Box::new(source),
85            },
86        }
87    }
88}
89
90#[derive(Debug, Clone)]
91struct Entry {
92    data: Bytes,
93    last_modified: DateTime<Utc>,
94    attributes: Attributes,
95    e_tag: usize,
96    access_count: Arc<AtomicUsize>,
97    access_ranges: Arc<Mutex<Vec<Range<u64>>>>,
98}
99
100impl Entry {
101    fn new(
102        data: Bytes,
103        last_modified: DateTime<Utc>,
104        e_tag: usize,
105        attributes: Attributes,
106    ) -> Self {
107        Self {
108            data,
109            last_modified,
110            e_tag,
111            attributes,
112            access_count: Arc::new(AtomicUsize::new(0)),
113            access_ranges: Arc::new(Mutex::new(Vec::new())),
114        }
115    }
116}
117
118#[derive(Debug, Default, Clone)]
119struct Storage {
120    next_etag: usize,
121    map: BTreeMap<Path, Entry>,
122}
123
124type SharedStorage = Arc<RwLock<Storage>>;
125
126impl Storage {
127    fn insert(&mut self, location: &Path, bytes: Bytes, attributes: Attributes) -> usize {
128        let etag = self.next_etag;
129        self.next_etag += 1;
130        let entry = Entry::new(bytes, Utc::now(), etag, attributes);
131        self.overwrite(location, entry);
132        etag
133    }
134
135    fn overwrite(&mut self, location: &Path, entry: Entry) {
136        self.map.insert(location.clone(), entry);
137    }
138
139    fn create(&mut self, location: &Path, entry: Entry) -> Result<()> {
140        use std::collections::btree_map;
141        match self.map.entry(location.clone()) {
142            btree_map::Entry::Occupied(_) => Err(MockStoreError::AlreadyExists {
143                path: location.to_string(),
144            }
145            .into()),
146            btree_map::Entry::Vacant(v) => {
147                v.insert(entry);
148                Ok(())
149            }
150        }
151    }
152}
153
154impl std::fmt::Display for MockStore {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        write!(f, "MockStore")
157    }
158}
159
160impl MockStore {
161    /// Create new in-memory storage.
162    pub fn new() -> Self {
163        Self::default()
164    }
165
166    /// Initialize the object store with preset values
167    pub fn new_with_files(file_count: usize, file_size: usize) -> Self {
168        let store = Self::new();
169        {
170            let mut storage = store.storage.write().unwrap();
171            let data = vec![0u8; file_size];
172
173            // Fill the data with a pattern: index % 256
174            // This makes it easy to verify ranges
175            let data: Vec<u8> = data
176                .iter()
177                .enumerate()
178                .map(|(i, _)| (i % 256) as u8)
179                .collect();
180
181            for file_name in 0..file_count {
182                let path = Path::from(format!("{file_name}.parquet"));
183                storage.insert(&path, Bytes::from(data.clone()), Attributes::default());
184            }
185        }
186        store
187    }
188
189    /// Creates a fork of the store, with the current content copied into the
190    /// new store.
191    pub fn fork(&self) -> Self {
192        let storage = self.storage.read().unwrap();
193        let storage = Arc::new(RwLock::new(storage.clone()));
194        Self { storage }
195    }
196
197    /// Get the access count(no. of calls to `get_opts`) for a specific file
198    pub fn get_access_count(&self, location: &Path) -> Option<usize> {
199        self.storage
200            .read()
201            .unwrap()
202            .map
203            .get(location)
204            .map(|entry| entry.access_count.load(Ordering::SeqCst))
205    }
206
207    /// Get a list of ranges that have been requested via `get_opts`
208    pub fn get_access_ranges(&self, location: &Path) -> Option<Vec<Range<u64>>> {
209        self.storage
210            .read()
211            .unwrap()
212            .map
213            .get(location)
214            .map(|entry| entry.access_ranges.lock().unwrap().clone())
215    }
216
217    /// Get the number of objects stored in the store
218    pub fn get_file_count(&self) -> usize {
219        self.storage.read().unwrap().map.len()
220    }
221
222    /// Get the total size of all objects in the store
223    pub fn get_store_size(&self) -> usize {
224        self.storage
225            .read()
226            .unwrap()
227            .map
228            .values()
229            .map(|entry| entry.data.len())
230            .sum()
231    }
232
233    fn entry(&self, location: &Path) -> Result<Entry> {
234        let storage = self.storage.read().unwrap();
235        let value =
236            storage
237                .map
238                .get(location)
239                .cloned()
240                .ok_or_else(|| MockStoreError::NoDataInMemory {
241                    path: location.to_string(),
242                })?;
243
244        Ok(value)
245    }
246}
247
248#[async_trait]
249impl ObjectStore for MockStore {
250    async fn put_opts(
251        &self,
252        location: &Path,
253        payload: PutPayload,
254        opts: PutOptions,
255    ) -> Result<PutResult> {
256        let mut storage = self.storage.write().unwrap();
257        let etag = storage.next_etag;
258        let entry = Entry::new(payload.into(), Utc::now(), etag, opts.attributes);
259
260        match opts.mode {
261            PutMode::Overwrite => storage.overwrite(location, entry),
262            PutMode::Create => storage.create(location, entry)?,
263            PutMode::Update(_) => unreachable!("MockStore does not support update"),
264        }
265        storage.next_etag += 1;
266
267        Ok(PutResult {
268            e_tag: Some(etag.to_string()),
269            version: None,
270        })
271    }
272
273    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
274        let entry = self.entry(location)?;
275
276        // Atomically increment the count. This is a fast, lock-free operation.
277        entry.access_count.fetch_add(1, Ordering::SeqCst);
278
279        let e_tag = entry.e_tag.to_string();
280
281        let meta = ObjectMeta {
282            location: location.clone(),
283            last_modified: entry.last_modified,
284            size: entry.data.len() as u64,
285            e_tag: Some(e_tag),
286            version: None,
287        };
288        options.check_preconditions(&meta)?;
289
290        let (range, data) = match options.range {
291            Some(range) => {
292                let r = range
293                    .as_range(entry.data.len() as u64)
294                    .map_err(|_| Error::Generic {
295                        store: "MockStore",
296                        source: Box::new(MockStoreError::InvalidGetRange),
297                    })?;
298                (
299                    r.clone(),
300                    entry.data.slice(r.start as usize..r.end as usize),
301                )
302            }
303            None => (0..entry.data.len() as u64, entry.data),
304        };
305        entry.access_ranges.lock().unwrap().push(range.clone());
306        let stream = futures::stream::once(futures::future::ready(Ok(data)));
307
308        Ok(GetResult {
309            payload: GetResultPayload::Stream(stream.boxed()),
310            attributes: entry.attributes,
311            meta,
312            range,
313        })
314    }
315
316    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
317        let entry = self.entry(location)?;
318
319        Ok(ObjectMeta {
320            location: location.clone(),
321            last_modified: entry.last_modified,
322            size: entry.data.len() as u64,
323            e_tag: Some(entry.e_tag.to_string()),
324            version: None,
325        })
326    }
327
328    async fn delete(&self, location: &Path) -> Result<()> {
329        self.storage.write().unwrap().map.remove(location);
330        Ok(())
331    }
332
333    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
334        let root = Path::default();
335        let prefix = prefix.unwrap_or(&root);
336
337        let storage = self.storage.read().unwrap();
338        let values: Vec<_> = storage
339            .map
340            .range((prefix)..)
341            .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
342            .filter(|(key, _)| {
343                // Don't return for exact prefix match
344                key.prefix_match(prefix)
345                    .map(|mut x| x.next().is_some())
346                    .unwrap_or(false)
347            })
348            .map(|(key, value)| {
349                Ok(ObjectMeta {
350                    location: key.clone(),
351                    last_modified: value.last_modified,
352                    size: value.data.len() as u64,
353                    e_tag: Some(value.e_tag.to_string()),
354                    version: None,
355                })
356            })
357            .collect();
358
359        futures::stream::iter(values).boxed()
360    }
361
362    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
363        let root = Path::default();
364        let prefix = prefix.unwrap_or(&root);
365
366        let mut common_prefixes = BTreeSet::new();
367
368        // Only objects in this base level should be returned in the
369        // response. Otherwise, we just collect the common prefixes.
370        let mut objects = vec![];
371        for (k, v) in self.storage.read().unwrap().map.range((prefix)..) {
372            if !k.as_ref().starts_with(prefix.as_ref()) {
373                break;
374            }
375
376            let mut parts = match k.prefix_match(prefix) {
377                Some(parts) => parts,
378                None => continue,
379            };
380
381            // Pop first element
382            let common_prefix = match parts.next() {
383                Some(p) => p,
384                // Should only return children of the prefix
385                None => continue,
386            };
387
388            if parts.next().is_some() {
389                common_prefixes.insert(prefix.child(common_prefix));
390            } else {
391                let object = ObjectMeta {
392                    location: k.clone(),
393                    last_modified: v.last_modified,
394                    size: v.data.len() as u64,
395                    e_tag: Some(v.e_tag.to_string()),
396                    version: None,
397                };
398                objects.push(object);
399            }
400        }
401
402        Ok(ListResult {
403            objects,
404            common_prefixes: common_prefixes.into_iter().collect(),
405        })
406    }
407
408    async fn put_multipart_opts(
409        &self,
410        _location: &Path,
411        _opts: PutMultipartOptions,
412    ) -> Result<Box<dyn MultipartUpload>> {
413        unreachable!("MockStore does not support multipart upload")
414    }
415
416    async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
417        unreachable!("MockStore does not support copy")
418    }
419
420    async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
421        unreachable!("MockStore does not support copy_if_not_exists")
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use futures::TryStreamExt;
429
430    async fn setup_test_store() -> MockStore {
431        let store = MockStore::new_with_files(10, 1024 * 10); // 10 files of 10KB
432        let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
433        assert_eq!(paths.len(), 10, "Initial store should have 10 files");
434        store
435    }
436
437    #[tokio::test]
438    async fn test_new_with_files() {
439        let store = setup_test_store().await;
440        let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
441        assert_eq!(paths.len(), 10);
442
443        // Test if the files are present and have the correct size
444        for (i, meta) in paths.iter().enumerate() {
445            let path = Path::from(format!("{i}.parquet"));
446            let loaded_meta = store.head(&path).await.unwrap();
447            assert_eq!(
448                loaded_meta.size,
449                1024 * 10,
450                "Expected size of 10KB, got {}",
451                loaded_meta.size
452            );
453            assert_eq!(
454                meta.location, path,
455                "Expected location to be {path}, got {}",
456                meta.location
457            );
458        }
459    }
460
461    #[tokio::test]
462    async fn test_get_opts() {
463        let store = setup_test_store().await;
464        let path = Path::from("1.parquet");
465
466        // Test for a range that is fully in bounds
467        let options = GetOptions {
468            range: Some((0..(1024 * 10)).into()),
469            ..GetOptions::default()
470        };
471        let result = store.get_opts(&path, options).await.unwrap();
472        let bytes = result.bytes().await.unwrap();
473        assert_eq!(bytes.len(), 1024 * 10);
474
475        // Test for a range that is partially in bounds
476        let options = GetOptions {
477            range: Some((1024..4096).into()),
478            ..GetOptions::default()
479        };
480        let result = store.get_opts(&path, options).await.unwrap();
481        let bytes = result.bytes().await.unwrap();
482        assert_eq!(bytes.len(), 3072);
483
484        // Test for a range that is partially out of bounds
485        let options = GetOptions {
486            range: Some((8192..12288).into()),
487            ..GetOptions::default()
488        };
489        let result = store.get_opts(&path, options).await.unwrap();
490        let bytes = result.bytes().await.unwrap();
491        // The store should return only the valid part of the range.
492        assert_eq!(bytes.len(), 2048);
493
494        // Test for a range that is fully out of bounds
495        let options = GetOptions {
496            range: Some((20480..30720).into()),
497            ..GetOptions::default()
498        };
499        let err = store.get_opts(&path, options).await.unwrap_err();
500        assert!(
501            matches!(err, Error::Generic { .. }),
502            "Expected an error for out-of-bounds request, got {err:?}"
503        );
504    }
505
506    #[tokio::test]
507    async fn test_insert_and_list() {
508        let store = setup_test_store().await;
509
510        // Test for a new file insertion
511        let new_path = Path::from("11.parquet");
512        let payload = PutPayload::from(Bytes::from_static(b"test data"));
513        store
514            .put_opts(&new_path, payload, PutOptions::default())
515            .await
516            .unwrap();
517
518        let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
519        assert_eq!(
520            paths.len(),
521            11,
522            "Store should have 11 files after insertion"
523        );
524
525        let meta = store.head(&new_path).await.unwrap();
526        assert_eq!(meta.size, 9);
527        assert_eq!(meta.location, new_path);
528    }
529
530    #[tokio::test]
531    async fn test_delete() {
532        let store = setup_test_store().await;
533        let path_to_delete = Path::from("5.parquet");
534
535        store.delete(&path_to_delete).await.unwrap();
536
537        // Verify the file is deleted
538        let err = store.head(&path_to_delete).await.unwrap_err();
539        assert!(
540            matches!(err, Error::NotFound { .. }),
541            "Expected NotFound error after delete, but got {err:?}"
542        );
543
544        let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
545        assert_eq!(paths.len(), 9, "Store should have 9 files after deletion");
546    }
547
548    #[tokio::test]
549    async fn test_list_uses_directories_correctly() {
550        let store = setup_test_store().await;
551        let folder_path = Path::from("folder/");
552        let file_path = Path::from("folder/file.parquet");
553        store
554            .put_opts(
555                &file_path,
556                PutPayload::from(Bytes::from_static(b"test")),
557                PutOptions::default(),
558            )
559            .await
560            .unwrap();
561
562        // Check listing the root
563        let list_result = store.list_with_delimiter(None).await.unwrap();
564        assert_eq!(list_result.objects.len(), 10, "Root should have 10 objects");
565        assert_eq!(
566            list_result.common_prefixes.len(),
567            1,
568            "Root should have 1 common prefix (folder)"
569        );
570        assert_eq!(
571            list_result.common_prefixes[0], folder_path,
572            "Common prefix should be 'folder/'"
573        );
574
575        // Check listing the sub-directory
576        let list_result = store.list_with_delimiter(Some(&folder_path)).await.unwrap();
577        assert_eq!(
578            list_result.objects.len(),
579            1,
580            "Folder should contain 1 object"
581        );
582        assert_eq!(
583            list_result.common_prefixes.len(),
584            0,
585            "Folder should have no common prefixes"
586        );
587        assert_eq!(list_result.objects[0].location, file_path);
588    }
589
590    #[tokio::test]
591    async fn test_fork() {
592        let original_store = setup_test_store().await;
593        let forked_store = original_store.fork();
594
595        // Modify the original store
596        original_store
597            .put_opts(
598                &Path::from("11.parquet"),
599                PutPayload::from(Bytes::from_static(b"new data")),
600                PutOptions::default(),
601            )
602            .await
603            .unwrap();
604
605        // Verify the original store has changed
606        let original_paths: Vec<ObjectMeta> =
607            original_store.list(None).try_collect().await.unwrap();
608        assert_eq!(original_paths.len(), 11);
609
610        // Verify the forked store has NOT changed
611        let forked_paths: Vec<ObjectMeta> = forked_store.list(None).try_collect().await.unwrap();
612        assert_eq!(
613            forked_paths.len(),
614            10,
615            "Forked store should not be affected by changes to the original"
616        );
617    }
618
619    #[tokio::test]
620    async fn test_access_count() {
621        let store = setup_test_store().await;
622        let path = Path::from("3.parquet");
623
624        let count = store.get_access_count(&path).unwrap();
625        assert_eq!(count, 0, "Initial access count should be 0, got {count}");
626
627        // First get
628        let _ = store.get_opts(&path, GetOptions::default()).await.unwrap();
629        let count = store.get_access_count(&path).unwrap();
630        assert_eq!(
631            count, 1,
632            "Access count should be 1 after one get, got {count}"
633        );
634
635        // Second get
636        let _ = store.get_opts(&path, GetOptions::default()).await.unwrap();
637        let count = store.get_access_count(&path).unwrap();
638        assert_eq!(
639            count, 2,
640            "Access count should be 2 after two gets, got {count}"
641        );
642    }
643
644    #[tokio::test]
645    async fn test_store_metrics() {
646        let store = setup_test_store().await;
647
648        // Initial state from setup_test_store
649        assert_eq!(store.get_file_count(), 10);
650        assert_eq!(store.get_store_size(), 10 * 1024 * 10);
651
652        // Add a new file
653        let new_path = Path::from("new_file.parquet");
654        let new_data = Bytes::from_static(b"some new data");
655        let new_data_len = new_data.len();
656        store
657            .put_opts(&new_path, PutPayload::from(new_data), PutOptions::default())
658            .await
659            .unwrap();
660
661        assert_eq!(store.get_file_count(), 11);
662        assert_eq!(store.get_store_size(), 10 * 1024 * 10 + new_data_len);
663
664        // Delete one of the original files
665        let path_to_delete = Path::from("5.parquet");
666        store.delete(&path_to_delete).await.unwrap();
667
668        assert_eq!(store.get_file_count(), 10);
669        assert_eq!(
670            store.get_store_size(),
671            9 * 1024 * 10 + new_data_len,
672            "Store size should be reduced by the size of the deleted file"
673        );
674    }
675}