commonware_storage/metadata/
mod.rs

1//! A key-value store optimized for atomically committing a small collection of metadata.
2//!
3//! [Metadata] is a key-value store optimized for tracking a small collection of metadata
4//! that allows multiple updates to be committed in a single batch. It is commonly used with
5//! a variety of other underlying storage systems to persist application state across restarts.
6//!
7//! # Format
8//!
9//! Data stored in [Metadata] is serialized as a sequence of key-value pairs in either a
10//! "left" or "right" blob:
11//!
12//! ```text
13//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
14//! | 0 | 1 |    ...    | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |  ...  |50 |...|90 |91 |92 |93 |
15//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
16//! |    Version (u64)  |      Key1     |              Value1           |...|  CRC32(u32)   |
17//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
18//! ```
19//!
20//! _To ensure the integrity of the data, a CRC32 checksum is appended to the end of the blob.
21//! This ensures that partial writes are detected before any data is relied on._
22//!
23//! # Atomic Updates
24//!
25//! To provide support for atomic updates, [Metadata] maintains two blobs: a "left" and a "right"
26//! blob. When a new update is committed, it is written to the "older" of the two blobs (indicated
27//! by the version persisted). Writes to [commonware_runtime::Blob] are not atomic and may only
28//! complete partially, so we only overwrite the "newer" blob once the "older" blob has been synced
29//! (otherwise, we would not be guaranteed to recover the latest complete state from disk on
30//! restart as half of a blob could be old data and half new data).
31//!
32//! # Delta Writes
33//!
34//! If the set of keys and the length of values are stable, [Metadata] will only write an update's
35//! delta to disk (rather than rewriting the entire metadata). This makes [Metadata] a great choice
36//! for maintaining even large collections of data (with the majority rarely modified).
37//!
38//! # Example
39//!
40//! ```rust
41//! use commonware_runtime::{Spawner, Runner, deterministic};
42//! use commonware_storage::metadata::{Metadata, Config};
43//! use commonware_utils::sequence::U64;
44//!
45//! let executor = deterministic::Runner::default();
46//! executor.start(|context| async move {
47//!     // Create a store
48//!     let mut metadata = Metadata::init(context, Config{
49//!         partition: "partition".to_string(),
50//!         codec_config: ((0..).into(), ()),
51//!     }).await.unwrap();
52//!
53//!     // Store metadata
54//!     metadata.put(U64::new(1), b"hello".to_vec());
55//!     metadata.put(U64::new(2), b"world".to_vec());
56//!
57//!     // Sync the metadata store (batch write changes)
58//!     metadata.sync().await.unwrap();
59//!
60//!     // Retrieve some metadata
61//!     let value = metadata.get(&U64::new(1)).unwrap();
62//!
63//!     // Close the store
64//!     metadata.close().await.unwrap();
65//! });
66//! ```
67
68mod storage;
69pub use storage::Metadata;
70use thiserror::Error;
71
72/// Errors that can occur when interacting with [Metadata].
73#[derive(Debug, Error)]
74pub enum Error {
75    #[error("runtime error: {0}")]
76    Runtime(#[from] commonware_runtime::Error),
77    #[error("blob too large: {0}")]
78    BlobTooLarge(u64),
79}
80
81/// Configuration for [Metadata] storage.
82#[derive(Clone)]
83pub struct Config<C> {
84    /// The [commonware_runtime::Storage] partition to use for storing metadata.
85    pub partition: String,
86
87    /// The codec configuration to use for the value stored in the metadata.
88    pub codec_config: C,
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94    use commonware_macros::test_traced;
95    use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
96    use commonware_utils::sequence::U64;
97    use rand::{Rng, RngCore};
98
99    #[test_traced]
100    fn test_put_get_clear() {
101        // Initialize the deterministic context
102        let executor = deterministic::Runner::default();
103        executor.start(|context| async move {
104            // Create a metadata store
105            let cfg = Config {
106                partition: "test".to_string(),
107                codec_config: ((0..).into(), ()),
108            };
109            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
110                .await
111                .unwrap();
112
113            // Get a key that doesn't exist
114            let key = U64::new(42);
115            let value = metadata.get(&key);
116            assert!(value.is_none());
117
118            // Check metrics
119            let buffer = context.encode();
120            assert!(buffer.contains("sync_rewrites_total 0"));
121            assert!(buffer.contains("sync_overwrites_total 0"));
122            assert!(buffer.contains("keys 0"));
123
124            // Put a key
125            let hello = b"hello".to_vec();
126            metadata.put(key.clone(), hello.clone());
127
128            // Get the key
129            let value = metadata.get(&key).unwrap();
130            assert_eq!(value, &hello);
131
132            // Check metrics
133            let buffer = context.encode();
134            assert!(buffer.contains("sync_rewrites_total 0"));
135            assert!(buffer.contains("sync_overwrites_total 0"));
136            assert!(buffer.contains("keys 1"));
137
138            // Close the metadata store
139            metadata.close().await.unwrap();
140
141            // Check metrics
142            let buffer = context.encode();
143            assert!(buffer.contains("sync_rewrites_total 1"));
144            assert!(buffer.contains("sync_overwrites_total 0"));
145            assert!(buffer.contains("keys 1"));
146
147            // Reopen the metadata store
148            let cfg = Config {
149                partition: "test".to_string(),
150                codec_config: ((0..).into(), ()),
151            };
152            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
153                .await
154                .unwrap();
155
156            // Check metrics
157            let buffer = context.encode();
158            assert!(buffer.contains("sync_rewrites_total 0"));
159            assert!(buffer.contains("sync_overwrites_total 0"));
160            assert!(buffer.contains("keys 1"));
161
162            // Get the key
163            let value = metadata.get(&key).unwrap();
164            assert_eq!(value, &hello);
165
166            // Test clearing the metadata store
167            metadata.clear();
168            let value = metadata.get(&key);
169            assert!(value.is_none());
170
171            // Check metrics
172            let buffer = context.encode();
173            assert!(buffer.contains("sync_rewrites_total 1"));
174            assert!(buffer.contains("sync_overwrites_total 0"));
175            assert!(buffer.contains("keys 0"));
176
177            metadata.destroy().await.unwrap();
178        });
179    }
180
181    #[test_traced]
182    fn test_multi_sync() {
183        // Initialize the deterministic context
184        let executor = deterministic::Runner::default();
185        executor.start(|context| async move {
186            // Create a metadata store
187            let cfg = Config {
188                partition: "test".to_string(),
189                codec_config: ((0..).into(), ()),
190            };
191            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
192                .await
193                .unwrap();
194
195            // Put a key
196            let key = U64::new(42);
197            let hello = b"hello".to_vec();
198            metadata.put(key.clone(), hello.clone());
199
200            // Sync the metadata store
201            metadata.sync().await.unwrap();
202
203            // Check metrics
204            let buffer = context.encode();
205            assert!(buffer.contains("sync_rewrites_total 1"));
206            assert!(buffer.contains("sync_overwrites_total 0"));
207            assert!(buffer.contains("keys 1"));
208
209            // Put an overlapping key and a new key
210            let world = b"world".to_vec();
211            metadata.put(key.clone(), world.clone());
212            let key2 = U64::new(43);
213            let foo = b"foo".to_vec();
214            metadata.put(key2.clone(), foo.clone());
215
216            // Close the metadata store
217            metadata.close().await.unwrap();
218
219            // Check metrics
220            let buffer = context.encode();
221            assert!(buffer.contains("sync_rewrites_total 2"));
222            assert!(buffer.contains("sync_overwrites_total 0"));
223            assert!(buffer.contains("keys 2"));
224
225            // Reopen the metadata store
226            let cfg = Config {
227                partition: "test".to_string(),
228                codec_config: ((0..).into(), ()),
229            };
230            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
231                .await
232                .unwrap();
233
234            // Check metrics
235            let buffer = context.encode();
236            assert!(buffer.contains("sync_rewrites_total 0"));
237            assert!(buffer.contains("sync_overwrites_total 0"));
238            assert!(buffer.contains("keys 2"));
239
240            // Get the key
241            let value = metadata.get(&key).unwrap();
242            assert_eq!(value, &world);
243            let value = metadata.get(&key2).unwrap();
244            assert_eq!(value, &foo);
245
246            // Remove the key
247            metadata.remove(&key);
248
249            // Sync the metadata store
250            metadata.sync().await.unwrap();
251
252            // Check metrics
253            let buffer = context.encode();
254            assert!(buffer.contains("sync_rewrites_total 1"));
255            assert!(buffer.contains("sync_overwrites_total 0"));
256            assert!(buffer.contains("keys 1"));
257
258            // Close the metadata store
259            metadata.close().await.unwrap();
260
261            // Reopen the metadata store
262            let cfg = Config {
263                partition: "test".to_string(),
264                codec_config: ((0..).into(), ()),
265            };
266            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
267                .await
268                .unwrap();
269
270            // Check metrics
271            let buffer = context.encode();
272            assert!(buffer.contains("sync_rewrites_total 0"));
273            assert!(buffer.contains("sync_overwrites_total 0"));
274            assert!(buffer.contains("keys 1"));
275
276            // Get the key
277            let value = metadata.get(&key);
278            assert!(value.is_none());
279            let value = metadata.get(&key2).unwrap();
280            assert_eq!(value, &foo);
281
282            metadata.destroy().await.unwrap();
283        });
284    }
285
286    #[test_traced]
287    fn test_recover_corrupted_one() {
288        // Initialize the deterministic context
289        let executor = deterministic::Runner::default();
290        executor.start(|context| async move {
291            // Create a metadata store
292            let cfg = Config {
293                partition: "test".to_string(),
294                codec_config: ((0..).into(), ()),
295            };
296            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
297                .await
298                .unwrap();
299
300            // Put a key
301            let key = U64::new(42);
302            let hello = b"hello".to_vec();
303            metadata.put(key.clone(), hello.clone());
304
305            // Sync the metadata store
306            metadata.sync().await.unwrap();
307
308            // Put an overlapping key and a new key
309            let world = b"world".to_vec();
310            metadata.put(key.clone(), world.clone());
311            let key2 = U64::new(43);
312            let foo = b"foo".to_vec();
313            metadata.put(key2, foo.clone());
314
315            // Close the metadata store
316            metadata.close().await.unwrap();
317
318            // Corrupt the metadata store
319            let (blob, _) = context.open("test", b"left").await.unwrap();
320            blob.write_at(b"corrupted".to_vec(), 0).await.unwrap();
321            blob.sync().await.unwrap();
322
323            // Reopen the metadata store
324            let cfg = Config {
325                partition: "test".to_string(),
326                codec_config: ((0..).into(), ()),
327            };
328            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
329                .await
330                .unwrap();
331
332            // Get the key (falls back to non-corrupt)
333            let value = metadata.get(&key).unwrap();
334            assert_eq!(value, &hello);
335
336            metadata.destroy().await.unwrap();
337        });
338    }
339
340    #[test_traced]
341    fn test_recover_corrupted_both() {
342        // Initialize the deterministic context
343        let executor = deterministic::Runner::default();
344        executor.start(|context| async move {
345            // Create a metadata store
346            let cfg = Config {
347                partition: "test".to_string(),
348                codec_config: ((0..).into(), ()),
349            };
350            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
351                .await
352                .unwrap();
353
354            // Put a key
355            let key = U64::new(42);
356            let hello = b"hello".to_vec();
357            metadata.put(key.clone(), hello.clone());
358
359            // Sync the metadata store
360            metadata.sync().await.unwrap();
361
362            // Put an overlapping key and a new key
363            let world = b"world".to_vec();
364            metadata.put(key.clone(), world.clone());
365            let key2 = U64::new(43);
366            let foo = b"foo".to_vec();
367            metadata.put(key2, foo.clone());
368
369            // Close the metadata store
370            metadata.close().await.unwrap();
371
372            // Corrupt the metadata store
373            let (blob, _) = context.open("test", b"left").await.unwrap();
374            blob.write_at(b"corrupted".to_vec(), 0).await.unwrap();
375            blob.sync().await.unwrap();
376            let (blob, _) = context.open("test", b"right").await.unwrap();
377            blob.write_at(b"corrupted".to_vec(), 0).await.unwrap();
378            blob.sync().await.unwrap();
379
380            // Reopen the metadata store
381            let cfg = Config {
382                partition: "test".to_string(),
383                codec_config: ((0..).into(), ()),
384            };
385            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
386                .await
387                .unwrap();
388
389            // Get the key (falls back to non-corrupt)
390            let value = metadata.get(&key);
391            assert!(value.is_none());
392
393            // Check metrics
394            let buffer = context.encode();
395            assert!(buffer.contains("sync_rewrites_total 0"));
396            assert!(buffer.contains("sync_overwrites_total 0"));
397            assert!(buffer.contains("keys 0"));
398
399            metadata.destroy().await.unwrap();
400        });
401    }
402
403    #[test_traced]
404    fn test_recover_corrupted_truncate() {
405        // Initialize the deterministic context
406        let executor = deterministic::Runner::default();
407        executor.start(|context| async move {
408            // Create a metadata store
409            let cfg = Config {
410                partition: "test".to_string(),
411                codec_config: ((0..).into(), ()),
412            };
413            let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
414
415            // Put a key
416            let key = U64::new(42);
417            let hello = b"hello".to_vec();
418            metadata.put(key.clone(), hello.clone());
419
420            // Sync the metadata store
421            metadata.sync().await.unwrap();
422
423            // Put an overlapping key and a new key
424            let world = b"world".to_vec();
425            metadata.put(key.clone(), world.clone());
426            let key2 = U64::new(43);
427            let foo = b"foo".to_vec();
428            metadata.put(key2, foo.clone());
429
430            // Close the metadata store
431            metadata.close().await.unwrap();
432
433            // Corrupt the metadata store
434            let (blob, len) = context.open("test", b"left").await.unwrap();
435            blob.resize(len - 8).await.unwrap();
436            blob.sync().await.unwrap();
437
438            // Reopen the metadata store
439            let cfg = Config {
440                partition: "test".to_string(),
441                codec_config: ((0..).into(), ()),
442            };
443            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
444                .await
445                .unwrap();
446
447            // Get the key (falls back to non-corrupt)
448            let value = metadata.get(&key).unwrap();
449            assert_eq!(value, &hello);
450
451            metadata.destroy().await.unwrap();
452        });
453    }
454
455    #[test_traced]
456    fn test_recover_corrupted_short() {
457        // Initialize the deterministic context
458        let executor = deterministic::Runner::default();
459        executor.start(|context| async move {
460            // Create a metadata store
461            let cfg = Config {
462                partition: "test".to_string(),
463                codec_config: ((0..).into(), ()),
464            };
465            let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
466
467            // Put a key
468            let key = U64::new(42);
469            let hello = b"hello".to_vec();
470            metadata.put(key.clone(), hello.clone());
471
472            // Sync the metadata store
473            metadata.sync().await.unwrap();
474
475            // Put an overlapping key and a new key
476            let world = b"world".to_vec();
477            metadata.put(key.clone(), world.clone());
478            let key2 = U64::new(43);
479            let foo = b"foo".to_vec();
480            metadata.put(key2, foo.clone());
481
482            // Close the metadata store
483            metadata.close().await.unwrap();
484
485            // Corrupt the metadata store
486            let (blob, _) = context.open("test", b"left").await.unwrap();
487            blob.resize(5).await.unwrap();
488            blob.sync().await.unwrap();
489
490            // Reopen the metadata store
491            let cfg = Config {
492                partition: "test".to_string(),
493                codec_config: ((0..).into(), ()),
494            };
495            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
496                .await
497                .unwrap();
498
499            // Get the key (falls back to non-corrupt)
500            let value = metadata.get(&key).unwrap();
501            assert_eq!(value, &hello);
502
503            metadata.destroy().await.unwrap();
504        });
505    }
506
507    #[test_traced]
508    fn test_unclean_shutdown() {
509        // Initialize the deterministic context
510        let executor = deterministic::Runner::default();
511        executor.start(|context| async move {
512            let key = U64::new(42);
513            let hello = b"hello".to_vec();
514            {
515                // Create a metadata store
516                let cfg = Config {
517                    partition: "test".to_string(),
518                    codec_config: ((0..).into(), ()),
519                };
520                let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
521
522                // Put a key
523                metadata.put(key.clone(), hello.clone());
524
525                // Drop metadata before sync
526            }
527
528            // Reopen the metadata store
529            let cfg = Config {
530                partition: "test".to_string(),
531                codec_config: ((0..).into(), ()),
532            };
533            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
534                .await
535                .unwrap();
536
537            // Get the key
538            let value = metadata.get(&key);
539            assert!(value.is_none());
540
541            // Check metrics
542            let buffer = context.encode();
543            assert!(buffer.contains("sync_rewrites_total 0"));
544            assert!(buffer.contains("sync_overwrites_total 0"));
545            assert!(buffer.contains("keys 0"));
546
547            metadata.destroy().await.unwrap();
548        });
549    }
550
551    #[test_traced]
552    #[should_panic(expected = "usize value is larger than u32")]
553    fn test_value_too_big_error() {
554        // Initialize the deterministic context
555        let executor = deterministic::Runner::default();
556        executor.start(|context| async move {
557            // Create a metadata store
558            let cfg = Config {
559                partition: "test".to_string(),
560                codec_config: ((0..).into(), ()),
561            };
562            let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
563
564            // Create a value that exceeds u32::MAX bytes
565            let value = vec![0u8; (u32::MAX as usize) + 1];
566            metadata.put(U64::new(1), value);
567
568            // Assert
569            metadata.sync().await.unwrap();
570        });
571    }
572
573    #[test_traced]
574    fn test_delta_writes() {
575        // Initialize the deterministic context
576        let executor = deterministic::Runner::default();
577        executor.start(|context| async move {
578            // Create a metadata store
579            let cfg = Config {
580                partition: "test".to_string(),
581                codec_config: ((0..).into(), ()),
582            };
583            let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
584
585            // Put initial keys
586            for i in 0..100 {
587                metadata.put(U64::new(i), vec![i as u8; 100]);
588            }
589
590            // First sync - should write everything to the first blob
591            //
592            // 100 keys * (8 bytes for key + 1 byte for len + 100 bytes for value) + 8 bytes for version + 4 bytes for checksum
593            metadata.sync().await.unwrap();
594            let buffer = context.encode();
595            assert!(buffer.contains("sync_rewrites_total 1"), "{buffer}");
596            assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
597            assert!(
598                buffer.contains("runtime_storage_write_bytes_total 10912"),
599                "{buffer}",
600            );
601
602            // Modify just one key
603            metadata.put(U64::new(51), vec![0xff; 100]);
604
605            // Sync again - should write everything to the second blob
606            metadata.sync().await.unwrap();
607            let buffer = context.encode();
608            assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
609            assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
610            assert!(
611                buffer.contains("runtime_storage_write_bytes_total 21824"),
612                "{buffer}",
613            );
614
615            // Sync again - should write only diff from the first blob
616            //
617            // 1 byte for len + 100 bytes for value + 8 byte for version + 4 bytes for checksum
618            metadata.sync().await.unwrap();
619            let buffer = context.encode();
620            assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
621            assert!(buffer.contains("sync_overwrites_total 1"), "{buffer}");
622            assert!(
623                buffer.contains("runtime_storage_write_bytes_total 21937"),
624                "{buffer}",
625            );
626
627            // Sync again - should write only diff from the second blob
628            //
629            // 8 byte for version + 4 bytes for checksum
630            metadata.sync().await.unwrap();
631            let buffer = context.encode();
632            assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
633            assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
634            assert!(
635                buffer.contains("runtime_storage_write_bytes_total 21949"),
636                "{buffer}",
637            );
638
639            // Remove a key - should rewrite everything
640            //
641            // 99 keys * (8 bytes for key + 1 bytes for len + 100 bytes for value) + 8 bytes for version + 4 bytes for checksum
642            metadata.remove(&U64::new(51));
643            metadata.sync().await.unwrap();
644            let buffer = context.encode();
645            assert!(buffer.contains("sync_rewrites_total 3"), "{buffer}");
646            assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
647            assert!(
648                buffer.contains("runtime_storage_write_bytes_total 32752"),
649                "{buffer}"
650            );
651
652            // Sync again - should also rewrite
653            metadata.sync().await.unwrap();
654            let buffer = context.encode();
655            assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
656            assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
657            assert!(
658                buffer.contains("runtime_storage_write_bytes_total 43555"),
659                "{buffer}"
660            );
661
662            // Modify in-place - should overwrite
663            //
664            // 1 byte for len + 100 bytes for value + 8 byte for version + 4 bytes for checksum
665            metadata.put(U64::new(50), vec![0xff; 100]);
666            metadata.sync().await.unwrap();
667            let buffer = context.encode();
668            assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
669            assert!(buffer.contains("sync_overwrites_total 3"), "{buffer}");
670            assert!(
671                buffer.contains("runtime_storage_write_bytes_total 43668"),
672                "{buffer}"
673            );
674
675            // Clean up
676            metadata.destroy().await.unwrap();
677        });
678    }
679
680    #[test_traced]
681    fn test_sync_with_no_changes() {
682        let executor = deterministic::Runner::default();
683        executor.start(|context| async move {
684            let cfg = Config {
685                partition: "test".to_string(),
686                codec_config: ((0..).into(), ()),
687            };
688            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
689                .await
690                .unwrap();
691
692            // Put initial data
693            metadata.put(U64::new(1), b"hello".to_vec());
694            metadata.sync().await.unwrap();
695
696            // Sync again with no changes - will rewrite because key_order_changed is recent
697            // (on startup, key_order_changed is set to next_version)
698            metadata.sync().await.unwrap();
699            let buffer = context.encode();
700            assert!(buffer.contains("sync_rewrites_total 2"));
701            assert!(buffer.contains("sync_overwrites_total 0"));
702
703            // Sync again - now key order is stable, should do overwrite
704            metadata.sync().await.unwrap();
705            let buffer = context.encode();
706            assert!(buffer.contains("sync_rewrites_total 2"));
707            assert!(buffer.contains("sync_overwrites_total 1"));
708
709            // Sync again - should continue doing overwrites
710            metadata.sync().await.unwrap();
711            let buffer = context.encode();
712            assert!(buffer.contains("sync_rewrites_total 2"));
713            assert!(buffer.contains("sync_overwrites_total 2"));
714
715            metadata.destroy().await.unwrap();
716        });
717    }
718
719    #[test_traced]
720    fn test_get_mut_marks_modified() {
721        let executor = deterministic::Runner::default();
722        executor.start(|context| async move {
723            let cfg = Config {
724                partition: "test".to_string(),
725                codec_config: ((0..).into(), ()),
726            };
727            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
728                .await
729                .unwrap();
730
731            // Put initial data
732            metadata.put(U64::new(1), b"hello".to_vec());
733            metadata.sync().await.unwrap();
734
735            // Sync again to ensure both blobs are populated
736            metadata.sync().await.unwrap();
737
738            // Use get_mut to modify value
739            let value = metadata.get_mut(&U64::new(1)).unwrap();
740            value[0] = b'H';
741
742            // Sync should detect the modification and do a rewrite (due to recent key_order_changed)
743            metadata.sync().await.unwrap();
744            let buffer = context.encode();
745            assert!(buffer.contains("sync_rewrites_total 2"));
746            assert!(buffer.contains("sync_overwrites_total 1"));
747
748            // Restart the metadata store
749            metadata.close().await.unwrap();
750            let metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
751                .await
752                .unwrap();
753
754            // Verify the change persisted
755            let value = metadata.get(&U64::new(1)).unwrap();
756            assert_eq!(value[0], b'H');
757
758            metadata.destroy().await.unwrap();
759        });
760    }
761
762    #[test_traced]
763    fn test_mixed_operation_sequences() {
764        let executor = deterministic::Runner::default();
765        executor.start(|context| async move {
766            let cfg = Config {
767                partition: "test".to_string(),
768                codec_config: ((0..).into(), ()),
769            };
770            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
771                .await
772                .unwrap();
773
774            let key = U64::new(1);
775
776            // Test: put -> remove -> put same key
777            metadata.put(key.clone(), b"first".to_vec());
778            metadata.remove(&key);
779            metadata.put(key.clone(), b"second".to_vec());
780            metadata.sync().await.unwrap();
781            let value = metadata.get(&key).unwrap();
782            assert_eq!(value, b"second");
783
784            // Test: put -> get_mut -> remove -> put
785            metadata.put(key.clone(), b"third".to_vec());
786            let value = metadata.get_mut(&key).unwrap();
787            value[0] = b'T';
788            metadata.remove(&key);
789            metadata.put(key.clone(), b"fourth".to_vec());
790            metadata.sync().await.unwrap();
791            let value = metadata.get(&key).unwrap();
792            assert_eq!(value, b"fourth");
793
794            // Restart the metadata store
795            metadata.close().await.unwrap();
796            let metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
797                .await
798                .unwrap();
799
800            // Verify the changes persisted
801            let value = metadata.get(&key).unwrap();
802            assert_eq!(value, b"fourth");
803
804            metadata.destroy().await.unwrap();
805        });
806    }
807
808    #[test_traced]
809    fn test_overwrite_vs_rewrite() {
810        let executor = deterministic::Runner::default();
811        executor.start(|context| async move {
812            let cfg = Config {
813                partition: "test".to_string(),
814                codec_config: ((0..).into(), ()),
815            };
816            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
817                .await
818                .unwrap();
819
820            // Set up initial data
821            metadata.put(U64::new(1), vec![1; 10]);
822            metadata.put(U64::new(2), vec![2; 10]);
823            metadata.sync().await.unwrap();
824
825            // Same size modification before both blobs are populated
826            metadata.put(U64::new(1), vec![0xFF; 10]);
827            metadata.sync().await.unwrap();
828            let buffer = context.encode();
829            assert!(buffer.contains("sync_rewrites_total 2"));
830            assert!(buffer.contains("sync_overwrites_total 0"));
831
832            // Let key order stabilize with another sync
833            metadata.sync().await.unwrap();
834            let buffer = context.encode();
835            assert!(buffer.contains("sync_rewrites_total 2"));
836            assert!(buffer.contains("sync_overwrites_total 1"));
837
838            // Same size modification after both blobs are populated - should overwrite
839            metadata.put(U64::new(1), vec![0xAA; 10]);
840            metadata.sync().await.unwrap();
841            let buffer = context.encode();
842            assert!(buffer.contains("sync_rewrites_total 2"));
843            assert!(buffer.contains("sync_overwrites_total 2"));
844
845            // Different size modification - should rewrite
846            metadata.put(U64::new(1), vec![0xFF; 20]);
847            metadata.sync().await.unwrap();
848            let buffer = context.encode();
849            assert!(buffer.contains("sync_rewrites_total 3"));
850            assert!(buffer.contains("sync_overwrites_total 2"));
851
852            // Add new key - should rewrite (key order changed)
853            metadata.put(U64::new(3), vec![3; 10]);
854            metadata.sync().await.unwrap();
855            let buffer = context.encode();
856            assert!(buffer.contains("sync_rewrites_total 4"));
857            assert!(buffer.contains("sync_overwrites_total 2"));
858
859            // Stabilize key order
860            metadata.sync().await.unwrap();
861            let buffer = context.encode();
862            assert!(buffer.contains("sync_rewrites_total 5"));
863            assert!(buffer.contains("sync_overwrites_total 2"));
864
865            // Modify existing key with same size - should overwrite after stabilized
866            metadata.put(U64::new(2), vec![0xAA; 10]);
867            metadata.sync().await.unwrap();
868            let buffer = context.encode();
869            assert!(buffer.contains("sync_rewrites_total 5"));
870            assert!(buffer.contains("sync_overwrites_total 3"));
871
872            metadata.destroy().await.unwrap();
873        });
874    }
875
876    #[test_traced]
877    fn test_blob_resize() {
878        let executor = deterministic::Runner::default();
879        executor.start(|context| async move {
880            let cfg = Config {
881                partition: "test".to_string(),
882                codec_config: ((0..).into(), ()),
883            };
884            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
885                .await
886                .unwrap();
887
888            // Start with large data
889            for i in 0..10 {
890                metadata.put(U64::new(i), vec![i as u8; 100]);
891            }
892            metadata.sync().await.unwrap();
893
894            // Stabilize key order
895            metadata.sync().await.unwrap();
896            let buffer = context.encode();
897            assert!(buffer.contains("sync_rewrites_total 2"));
898            assert!(buffer.contains("sync_overwrites_total 0"));
899
900            // Remove most data to make blob smaller
901            for i in 1..10 {
902                metadata.remove(&U64::new(i));
903            }
904            metadata.sync().await.unwrap();
905
906            // Verify the remaining data is still accessible
907            let value = metadata.get(&U64::new(0)).unwrap();
908            assert_eq!(value.len(), 100);
909            assert_eq!(value[0], 0);
910
911            // Check that sync properly handles blob resizing
912            let buffer = context.encode();
913            assert!(buffer.contains("sync_rewrites_total 3"));
914            assert!(buffer.contains("sync_overwrites_total 0"));
915
916            // Restart the metadata store
917            metadata.close().await.unwrap();
918            let metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
919                .await
920                .unwrap();
921
922            // Verify the changes persisted
923            let value = metadata.get(&U64::new(0)).unwrap();
924            assert_eq!(value.len(), 100);
925            assert_eq!(value[0], 0);
926
927            // Verify the removed keys are not present
928            for i in 1..10 {
929                assert!(metadata.get(&U64::new(i)).is_none());
930            }
931
932            metadata.destroy().await.unwrap();
933        });
934    }
935
936    #[test_traced]
937    fn test_clear_and_repopulate() {
938        let executor = deterministic::Runner::default();
939        executor.start(|context| async move {
940            let cfg = Config {
941                partition: "test".to_string(),
942                codec_config: ((0..).into(), ()),
943            };
944            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
945                .await
946                .unwrap();
947
948            // Initial data
949            metadata.put(U64::new(1), b"first".to_vec());
950            metadata.put(U64::new(2), b"second".to_vec());
951            metadata.sync().await.unwrap();
952
953            // Clear everything
954            metadata.clear();
955            metadata.sync().await.unwrap();
956
957            // Verify empty
958            assert!(metadata.get(&U64::new(1)).is_none());
959            assert!(metadata.get(&U64::new(2)).is_none());
960
961            // Restart the metadata store
962            metadata.close().await.unwrap();
963            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
964                .await
965                .unwrap();
966
967            // Verify the changes persisted
968            assert!(metadata.get(&U64::new(1)).is_none());
969            assert!(metadata.get(&U64::new(2)).is_none());
970
971            // Repopulate with different data
972            metadata.put(U64::new(3), b"third".to_vec());
973            metadata.put(U64::new(4), b"fourth".to_vec());
974            metadata.sync().await.unwrap();
975
976            // Verify new data
977            assert_eq!(metadata.get(&U64::new(3)).unwrap(), b"third");
978            assert_eq!(metadata.get(&U64::new(4)).unwrap(), b"fourth");
979            assert!(metadata.get(&U64::new(1)).is_none());
980            assert!(metadata.get(&U64::new(2)).is_none());
981
982            metadata.destroy().await.unwrap();
983        });
984    }
985
986    fn test_metadata_operations_and_restart(num_operations: usize) -> String {
987        let executor = deterministic::Runner::default();
988        executor.start(|mut context| async move {
989            let cfg = Config {
990                partition: "test_determinism".to_string(),
991                codec_config: ((0..).into(), ()),
992            };
993            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
994                .await
995                .unwrap();
996
997            // Perform a series of deterministic operations
998            for i in 0..num_operations {
999                let key = U64::new(i as u64);
1000                let mut value = vec![0u8; 64];
1001                context.fill_bytes(&mut value);
1002                metadata.put(key, value);
1003
1004                // Sync occasionally
1005                if context.gen_bool(0.1) {
1006                    metadata.sync().await.unwrap();
1007                }
1008
1009                // Update some existing keys
1010                if context.gen_bool(0.1) {
1011                    let selected_index = context.gen_range(0..=i);
1012                    let update_key = U64::new(selected_index as u64);
1013                    let mut new_value = vec![0u8; 64];
1014                    context.fill_bytes(&mut new_value);
1015                    metadata.put(update_key, new_value);
1016                }
1017
1018                // Remove some keys
1019                if context.gen_bool(0.1) {
1020                    let selected_index = context.gen_range(0..=i);
1021                    let remove_key = U64::new(selected_index as u64);
1022                    metadata.remove(&remove_key);
1023                }
1024
1025                // Use get_mut occasionally
1026                if context.gen_bool(0.1) {
1027                    let selected_index = context.gen_range(0..=i);
1028                    let mut_key = U64::new(selected_index as u64);
1029                    if let Some(value) = metadata.get_mut(&mut_key) {
1030                        if !value.is_empty() {
1031                            value[0] = value[0].wrapping_add(1);
1032                        }
1033                    }
1034                }
1035            }
1036            metadata.sync().await.unwrap();
1037
1038            // Destroy the metadata store
1039            metadata.destroy().await.unwrap();
1040
1041            context.auditor().state()
1042        })
1043    }
1044
1045    #[test_traced]
1046    #[ignore]
1047    fn test_determinism() {
1048        let state1 = test_metadata_operations_and_restart(1_000);
1049        let state2 = test_metadata_operations_and_restart(1_000);
1050        assert_eq!(state1, state2);
1051    }
1052
1053    #[test_traced]
1054    fn test_keys_iterator() {
1055        // Initialize the deterministic context
1056        let executor = deterministic::Runner::default();
1057        executor.start(|context| async move {
1058            // Create a metadata store
1059            let cfg = Config {
1060                partition: "test".to_string(),
1061                codec_config: ((0..).into(), ()),
1062            };
1063            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1064                .await
1065                .unwrap();
1066
1067            // Add some keys with different prefixes
1068            metadata.put(U64::new(0x1000), b"value1".to_vec());
1069            metadata.put(U64::new(0x1001), b"value2".to_vec());
1070            metadata.put(U64::new(0x1002), b"value3".to_vec());
1071            metadata.put(U64::new(0x2000), b"value4".to_vec());
1072            metadata.put(U64::new(0x2001), b"value5".to_vec());
1073            metadata.put(U64::new(0x3000), b"value6".to_vec());
1074
1075            // Test iterating over all keys
1076            let all_keys: Vec<_> = metadata.keys(None).cloned().collect();
1077            assert_eq!(all_keys.len(), 6);
1078            assert!(all_keys.contains(&U64::new(0x1000)));
1079            assert!(all_keys.contains(&U64::new(0x3000)));
1080
1081            // Test iterating with prefix 0x10
1082            let prefix = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10];
1083            let prefix_keys: Vec<_> = metadata.keys(Some(&prefix)).cloned().collect();
1084            assert_eq!(prefix_keys.len(), 3);
1085            assert!(prefix_keys.contains(&U64::new(0x1000)));
1086            assert!(prefix_keys.contains(&U64::new(0x1001)));
1087            assert!(prefix_keys.contains(&U64::new(0x1002)));
1088            assert!(!prefix_keys.contains(&U64::new(0x2000)));
1089
1090            // Test iterating with prefix 0x20
1091            let prefix = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x20];
1092            let prefix_keys: Vec<_> = metadata.keys(Some(&prefix)).cloned().collect();
1093            assert_eq!(prefix_keys.len(), 2);
1094            assert!(prefix_keys.contains(&U64::new(0x2000)));
1095            assert!(prefix_keys.contains(&U64::new(0x2001)));
1096
1097            // Test with non-matching prefix
1098            let prefix = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40];
1099            let prefix_keys: Vec<_> = metadata.keys(Some(&prefix)).cloned().collect();
1100            assert_eq!(prefix_keys.len(), 0);
1101
1102            metadata.destroy().await.unwrap();
1103        });
1104    }
1105
1106    #[test_traced]
1107    fn test_remove_prefix() {
1108        // Initialize the deterministic context
1109        let executor = deterministic::Runner::default();
1110        executor.start(|context| async move {
1111            // Create a metadata store
1112            let cfg = Config {
1113                partition: "test".to_string(),
1114                codec_config: ((0..).into(), ()),
1115            };
1116            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1117                .await
1118                .unwrap();
1119
1120            // Add some keys with different prefixes
1121            metadata.put(U64::new(0x1000), b"value1".to_vec());
1122            metadata.put(U64::new(0x1001), b"value2".to_vec());
1123            metadata.put(U64::new(0x1002), b"value3".to_vec());
1124            metadata.put(U64::new(0x2000), b"value4".to_vec());
1125            metadata.put(U64::new(0x2001), b"value5".to_vec());
1126            metadata.put(U64::new(0x3000), b"value6".to_vec());
1127
1128            // Check initial metrics
1129            let buffer = context.encode();
1130            assert!(buffer.contains("keys 6"));
1131
1132            // Remove keys with prefix 0x10
1133            let prefix = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10];
1134            metadata.remove_prefix(&prefix);
1135
1136            // Check metrics after removal
1137            let buffer = context.encode();
1138            assert!(buffer.contains("keys 3"));
1139
1140            // Verify remaining keys
1141            assert!(metadata.get(&U64::new(0x1000)).is_none());
1142            assert!(metadata.get(&U64::new(0x1001)).is_none());
1143            assert!(metadata.get(&U64::new(0x1002)).is_none());
1144            assert!(metadata.get(&U64::new(0x2000)).is_some());
1145            assert!(metadata.get(&U64::new(0x2001)).is_some());
1146            assert!(metadata.get(&U64::new(0x3000)).is_some());
1147
1148            // Sync and reopen to ensure persistence
1149            metadata.sync().await.unwrap();
1150            metadata.close().await.unwrap();
1151            let cfg = Config {
1152                partition: "test".to_string(),
1153                codec_config: ((0..).into(), ()),
1154            };
1155            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1156                .await
1157                .unwrap();
1158
1159            // Verify keys are still removed after restart
1160            assert!(metadata.get(&U64::new(0x1000)).is_none());
1161            assert!(metadata.get(&U64::new(0x2000)).is_some());
1162            assert_eq!(metadata.keys(None).count(), 3);
1163
1164            // Remove non-existing prefix
1165            let prefix = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40];
1166            metadata.remove_prefix(&prefix);
1167
1168            // Remove all remaining keys
1169            let prefix = vec![]; // Empty prefix matches all
1170            metadata.remove_prefix(&prefix);
1171            assert_eq!(metadata.keys(None).count(), 0);
1172
1173            metadata.destroy().await.unwrap();
1174        });
1175    }
1176}