Skip to main content

commonware_storage/metadata/
mod.rs

1//! A key-value store optimized for atomically committing a small collection of metadata.
2//!
3//! [Metadata] is a key-value store optimized for tracking a small collection of metadata
4//! that allows multiple updates to be committed in a single batch. It is commonly used with
5//! a variety of other underlying storage systems to persist application state across restarts.
6//!
7//! # Format
8//!
9//! Data stored in [Metadata] is serialized as a sequence of key-value pairs in either a
10//! "left" or "right" blob:
11//!
12//! ```text
13//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
14//! | 0 | 1 |    ...    | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |  ...  |50 |...|90 |91 |92 |93 |
15//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
16//! |    Version (u64)  |      Key1     |              Value1           |...|  CRC32(u32)   |
17//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
18//! ```
19//!
20//! _To ensure the integrity of the data, a CRC32 checksum is appended to the end of the blob.
21//! This ensures that partial writes are detected before any data is relied on._
22//!
23//! # Atomic Updates
24//!
25//! To provide support for atomic updates, [Metadata] maintains two blobs: a "left" and a "right"
26//! blob. When a new update is committed, it is written to the "older" of the two blobs (indicated
27//! by the version persisted). Writes to [commonware_runtime::Blob] are not atomic and may only
28//! complete partially, so we only overwrite the "newer" blob once the "older" blob has been synced
29//! (otherwise, we would not be guaranteed to recover the latest complete state from disk on
30//! restart as half of a blob could be old data and half new data).
31//!
32//! # Delta Writes
33//!
34//! If the set of keys and the length of values are stable, [Metadata] will only write an update's
35//! delta to disk (rather than rewriting the entire metadata). This makes [Metadata] a great choice
36//! for maintaining even large collections of data (with the majority rarely modified).
37//!
38//! # Example
39//!
40//! ```rust
41//! use commonware_runtime::{Spawner, Runner, deterministic};
42//! use commonware_storage::metadata::{Metadata, Config};
43//! use commonware_utils::sequence::U64;
44//!
45//! let executor = deterministic::Runner::default();
46//! executor.start(|context| async move {
47//!     // Create a store
48//!     let mut metadata = Metadata::init(context, Config{
49//!         partition: "partition".to_string(),
50//!         codec_config: ((0..).into(), ()),
51//!     }).await.unwrap();
52//!
53//!     // Store metadata
54//!     metadata.put(U64::new(1), b"hello".to_vec());
55//!     metadata.put(U64::new(2), b"world".to_vec());
56//!
57//!     // Sync the metadata store (batch write changes)
58//!     metadata.sync().await.unwrap();
59//!
60//!     // Retrieve some metadata
61//!     let value = metadata.get(&U64::new(1)).unwrap();
62//!
63//! });
64//! ```
65
66mod storage;
67pub use storage::Metadata;
68use thiserror::Error;
69
70/// Errors that can occur when interacting with [Metadata].
71#[derive(Debug, Error)]
72pub enum Error {
73    #[error("runtime error: {0}")]
74    Runtime(#[from] commonware_runtime::Error),
75}
76
77/// Configuration for [Metadata] storage.
78#[derive(Clone)]
79pub struct Config<C> {
80    /// The [commonware_runtime::Storage] partition to use for storing metadata.
81    pub partition: String,
82
83    /// The codec configuration to use for the value stored in the metadata.
84    pub codec_config: C,
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90    use commonware_macros::{test_group, test_traced};
91    use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
92    use commonware_utils::{hex, sequence::U64};
93    use rand::{Rng, RngCore};
94
95    #[test_traced]
96    fn test_put_get_clear() {
97        // Initialize the deterministic context
98        let executor = deterministic::Runner::default();
99        executor.start(|context| async move {
100            // Create a metadata store
101            let cfg = Config {
102                partition: "test".to_string(),
103                codec_config: ((0..).into(), ()),
104            };
105            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
106                .await
107                .unwrap();
108
109            // Get a key that doesn't exist
110            let key = U64::new(42);
111            let value = metadata.get(&key);
112            assert!(value.is_none());
113
114            // Check metrics
115            let buffer = context.encode();
116            assert!(buffer.contains("first_sync_rewrites_total 0"));
117            assert!(buffer.contains("first_sync_overwrites_total 0"));
118            assert!(buffer.contains("first_keys 0"));
119
120            // Put a key
121            let hello = b"hello".to_vec();
122            metadata.put(key.clone(), hello.clone());
123
124            // Get the key
125            let value = metadata.get(&key).unwrap();
126            assert_eq!(value, &hello);
127
128            // Check metrics
129            let buffer = context.encode();
130            assert!(buffer.contains("first_sync_rewrites_total 0"));
131            assert!(buffer.contains("first_sync_overwrites_total 0"));
132            assert!(buffer.contains("first_keys 1"));
133
134            // Sync the metadata store
135            metadata.sync().await.unwrap();
136
137            // Check metrics
138            let buffer = context.encode();
139            assert!(buffer.contains("first_sync_rewrites_total 1"));
140            assert!(buffer.contains("first_sync_overwrites_total 0"));
141            assert!(buffer.contains("first_keys 1"));
142
143            // Reopen the metadata store
144            drop(metadata);
145            let cfg = Config {
146                partition: "test".to_string(),
147                codec_config: ((0..).into(), ()),
148            };
149            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
150                .await
151                .unwrap();
152
153            // Check metrics
154            let buffer = context.encode();
155            assert!(buffer.contains("second_sync_rewrites_total 0"));
156            assert!(buffer.contains("second_sync_overwrites_total 0"));
157            assert!(buffer.contains("second_keys 1"));
158
159            // Get the key
160            let value = metadata.get(&key).unwrap();
161            assert_eq!(value, &hello);
162
163            // Test clearing the metadata store
164            metadata.clear();
165            let value = metadata.get(&key);
166            assert!(value.is_none());
167
168            // Check metrics
169            let buffer = context.encode();
170            assert!(buffer.contains("second_sync_rewrites_total 0"));
171            assert!(buffer.contains("second_sync_overwrites_total 0"));
172            assert!(buffer.contains("second_keys 0"));
173
174            metadata.destroy().await.unwrap();
175        });
176    }
177
178    #[test_traced]
179    fn test_put_returns_previous_value() {
180        let executor = deterministic::Runner::default();
181        executor.start(|context| async move {
182            let cfg = Config {
183                partition: "test".to_string(),
184                codec_config: ((0..).into(), ()),
185            };
186            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
187                .await
188                .unwrap();
189
190            let key = U64::new(42);
191
192            // First put returns None (no previous value)
193            let previous = metadata.put(key.clone(), b"first".to_vec());
194            assert!(previous.is_none());
195
196            // Second put returns the previous value
197            let previous = metadata.put(key.clone(), b"second".to_vec());
198            assert_eq!(previous, Some(b"first".to_vec()));
199
200            // Third put returns the previous value
201            let previous = metadata.put(key.clone(), b"third".to_vec());
202            assert_eq!(previous, Some(b"second".to_vec()));
203
204            // Current value is the latest
205            assert_eq!(metadata.get(&key), Some(&b"third".to_vec()));
206
207            // Different key returns None
208            let other_key = U64::new(99);
209            let previous = metadata.put(other_key.clone(), b"other".to_vec());
210            assert!(previous.is_none());
211
212            // Sync and verify persistence
213            metadata.sync().await.unwrap();
214            drop(metadata);
215
216            let cfg = Config {
217                partition: "test".to_string(),
218                codec_config: ((0..).into(), ()),
219            };
220            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
221                .await
222                .unwrap();
223
224            // After restart, put still returns previous value
225            let previous = metadata.put(key.clone(), b"fourth".to_vec());
226            assert_eq!(previous, Some(b"third".to_vec()));
227
228            metadata.destroy().await.unwrap();
229        });
230    }
231
232    #[test_traced]
233    fn test_multi_sync() {
234        // Initialize the deterministic context
235        let executor = deterministic::Runner::default();
236        executor.start(|context| async move {
237            // Create a metadata store
238            let cfg = Config {
239                partition: "test".to_string(),
240                codec_config: ((0..).into(), ()),
241            };
242            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
243                .await
244                .unwrap();
245
246            // Put a key
247            let key = U64::new(42);
248            let hello = b"hello".to_vec();
249            metadata.put(key.clone(), hello.clone());
250
251            // Sync the metadata store
252            metadata.sync().await.unwrap();
253
254            // Check metrics
255            let buffer = context.encode();
256            assert!(buffer.contains("first_sync_rewrites_total 1"));
257            assert!(buffer.contains("first_sync_overwrites_total 0"));
258            assert!(buffer.contains("first_keys 1"));
259
260            // Put an overlapping key and a new key
261            let world = b"world".to_vec();
262            metadata.put(key.clone(), world.clone());
263            let key2 = U64::new(43);
264            let foo = b"foo".to_vec();
265            metadata.put(key2.clone(), foo.clone());
266
267            // Sync the metadata store
268            metadata.sync().await.unwrap();
269
270            // Check metrics
271            let buffer = context.encode();
272            assert!(buffer.contains("first_sync_rewrites_total 2"));
273            assert!(buffer.contains("first_sync_overwrites_total 0"));
274            assert!(buffer.contains("first_keys 2"));
275
276            // Reopen the metadata store
277            drop(metadata);
278            let cfg = Config {
279                partition: "test".to_string(),
280                codec_config: ((0..).into(), ()),
281            };
282            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
283                .await
284                .unwrap();
285
286            // Check metrics
287            let buffer = context.encode();
288            assert!(buffer.contains("second_sync_rewrites_total 0"));
289            assert!(buffer.contains("second_sync_overwrites_total 0"));
290            assert!(buffer.contains("second_keys 2"));
291
292            // Get the key
293            let value = metadata.get(&key).unwrap();
294            assert_eq!(value, &world);
295            let value = metadata.get(&key2).unwrap();
296            assert_eq!(value, &foo);
297
298            // Remove the key
299            metadata.remove(&key);
300
301            // Sync the metadata store
302            metadata.sync().await.unwrap();
303
304            // Check metrics
305            let buffer = context.encode();
306            assert!(buffer.contains("second_sync_rewrites_total 1"));
307            assert!(buffer.contains("second_sync_overwrites_total 0"));
308            assert!(buffer.contains("second_keys 1"));
309
310            // Reopen the metadata store
311            drop(metadata);
312            let cfg = Config {
313                partition: "test".to_string(),
314                codec_config: ((0..).into(), ()),
315            };
316            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("third"), cfg)
317                .await
318                .unwrap();
319
320            // Check metrics
321            let buffer = context.encode();
322            assert!(buffer.contains("third_sync_rewrites_total 0"));
323            assert!(buffer.contains("third_sync_overwrites_total 0"));
324            assert!(buffer.contains("third_keys 1"));
325
326            // Get the key
327            let value = metadata.get(&key);
328            assert!(value.is_none());
329            let value = metadata.get(&key2).unwrap();
330            assert_eq!(value, &foo);
331
332            metadata.destroy().await.unwrap();
333        });
334    }
335
336    #[test_traced]
337    fn test_recover_corrupted_one() {
338        // Initialize the deterministic context
339        let executor = deterministic::Runner::default();
340        executor.start(|context| async move {
341            // Create a metadata store
342            let cfg = Config {
343                partition: "test".to_string(),
344                codec_config: ((0..).into(), ()),
345            };
346            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
347                .await
348                .unwrap();
349
350            // Put a key
351            let key = U64::new(42);
352            let hello = b"hello".to_vec();
353            metadata.put(key.clone(), hello.clone());
354
355            // Sync the metadata store
356            metadata.sync().await.unwrap();
357
358            // Put an overlapping key and a new key
359            let world = b"world".to_vec();
360            metadata.put(key.clone(), world.clone());
361            let key2 = U64::new(43);
362            let foo = b"foo".to_vec();
363            metadata.put(key2, foo.clone());
364
365            // Sync the metadata store
366            metadata.sync().await.unwrap();
367            drop(metadata);
368
369            // Corrupt the metadata store
370            let (blob, _) = context.open("test", b"left").await.unwrap();
371            blob.write_at(0, b"corrupted".to_vec()).await.unwrap();
372            blob.sync().await.unwrap();
373
374            // Reopen the metadata store
375            let cfg = Config {
376                partition: "test".to_string(),
377                codec_config: ((0..).into(), ()),
378            };
379            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
380                .await
381                .unwrap();
382
383            // Get the key (falls back to non-corrupt)
384            let value = metadata.get(&key).unwrap();
385            assert_eq!(value, &hello);
386
387            metadata.destroy().await.unwrap();
388        });
389    }
390
391    #[test_traced]
392    fn test_recover_corrupted_both() {
393        // Initialize the deterministic context
394        let executor = deterministic::Runner::default();
395        executor.start(|context| async move {
396            // Create a metadata store
397            let cfg = Config {
398                partition: "test".to_string(),
399                codec_config: ((0..).into(), ()),
400            };
401            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
402                .await
403                .unwrap();
404
405            // Put a key
406            let key = U64::new(42);
407            let hello = b"hello".to_vec();
408            metadata.put(key.clone(), hello.clone());
409
410            // Sync the metadata store
411            metadata.sync().await.unwrap();
412
413            // Put an overlapping key and a new key
414            let world = b"world".to_vec();
415            metadata.put(key.clone(), world.clone());
416            let key2 = U64::new(43);
417            let foo = b"foo".to_vec();
418            metadata.put(key2, foo.clone());
419
420            // Sync the metadata store
421            metadata.sync().await.unwrap();
422            drop(metadata);
423
424            // Corrupt the metadata store
425            let (blob, _) = context.open("test", b"left").await.unwrap();
426            blob.write_at(0, b"corrupted".to_vec()).await.unwrap();
427            blob.sync().await.unwrap();
428            let (blob, _) = context.open("test", b"right").await.unwrap();
429            blob.write_at(0, b"corrupted".to_vec()).await.unwrap();
430            blob.sync().await.unwrap();
431
432            // Reopen the metadata store
433            let cfg = Config {
434                partition: "test".to_string(),
435                codec_config: ((0..).into(), ()),
436            };
437            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
438                .await
439                .unwrap();
440
441            // Get the key (falls back to non-corrupt)
442            let value = metadata.get(&key);
443            assert!(value.is_none());
444
445            // Check metrics
446            let buffer = context.encode();
447            assert!(buffer.contains("second_sync_rewrites_total 0"));
448            assert!(buffer.contains("second_sync_overwrites_total 0"));
449            assert!(buffer.contains("second_keys 0"));
450
451            metadata.destroy().await.unwrap();
452        });
453    }
454
455    #[test_traced]
456    fn test_recover_corrupted_truncate() {
457        // Initialize the deterministic context
458        let executor = deterministic::Runner::default();
459        executor.start(|context| async move {
460            // Create a metadata store
461            let cfg = Config {
462                partition: "test".to_string(),
463                codec_config: ((0..).into(), ()),
464            };
465            let mut metadata = Metadata::init(context.with_label("first"), cfg)
466                .await
467                .unwrap();
468
469            // Put a key
470            let key = U64::new(42);
471            let hello = b"hello".to_vec();
472            metadata.put(key.clone(), hello.clone());
473
474            // Sync the metadata store
475            metadata.sync().await.unwrap();
476
477            // Put an overlapping key and a new key
478            let world = b"world".to_vec();
479            metadata.put(key.clone(), world.clone());
480            let key2 = U64::new(43);
481            let foo = b"foo".to_vec();
482            metadata.put(key2, foo.clone());
483
484            // Sync the metadata store
485            metadata.sync().await.unwrap();
486            drop(metadata);
487
488            // Corrupt the metadata store
489            let (blob, len) = context.open("test", b"left").await.unwrap();
490            blob.resize(len - 8).await.unwrap();
491            blob.sync().await.unwrap();
492
493            // Reopen the metadata store
494            let cfg = Config {
495                partition: "test".to_string(),
496                codec_config: ((0..).into(), ()),
497            };
498            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
499                .await
500                .unwrap();
501
502            // Get the key (falls back to non-corrupt)
503            let value = metadata.get(&key).unwrap();
504            assert_eq!(value, &hello);
505
506            metadata.destroy().await.unwrap();
507        });
508    }
509
510    #[test_traced]
511    fn test_recover_corrupted_short() {
512        // Initialize the deterministic context
513        let executor = deterministic::Runner::default();
514        executor.start(|context| async move {
515            // Create a metadata store
516            let cfg = Config {
517                partition: "test".to_string(),
518                codec_config: ((0..).into(), ()),
519            };
520            let mut metadata = Metadata::init(context.with_label("first"), cfg)
521                .await
522                .unwrap();
523
524            // Put a key
525            let key = U64::new(42);
526            let hello = b"hello".to_vec();
527            metadata.put(key.clone(), hello.clone());
528
529            // Sync the metadata store
530            metadata.sync().await.unwrap();
531
532            // Put an overlapping key and a new key
533            let world = b"world".to_vec();
534            metadata.put(key.clone(), world.clone());
535            let key2 = U64::new(43);
536            let foo = b"foo".to_vec();
537            metadata.put(key2, foo.clone());
538
539            // Sync the metadata store
540            metadata.sync().await.unwrap();
541            drop(metadata);
542
543            // Corrupt the metadata store
544            let (blob, _) = context.open("test", b"left").await.unwrap();
545            blob.resize(5).await.unwrap();
546            blob.sync().await.unwrap();
547
548            // Reopen the metadata store
549            let cfg = Config {
550                partition: "test".to_string(),
551                codec_config: ((0..).into(), ()),
552            };
553            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
554                .await
555                .unwrap();
556
557            // Get the key (falls back to non-corrupt)
558            let value = metadata.get(&key).unwrap();
559            assert_eq!(value, &hello);
560
561            metadata.destroy().await.unwrap();
562        });
563    }
564
565    #[test_traced]
566    fn test_unclean_shutdown() {
567        // Initialize the deterministic context
568        let executor = deterministic::Runner::default();
569        executor.start(|context| async move {
570            let key = U64::new(42);
571            let hello = b"hello".to_vec();
572            {
573                // Create a metadata store
574                let cfg = Config {
575                    partition: "test".to_string(),
576                    codec_config: ((0..).into(), ()),
577                };
578                let mut metadata = Metadata::init(context.with_label("first"), cfg)
579                    .await
580                    .unwrap();
581
582                // Put a key
583                metadata.put(key.clone(), hello.clone());
584
585                // Drop metadata before sync
586            }
587
588            // Reopen the metadata store
589            let cfg = Config {
590                partition: "test".to_string(),
591                codec_config: ((0..).into(), ()),
592            };
593            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
594                .await
595                .unwrap();
596
597            // Get the key
598            let value = metadata.get(&key);
599            assert!(value.is_none());
600
601            // Check metrics
602            let buffer = context.encode();
603            assert!(buffer.contains("second_sync_rewrites_total 0"));
604            assert!(buffer.contains("second_sync_overwrites_total 0"));
605            assert!(buffer.contains("second_keys 0"));
606
607            metadata.destroy().await.unwrap();
608        });
609    }
610
611    #[test_traced]
612    #[should_panic(expected = "usize value is larger than u32")]
613    fn test_value_too_big_error() {
614        // Initialize the deterministic context
615        let executor = deterministic::Runner::default();
616        executor.start(|context| async move {
617            // Create a metadata store
618            let cfg = Config {
619                partition: "test".to_string(),
620                codec_config: ((0..).into(), ()),
621            };
622            let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
623
624            // Create a value that exceeds u32::MAX bytes
625            let value = vec![0u8; (u32::MAX as usize) + 1];
626            metadata.put(U64::new(1), value);
627
628            // Assert
629            metadata.sync().await.unwrap();
630        });
631    }
632
633    #[test_traced]
634    fn test_delta_writes() {
635        // Initialize the deterministic context
636        let executor = deterministic::Runner::default();
637        executor.start(|context| async move {
638            // Create a metadata store
639            let cfg = Config {
640                partition: "test".to_string(),
641                codec_config: ((0..).into(), ()),
642            };
643            let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
644
645            // Put initial keys
646            for i in 0..100 {
647                metadata.put(U64::new(i), vec![i as u8; 100]);
648            }
649
650            // First sync - should write everything to the first blob
651            //
652            // 100 keys * (8 bytes for key + 1 byte for len + 100 bytes for value) + 8 bytes for version + 4 bytes for checksum
653            metadata.sync().await.unwrap();
654            let buffer = context.encode();
655            assert!(buffer.contains("sync_rewrites_total 1"), "{buffer}");
656            assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
657            assert!(
658                buffer.contains("runtime_storage_write_bytes_total 10912"),
659                "{buffer}",
660            );
661
662            // Modify just one key
663            metadata.put(U64::new(51), vec![0xff; 100]);
664
665            // Sync again - should write everything to the second blob
666            metadata.sync().await.unwrap();
667            let buffer = context.encode();
668            assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
669            assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
670            assert!(
671                buffer.contains("runtime_storage_write_bytes_total 21824"),
672                "{buffer}",
673            );
674
675            // Sync again - should write only diff from the first blob
676            //
677            // 1 byte for len + 100 bytes for value + 8 byte for version + 4 bytes for checksum
678            metadata.sync().await.unwrap();
679            let buffer = context.encode();
680            assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
681            assert!(buffer.contains("sync_overwrites_total 1"), "{buffer}");
682            assert!(
683                buffer.contains("runtime_storage_write_bytes_total 21937"),
684                "{buffer}",
685            );
686
687            // Sync again - should write only diff from the second blob
688            //
689            // 8 byte for version + 4 bytes for checksum
690            metadata.sync().await.unwrap();
691            let buffer = context.encode();
692            assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
693            assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
694            assert!(
695                buffer.contains("runtime_storage_write_bytes_total 21949"),
696                "{buffer}",
697            );
698
699            // Remove a key - should rewrite everything
700            //
701            // 99 keys * (8 bytes for key + 1 bytes for len + 100 bytes for value) + 8 bytes for version + 4 bytes for checksum
702            metadata.remove(&U64::new(51));
703            metadata.sync().await.unwrap();
704            let buffer = context.encode();
705            assert!(buffer.contains("sync_rewrites_total 3"), "{buffer}");
706            assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
707            assert!(
708                buffer.contains("runtime_storage_write_bytes_total 32752"),
709                "{buffer}"
710            );
711
712            // Sync again - should also rewrite
713            metadata.sync().await.unwrap();
714            let buffer = context.encode();
715            assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
716            assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
717            assert!(
718                buffer.contains("runtime_storage_write_bytes_total 43555"),
719                "{buffer}"
720            );
721
722            // Modify in-place - should overwrite
723            //
724            // 1 byte for len + 100 bytes for value + 8 byte for version + 4 bytes for checksum
725            metadata.put(U64::new(50), vec![0xff; 100]);
726            metadata.sync().await.unwrap();
727            let buffer = context.encode();
728            assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
729            assert!(buffer.contains("sync_overwrites_total 3"), "{buffer}");
730            assert!(
731                buffer.contains("runtime_storage_write_bytes_total 43668"),
732                "{buffer}"
733            );
734
735            // Clean up
736            metadata.destroy().await.unwrap();
737        });
738    }
739
740    #[test_traced]
741    fn test_sync_with_no_changes() {
742        let executor = deterministic::Runner::default();
743        executor.start(|context| async move {
744            let cfg = Config {
745                partition: "test".to_string(),
746                codec_config: ((0..).into(), ()),
747            };
748            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
749                .await
750                .unwrap();
751
752            // Put initial data
753            metadata.put(U64::new(1), b"hello".to_vec());
754            metadata.sync().await.unwrap();
755
756            // Sync again with no changes - will rewrite because key_order_changed is recent
757            // (on startup, key_order_changed is set to next_version)
758            metadata.sync().await.unwrap();
759            let buffer = context.encode();
760            assert!(buffer.contains("sync_rewrites_total 2"));
761            assert!(buffer.contains("sync_overwrites_total 0"));
762
763            // Sync again - now key order is stable, should do overwrite
764            metadata.sync().await.unwrap();
765            let buffer = context.encode();
766            assert!(buffer.contains("sync_rewrites_total 2"));
767            assert!(buffer.contains("sync_overwrites_total 1"));
768
769            // Sync again - should continue doing overwrites
770            metadata.sync().await.unwrap();
771            let buffer = context.encode();
772            assert!(buffer.contains("sync_rewrites_total 2"));
773            assert!(buffer.contains("sync_overwrites_total 2"));
774
775            metadata.destroy().await.unwrap();
776        });
777    }
778
779    #[test_traced]
780    fn test_get_mut_marks_modified() {
781        let executor = deterministic::Runner::default();
782        executor.start(|context| async move {
783            let cfg = Config {
784                partition: "test".to_string(),
785                codec_config: ((0..).into(), ()),
786            };
787            let mut metadata =
788                Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg.clone())
789                    .await
790                    .unwrap();
791
792            // Put initial data
793            metadata.put(U64::new(1), b"hello".to_vec());
794            metadata.sync().await.unwrap();
795
796            // Sync again to ensure both blobs are populated
797            metadata.sync().await.unwrap();
798
799            // Use get_mut to modify value
800            let value = metadata.get_mut(&U64::new(1)).unwrap();
801            value[0] = b'H';
802
803            // Sync should detect the modification and do a rewrite (due to recent key_order_changed)
804            metadata.sync().await.unwrap();
805            let buffer = context.encode();
806            assert!(buffer.contains("first_sync_rewrites_total 2"));
807            assert!(buffer.contains("first_sync_overwrites_total 1"));
808
809            // Restart the metadata store
810            drop(metadata);
811            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
812                .await
813                .unwrap();
814
815            // Verify the change persisted
816            let value = metadata.get(&U64::new(1)).unwrap();
817            assert_eq!(value[0], b'H');
818
819            metadata.destroy().await.unwrap();
820        });
821    }
822
823    #[test_traced]
824    fn test_mixed_operation_sequences() {
825        let executor = deterministic::Runner::default();
826        executor.start(|context| async move {
827            let cfg = Config {
828                partition: "test".to_string(),
829                codec_config: ((0..).into(), ()),
830            };
831            let mut metadata =
832                Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg.clone())
833                    .await
834                    .unwrap();
835
836            let key = U64::new(1);
837
838            // Test: put -> remove -> put same key
839            metadata.put(key.clone(), b"first".to_vec());
840            metadata.remove(&key);
841            metadata.put(key.clone(), b"second".to_vec());
842            metadata.sync().await.unwrap();
843            let value = metadata.get(&key).unwrap();
844            assert_eq!(value, b"second");
845
846            // Test: put -> get_mut -> remove -> put
847            metadata.put(key.clone(), b"third".to_vec());
848            let value = metadata.get_mut(&key).unwrap();
849            value[0] = b'T';
850            metadata.remove(&key);
851            metadata.put(key.clone(), b"fourth".to_vec());
852            metadata.sync().await.unwrap();
853            let value = metadata.get(&key).unwrap();
854            assert_eq!(value, b"fourth");
855
856            // Restart the metadata store
857            drop(metadata);
858            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
859                .await
860                .unwrap();
861
862            // Verify the changes persisted
863            let value = metadata.get(&key).unwrap();
864            assert_eq!(value, b"fourth");
865
866            metadata.destroy().await.unwrap();
867        });
868    }
869
870    #[test_traced]
871    fn test_overwrite_vs_rewrite() {
872        let executor = deterministic::Runner::default();
873        executor.start(|context| async move {
874            let cfg = Config {
875                partition: "test".to_string(),
876                codec_config: ((0..).into(), ()),
877            };
878            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
879                .await
880                .unwrap();
881
882            // Set up initial data
883            metadata.put(U64::new(1), vec![1; 10]);
884            metadata.put(U64::new(2), vec![2; 10]);
885            metadata.sync().await.unwrap();
886
887            // Same size modification before both blobs are populated
888            metadata.put(U64::new(1), vec![0xFF; 10]);
889            metadata.sync().await.unwrap();
890            let buffer = context.encode();
891            assert!(buffer.contains("sync_rewrites_total 2"));
892            assert!(buffer.contains("sync_overwrites_total 0"));
893
894            // Let key order stabilize with another sync
895            metadata.sync().await.unwrap();
896            let buffer = context.encode();
897            assert!(buffer.contains("sync_rewrites_total 2"));
898            assert!(buffer.contains("sync_overwrites_total 1"));
899
900            // Same size modification after both blobs are populated - should overwrite
901            metadata.put(U64::new(1), vec![0xAA; 10]);
902            metadata.sync().await.unwrap();
903            let buffer = context.encode();
904            assert!(buffer.contains("sync_rewrites_total 2"));
905            assert!(buffer.contains("sync_overwrites_total 2"));
906
907            // Different size modification - should rewrite
908            metadata.put(U64::new(1), vec![0xFF; 20]);
909            metadata.sync().await.unwrap();
910            let buffer = context.encode();
911            assert!(buffer.contains("sync_rewrites_total 3"));
912            assert!(buffer.contains("sync_overwrites_total 2"));
913
914            // Add new key - should rewrite (key order changed)
915            metadata.put(U64::new(3), vec![3; 10]);
916            metadata.sync().await.unwrap();
917            let buffer = context.encode();
918            assert!(buffer.contains("sync_rewrites_total 4"));
919            assert!(buffer.contains("sync_overwrites_total 2"));
920
921            // Stabilize key order
922            metadata.sync().await.unwrap();
923            let buffer = context.encode();
924            assert!(buffer.contains("sync_rewrites_total 5"));
925            assert!(buffer.contains("sync_overwrites_total 2"));
926
927            // Modify existing key with same size - should overwrite after stabilized
928            metadata.put(U64::new(2), vec![0xAA; 10]);
929            metadata.sync().await.unwrap();
930            let buffer = context.encode();
931            assert!(buffer.contains("sync_rewrites_total 5"));
932            assert!(buffer.contains("sync_overwrites_total 3"));
933
934            metadata.destroy().await.unwrap();
935        });
936    }
937
938    #[test_traced]
939    fn test_blob_resize() {
940        let executor = deterministic::Runner::default();
941        executor.start(|context| async move {
942            let cfg = Config {
943                partition: "test".to_string(),
944                codec_config: ((0..).into(), ()),
945            };
946            let mut metadata =
947                Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg.clone())
948                    .await
949                    .unwrap();
950
951            // Start with large data
952            for i in 0..10 {
953                metadata.put(U64::new(i), vec![i as u8; 100]);
954            }
955            metadata.sync().await.unwrap();
956
957            // Stabilize key order
958            metadata.sync().await.unwrap();
959            let buffer = context.encode();
960            assert!(buffer.contains("first_sync_rewrites_total 2"));
961            assert!(buffer.contains("first_sync_overwrites_total 0"));
962
963            // Remove most data to make blob smaller
964            for i in 1..10 {
965                metadata.remove(&U64::new(i));
966            }
967            metadata.sync().await.unwrap();
968
969            // Verify the remaining data is still accessible
970            let value = metadata.get(&U64::new(0)).unwrap();
971            assert_eq!(value.len(), 100);
972            assert_eq!(value[0], 0);
973
974            // Check that sync properly handles blob resizing
975            let buffer = context.encode();
976            assert!(buffer.contains("first_sync_rewrites_total 3"));
977            assert!(buffer.contains("first_sync_overwrites_total 0"));
978
979            // Restart the metadata store
980            drop(metadata);
981            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
982                .await
983                .unwrap();
984
985            // Verify the changes persisted
986            let value = metadata.get(&U64::new(0)).unwrap();
987            assert_eq!(value.len(), 100);
988            assert_eq!(value[0], 0);
989
990            // Verify the removed keys are not present
991            for i in 1..10 {
992                assert!(metadata.get(&U64::new(i)).is_none());
993            }
994
995            metadata.destroy().await.unwrap();
996        });
997    }
998
999    #[test_traced]
1000    fn test_clear_and_repopulate() {
1001        let executor = deterministic::Runner::default();
1002        executor.start(|context| async move {
1003            let cfg = Config {
1004                partition: "test".to_string(),
1005                codec_config: ((0..).into(), ()),
1006            };
1007            let mut metadata =
1008                Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg.clone())
1009                    .await
1010                    .unwrap();
1011
1012            // Initial data
1013            metadata.put(U64::new(1), b"first".to_vec());
1014            metadata.put(U64::new(2), b"second".to_vec());
1015            metadata.sync().await.unwrap();
1016
1017            // Clear everything
1018            metadata.clear();
1019            metadata.sync().await.unwrap();
1020
1021            // Verify empty
1022            assert!(metadata.get(&U64::new(1)).is_none());
1023            assert!(metadata.get(&U64::new(2)).is_none());
1024
1025            // Restart the metadata store
1026            drop(metadata);
1027            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
1028                .await
1029                .unwrap();
1030
1031            // Verify the changes persisted
1032            assert!(metadata.get(&U64::new(1)).is_none());
1033            assert!(metadata.get(&U64::new(2)).is_none());
1034
1035            // Repopulate with different data
1036            metadata.put(U64::new(3), b"third".to_vec());
1037            metadata.put(U64::new(4), b"fourth".to_vec());
1038            metadata.sync().await.unwrap();
1039
1040            // Verify new data
1041            assert_eq!(metadata.get(&U64::new(3)).unwrap(), b"third");
1042            assert_eq!(metadata.get(&U64::new(4)).unwrap(), b"fourth");
1043            assert!(metadata.get(&U64::new(1)).is_none());
1044            assert!(metadata.get(&U64::new(2)).is_none());
1045
1046            metadata.destroy().await.unwrap();
1047        });
1048    }
1049
1050    fn test_metadata_operations_and_restart(num_operations: usize) -> String {
1051        let executor = deterministic::Runner::default();
1052        executor.start(|mut context| async move {
1053            let cfg = Config {
1054                partition: "test_determinism".to_string(),
1055                codec_config: ((0..).into(), ()),
1056            };
1057            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
1058                .await
1059                .unwrap();
1060
1061            // Perform a series of deterministic operations
1062            for i in 0..num_operations {
1063                let key = U64::new(i as u64);
1064                let mut value = vec![0u8; 64];
1065                context.fill_bytes(&mut value);
1066                metadata.put(key, value);
1067
1068                // Sync occasionally
1069                if context.gen_bool(0.1) {
1070                    metadata.sync().await.unwrap();
1071                }
1072
1073                // Update some existing keys
1074                if context.gen_bool(0.1) {
1075                    let selected_index = context.gen_range(0..=i);
1076                    let update_key = U64::new(selected_index as u64);
1077                    let mut new_value = vec![0u8; 64];
1078                    context.fill_bytes(&mut new_value);
1079                    metadata.put(update_key, new_value);
1080                }
1081
1082                // Remove some keys
1083                if context.gen_bool(0.1) {
1084                    let selected_index = context.gen_range(0..=i);
1085                    let remove_key = U64::new(selected_index as u64);
1086                    metadata.remove(&remove_key);
1087                }
1088
1089                // Use get_mut occasionally
1090                if context.gen_bool(0.1) {
1091                    let selected_index = context.gen_range(0..=i);
1092                    let mut_key = U64::new(selected_index as u64);
1093                    if let Some(value) = metadata.get_mut(&mut_key) {
1094                        if !value.is_empty() {
1095                            value[0] = value[0].wrapping_add(1);
1096                        }
1097                    }
1098                }
1099            }
1100            metadata.sync().await.unwrap();
1101
1102            // Destroy the metadata store
1103            metadata.destroy().await.unwrap();
1104
1105            context.auditor().state()
1106        })
1107    }
1108
1109    #[test_group("slow")]
1110    #[test_traced]
1111    fn test_determinism() {
1112        let state1 = test_metadata_operations_and_restart(1_000);
1113        let state2 = test_metadata_operations_and_restart(1_000);
1114        assert_eq!(state1, state2);
1115    }
1116
1117    #[test_traced]
1118    fn test_keys_iterator() {
1119        // Initialize the deterministic context
1120        let executor = deterministic::Runner::default();
1121        executor.start(|context| async move {
1122            // Create a metadata store
1123            let cfg = Config {
1124                partition: "test".to_string(),
1125                codec_config: ((0..).into(), ()),
1126            };
1127            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1128                .await
1129                .unwrap();
1130
1131            // Add some keys with different prefixes
1132            metadata.put(U64::new(0x1000), b"value1".to_vec());
1133            metadata.put(U64::new(0x1001), b"value2".to_vec());
1134            metadata.put(U64::new(0x1002), b"value3".to_vec());
1135            metadata.put(U64::new(0x2000), b"value4".to_vec());
1136            metadata.put(U64::new(0x2001), b"value5".to_vec());
1137            metadata.put(U64::new(0x3000), b"value6".to_vec());
1138
1139            // Test iterating over all keys
1140            let all_keys: Vec<_> = metadata.keys().cloned().collect();
1141            assert_eq!(all_keys.len(), 6);
1142            assert!(all_keys.contains(&U64::new(0x1000)));
1143            assert!(all_keys.contains(&U64::new(0x3000)));
1144
1145            // Test iterating with prefix 0x10
1146            let prefix = hex!("0x00000000000010");
1147            let prefix_keys: Vec<_> = metadata
1148                .keys()
1149                .filter(|k| k.as_ref().starts_with(&prefix))
1150                .cloned()
1151                .collect();
1152            assert_eq!(prefix_keys.len(), 3);
1153            assert!(prefix_keys.contains(&U64::new(0x1000)));
1154            assert!(prefix_keys.contains(&U64::new(0x1001)));
1155            assert!(prefix_keys.contains(&U64::new(0x1002)));
1156            assert!(!prefix_keys.contains(&U64::new(0x2000)));
1157
1158            // Test iterating with prefix 0x20
1159            let prefix = hex!("0x00000000000020");
1160            let prefix_keys: Vec<_> = metadata
1161                .keys()
1162                .filter(|k| k.as_ref().starts_with(&prefix))
1163                .cloned()
1164                .collect();
1165            assert_eq!(prefix_keys.len(), 2);
1166            assert!(prefix_keys.contains(&U64::new(0x2000)));
1167            assert!(prefix_keys.contains(&U64::new(0x2001)));
1168
1169            // Test with non-matching prefix
1170            let prefix = hex!("0x00000000000040");
1171            let prefix_keys: Vec<_> = metadata
1172                .keys()
1173                .filter(|k| k.as_ref().starts_with(&prefix))
1174                .cloned()
1175                .collect();
1176            assert_eq!(prefix_keys.len(), 0);
1177
1178            metadata.destroy().await.unwrap();
1179        });
1180    }
1181
1182    #[test_traced]
1183    fn test_retain() {
1184        // Initialize the deterministic context
1185        let executor = deterministic::Runner::default();
1186        executor.start(|context| async move {
1187            // Create a metadata store
1188            let cfg = Config {
1189                partition: "test".to_string(),
1190                codec_config: ((0..).into(), ()),
1191            };
1192            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
1193                .await
1194                .unwrap();
1195
1196            // Add some keys with different prefixes
1197            metadata.put(U64::new(0x1000), b"value1".to_vec());
1198            metadata.put(U64::new(0x1001), b"value2".to_vec());
1199            metadata.put(U64::new(0x1002), b"value3".to_vec());
1200            metadata.put(U64::new(0x2000), b"value4".to_vec());
1201            metadata.put(U64::new(0x2001), b"value5".to_vec());
1202            metadata.put(U64::new(0x3000), b"value6".to_vec());
1203
1204            // Check initial metrics
1205            let buffer = context.encode();
1206            assert!(buffer.contains("first_keys 6"));
1207
1208            // Remove keys with prefix 0x10
1209            let prefix = hex!("0x00000000000010");
1210            metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
1211
1212            // Check metrics after removal
1213            let buffer = context.encode();
1214            assert!(buffer.contains("first_keys 3"));
1215
1216            // Verify remaining keys
1217            assert!(metadata.get(&U64::new(0x1000)).is_none());
1218            assert!(metadata.get(&U64::new(0x1001)).is_none());
1219            assert!(metadata.get(&U64::new(0x1002)).is_none());
1220            assert!(metadata.get(&U64::new(0x2000)).is_some());
1221            assert!(metadata.get(&U64::new(0x2001)).is_some());
1222            assert!(metadata.get(&U64::new(0x3000)).is_some());
1223
1224            // Sync and reopen to ensure persistence
1225            metadata.sync().await.unwrap();
1226            drop(metadata);
1227            let cfg = Config {
1228                partition: "test".to_string(),
1229                codec_config: ((0..).into(), ()),
1230            };
1231            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
1232                .await
1233                .unwrap();
1234
1235            // Verify keys are still removed after restart
1236            assert!(metadata.get(&U64::new(0x1000)).is_none());
1237            assert!(metadata.get(&U64::new(0x2000)).is_some());
1238            assert_eq!(metadata.keys().count(), 3);
1239
1240            // Remove non-existing prefix
1241            let prefix = hex!("0x00000000000040");
1242            metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
1243
1244            // Remove all remaining keys
1245            metadata.retain(|_, _| false);
1246            assert_eq!(metadata.keys().count(), 0);
1247
1248            metadata.destroy().await.unwrap();
1249        });
1250    }
1251}