1use std::{collections::HashMap, sync::RwLock};
4
5use bytes::Bytes;
6
7use super::StorageBackend;
8use crate::error::{Error, Result};
9
10#[derive(Debug, Default)]
32pub struct MemoryBackend {
33 data: RwLock<HashMap<String, Bytes>>,
34}
35
36impl MemoryBackend {
37 pub fn new() -> Self {
39 Self {
40 data: RwLock::new(HashMap::new()),
41 }
42 }
43
44 pub fn with_data(data: HashMap<String, Bytes>) -> Self {
46 Self {
47 data: RwLock::new(data),
48 }
49 }
50
51 pub fn len(&self) -> usize {
53 self.data.read().map(|d| d.len()).unwrap_or(0)
54 }
55
56 pub fn is_empty(&self) -> bool {
58 self.len() == 0
59 }
60
61 pub fn clear(&self) {
63 if let Ok(mut data) = self.data.write() {
64 data.clear();
65 }
66 }
67}
68
69impl StorageBackend for MemoryBackend {
70 fn list(&self, prefix: &str) -> Result<Vec<String>> {
71 let data = self
72 .data
73 .read()
74 .map_err(|_| Error::storage("Failed to acquire read lock"))?;
75
76 let keys: Vec<String> = data
77 .keys()
78 .filter(|k| k.starts_with(prefix))
79 .cloned()
80 .collect();
81
82 Ok(keys)
83 }
84
85 fn get(&self, key: &str) -> Result<Bytes> {
86 let data = self
87 .data
88 .read()
89 .map_err(|_| Error::storage("Failed to acquire read lock"))?;
90
91 data.get(key)
92 .cloned()
93 .ok_or_else(|| Error::storage(format!("Key not found: {}", key)))
94 }
95
96 fn put(&self, key: &str, data: Bytes) -> Result<()> {
97 let mut store = self
98 .data
99 .write()
100 .map_err(|_| Error::storage("Failed to acquire write lock"))?;
101
102 store.insert(key.to_string(), data);
103 Ok(())
104 }
105
106 fn delete(&self, key: &str) -> Result<()> {
107 let mut data = self
108 .data
109 .write()
110 .map_err(|_| Error::storage("Failed to acquire write lock"))?;
111
112 data.remove(key);
113 Ok(())
114 }
115
116 fn exists(&self, key: &str) -> Result<bool> {
117 let data = self
118 .data
119 .read()
120 .map_err(|_| Error::storage("Failed to acquire read lock"))?;
121
122 Ok(data.contains_key(key))
123 }
124
125 fn size(&self, key: &str) -> Result<u64> {
126 let data = self
127 .data
128 .read()
129 .map_err(|_| Error::storage("Failed to acquire read lock"))?;
130
131 data.get(key)
132 .map(|d| d.len() as u64)
133 .ok_or_else(|| Error::storage(format!("Key not found: {}", key)))
134 }
135}
136
137impl Clone for MemoryBackend {
138 fn clone(&self) -> Self {
139 let data = self.data.read().map(|d| d.clone()).unwrap_or_default();
140 Self::with_data(data)
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147
148 #[test]
149 fn test_new() {
150 let backend = MemoryBackend::new();
151 assert!(backend.is_empty());
152 }
153
154 #[test]
155 fn test_put_and_get() {
156 let backend = MemoryBackend::new();
157
158 let data = Bytes::from("hello world");
159 backend
160 .put("key", data.clone())
161 .ok()
162 .unwrap_or_else(|| panic!("Should put"));
163
164 let retrieved = backend
165 .get("key")
166 .ok()
167 .unwrap_or_else(|| panic!("Should get"));
168 assert_eq!(retrieved, data);
169 }
170
171 #[test]
172 fn test_exists() {
173 let backend = MemoryBackend::new();
174
175 assert!(!backend
176 .exists("key")
177 .ok()
178 .unwrap_or_else(|| panic!("Should check")));
179
180 backend
181 .put("key", Bytes::from("data"))
182 .ok()
183 .unwrap_or_else(|| panic!("Should put"));
184
185 assert!(backend
186 .exists("key")
187 .ok()
188 .unwrap_or_else(|| panic!("Should check")));
189 }
190
191 #[test]
192 fn test_delete() {
193 let backend = MemoryBackend::new();
194
195 backend
196 .put("key", Bytes::from("data"))
197 .ok()
198 .unwrap_or_else(|| panic!("Should put"));
199
200 assert!(backend
201 .exists("key")
202 .ok()
203 .unwrap_or_else(|| panic!("Should exist")));
204
205 backend
206 .delete("key")
207 .ok()
208 .unwrap_or_else(|| panic!("Should delete"));
209
210 assert!(!backend
211 .exists("key")
212 .ok()
213 .unwrap_or_else(|| panic!("Should not exist")));
214 }
215
216 #[test]
217 fn test_list() {
218 let backend = MemoryBackend::new();
219
220 backend
221 .put("foo/bar", Bytes::from("a"))
222 .ok()
223 .unwrap_or_else(|| panic!("Should put"));
224 backend
225 .put("foo/baz", Bytes::from("b"))
226 .ok()
227 .unwrap_or_else(|| panic!("Should put"));
228 backend
229 .put("other", Bytes::from("c"))
230 .ok()
231 .unwrap_or_else(|| panic!("Should put"));
232
233 let foo_keys = backend
234 .list("foo/")
235 .ok()
236 .unwrap_or_else(|| panic!("Should list"));
237 assert_eq!(foo_keys.len(), 2);
238
239 let all_keys = backend
240 .list("")
241 .ok()
242 .unwrap_or_else(|| panic!("Should list"));
243 assert_eq!(all_keys.len(), 3);
244 }
245
246 #[test]
247 fn test_size() {
248 let backend = MemoryBackend::new();
249
250 let data = Bytes::from("1234567890"); backend
252 .put("key", data)
253 .ok()
254 .unwrap_or_else(|| panic!("Should put"));
255
256 let size = backend
257 .size("key")
258 .ok()
259 .unwrap_or_else(|| panic!("Should get size"));
260 assert_eq!(size, 10);
261 }
262
263 #[test]
264 fn test_get_nonexistent() {
265 let backend = MemoryBackend::new();
266 let result = backend.get("nonexistent");
267 assert!(result.is_err());
268 }
269
270 #[test]
271 fn test_len() {
272 let backend = MemoryBackend::new();
273 assert_eq!(backend.len(), 0);
274
275 backend
276 .put("a", Bytes::from("1"))
277 .ok()
278 .unwrap_or_else(|| panic!("Should put"));
279 backend
280 .put("b", Bytes::from("2"))
281 .ok()
282 .unwrap_or_else(|| panic!("Should put"));
283
284 assert_eq!(backend.len(), 2);
285 }
286
287 #[test]
288 fn test_clear() {
289 let backend = MemoryBackend::new();
290
291 backend
292 .put("a", Bytes::from("1"))
293 .ok()
294 .unwrap_or_else(|| panic!("Should put"));
295 backend
296 .put("b", Bytes::from("2"))
297 .ok()
298 .unwrap_or_else(|| panic!("Should put"));
299
300 assert_eq!(backend.len(), 2);
301
302 backend.clear();
303 assert!(backend.is_empty());
304 }
305
306 #[test]
307 fn test_with_data() {
308 let mut initial = HashMap::new();
309 initial.insert("key".to_string(), Bytes::from("value"));
310
311 let backend = MemoryBackend::with_data(initial);
312 assert_eq!(backend.len(), 1);
313 assert!(backend
314 .exists("key")
315 .ok()
316 .unwrap_or_else(|| panic!("Should check")));
317 }
318
319 #[test]
320 fn test_clone() {
321 let backend = MemoryBackend::new();
322 backend
323 .put("key", Bytes::from("value"))
324 .ok()
325 .unwrap_or_else(|| panic!("Should put"));
326
327 let cloned = backend;
328 assert_eq!(cloned.len(), 1);
329 assert_eq!(
330 cloned
331 .get("key")
332 .ok()
333 .unwrap_or_else(|| panic!("Should get")),
334 Bytes::from("value")
335 );
336 }
337
338 #[test]
339 fn test_delete_nonexistent_is_ok() {
340 let backend = MemoryBackend::new();
341 let result = backend.delete("nonexistent");
342 assert!(result.is_ok());
343 }
344
345 #[test]
346 fn test_size_nonexistent() {
347 let backend = MemoryBackend::new();
348 let result = backend.size("nonexistent");
349 assert!(result.is_err());
350 }
351
352 #[test]
353 fn test_default() {
354 let backend = MemoryBackend::default();
355 assert!(backend.is_empty());
356 }
357
358 #[test]
359 fn test_debug() {
360 let backend = MemoryBackend::new();
361 let debug_str = format!("{:?}", backend);
362 assert!(debug_str.contains("MemoryBackend"));
363 }
364
365 #[test]
366 fn test_list_with_prefix_filter() {
367 let backend = MemoryBackend::new();
368
369 backend
370 .put("data/train.parquet", Bytes::from("a"))
371 .ok()
372 .unwrap_or_else(|| panic!("Should put"));
373 backend
374 .put("data/test.parquet", Bytes::from("b"))
375 .ok()
376 .unwrap_or_else(|| panic!("Should put"));
377 backend
378 .put("metadata/info.json", Bytes::from("c"))
379 .ok()
380 .unwrap_or_else(|| panic!("Should put"));
381
382 let data_keys = backend
383 .list("data/")
384 .ok()
385 .unwrap_or_else(|| panic!("Should list"));
386 assert_eq!(data_keys.len(), 2);
387
388 let metadata_keys = backend
389 .list("metadata/")
390 .ok()
391 .unwrap_or_else(|| panic!("Should list"));
392 assert_eq!(metadata_keys.len(), 1);
393 }
394
395 #[test]
398 fn test_put_overwrite() {
399 let backend = MemoryBackend::new();
400
401 backend
402 .put("key", Bytes::from("original"))
403 .ok()
404 .unwrap_or_else(|| panic!("put 1"));
405 backend
406 .put("key", Bytes::from("updated"))
407 .ok()
408 .unwrap_or_else(|| panic!("put 2"));
409
410 let content = backend.get("key").ok().unwrap_or_else(|| panic!("get"));
411 assert_eq!(content, Bytes::from("updated"));
412 assert_eq!(backend.len(), 1);
413 }
414
415 #[test]
416 fn test_clone_independence() {
417 let backend = MemoryBackend::new();
418 backend
419 .put("key", Bytes::from("value"))
420 .ok()
421 .unwrap_or_else(|| panic!("put"));
422
423 let cloned = backend.clone();
424
425 backend
427 .put("new_key", Bytes::from("new_value"))
428 .ok()
429 .unwrap_or_else(|| panic!("put new"));
430
431 assert_eq!(backend.len(), 2);
433 assert_eq!(cloned.len(), 1);
434 }
435
436 #[test]
437 fn test_list_empty_prefix() {
438 let backend = MemoryBackend::new();
439 backend
440 .put("a", Bytes::from("1"))
441 .ok()
442 .unwrap_or_else(|| panic!("put"));
443 backend
444 .put("b", Bytes::from("2"))
445 .ok()
446 .unwrap_or_else(|| panic!("put"));
447 backend
448 .put("c", Bytes::from("3"))
449 .ok()
450 .unwrap_or_else(|| panic!("put"));
451
452 let all = backend.list("").ok().unwrap_or_else(|| panic!("list"));
453 assert_eq!(all.len(), 3);
454 }
455
456 #[test]
457 fn test_list_no_matches() {
458 let backend = MemoryBackend::new();
459 backend
460 .put("data/file.txt", Bytes::from("content"))
461 .ok()
462 .unwrap_or_else(|| panic!("put"));
463
464 let matches = backend
465 .list("nonexistent/")
466 .ok()
467 .unwrap_or_else(|| panic!("list"));
468 assert!(matches.is_empty());
469 }
470
471 #[test]
472 fn test_size_empty_value() {
473 let backend = MemoryBackend::new();
474 backend
475 .put("empty", Bytes::new())
476 .ok()
477 .unwrap_or_else(|| panic!("put"));
478
479 let size = backend.size("empty").ok().unwrap_or_else(|| panic!("size"));
480 assert_eq!(size, 0);
481 }
482
483 #[test]
484 fn test_is_empty_after_operations() {
485 let backend = MemoryBackend::new();
486 assert!(backend.is_empty());
487
488 backend
489 .put("key", Bytes::from("value"))
490 .ok()
491 .unwrap_or_else(|| panic!("put"));
492 assert!(!backend.is_empty());
493
494 backend
495 .delete("key")
496 .ok()
497 .unwrap_or_else(|| panic!("delete"));
498 assert!(backend.is_empty());
499 }
500
501 #[test]
502 fn test_many_keys() {
503 let backend = MemoryBackend::new();
504
505 for i in 0..100 {
506 backend
507 .put(&format!("key_{}", i), Bytes::from(format!("value_{}", i)))
508 .ok()
509 .unwrap_or_else(|| panic!("put"));
510 }
511
512 assert_eq!(backend.len(), 100);
513
514 for i in 0..100 {
516 assert!(backend
517 .exists(&format!("key_{}", i))
518 .ok()
519 .unwrap_or_else(|| panic!("exists")));
520 }
521 }
522
523 #[test]
524 fn test_delete_then_reput() {
525 let backend = MemoryBackend::new();
526
527 backend
528 .put("key", Bytes::from("v1"))
529 .ok()
530 .unwrap_or_else(|| panic!("put 1"));
531 backend
532 .delete("key")
533 .ok()
534 .unwrap_or_else(|| panic!("delete"));
535 backend
536 .put("key", Bytes::from("v2"))
537 .ok()
538 .unwrap_or_else(|| panic!("put 2"));
539
540 let content = backend.get("key").ok().unwrap_or_else(|| panic!("get"));
541 assert_eq!(content, Bytes::from("v2"));
542 }
543
544 #[test]
545 fn test_with_data_multiple() {
546 let mut initial = HashMap::new();
547 initial.insert("key1".to_string(), Bytes::from("value1"));
548 initial.insert("key2".to_string(), Bytes::from("value2"));
549 initial.insert("key3".to_string(), Bytes::from("value3"));
550
551 let backend = MemoryBackend::with_data(initial);
552 assert_eq!(backend.len(), 3);
553
554 for i in 1..=3 {
555 let content = backend
556 .get(&format!("key{}", i))
557 .ok()
558 .unwrap_or_else(|| panic!("get"));
559 assert_eq!(content, Bytes::from(format!("value{}", i)));
560 }
561 }
562
563 #[test]
564 fn test_large_value() {
565 let backend = MemoryBackend::new();
566
567 let data: Vec<u8> = (0..1_000_000).map(|i| (i % 256) as u8).collect();
569 backend
570 .put("large", Bytes::from(data.clone()))
571 .ok()
572 .unwrap_or_else(|| panic!("put"));
573
574 let retrieved = backend.get("large").ok().unwrap_or_else(|| panic!("get"));
575 assert_eq!(retrieved.len(), 1_000_000);
576 assert_eq!(&retrieved[..], &data[..]);
577 }
578
579 #[test]
580 fn test_clear_and_reuse() {
581 let backend = MemoryBackend::new();
582
583 backend
584 .put("old", Bytes::from("data"))
585 .ok()
586 .unwrap_or_else(|| panic!("put"));
587 backend.clear();
588
589 assert!(backend.is_empty());
590 assert!(!backend
591 .exists("old")
592 .ok()
593 .unwrap_or_else(|| panic!("exists")));
594
595 backend
596 .put("new", Bytes::from("data"))
597 .ok()
598 .unwrap_or_else(|| panic!("put"));
599 assert_eq!(backend.len(), 1);
600 }
601
602 #[test]
605 fn test_list_with_various_prefixes() {
606 let backend = MemoryBackend::new();
607
608 backend
609 .put("data/train/file1.parquet", Bytes::from("1"))
610 .ok()
611 .unwrap();
612 backend
613 .put("data/train/file2.parquet", Bytes::from("2"))
614 .ok()
615 .unwrap();
616 backend
617 .put("data/test/file1.parquet", Bytes::from("3"))
618 .ok()
619 .unwrap();
620 backend
621 .put("metadata/schema.json", Bytes::from("4"))
622 .ok()
623 .unwrap();
624
625 assert_eq!(backend.list("data/train/").ok().unwrap().len(), 2);
626 assert_eq!(backend.list("data/test/").ok().unwrap().len(), 1);
627 assert_eq!(backend.list("data/").ok().unwrap().len(), 3);
628 assert_eq!(backend.list("metadata/").ok().unwrap().len(), 1);
629 assert_eq!(backend.list("").ok().unwrap().len(), 4);
630 }
631
632 #[test]
633 fn test_size_of_empty_and_non_empty() {
634 let backend = MemoryBackend::new();
635
636 backend.put("empty", Bytes::new()).ok().unwrap();
637 backend.put("small", Bytes::from("abc")).ok().unwrap();
638 backend
639 .put("medium", Bytes::from("0123456789"))
640 .ok()
641 .unwrap();
642
643 assert_eq!(backend.size("empty").ok().unwrap(), 0);
644 assert_eq!(backend.size("small").ok().unwrap(), 3);
645 assert_eq!(backend.size("medium").ok().unwrap(), 10);
646 }
647
648 #[test]
649 fn test_delete_idempotent() {
650 let backend = MemoryBackend::new();
651
652 backend.put("key", Bytes::from("value")).ok().unwrap();
653
654 assert!(backend.delete("key").is_ok());
656 assert!(backend.delete("key").is_ok());
658 assert!(backend.delete("key").is_ok());
660 }
661
662 #[test]
663 fn test_exists_after_operations() {
664 let backend = MemoryBackend::new();
665
666 assert!(!backend.exists("key").ok().unwrap());
668
669 backend.put("key", Bytes::from("value")).ok().unwrap();
671 assert!(backend.exists("key").ok().unwrap());
672
673 backend.delete("key").ok().unwrap();
675 assert!(!backend.exists("key").ok().unwrap());
676
677 backend.put("key", Bytes::from("new value")).ok().unwrap();
679 assert!(backend.exists("key").ok().unwrap());
680 }
681
682 #[test]
683 fn test_clone_deep_copy() {
684 let backend = MemoryBackend::new();
685 backend.put("key1", Bytes::from("value1")).ok().unwrap();
686 backend.put("key2", Bytes::from("value2")).ok().unwrap();
687
688 let cloned = backend.clone();
689
690 backend.put("key3", Bytes::from("value3")).ok().unwrap();
692 backend.delete("key1").ok().unwrap();
693
694 assert_eq!(cloned.len(), 2);
696 assert!(cloned.exists("key1").ok().unwrap());
697 assert!(!cloned.exists("key3").ok().unwrap());
698 }
699
700 #[test]
701 fn test_list_partial_prefix_match() {
702 let backend = MemoryBackend::new();
703
704 backend.put("prefix_a_1", Bytes::from("1")).ok().unwrap();
705 backend.put("prefix_a_2", Bytes::from("2")).ok().unwrap();
706 backend.put("prefix_b_1", Bytes::from("3")).ok().unwrap();
707 backend.put("other", Bytes::from("4")).ok().unwrap();
708
709 assert_eq!(backend.list("prefix_a").ok().unwrap().len(), 2);
711 assert_eq!(backend.list("prefix_b").ok().unwrap().len(), 1);
712 assert_eq!(backend.list("prefix_").ok().unwrap().len(), 3);
713 assert_eq!(backend.list("pre").ok().unwrap().len(), 3);
714 }
715
716 #[test]
717 fn test_get_error_message() {
718 let backend = MemoryBackend::new();
719
720 let result = backend.get("nonexistent_key");
721 assert!(result.is_err());
722 if let Err(e) = result {
723 let msg = format!("{:?}", e);
724 assert!(msg.contains("nonexistent_key") || msg.contains("not found"));
725 }
726 }
727
728 #[test]
729 fn test_size_error_message() {
730 let backend = MemoryBackend::new();
731
732 let result = backend.size("missing_file");
733 assert!(result.is_err());
734 if let Err(e) = result {
735 let msg = format!("{:?}", e);
736 assert!(msg.contains("missing_file") || msg.contains("not found"));
737 }
738 }
739
740 #[test]
741 fn test_with_data_preserves_all() {
742 let mut initial = HashMap::new();
743 initial.insert("a".to_string(), Bytes::from("1"));
744 initial.insert("b".to_string(), Bytes::from("2"));
745 initial.insert("c".to_string(), Bytes::from("3"));
746
747 let backend = MemoryBackend::with_data(initial);
748
749 assert_eq!(backend.len(), 3);
750 assert_eq!(backend.get("a").ok().unwrap(), Bytes::from("1"));
751 assert_eq!(backend.get("b").ok().unwrap(), Bytes::from("2"));
752 assert_eq!(backend.get("c").ok().unwrap(), Bytes::from("3"));
753 }
754
755 #[test]
756 fn test_binary_data_roundtrip() {
757 let backend = MemoryBackend::new();
758
759 let binary: Vec<u8> = (0..=255).collect();
761 backend
762 .put("binary", Bytes::from(binary.clone()))
763 .ok()
764 .unwrap();
765
766 let retrieved = backend.get("binary").ok().unwrap();
767 assert_eq!(retrieved.as_ref(), binary.as_slice());
768 }
769
770 #[test]
771 fn test_concurrent_access_simulation() {
772 let backend = MemoryBackend::new();
773
774 for i in 0..100 {
776 let key = format!("key_{}", i);
777 let value = format!("value_{}", i);
778 backend.put(&key, Bytes::from(value.clone())).ok().unwrap();
779 }
780
781 assert_eq!(backend.len(), 100);
782
783 for i in 0..100 {
785 let key = format!("key_{}", i);
786 let expected = format!("value_{}", i);
787 assert_eq!(backend.get(&key).ok().unwrap(), Bytes::from(expected));
788 }
789 }
790
791 #[test]
792 fn test_clear_multiple_times() {
793 let backend = MemoryBackend::new();
794
795 backend.put("key", Bytes::from("value")).ok().unwrap();
796 backend.clear();
797 backend.clear(); backend.clear();
799
800 assert!(backend.is_empty());
801 }
802}