1use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use futures::{StreamExt, stream::BoxStream};
7use object_store::PutMultipartOptions;
8use object_store::{
9 Attributes, CopyOptions, Error, GetOptions, GetResult, GetResultPayload, ListResult,
10 MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result,
11 path::Path,
12};
13use std::collections::{BTreeMap, BTreeSet};
14use std::ops::Range;
15use std::sync::Arc;
16use std::sync::Mutex;
17use std::sync::RwLock;
18use std::sync::atomic::{AtomicUsize, Ordering};
19
20#[derive(Debug, Default)]
56pub struct MockStore {
57 storage: SharedStorage,
58}
59
60#[derive(Debug, thiserror::Error)]
62enum MockStoreError {
63 #[error("No data in memory found. Location: {path}")]
64 NoDataInMemory { path: String },
65
66 #[error("Object already exists at that location: {path}")]
67 AlreadyExists { path: String },
68
69 #[error("Invalid range")]
70 InvalidGetRange,
71}
72
73impl From<MockStoreError> for object_store::Error {
74 fn from(source: MockStoreError) -> Self {
75 match source {
76 MockStoreError::NoDataInMemory { ref path } => Self::NotFound {
77 path: path.into(),
78 source: source.into(),
79 },
80 MockStoreError::AlreadyExists { ref path } => Self::AlreadyExists {
81 path: path.into(),
82 source: source.into(),
83 },
84 _ => Self::Generic {
85 store: "MockStore",
86 source: Box::new(source),
87 },
88 }
89 }
90}
91
92#[derive(Debug, Clone)]
93struct Entry {
94 data: Bytes,
95 last_modified: DateTime<Utc>,
96 attributes: Attributes,
97 e_tag: usize,
98 access_count: Arc<AtomicUsize>,
99 access_ranges: Arc<Mutex<Vec<Range<u64>>>>,
100}
101
102impl Entry {
103 fn new(
104 data: Bytes,
105 last_modified: DateTime<Utc>,
106 e_tag: usize,
107 attributes: Attributes,
108 ) -> Self {
109 Self {
110 data,
111 last_modified,
112 e_tag,
113 attributes,
114 access_count: Arc::new(AtomicUsize::new(0)),
115 access_ranges: Arc::new(Mutex::new(Vec::new())),
116 }
117 }
118}
119
120#[derive(Debug, Default, Clone)]
121struct Storage {
122 next_etag: usize,
123 map: BTreeMap<Path, Entry>,
124}
125
126type SharedStorage = Arc<RwLock<Storage>>;
127
128impl Storage {
129 fn insert(&mut self, location: &Path, bytes: Bytes, attributes: Attributes) -> usize {
130 let etag = self.next_etag;
131 self.next_etag += 1;
132 let entry = Entry::new(bytes, Utc::now(), etag, attributes);
133 self.overwrite(location, entry);
134 etag
135 }
136
137 fn overwrite(&mut self, location: &Path, entry: Entry) {
138 self.map.insert(location.clone(), entry);
139 }
140
141 fn create(&mut self, location: &Path, entry: Entry) -> Result<()> {
142 use std::collections::btree_map;
143 match self.map.entry(location.clone()) {
144 btree_map::Entry::Occupied(_) => Err(MockStoreError::AlreadyExists {
145 path: location.to_string(),
146 }
147 .into()),
148 btree_map::Entry::Vacant(v) => {
149 v.insert(entry);
150 Ok(())
151 }
152 }
153 }
154}
155
156impl std::fmt::Display for MockStore {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 write!(f, "MockStore")
159 }
160}
161
162impl MockStore {
163 pub fn new() -> Self {
165 Self::default()
166 }
167
168 pub fn new_with_files(file_count: usize, file_size: usize) -> Self {
170 let store = Self::new();
171 {
172 let mut storage = store.storage.write().unwrap();
173 let data = vec![0u8; file_size];
174
175 let data: Vec<u8> = data
178 .iter()
179 .enumerate()
180 .map(|(i, _)| (i % 256) as u8)
181 .collect();
182
183 for file_name in 0..file_count {
184 let path = Path::from(format!("{file_name}.parquet"));
185 storage.insert(&path, Bytes::from(data.clone()), Attributes::default());
186 }
187 }
188 store
189 }
190
191 pub fn fork(&self) -> Self {
194 let storage = self.storage.read().unwrap();
195 let storage = Arc::new(RwLock::new(storage.clone()));
196 Self { storage }
197 }
198
199 pub fn get_access_count(&self, location: &Path) -> Option<usize> {
201 self.storage
202 .read()
203 .unwrap()
204 .map
205 .get(location)
206 .map(|entry| entry.access_count.load(Ordering::SeqCst))
207 }
208
209 pub fn get_access_ranges(&self, location: &Path) -> Option<Vec<Range<u64>>> {
211 self.storage
212 .read()
213 .unwrap()
214 .map
215 .get(location)
216 .map(|entry| entry.access_ranges.lock().unwrap().clone())
217 }
218
219 pub fn get_file_count(&self) -> usize {
221 self.storage.read().unwrap().map.len()
222 }
223
224 pub fn get_store_size(&self) -> usize {
226 self.storage
227 .read()
228 .unwrap()
229 .map
230 .values()
231 .map(|entry| entry.data.len())
232 .sum()
233 }
234
235 fn entry(&self, location: &Path) -> Result<Entry> {
236 let storage = self.storage.read().unwrap();
237 let value =
238 storage
239 .map
240 .get(location)
241 .cloned()
242 .ok_or_else(|| MockStoreError::NoDataInMemory {
243 path: location.to_string(),
244 })?;
245
246 Ok(value)
247 }
248}
249
250#[async_trait]
251impl ObjectStore for MockStore {
252 async fn put_opts(
253 &self,
254 location: &Path,
255 payload: PutPayload,
256 opts: PutOptions,
257 ) -> Result<PutResult> {
258 let mut storage = self.storage.write().unwrap();
259 let etag = storage.next_etag;
260 let entry = Entry::new(payload.into(), Utc::now(), etag, opts.attributes);
261
262 match opts.mode {
263 PutMode::Overwrite => storage.overwrite(location, entry),
264 PutMode::Create => storage.create(location, entry)?,
265 PutMode::Update(_) => unreachable!("MockStore does not support update"),
266 }
267 storage.next_etag += 1;
268
269 Ok(PutResult {
270 e_tag: Some(etag.to_string()),
271 version: None,
272 })
273 }
274
275 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
276 let entry = self.entry(location)?;
277
278 let e_tag = entry.e_tag.to_string();
279
280 let meta = ObjectMeta {
281 location: location.clone(),
282 last_modified: entry.last_modified,
283 size: entry.data.len() as u64,
284 e_tag: Some(e_tag),
285 version: None,
286 };
287 options.check_preconditions(&meta)?;
288
289 if options.head {
290 let stream = futures::stream::empty().boxed();
291 return Ok(GetResult {
292 payload: GetResultPayload::Stream(stream),
293 attributes: entry.attributes,
294 meta,
295 range: 0..0,
296 });
297 }
298
299 entry.access_count.fetch_add(1, Ordering::SeqCst);
301
302 let (range, data) = match options.range {
303 Some(range) => {
304 let r = range
305 .as_range(entry.data.len() as u64)
306 .map_err(|_| Error::Generic {
307 store: "MockStore",
308 source: Box::new(MockStoreError::InvalidGetRange),
309 })?;
310 (
311 r.clone(),
312 entry.data.slice(r.start as usize..r.end as usize),
313 )
314 }
315 None => (0..entry.data.len() as u64, entry.data),
316 };
317 entry.access_ranges.lock().unwrap().push(range.clone());
318 let stream = futures::stream::once(futures::future::ready(Ok(data)));
319
320 Ok(GetResult {
321 payload: GetResultPayload::Stream(stream.boxed()),
322 attributes: entry.attributes,
323 meta,
324 range,
325 })
326 }
327
328 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
329 let root = Path::default();
330 let prefix = prefix.unwrap_or(&root);
331
332 let storage = self.storage.read().unwrap();
333 let values: Vec<_> = storage
334 .map
335 .range((prefix)..)
336 .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
337 .filter(|(key, _)| {
338 key.prefix_match(prefix)
340 .map(|mut x| x.next().is_some())
341 .unwrap_or(false)
342 })
343 .map(|(key, value)| {
344 Ok(ObjectMeta {
345 location: key.clone(),
346 last_modified: value.last_modified,
347 size: value.data.len() as u64,
348 e_tag: Some(value.e_tag.to_string()),
349 version: None,
350 })
351 })
352 .collect();
353
354 futures::stream::iter(values).boxed()
355 }
356
357 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
358 let root = Path::default();
359 let prefix = prefix.unwrap_or(&root);
360
361 let mut common_prefixes = BTreeSet::new();
362
363 let mut objects = vec![];
366 for (k, v) in self.storage.read().unwrap().map.range((prefix)..) {
367 if !k.as_ref().starts_with(prefix.as_ref()) {
368 break;
369 }
370
371 let mut parts = match k.prefix_match(prefix) {
372 Some(parts) => parts,
373 None => continue,
374 };
375
376 let common_prefix = match parts.next() {
378 Some(p) => p,
379 None => continue,
381 };
382
383 if parts.next().is_some() {
384 common_prefixes.insert(prefix.clone().join(common_prefix));
385 } else {
386 let object = ObjectMeta {
387 location: k.clone(),
388 last_modified: v.last_modified,
389 size: v.data.len() as u64,
390 e_tag: Some(v.e_tag.to_string()),
391 version: None,
392 };
393 objects.push(object);
394 }
395 }
396
397 Ok(ListResult {
398 objects,
399 common_prefixes: common_prefixes.into_iter().collect(),
400 })
401 }
402
403 async fn put_multipart_opts(
404 &self,
405 _location: &Path,
406 _opts: PutMultipartOptions,
407 ) -> Result<Box<dyn MultipartUpload>> {
408 unreachable!("MockStore does not support multipart upload")
409 }
410
411 fn delete_stream(
412 &self,
413 _locations: BoxStream<'static, Result<Path>>,
414 ) -> BoxStream<'static, Result<Path>> {
415 unreachable!("MockStore does not support delete")
416 }
417
418 async fn copy_opts(&self, _from: &Path, _to: &Path, _options: CopyOptions) -> Result<()> {
419 unreachable!("MockStore does not support copy")
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426 use futures::TryStreamExt;
427 use object_store::ObjectStoreExt;
428
429 async fn setup_test_store() -> MockStore {
430 let store = MockStore::new_with_files(10, 1024 * 10); let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
432 assert_eq!(paths.len(), 10, "Initial store should have 10 files");
433 store
434 }
435
436 #[tokio::test]
437 async fn test_new_with_files() {
438 let store = setup_test_store().await;
439 let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
440 assert_eq!(paths.len(), 10);
441
442 for (i, meta) in paths.iter().enumerate() {
444 let path = Path::from(format!("{i}.parquet"));
445 let loaded_meta = store.head(&path).await.unwrap();
446 assert_eq!(
447 loaded_meta.size,
448 1024 * 10,
449 "Expected size of 10KB, got {}",
450 loaded_meta.size
451 );
452 assert_eq!(
453 meta.location, path,
454 "Expected location to be {path}, got {}",
455 meta.location
456 );
457 }
458 }
459
460 #[tokio::test]
461 async fn test_get_opts() {
462 let store = setup_test_store().await;
463 let path = Path::from("1.parquet");
464
465 let options = GetOptions {
467 range: Some((0..(1024 * 10)).into()),
468 ..GetOptions::default()
469 };
470 let result = store.get_opts(&path, options).await.unwrap();
471 let bytes = result.bytes().await.unwrap();
472 assert_eq!(bytes.len(), 1024 * 10);
473
474 let options = GetOptions {
476 range: Some((1024..4096).into()),
477 ..GetOptions::default()
478 };
479 let result = store.get_opts(&path, options).await.unwrap();
480 let bytes = result.bytes().await.unwrap();
481 assert_eq!(bytes.len(), 3072);
482
483 let options = GetOptions {
485 range: Some((8192..12288).into()),
486 ..GetOptions::default()
487 };
488 let result = store.get_opts(&path, options).await.unwrap();
489 let bytes = result.bytes().await.unwrap();
490 assert_eq!(bytes.len(), 2048);
492
493 let options = GetOptions {
495 range: Some((20480..30720).into()),
496 ..GetOptions::default()
497 };
498 let err = store.get_opts(&path, options).await.unwrap_err();
499 assert!(
500 matches!(err, Error::Generic { .. }),
501 "Expected an error for out-of-bounds request, got {err:?}"
502 );
503 }
504
505 #[tokio::test]
506 async fn test_insert_and_list() {
507 let store = setup_test_store().await;
508
509 let new_path = Path::from("11.parquet");
511 let payload = PutPayload::from(Bytes::from_static(b"test data"));
512 store
513 .put_opts(&new_path, payload, PutOptions::default())
514 .await
515 .unwrap();
516
517 let paths: Vec<ObjectMeta> = store.list(None).try_collect().await.unwrap();
518 assert_eq!(
519 paths.len(),
520 11,
521 "Store should have 11 files after insertion"
522 );
523
524 let meta = store.head(&new_path).await.unwrap();
525 assert_eq!(meta.size, 9);
526 assert_eq!(meta.location, new_path);
527 }
528
529 #[tokio::test]
530 async fn test_list_uses_directories_correctly() {
531 let store = setup_test_store().await;
532 let folder_path = Path::from("folder/");
533 let file_path = Path::from("folder/file.parquet");
534 store
535 .put_opts(
536 &file_path,
537 PutPayload::from(Bytes::from_static(b"test")),
538 PutOptions::default(),
539 )
540 .await
541 .unwrap();
542
543 let list_result = store.list_with_delimiter(None).await.unwrap();
545 assert_eq!(list_result.objects.len(), 10, "Root should have 10 objects");
546 assert_eq!(
547 list_result.common_prefixes.len(),
548 1,
549 "Root should have 1 common prefix (folder)"
550 );
551 assert_eq!(
552 list_result.common_prefixes[0], folder_path,
553 "Common prefix should be 'folder/'"
554 );
555
556 let list_result = store.list_with_delimiter(Some(&folder_path)).await.unwrap();
558 assert_eq!(
559 list_result.objects.len(),
560 1,
561 "Folder should contain 1 object"
562 );
563 assert_eq!(
564 list_result.common_prefixes.len(),
565 0,
566 "Folder should have no common prefixes"
567 );
568 assert_eq!(list_result.objects[0].location, file_path);
569 }
570
571 #[tokio::test]
572 async fn test_fork() {
573 let original_store = setup_test_store().await;
574 let forked_store = original_store.fork();
575
576 original_store
578 .put_opts(
579 &Path::from("11.parquet"),
580 PutPayload::from(Bytes::from_static(b"new data")),
581 PutOptions::default(),
582 )
583 .await
584 .unwrap();
585
586 let original_paths: Vec<ObjectMeta> =
588 original_store.list(None).try_collect().await.unwrap();
589 assert_eq!(original_paths.len(), 11);
590
591 let forked_paths: Vec<ObjectMeta> = forked_store.list(None).try_collect().await.unwrap();
593 assert_eq!(
594 forked_paths.len(),
595 10,
596 "Forked store should not be affected by changes to the original"
597 );
598 }
599
600 #[tokio::test]
601 async fn test_access_count() {
602 let store = setup_test_store().await;
603 let path = Path::from("3.parquet");
604
605 let count = store.get_access_count(&path).unwrap();
606 assert_eq!(count, 0, "Initial access count should be 0, got {count}");
607
608 let _ = store.get_opts(&path, GetOptions::default()).await.unwrap();
610 let count = store.get_access_count(&path).unwrap();
611 assert_eq!(
612 count, 1,
613 "Access count should be 1 after one get, got {count}"
614 );
615
616 let _ = store.get_opts(&path, GetOptions::default()).await.unwrap();
618 let count = store.get_access_count(&path).unwrap();
619 assert_eq!(
620 count, 2,
621 "Access count should be 2 after two gets, got {count}"
622 );
623 }
624
625 #[tokio::test]
626 async fn test_store_metrics() {
627 let store = setup_test_store().await;
628
629 assert_eq!(store.get_file_count(), 10);
631 assert_eq!(store.get_store_size(), 10 * 1024 * 10);
632
633 let new_path = Path::from("new_file.parquet");
635 let new_data = Bytes::from_static(b"some new data");
636 let new_data_len = new_data.len();
637 store
638 .put_opts(&new_path, PutPayload::from(new_data), PutOptions::default())
639 .await
640 .unwrap();
641
642 assert_eq!(store.get_file_count(), 11);
643 assert_eq!(store.get_store_size(), 10 * 1024 * 10 + new_data_len);
644 }
645}