1use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use futures::{StreamExt, stream::BoxStream};
7use object_store::PutMultipartOptions;
8use object_store::{
9 Attributes, Error, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
10 ObjectMeta, ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result, path::Path,
11};
12use std::collections::{BTreeMap, BTreeSet};
13use std::ops::Range;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::sync::RwLock;
17use std::sync::atomic::{AtomicUsize, Ordering};
18
19#[derive(Debug, Default)]
54pub struct MockStore {
55 storage: SharedStorage,
56}
57
58#[derive(Debug, thiserror::Error)]
60enum MockStoreError {
61 #[error("No data in memory found. Location: {path}")]
62 NoDataInMemory { path: String },
63
64 #[error("Object already exists at that location: {path}")]
65 AlreadyExists { path: String },
66
67 #[error("Invalid range")]
68 InvalidGetRange,
69}
70
71impl From<MockStoreError> for object_store::Error {
72 fn from(source: MockStoreError) -> Self {
73 match source {
74 MockStoreError::NoDataInMemory { ref path } => Self::NotFound {
75 path: path.into(),
76 source: source.into(),
77 },
78 MockStoreError::AlreadyExists { ref path } => Self::AlreadyExists {
79 path: path.into(),
80 source: source.into(),
81 },
82 _ => Self::Generic {
83 store: "MockStore",
84 source: Box::new(source),
85 },
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
91struct Entry {
92 data: Bytes,
93 last_modified: DateTime<Utc>,
94 attributes: Attributes,
95 e_tag: usize,
96 access_count: Arc<AtomicUsize>,
97 access_ranges: Arc<Mutex<Vec<Range<u64>>>>,
98}
99
100impl Entry {
101 fn new(
102 data: Bytes,
103 last_modified: DateTime<Utc>,
104 e_tag: usize,
105 attributes: Attributes,
106 ) -> Self {
107 Self {
108 data,
109 last_modified,
110 e_tag,
111 attributes,
112 access_count: Arc::new(AtomicUsize::new(0)),
113 access_ranges: Arc::new(Mutex::new(Vec::new())),
114 }
115 }
116}
117
118#[derive(Debug, Default, Clone)]
119struct Storage {
120 next_etag: usize,
121 map: BTreeMap<Path, Entry>,
122}
123
124type SharedStorage = Arc<RwLock<Storage>>;
125
126impl Storage {
127 fn insert(&mut self, location: &Path, bytes: Bytes, attributes: Attributes) -> usize {
128 let etag = self.next_etag;
129 self.next_etag += 1;
130 let entry = Entry::new(bytes, Utc::now(), etag, attributes);
131 self.overwrite(location, entry);
132 etag
133 }
134
135 fn overwrite(&mut self, location: &Path, entry: Entry) {
136 self.map.insert(location.clone(), entry);
137 }
138
139 fn create(&mut self, location: &Path, entry: Entry) -> Result<()> {
140 use std::collections::btree_map;
141 match self.map.entry(location.clone()) {
142 btree_map::Entry::Occupied(_) => Err(MockStoreError::AlreadyExists {
143 path: location.to_string(),
144 }
145 .into()),
146 btree_map::Entry::Vacant(v) => {
147 v.insert(entry);
148 Ok(())
149 }
150 }
151 }
152}
153
154impl std::fmt::Display for MockStore {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 write!(f, "MockStore")
157 }
158}
159
160impl MockStore {
161 pub fn new() -> Self {
163 Self::default()
164 }
165
166 pub fn new_with_files(file_count: usize, file_size: usize) -> Self {
168 let store = Self::new();
169 {
170 let mut storage = store.storage.write().unwrap();
171 let data = vec![0u8; file_size];
172
173 let data: Vec<u8> = data
176 .iter()
177 .enumerate()
178 .map(|(i, _)| (i % 256) as u8)
179 .collect();
180
181 for file_name in 0..file_count {
182 let path = Path::from(format!("{file_name}.parquet"));
183 storage.insert(&path, Bytes::from(data.clone()), Attributes::default());
184 }
185 }
186 store
187 }
188
189 pub fn fork(&self) -> Self {
192 let storage = self.storage.read().unwrap();
193 let storage = Arc::new(RwLock::new(storage.clone()));
194 Self { storage }
195 }
196
197 pub fn get_access_count(&self, location: &Path) -> Option<usize> {
199 self.storage
200 .read()
201 .unwrap()
202 .map
203 .get(location)
204 .map(|entry| entry.access_count.load(Ordering::SeqCst))
205 }
206
207 pub fn get_access_ranges(&self, location: &Path) -> Option<Vec<Range<u64>>> {
209 self.storage
210 .read()
211 .unwrap()
212 .map
213 .get(location)
214 .map(|entry| entry.access_ranges.lock().unwrap().clone())
215 }
216
217 pub fn get_file_count(&self) -> usize {
219 self.storage.read().unwrap().map.len()
220 }
221
222 pub fn get_store_size(&self) -> usize {
224 self.storage
225 .read()
226 .unwrap()
227 .map
228 .values()
229 .map(|entry| entry.data.len())
230 .sum()
231 }
232
233 fn entry(&self, location: &Path) -> Result<Entry> {
234 let storage = self.storage.read().unwrap();
235 let value =
236 storage
237 .map
238 .get(location)
239 .cloned()
240 .ok_or_else(|| MockStoreError::NoDataInMemory {
241 path: location.to_string(),
242 })?;
243
244 Ok(value)
245 }
246}
247
248#[async_trait]
249impl ObjectStore for MockStore {
250 async fn put_opts(
251 &self,
252 location: &Path,
253 payload: PutPayload,
254 opts: PutOptions,
255 ) -> Result<PutResult> {
256 let mut storage = self.storage.write().unwrap();
257 let etag = storage.next_etag;
258 let entry = Entry::new(payload.into(), Utc::now(), etag, opts.attributes);
259
260 match opts.mode {
261 PutMode::Overwrite => storage.overwrite(location, entry),
262 PutMode::Create => storage.create(location, entry)?,
263 PutMode::Update(_) => unreachable!("MockStore does not support update"),
264 }
265 storage.next_etag += 1;
266
267 Ok(PutResult {
268 e_tag: Some(etag.to_string()),
269 version: None,
270 })
271 }
272
273 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
274 let entry = self.entry(location)?;
275
276 entry.access_count.fetch_add(1, Ordering::SeqCst);
278
279 let e_tag = entry.e_tag.to_string();
280
281 let meta = ObjectMeta {
282 location: location.clone(),
283 last_modified: entry.last_modified,
284 size: entry.data.len() as u64,
285 e_tag: Some(e_tag),
286 version: None,
287 };
288 options.check_preconditions(&meta)?;
289
290 let (range, data) = match options.range {
291 Some(range) => {
292 let r = range
293 .as_range(entry.data.len() as u64)
294 .map_err(|_| Error::Generic {
295 store: "MockStore",
296 source: Box::new(MockStoreError::InvalidGetRange),
297 })?;
298 (
299 r.clone(),
300 entry.data.slice(r.start as usize..r.end as usize),
301 )
302 }
303 None => (0..entry.data.len() as u64, entry.data),
304 };
305 entry.access_ranges.lock().unwrap().push(range.clone());
306 let stream = futures::stream::once(futures::future::ready(Ok(data)));
307
308 Ok(GetResult {
309 payload: GetResultPayload::Stream(stream.boxed()),
310 attributes: entry.attributes,
311 meta,
312 range,
313 })
314 }
315
316 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
317 let entry = self.entry(location)?;
318
319 Ok(ObjectMeta {
320 location: location.clone(),
321 last_modified: entry.last_modified,
322 size: entry.data.len() as u64,
323 e_tag: Some(entry.e_tag.to_string()),
324 version: None,
325 })
326 }
327
328 async fn delete(&self, location: &Path) -> Result<()> {
329 self.storage.write().unwrap().map.remove(location);
330 Ok(())
331 }
332
333 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
334 let root = Path::default();
335 let prefix = prefix.unwrap_or(&root);
336
337 let storage = self.storage.read().unwrap();
338 let values: Vec<_> = storage
339 .map
340 .range((prefix)..)
341 .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
342 .filter(|(key, _)| {
343 key.prefix_match(prefix)
345 .map(|mut x| x.next().is_some())
346 .unwrap_or(false)
347 })
348 .map(|(key, value)| {
349 Ok(ObjectMeta {
350 location: key.clone(),
351 last_modified: value.last_modified,
352 size: value.data.len() as u64,
353 e_tag: Some(value.e_tag.to_string()),
354 version: None,
355 })
356 })
357 .collect();
358
359 futures::stream::iter(values).boxed()
360 }
361
362 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
363 let root = Path::default();
364 let prefix = prefix.unwrap_or(&root);
365
366 let mut common_prefixes = BTreeSet::new();
367
368 let mut objects = vec![];
371 for (k, v) in self.storage.read().unwrap().map.range((prefix)..) {
372 if !k.as_ref().starts_with(prefix.as_ref()) {
373 break;
374 }
375
376 let mut parts = match k.prefix_match(prefix) {
377 Some(parts) => parts,
378 None => continue,
379 };
380
381 let common_prefix = match parts.next() {
383 Some(p) => p,
384 None => continue,
386 };
387
388 if parts.next().is_some() {
389 common_prefixes.insert(prefix.child(common_prefix));
390 } else {
391 let object = ObjectMeta {
392 location: k.clone(),
393 last_modified: v.last_modified,
394 size: v.data.len() as u64,
395 e_tag: Some(v.e_tag.to_string()),
396 version: None,
397 };
398 objects.push(object);
399 }
400 }
401
402 Ok(ListResult {
403 objects,
404 common_prefixes: common_prefixes.into_iter().collect(),
405 })
406 }
407
408 async fn put_multipart_opts(
409 &self,
410 _location: &Path,
411 _opts: PutMultipartOptions,
412 ) -> Result<Box<dyn MultipartUpload>> {
413 unreachable!("MockStore does not support multipart upload")
414 }
415
416 async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
417 unreachable!("MockStore does not support copy")
418 }
419
420 async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
421 unreachable!("MockStore does not support copy_if_not_exists")
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use futures::TryStreamExt;
429
430 async fn setup_test_store() -> MockStore {
431 let store = MockStore::new_with_files(10, 1024 * 10); let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
433 assert_eq!(paths.len(), 10, "Initial store should have 10 files");
434 store
435 }
436
437 #[tokio::test]
438 async fn test_new_with_files() {
439 let store = setup_test_store().await;
440 let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
441 assert_eq!(paths.len(), 10);
442
443 for (i, meta) in paths.iter().enumerate() {
445 let path = Path::from(format!("{i}.parquet"));
446 let loaded_meta = store.head(&path).await.unwrap();
447 assert_eq!(
448 loaded_meta.size,
449 1024 * 10,
450 "Expected size of 10KB, got {}",
451 loaded_meta.size
452 );
453 assert_eq!(
454 meta.location, path,
455 "Expected location to be {path}, got {}",
456 meta.location
457 );
458 }
459 }
460
461 #[tokio::test]
462 async fn test_get_opts() {
463 let store = setup_test_store().await;
464 let path = Path::from("1.parquet");
465
466 let options = GetOptions {
468 range: Some((0..(1024 * 10)).into()),
469 ..GetOptions::default()
470 };
471 let result = store.get_opts(&path, options).await.unwrap();
472 let bytes = result.bytes().await.unwrap();
473 assert_eq!(bytes.len(), 1024 * 10);
474
475 let options = GetOptions {
477 range: Some((1024..4096).into()),
478 ..GetOptions::default()
479 };
480 let result = store.get_opts(&path, options).await.unwrap();
481 let bytes = result.bytes().await.unwrap();
482 assert_eq!(bytes.len(), 3072);
483
484 let options = GetOptions {
486 range: Some((8192..12288).into()),
487 ..GetOptions::default()
488 };
489 let result = store.get_opts(&path, options).await.unwrap();
490 let bytes = result.bytes().await.unwrap();
491 assert_eq!(bytes.len(), 2048);
493
494 let options = GetOptions {
496 range: Some((20480..30720).into()),
497 ..GetOptions::default()
498 };
499 let err = store.get_opts(&path, options).await.unwrap_err();
500 assert!(
501 matches!(err, Error::Generic { .. }),
502 "Expected an error for out-of-bounds request, got {err:?}"
503 );
504 }
505
506 #[tokio::test]
507 async fn test_insert_and_list() {
508 let store = setup_test_store().await;
509
510 let new_path = Path::from("11.parquet");
512 let payload = PutPayload::from(Bytes::from_static(b"test data"));
513 store
514 .put_opts(&new_path, payload, PutOptions::default())
515 .await
516 .unwrap();
517
518 let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
519 assert_eq!(
520 paths.len(),
521 11,
522 "Store should have 11 files after insertion"
523 );
524
525 let meta = store.head(&new_path).await.unwrap();
526 assert_eq!(meta.size, 9);
527 assert_eq!(meta.location, new_path);
528 }
529
530 #[tokio::test]
531 async fn test_delete() {
532 let store = setup_test_store().await;
533 let path_to_delete = Path::from("5.parquet");
534
535 store.delete(&path_to_delete).await.unwrap();
536
537 let err = store.head(&path_to_delete).await.unwrap_err();
539 assert!(
540 matches!(err, Error::NotFound { .. }),
541 "Expected NotFound error after delete, but got {err:?}"
542 );
543
544 let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
545 assert_eq!(paths.len(), 9, "Store should have 9 files after deletion");
546 }
547
548 #[tokio::test]
549 async fn test_list_uses_directories_correctly() {
550 let store = setup_test_store().await;
551 let folder_path = Path::from("folder/");
552 let file_path = Path::from("folder/file.parquet");
553 store
554 .put_opts(
555 &file_path,
556 PutPayload::from(Bytes::from_static(b"test")),
557 PutOptions::default(),
558 )
559 .await
560 .unwrap();
561
562 let list_result = store.list_with_delimiter(None).await.unwrap();
564 assert_eq!(list_result.objects.len(), 10, "Root should have 10 objects");
565 assert_eq!(
566 list_result.common_prefixes.len(),
567 1,
568 "Root should have 1 common prefix (folder)"
569 );
570 assert_eq!(
571 list_result.common_prefixes[0], folder_path,
572 "Common prefix should be 'folder/'"
573 );
574
575 let list_result = store.list_with_delimiter(Some(&folder_path)).await.unwrap();
577 assert_eq!(
578 list_result.objects.len(),
579 1,
580 "Folder should contain 1 object"
581 );
582 assert_eq!(
583 list_result.common_prefixes.len(),
584 0,
585 "Folder should have no common prefixes"
586 );
587 assert_eq!(list_result.objects[0].location, file_path);
588 }
589
590 #[tokio::test]
591 async fn test_fork() {
592 let original_store = setup_test_store().await;
593 let forked_store = original_store.fork();
594
595 original_store
597 .put_opts(
598 &Path::from("11.parquet"),
599 PutPayload::from(Bytes::from_static(b"new data")),
600 PutOptions::default(),
601 )
602 .await
603 .unwrap();
604
605 let original_paths: Vec<ObjectMeta> =
607 original_store.list(None).try_collect().await.unwrap();
608 assert_eq!(original_paths.len(), 11);
609
610 let forked_paths: Vec<ObjectMeta> = forked_store.list(None).try_collect().await.unwrap();
612 assert_eq!(
613 forked_paths.len(),
614 10,
615 "Forked store should not be affected by changes to the original"
616 );
617 }
618
619 #[tokio::test]
620 async fn test_access_count() {
621 let store = setup_test_store().await;
622 let path = Path::from("3.parquet");
623
624 let count = store.get_access_count(&path).unwrap();
625 assert_eq!(count, 0, "Initial access count should be 0, got {count}");
626
627 let _ = store.get_opts(&path, GetOptions::default()).await.unwrap();
629 let count = store.get_access_count(&path).unwrap();
630 assert_eq!(
631 count, 1,
632 "Access count should be 1 after one get, got {count}"
633 );
634
635 let _ = store.get_opts(&path, GetOptions::default()).await.unwrap();
637 let count = store.get_access_count(&path).unwrap();
638 assert_eq!(
639 count, 2,
640 "Access count should be 2 after two gets, got {count}"
641 );
642 }
643
644 #[tokio::test]
645 async fn test_store_metrics() {
646 let store = setup_test_store().await;
647
648 assert_eq!(store.get_file_count(), 10);
650 assert_eq!(store.get_store_size(), 10 * 1024 * 10);
651
652 let new_path = Path::from("new_file.parquet");
654 let new_data = Bytes::from_static(b"some new data");
655 let new_data_len = new_data.len();
656 store
657 .put_opts(&new_path, PutPayload::from(new_data), PutOptions::default())
658 .await
659 .unwrap();
660
661 assert_eq!(store.get_file_count(), 11);
662 assert_eq!(store.get_store_size(), 10 * 1024 * 10 + new_data_len);
663
664 let path_to_delete = Path::from("5.parquet");
666 store.delete(&path_to_delete).await.unwrap();
667
668 assert_eq!(store.get_file_count(), 10);
669 assert_eq!(
670 store.get_store_size(),
671 9 * 1024 * 10 + new_data_len,
672 "Store size should be reduced by the size of the deleted file"
673 );
674 }
675}