commonware_storage/metadata/
mod.rs

1//! A key-value store optimized for atomically committing a small collection of metadata.
2//!
3//! [Metadata] is a key-value store optimized for tracking a small collection of metadata
4//! that allows multiple updates to be committed in a single batch. It is commonly used with
5//! a variety of other underlying storage systems to persist application state across restarts.
6//!
7//! # Format
8//!
9//! Data stored in [Metadata] is serialized as a sequence of key-value pairs in either a
10//! "left" or "right" blob:
11//!
12//! ```text
13//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
14//! | 0 | 1 |    ...    | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |  ...  |50 |...|90 |91 |92 |93 |
15//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
16//! |    Version (u64)  |      Key1     |              Value1           |...|  CRC32(u32)   |
17//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
18//! ```
19//!
20//! _To ensure the integrity of the data, a CRC32 checksum is appended to the end of the blob.
21//! This ensures that partial writes are detected before any data is relied on._
22//!
23//! # Atomic Updates
24//!
25//! To provide support for atomic updates, [Metadata] maintains two blobs: a "left" and a "right"
26//! blob. When a new update is committed, it is written to the "older" of the two blobs (indicated
27//! by the version persisted). Writes to [commonware_runtime::Blob] are not atomic and may only
28//! complete partially, so we only overwrite the "newer" blob once the "older" blob has been synced
29//! (otherwise, we would not be guaranteed to recover the latest complete state from disk on
30//! restart as half of a blob could be old data and half new data).
31//!
32//! # Delta Writes
33//!
34//! If the set of keys and the length of values are stable, [Metadata] will only write an update's
35//! delta to disk (rather than rewriting the entire metadata). This makes [Metadata] a great choice
36//! for maintaining even large collections of data (with the majority rarely modified).
37//!
38//! # Example
39//!
40//! ```rust
41//! use commonware_runtime::{Spawner, Runner, deterministic};
42//! use commonware_storage::metadata::{Metadata, Config};
43//! use commonware_utils::sequence::U64;
44//!
45//! let executor = deterministic::Runner::default();
46//! executor.start(|context| async move {
47//!     // Create a store
48//!     let mut metadata = Metadata::init(context, Config{
49//!         partition: "partition".to_string(),
50//!         codec_config: ((0..).into(), ()),
51//!     }).await.unwrap();
52//!
53//!     // Store metadata
54//!     metadata.put(U64::new(1), b"hello".to_vec());
55//!     metadata.put(U64::new(2), b"world".to_vec());
56//!
57//!     // Sync the metadata store (batch write changes)
58//!     metadata.sync().await.unwrap();
59//!
60//!     // Retrieve some metadata
61//!     let value = metadata.get(&U64::new(1)).unwrap();
62//!
63//! });
64//! ```
65
66mod storage;
67pub use storage::Metadata;
68use thiserror::Error;
69
70/// Errors that can occur when interacting with [Metadata].
71#[derive(Debug, Error)]
72pub enum Error {
73    #[error("runtime error: {0}")]
74    Runtime(#[from] commonware_runtime::Error),
75    #[error("blob too large: {0}")]
76    BlobTooLarge(u64),
77}
78
79/// Configuration for [Metadata] storage.
80#[derive(Clone)]
81pub struct Config<C> {
82    /// The [commonware_runtime::Storage] partition to use for storing metadata.
83    pub partition: String,
84
85    /// The codec configuration to use for the value stored in the metadata.
86    pub codec_config: C,
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use commonware_macros::{test_group, test_traced};
93    use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
94    use commonware_utils::{hex, sequence::U64};
95    use rand::{Rng, RngCore};
96
97    #[test_traced]
98    fn test_put_get_clear() {
99        // Initialize the deterministic context
100        let executor = deterministic::Runner::default();
101        executor.start(|context| async move {
102            // Create a metadata store
103            let cfg = Config {
104                partition: "test".to_string(),
105                codec_config: ((0..).into(), ()),
106            };
107            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
108                .await
109                .unwrap();
110
111            // Get a key that doesn't exist
112            let key = U64::new(42);
113            let value = metadata.get(&key);
114            assert!(value.is_none());
115
116            // Check metrics
117            let buffer = context.encode();
118            assert!(buffer.contains("sync_rewrites_total 0"));
119            assert!(buffer.contains("sync_overwrites_total 0"));
120            assert!(buffer.contains("keys 0"));
121
122            // Put a key
123            let hello = b"hello".to_vec();
124            metadata.put(key.clone(), hello.clone());
125
126            // Get the key
127            let value = metadata.get(&key).unwrap();
128            assert_eq!(value, &hello);
129
130            // Check metrics
131            let buffer = context.encode();
132            assert!(buffer.contains("sync_rewrites_total 0"));
133            assert!(buffer.contains("sync_overwrites_total 0"));
134            assert!(buffer.contains("keys 1"));
135
136            // Sync the metadata store
137            metadata.sync().await.unwrap();
138
139            // Check metrics
140            let buffer = context.encode();
141            assert!(buffer.contains("sync_rewrites_total 1"));
142            assert!(buffer.contains("sync_overwrites_total 0"));
143            assert!(buffer.contains("keys 1"));
144
145            // Reopen the metadata store
146            drop(metadata);
147            let cfg = Config {
148                partition: "test".to_string(),
149                codec_config: ((0..).into(), ()),
150            };
151            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
152                .await
153                .unwrap();
154
155            // Check metrics
156            let buffer = context.encode();
157            assert!(buffer.contains("sync_rewrites_total 0"));
158            assert!(buffer.contains("sync_overwrites_total 0"));
159            assert!(buffer.contains("keys 1"));
160
161            // Get the key
162            let value = metadata.get(&key).unwrap();
163            assert_eq!(value, &hello);
164
165            // Test clearing the metadata store
166            metadata.clear();
167            let value = metadata.get(&key);
168            assert!(value.is_none());
169
170            // Check metrics
171            let buffer = context.encode();
172            assert!(buffer.contains("sync_rewrites_total 1"));
173            assert!(buffer.contains("sync_overwrites_total 0"));
174            assert!(buffer.contains("keys 0"));
175
176            metadata.destroy().await.unwrap();
177        });
178    }
179
180    #[test_traced]
181    fn test_put_returns_previous_value() {
182        let executor = deterministic::Runner::default();
183        executor.start(|context| async move {
184            let cfg = Config {
185                partition: "test".to_string(),
186                codec_config: ((0..).into(), ()),
187            };
188            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
189                .await
190                .unwrap();
191
192            let key = U64::new(42);
193
194            // First put returns None (no previous value)
195            let previous = metadata.put(key.clone(), b"first".to_vec());
196            assert!(previous.is_none());
197
198            // Second put returns the previous value
199            let previous = metadata.put(key.clone(), b"second".to_vec());
200            assert_eq!(previous, Some(b"first".to_vec()));
201
202            // Third put returns the previous value
203            let previous = metadata.put(key.clone(), b"third".to_vec());
204            assert_eq!(previous, Some(b"second".to_vec()));
205
206            // Current value is the latest
207            assert_eq!(metadata.get(&key), Some(&b"third".to_vec()));
208
209            // Different key returns None
210            let other_key = U64::new(99);
211            let previous = metadata.put(other_key.clone(), b"other".to_vec());
212            assert!(previous.is_none());
213
214            // Sync and verify persistence
215            metadata.sync().await.unwrap();
216            drop(metadata);
217
218            let cfg = Config {
219                partition: "test".to_string(),
220                codec_config: ((0..).into(), ()),
221            };
222            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
223                .await
224                .unwrap();
225
226            // After restart, put still returns previous value
227            let previous = metadata.put(key.clone(), b"fourth".to_vec());
228            assert_eq!(previous, Some(b"third".to_vec()));
229
230            metadata.destroy().await.unwrap();
231        });
232    }
233
234    #[test_traced]
235    fn test_multi_sync() {
236        // Initialize the deterministic context
237        let executor = deterministic::Runner::default();
238        executor.start(|context| async move {
239            // Create a metadata store
240            let cfg = Config {
241                partition: "test".to_string(),
242                codec_config: ((0..).into(), ()),
243            };
244            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
245                .await
246                .unwrap();
247
248            // Put a key
249            let key = U64::new(42);
250            let hello = b"hello".to_vec();
251            metadata.put(key.clone(), hello.clone());
252
253            // Sync the metadata store
254            metadata.sync().await.unwrap();
255
256            // Check metrics
257            let buffer = context.encode();
258            assert!(buffer.contains("sync_rewrites_total 1"));
259            assert!(buffer.contains("sync_overwrites_total 0"));
260            assert!(buffer.contains("keys 1"));
261
262            // Put an overlapping key and a new key
263            let world = b"world".to_vec();
264            metadata.put(key.clone(), world.clone());
265            let key2 = U64::new(43);
266            let foo = b"foo".to_vec();
267            metadata.put(key2.clone(), foo.clone());
268
269            // Sync the metadata store
270            metadata.sync().await.unwrap();
271
272            // Check metrics
273            let buffer = context.encode();
274            assert!(buffer.contains("sync_rewrites_total 2"));
275            assert!(buffer.contains("sync_overwrites_total 0"));
276            assert!(buffer.contains("keys 2"));
277
278            // Reopen the metadata store
279            drop(metadata);
280            let cfg = Config {
281                partition: "test".to_string(),
282                codec_config: ((0..).into(), ()),
283            };
284            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
285                .await
286                .unwrap();
287
288            // Check metrics
289            let buffer = context.encode();
290            assert!(buffer.contains("sync_rewrites_total 0"));
291            assert!(buffer.contains("sync_overwrites_total 0"));
292            assert!(buffer.contains("keys 2"));
293
294            // Get the key
295            let value = metadata.get(&key).unwrap();
296            assert_eq!(value, &world);
297            let value = metadata.get(&key2).unwrap();
298            assert_eq!(value, &foo);
299
300            // Remove the key
301            metadata.remove(&key);
302
303            // Sync the metadata store
304            metadata.sync().await.unwrap();
305
306            // Check metrics
307            let buffer = context.encode();
308            assert!(buffer.contains("sync_rewrites_total 1"));
309            assert!(buffer.contains("sync_overwrites_total 0"));
310            assert!(buffer.contains("keys 1"));
311
312            // Reopen the metadata store
313            drop(metadata);
314            let cfg = Config {
315                partition: "test".to_string(),
316                codec_config: ((0..).into(), ()),
317            };
318            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
319                .await
320                .unwrap();
321
322            // Check metrics
323            let buffer = context.encode();
324            assert!(buffer.contains("sync_rewrites_total 0"));
325            assert!(buffer.contains("sync_overwrites_total 0"));
326            assert!(buffer.contains("keys 1"));
327
328            // Get the key
329            let value = metadata.get(&key);
330            assert!(value.is_none());
331            let value = metadata.get(&key2).unwrap();
332            assert_eq!(value, &foo);
333
334            metadata.destroy().await.unwrap();
335        });
336    }
337
338    #[test_traced]
339    fn test_recover_corrupted_one() {
340        // Initialize the deterministic context
341        let executor = deterministic::Runner::default();
342        executor.start(|context| async move {
343            // Create a metadata store
344            let cfg = Config {
345                partition: "test".to_string(),
346                codec_config: ((0..).into(), ()),
347            };
348            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
349                .await
350                .unwrap();
351
352            // Put a key
353            let key = U64::new(42);
354            let hello = b"hello".to_vec();
355            metadata.put(key.clone(), hello.clone());
356
357            // Sync the metadata store
358            metadata.sync().await.unwrap();
359
360            // Put an overlapping key and a new key
361            let world = b"world".to_vec();
362            metadata.put(key.clone(), world.clone());
363            let key2 = U64::new(43);
364            let foo = b"foo".to_vec();
365            metadata.put(key2, foo.clone());
366
367            // Sync the metadata store
368            metadata.sync().await.unwrap();
369            drop(metadata);
370
371            // Corrupt the metadata store
372            let (blob, _) = context.open("test", b"left").await.unwrap();
373            blob.write_at(b"corrupted".to_vec(), 0).await.unwrap();
374            blob.sync().await.unwrap();
375
376            // Reopen the metadata store
377            let cfg = Config {
378                partition: "test".to_string(),
379                codec_config: ((0..).into(), ()),
380            };
381            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
382                .await
383                .unwrap();
384
385            // Get the key (falls back to non-corrupt)
386            let value = metadata.get(&key).unwrap();
387            assert_eq!(value, &hello);
388
389            metadata.destroy().await.unwrap();
390        });
391    }
392
393    #[test_traced]
394    fn test_recover_corrupted_both() {
395        // Initialize the deterministic context
396        let executor = deterministic::Runner::default();
397        executor.start(|context| async move {
398            // Create a metadata store
399            let cfg = Config {
400                partition: "test".to_string(),
401                codec_config: ((0..).into(), ()),
402            };
403            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
404                .await
405                .unwrap();
406
407            // Put a key
408            let key = U64::new(42);
409            let hello = b"hello".to_vec();
410            metadata.put(key.clone(), hello.clone());
411
412            // Sync the metadata store
413            metadata.sync().await.unwrap();
414
415            // Put an overlapping key and a new key
416            let world = b"world".to_vec();
417            metadata.put(key.clone(), world.clone());
418            let key2 = U64::new(43);
419            let foo = b"foo".to_vec();
420            metadata.put(key2, foo.clone());
421
422            // Sync the metadata store
423            metadata.sync().await.unwrap();
424            drop(metadata);
425
426            // Corrupt the metadata store
427            let (blob, _) = context.open("test", b"left").await.unwrap();
428            blob.write_at(b"corrupted".to_vec(), 0).await.unwrap();
429            blob.sync().await.unwrap();
430            let (blob, _) = context.open("test", b"right").await.unwrap();
431            blob.write_at(b"corrupted".to_vec(), 0).await.unwrap();
432            blob.sync().await.unwrap();
433
434            // Reopen the metadata store
435            let cfg = Config {
436                partition: "test".to_string(),
437                codec_config: ((0..).into(), ()),
438            };
439            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
440                .await
441                .unwrap();
442
443            // Get the key (falls back to non-corrupt)
444            let value = metadata.get(&key);
445            assert!(value.is_none());
446
447            // Check metrics
448            let buffer = context.encode();
449            assert!(buffer.contains("sync_rewrites_total 0"));
450            assert!(buffer.contains("sync_overwrites_total 0"));
451            assert!(buffer.contains("keys 0"));
452
453            metadata.destroy().await.unwrap();
454        });
455    }
456
457    #[test_traced]
458    fn test_recover_corrupted_truncate() {
459        // Initialize the deterministic context
460        let executor = deterministic::Runner::default();
461        executor.start(|context| async move {
462            // Create a metadata store
463            let cfg = Config {
464                partition: "test".to_string(),
465                codec_config: ((0..).into(), ()),
466            };
467            let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
468
469            // Put a key
470            let key = U64::new(42);
471            let hello = b"hello".to_vec();
472            metadata.put(key.clone(), hello.clone());
473
474            // Sync the metadata store
475            metadata.sync().await.unwrap();
476
477            // Put an overlapping key and a new key
478            let world = b"world".to_vec();
479            metadata.put(key.clone(), world.clone());
480            let key2 = U64::new(43);
481            let foo = b"foo".to_vec();
482            metadata.put(key2, foo.clone());
483
484            // Sync the metadata store
485            metadata.sync().await.unwrap();
486            drop(metadata);
487
488            // Corrupt the metadata store
489            let (blob, len) = context.open("test", b"left").await.unwrap();
490            blob.resize(len - 8).await.unwrap();
491            blob.sync().await.unwrap();
492
493            // Reopen the metadata store
494            let cfg = Config {
495                partition: "test".to_string(),
496                codec_config: ((0..).into(), ()),
497            };
498            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
499                .await
500                .unwrap();
501
502            // Get the key (falls back to non-corrupt)
503            let value = metadata.get(&key).unwrap();
504            assert_eq!(value, &hello);
505
506            metadata.destroy().await.unwrap();
507        });
508    }
509
510    #[test_traced]
511    fn test_recover_corrupted_short() {
512        // Initialize the deterministic context
513        let executor = deterministic::Runner::default();
514        executor.start(|context| async move {
515            // Create a metadata store
516            let cfg = Config {
517                partition: "test".to_string(),
518                codec_config: ((0..).into(), ()),
519            };
520            let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
521
522            // Put a key
523            let key = U64::new(42);
524            let hello = b"hello".to_vec();
525            metadata.put(key.clone(), hello.clone());
526
527            // Sync the metadata store
528            metadata.sync().await.unwrap();
529
530            // Put an overlapping key and a new key
531            let world = b"world".to_vec();
532            metadata.put(key.clone(), world.clone());
533            let key2 = U64::new(43);
534            let foo = b"foo".to_vec();
535            metadata.put(key2, foo.clone());
536
537            // Sync the metadata store
538            metadata.sync().await.unwrap();
539            drop(metadata);
540
541            // Corrupt the metadata store
542            let (blob, _) = context.open("test", b"left").await.unwrap();
543            blob.resize(5).await.unwrap();
544            blob.sync().await.unwrap();
545
546            // Reopen the metadata store
547            let cfg = Config {
548                partition: "test".to_string(),
549                codec_config: ((0..).into(), ()),
550            };
551            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
552                .await
553                .unwrap();
554
555            // Get the key (falls back to non-corrupt)
556            let value = metadata.get(&key).unwrap();
557            assert_eq!(value, &hello);
558
559            metadata.destroy().await.unwrap();
560        });
561    }
562
563    #[test_traced]
564    fn test_unclean_shutdown() {
565        // Initialize the deterministic context
566        let executor = deterministic::Runner::default();
567        executor.start(|context| async move {
568            let key = U64::new(42);
569            let hello = b"hello".to_vec();
570            {
571                // Create a metadata store
572                let cfg = Config {
573                    partition: "test".to_string(),
574                    codec_config: ((0..).into(), ()),
575                };
576                let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
577
578                // Put a key
579                metadata.put(key.clone(), hello.clone());
580
581                // Drop metadata before sync
582            }
583
584            // Reopen the metadata store
585            let cfg = Config {
586                partition: "test".to_string(),
587                codec_config: ((0..).into(), ()),
588            };
589            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
590                .await
591                .unwrap();
592
593            // Get the key
594            let value = metadata.get(&key);
595            assert!(value.is_none());
596
597            // Check metrics
598            let buffer = context.encode();
599            assert!(buffer.contains("sync_rewrites_total 0"));
600            assert!(buffer.contains("sync_overwrites_total 0"));
601            assert!(buffer.contains("keys 0"));
602
603            metadata.destroy().await.unwrap();
604        });
605    }
606
607    #[test_traced]
608    #[should_panic(expected = "usize value is larger than u32")]
609    fn test_value_too_big_error() {
610        // Initialize the deterministic context
611        let executor = deterministic::Runner::default();
612        executor.start(|context| async move {
613            // Create a metadata store
614            let cfg = Config {
615                partition: "test".to_string(),
616                codec_config: ((0..).into(), ()),
617            };
618            let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
619
620            // Create a value that exceeds u32::MAX bytes
621            let value = vec![0u8; (u32::MAX as usize) + 1];
622            metadata.put(U64::new(1), value);
623
624            // Assert
625            metadata.sync().await.unwrap();
626        });
627    }
628
629    #[test_traced]
630    fn test_delta_writes() {
631        // Initialize the deterministic context
632        let executor = deterministic::Runner::default();
633        executor.start(|context| async move {
634            // Create a metadata store
635            let cfg = Config {
636                partition: "test".to_string(),
637                codec_config: ((0..).into(), ()),
638            };
639            let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
640
641            // Put initial keys
642            for i in 0..100 {
643                metadata.put(U64::new(i), vec![i as u8; 100]);
644            }
645
646            // First sync - should write everything to the first blob
647            //
648            // 100 keys * (8 bytes for key + 1 byte for len + 100 bytes for value) + 8 bytes for version + 4 bytes for checksum
649            metadata.sync().await.unwrap();
650            let buffer = context.encode();
651            assert!(buffer.contains("sync_rewrites_total 1"), "{buffer}");
652            assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
653            assert!(
654                buffer.contains("runtime_storage_write_bytes_total 10912"),
655                "{buffer}",
656            );
657
658            // Modify just one key
659            metadata.put(U64::new(51), vec![0xff; 100]);
660
661            // Sync again - should write everything to the second blob
662            metadata.sync().await.unwrap();
663            let buffer = context.encode();
664            assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
665            assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
666            assert!(
667                buffer.contains("runtime_storage_write_bytes_total 21824"),
668                "{buffer}",
669            );
670
671            // Sync again - should write only diff from the first blob
672            //
673            // 1 byte for len + 100 bytes for value + 8 byte for version + 4 bytes for checksum
674            metadata.sync().await.unwrap();
675            let buffer = context.encode();
676            assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
677            assert!(buffer.contains("sync_overwrites_total 1"), "{buffer}");
678            assert!(
679                buffer.contains("runtime_storage_write_bytes_total 21937"),
680                "{buffer}",
681            );
682
683            // Sync again - should write only diff from the second blob
684            //
685            // 8 byte for version + 4 bytes for checksum
686            metadata.sync().await.unwrap();
687            let buffer = context.encode();
688            assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
689            assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
690            assert!(
691                buffer.contains("runtime_storage_write_bytes_total 21949"),
692                "{buffer}",
693            );
694
695            // Remove a key - should rewrite everything
696            //
697            // 99 keys * (8 bytes for key + 1 bytes for len + 100 bytes for value) + 8 bytes for version + 4 bytes for checksum
698            metadata.remove(&U64::new(51));
699            metadata.sync().await.unwrap();
700            let buffer = context.encode();
701            assert!(buffer.contains("sync_rewrites_total 3"), "{buffer}");
702            assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
703            assert!(
704                buffer.contains("runtime_storage_write_bytes_total 32752"),
705                "{buffer}"
706            );
707
708            // Sync again - should also rewrite
709            metadata.sync().await.unwrap();
710            let buffer = context.encode();
711            assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
712            assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
713            assert!(
714                buffer.contains("runtime_storage_write_bytes_total 43555"),
715                "{buffer}"
716            );
717
718            // Modify in-place - should overwrite
719            //
720            // 1 byte for len + 100 bytes for value + 8 byte for version + 4 bytes for checksum
721            metadata.put(U64::new(50), vec![0xff; 100]);
722            metadata.sync().await.unwrap();
723            let buffer = context.encode();
724            assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
725            assert!(buffer.contains("sync_overwrites_total 3"), "{buffer}");
726            assert!(
727                buffer.contains("runtime_storage_write_bytes_total 43668"),
728                "{buffer}"
729            );
730
731            // Clean up
732            metadata.destroy().await.unwrap();
733        });
734    }
735
736    #[test_traced]
737    fn test_sync_with_no_changes() {
738        let executor = deterministic::Runner::default();
739        executor.start(|context| async move {
740            let cfg = Config {
741                partition: "test".to_string(),
742                codec_config: ((0..).into(), ()),
743            };
744            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
745                .await
746                .unwrap();
747
748            // Put initial data
749            metadata.put(U64::new(1), b"hello".to_vec());
750            metadata.sync().await.unwrap();
751
752            // Sync again with no changes - will rewrite because key_order_changed is recent
753            // (on startup, key_order_changed is set to next_version)
754            metadata.sync().await.unwrap();
755            let buffer = context.encode();
756            assert!(buffer.contains("sync_rewrites_total 2"));
757            assert!(buffer.contains("sync_overwrites_total 0"));
758
759            // Sync again - now key order is stable, should do overwrite
760            metadata.sync().await.unwrap();
761            let buffer = context.encode();
762            assert!(buffer.contains("sync_rewrites_total 2"));
763            assert!(buffer.contains("sync_overwrites_total 1"));
764
765            // Sync again - should continue doing overwrites
766            metadata.sync().await.unwrap();
767            let buffer = context.encode();
768            assert!(buffer.contains("sync_rewrites_total 2"));
769            assert!(buffer.contains("sync_overwrites_total 2"));
770
771            metadata.destroy().await.unwrap();
772        });
773    }
774
775    #[test_traced]
776    fn test_get_mut_marks_modified() {
777        let executor = deterministic::Runner::default();
778        executor.start(|context| async move {
779            let cfg = Config {
780                partition: "test".to_string(),
781                codec_config: ((0..).into(), ()),
782            };
783            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
784                .await
785                .unwrap();
786
787            // Put initial data
788            metadata.put(U64::new(1), b"hello".to_vec());
789            metadata.sync().await.unwrap();
790
791            // Sync again to ensure both blobs are populated
792            metadata.sync().await.unwrap();
793
794            // Use get_mut to modify value
795            let value = metadata.get_mut(&U64::new(1)).unwrap();
796            value[0] = b'H';
797
798            // Sync should detect the modification and do a rewrite (due to recent key_order_changed)
799            metadata.sync().await.unwrap();
800            let buffer = context.encode();
801            assert!(buffer.contains("sync_rewrites_total 2"));
802            assert!(buffer.contains("sync_overwrites_total 1"));
803
804            // Restart the metadata store
805            drop(metadata);
806            let metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
807                .await
808                .unwrap();
809
810            // Verify the change persisted
811            let value = metadata.get(&U64::new(1)).unwrap();
812            assert_eq!(value[0], b'H');
813
814            metadata.destroy().await.unwrap();
815        });
816    }
817
818    #[test_traced]
819    fn test_mixed_operation_sequences() {
820        let executor = deterministic::Runner::default();
821        executor.start(|context| async move {
822            let cfg = Config {
823                partition: "test".to_string(),
824                codec_config: ((0..).into(), ()),
825            };
826            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
827                .await
828                .unwrap();
829
830            let key = U64::new(1);
831
832            // Test: put -> remove -> put same key
833            metadata.put(key.clone(), b"first".to_vec());
834            metadata.remove(&key);
835            metadata.put(key.clone(), b"second".to_vec());
836            metadata.sync().await.unwrap();
837            let value = metadata.get(&key).unwrap();
838            assert_eq!(value, b"second");
839
840            // Test: put -> get_mut -> remove -> put
841            metadata.put(key.clone(), b"third".to_vec());
842            let value = metadata.get_mut(&key).unwrap();
843            value[0] = b'T';
844            metadata.remove(&key);
845            metadata.put(key.clone(), b"fourth".to_vec());
846            metadata.sync().await.unwrap();
847            let value = metadata.get(&key).unwrap();
848            assert_eq!(value, b"fourth");
849
850            // Restart the metadata store
851            drop(metadata);
852            let metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
853                .await
854                .unwrap();
855
856            // Verify the changes persisted
857            let value = metadata.get(&key).unwrap();
858            assert_eq!(value, b"fourth");
859
860            metadata.destroy().await.unwrap();
861        });
862    }
863
864    #[test_traced]
865    fn test_overwrite_vs_rewrite() {
866        let executor = deterministic::Runner::default();
867        executor.start(|context| async move {
868            let cfg = Config {
869                partition: "test".to_string(),
870                codec_config: ((0..).into(), ()),
871            };
872            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
873                .await
874                .unwrap();
875
876            // Set up initial data
877            metadata.put(U64::new(1), vec![1; 10]);
878            metadata.put(U64::new(2), vec![2; 10]);
879            metadata.sync().await.unwrap();
880
881            // Same size modification before both blobs are populated
882            metadata.put(U64::new(1), vec![0xFF; 10]);
883            metadata.sync().await.unwrap();
884            let buffer = context.encode();
885            assert!(buffer.contains("sync_rewrites_total 2"));
886            assert!(buffer.contains("sync_overwrites_total 0"));
887
888            // Let key order stabilize with another sync
889            metadata.sync().await.unwrap();
890            let buffer = context.encode();
891            assert!(buffer.contains("sync_rewrites_total 2"));
892            assert!(buffer.contains("sync_overwrites_total 1"));
893
894            // Same size modification after both blobs are populated - should overwrite
895            metadata.put(U64::new(1), vec![0xAA; 10]);
896            metadata.sync().await.unwrap();
897            let buffer = context.encode();
898            assert!(buffer.contains("sync_rewrites_total 2"));
899            assert!(buffer.contains("sync_overwrites_total 2"));
900
901            // Different size modification - should rewrite
902            metadata.put(U64::new(1), vec![0xFF; 20]);
903            metadata.sync().await.unwrap();
904            let buffer = context.encode();
905            assert!(buffer.contains("sync_rewrites_total 3"));
906            assert!(buffer.contains("sync_overwrites_total 2"));
907
908            // Add new key - should rewrite (key order changed)
909            metadata.put(U64::new(3), vec![3; 10]);
910            metadata.sync().await.unwrap();
911            let buffer = context.encode();
912            assert!(buffer.contains("sync_rewrites_total 4"));
913            assert!(buffer.contains("sync_overwrites_total 2"));
914
915            // Stabilize key order
916            metadata.sync().await.unwrap();
917            let buffer = context.encode();
918            assert!(buffer.contains("sync_rewrites_total 5"));
919            assert!(buffer.contains("sync_overwrites_total 2"));
920
921            // Modify existing key with same size - should overwrite after stabilized
922            metadata.put(U64::new(2), vec![0xAA; 10]);
923            metadata.sync().await.unwrap();
924            let buffer = context.encode();
925            assert!(buffer.contains("sync_rewrites_total 5"));
926            assert!(buffer.contains("sync_overwrites_total 3"));
927
928            metadata.destroy().await.unwrap();
929        });
930    }
931
932    #[test_traced]
933    fn test_blob_resize() {
934        let executor = deterministic::Runner::default();
935        executor.start(|context| async move {
936            let cfg = Config {
937                partition: "test".to_string(),
938                codec_config: ((0..).into(), ()),
939            };
940            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
941                .await
942                .unwrap();
943
944            // Start with large data
945            for i in 0..10 {
946                metadata.put(U64::new(i), vec![i as u8; 100]);
947            }
948            metadata.sync().await.unwrap();
949
950            // Stabilize key order
951            metadata.sync().await.unwrap();
952            let buffer = context.encode();
953            assert!(buffer.contains("sync_rewrites_total 2"));
954            assert!(buffer.contains("sync_overwrites_total 0"));
955
956            // Remove most data to make blob smaller
957            for i in 1..10 {
958                metadata.remove(&U64::new(i));
959            }
960            metadata.sync().await.unwrap();
961
962            // Verify the remaining data is still accessible
963            let value = metadata.get(&U64::new(0)).unwrap();
964            assert_eq!(value.len(), 100);
965            assert_eq!(value[0], 0);
966
967            // Check that sync properly handles blob resizing
968            let buffer = context.encode();
969            assert!(buffer.contains("sync_rewrites_total 3"));
970            assert!(buffer.contains("sync_overwrites_total 0"));
971
972            // Restart the metadata store
973            drop(metadata);
974            let metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
975                .await
976                .unwrap();
977
978            // Verify the changes persisted
979            let value = metadata.get(&U64::new(0)).unwrap();
980            assert_eq!(value.len(), 100);
981            assert_eq!(value[0], 0);
982
983            // Verify the removed keys are not present
984            for i in 1..10 {
985                assert!(metadata.get(&U64::new(i)).is_none());
986            }
987
988            metadata.destroy().await.unwrap();
989        });
990    }
991
992    #[test_traced]
993    fn test_clear_and_repopulate() {
994        let executor = deterministic::Runner::default();
995        executor.start(|context| async move {
996            let cfg = Config {
997                partition: "test".to_string(),
998                codec_config: ((0..).into(), ()),
999            };
1000            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
1001                .await
1002                .unwrap();
1003
1004            // Initial data
1005            metadata.put(U64::new(1), b"first".to_vec());
1006            metadata.put(U64::new(2), b"second".to_vec());
1007            metadata.sync().await.unwrap();
1008
1009            // Clear everything
1010            metadata.clear();
1011            metadata.sync().await.unwrap();
1012
1013            // Verify empty
1014            assert!(metadata.get(&U64::new(1)).is_none());
1015            assert!(metadata.get(&U64::new(2)).is_none());
1016
1017            // Restart the metadata store
1018            drop(metadata);
1019            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
1020                .await
1021                .unwrap();
1022
1023            // Verify the changes persisted
1024            assert!(metadata.get(&U64::new(1)).is_none());
1025            assert!(metadata.get(&U64::new(2)).is_none());
1026
1027            // Repopulate with different data
1028            metadata.put(U64::new(3), b"third".to_vec());
1029            metadata.put(U64::new(4), b"fourth".to_vec());
1030            metadata.sync().await.unwrap();
1031
1032            // Verify new data
1033            assert_eq!(metadata.get(&U64::new(3)).unwrap(), b"third");
1034            assert_eq!(metadata.get(&U64::new(4)).unwrap(), b"fourth");
1035            assert!(metadata.get(&U64::new(1)).is_none());
1036            assert!(metadata.get(&U64::new(2)).is_none());
1037
1038            metadata.destroy().await.unwrap();
1039        });
1040    }
1041
1042    fn test_metadata_operations_and_restart(num_operations: usize) -> String {
1043        let executor = deterministic::Runner::default();
1044        executor.start(|mut context| async move {
1045            let cfg = Config {
1046                partition: "test_determinism".to_string(),
1047                codec_config: ((0..).into(), ()),
1048            };
1049            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
1050                .await
1051                .unwrap();
1052
1053            // Perform a series of deterministic operations
1054            for i in 0..num_operations {
1055                let key = U64::new(i as u64);
1056                let mut value = vec![0u8; 64];
1057                context.fill_bytes(&mut value);
1058                metadata.put(key, value);
1059
1060                // Sync occasionally
1061                if context.gen_bool(0.1) {
1062                    metadata.sync().await.unwrap();
1063                }
1064
1065                // Update some existing keys
1066                if context.gen_bool(0.1) {
1067                    let selected_index = context.gen_range(0..=i);
1068                    let update_key = U64::new(selected_index as u64);
1069                    let mut new_value = vec![0u8; 64];
1070                    context.fill_bytes(&mut new_value);
1071                    metadata.put(update_key, new_value);
1072                }
1073
1074                // Remove some keys
1075                if context.gen_bool(0.1) {
1076                    let selected_index = context.gen_range(0..=i);
1077                    let remove_key = U64::new(selected_index as u64);
1078                    metadata.remove(&remove_key);
1079                }
1080
1081                // Use get_mut occasionally
1082                if context.gen_bool(0.1) {
1083                    let selected_index = context.gen_range(0..=i);
1084                    let mut_key = U64::new(selected_index as u64);
1085                    if let Some(value) = metadata.get_mut(&mut_key) {
1086                        if !value.is_empty() {
1087                            value[0] = value[0].wrapping_add(1);
1088                        }
1089                    }
1090                }
1091            }
1092            metadata.sync().await.unwrap();
1093
1094            // Destroy the metadata store
1095            metadata.destroy().await.unwrap();
1096
1097            context.auditor().state()
1098        })
1099    }
1100
1101    #[test_group("slow")]
1102    #[test_traced]
1103    fn test_determinism() {
1104        let state1 = test_metadata_operations_and_restart(1_000);
1105        let state2 = test_metadata_operations_and_restart(1_000);
1106        assert_eq!(state1, state2);
1107    }
1108
1109    #[test_traced]
1110    fn test_keys_iterator() {
1111        // Initialize the deterministic context
1112        let executor = deterministic::Runner::default();
1113        executor.start(|context| async move {
1114            // Create a metadata store
1115            let cfg = Config {
1116                partition: "test".to_string(),
1117                codec_config: ((0..).into(), ()),
1118            };
1119            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1120                .await
1121                .unwrap();
1122
1123            // Add some keys with different prefixes
1124            metadata.put(U64::new(0x1000), b"value1".to_vec());
1125            metadata.put(U64::new(0x1001), b"value2".to_vec());
1126            metadata.put(U64::new(0x1002), b"value3".to_vec());
1127            metadata.put(U64::new(0x2000), b"value4".to_vec());
1128            metadata.put(U64::new(0x2001), b"value5".to_vec());
1129            metadata.put(U64::new(0x3000), b"value6".to_vec());
1130
1131            // Test iterating over all keys
1132            let all_keys: Vec<_> = metadata.keys().cloned().collect();
1133            assert_eq!(all_keys.len(), 6);
1134            assert!(all_keys.contains(&U64::new(0x1000)));
1135            assert!(all_keys.contains(&U64::new(0x3000)));
1136
1137            // Test iterating with prefix 0x10
1138            let prefix = hex!("0x00000000000010");
1139            let prefix_keys: Vec<_> = metadata
1140                .keys()
1141                .filter(|k| k.as_ref().starts_with(&prefix))
1142                .cloned()
1143                .collect();
1144            assert_eq!(prefix_keys.len(), 3);
1145            assert!(prefix_keys.contains(&U64::new(0x1000)));
1146            assert!(prefix_keys.contains(&U64::new(0x1001)));
1147            assert!(prefix_keys.contains(&U64::new(0x1002)));
1148            assert!(!prefix_keys.contains(&U64::new(0x2000)));
1149
1150            // Test iterating with prefix 0x20
1151            let prefix = hex!("0x00000000000020");
1152            let prefix_keys: Vec<_> = metadata
1153                .keys()
1154                .filter(|k| k.as_ref().starts_with(&prefix))
1155                .cloned()
1156                .collect();
1157            assert_eq!(prefix_keys.len(), 2);
1158            assert!(prefix_keys.contains(&U64::new(0x2000)));
1159            assert!(prefix_keys.contains(&U64::new(0x2001)));
1160
1161            // Test with non-matching prefix
1162            let prefix = hex!("0x00000000000040");
1163            let prefix_keys: Vec<_> = metadata
1164                .keys()
1165                .filter(|k| k.as_ref().starts_with(&prefix))
1166                .cloned()
1167                .collect();
1168            assert_eq!(prefix_keys.len(), 0);
1169
1170            metadata.destroy().await.unwrap();
1171        });
1172    }
1173
1174    #[test_traced]
1175    fn test_retain() {
1176        // Initialize the deterministic context
1177        let executor = deterministic::Runner::default();
1178        executor.start(|context| async move {
1179            // Create a metadata store
1180            let cfg = Config {
1181                partition: "test".to_string(),
1182                codec_config: ((0..).into(), ()),
1183            };
1184            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1185                .await
1186                .unwrap();
1187
1188            // Add some keys with different prefixes
1189            metadata.put(U64::new(0x1000), b"value1".to_vec());
1190            metadata.put(U64::new(0x1001), b"value2".to_vec());
1191            metadata.put(U64::new(0x1002), b"value3".to_vec());
1192            metadata.put(U64::new(0x2000), b"value4".to_vec());
1193            metadata.put(U64::new(0x2001), b"value5".to_vec());
1194            metadata.put(U64::new(0x3000), b"value6".to_vec());
1195
1196            // Check initial metrics
1197            let buffer = context.encode();
1198            assert!(buffer.contains("keys 6"));
1199
1200            // Remove keys with prefix 0x10
1201            let prefix = hex!("0x00000000000010");
1202            metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
1203
1204            // Check metrics after removal
1205            let buffer = context.encode();
1206            assert!(buffer.contains("keys 3"));
1207
1208            // Verify remaining keys
1209            assert!(metadata.get(&U64::new(0x1000)).is_none());
1210            assert!(metadata.get(&U64::new(0x1001)).is_none());
1211            assert!(metadata.get(&U64::new(0x1002)).is_none());
1212            assert!(metadata.get(&U64::new(0x2000)).is_some());
1213            assert!(metadata.get(&U64::new(0x2001)).is_some());
1214            assert!(metadata.get(&U64::new(0x3000)).is_some());
1215
1216            // Sync and reopen to ensure persistence
1217            metadata.sync().await.unwrap();
1218            drop(metadata);
1219            let cfg = Config {
1220                partition: "test".to_string(),
1221                codec_config: ((0..).into(), ()),
1222            };
1223            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1224                .await
1225                .unwrap();
1226
1227            // Verify keys are still removed after restart
1228            assert!(metadata.get(&U64::new(0x1000)).is_none());
1229            assert!(metadata.get(&U64::new(0x2000)).is_some());
1230            assert_eq!(metadata.keys().count(), 3);
1231
1232            // Remove non-existing prefix
1233            let prefix = hex!("0x00000000000040");
1234            metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
1235
1236            // Remove all remaining keys
1237            metadata.retain(|_, _| false);
1238            assert_eq!(metadata.keys().count(), 0);
1239
1240            metadata.destroy().await.unwrap();
1241        });
1242    }
1243}