Skip to main content

commonware_storage/metadata/
mod.rs

1//! A key-value store optimized for atomically committing a small collection of metadata.
2//!
3//! [Metadata] is a key-value store optimized for tracking a small collection of metadata
4//! that allows multiple updates to be committed in a single batch. It is commonly used with
5//! a variety of other underlying storage systems to persist application state across restarts.
6//!
7//! # Format
8//!
9//! Data stored in [Metadata] is serialized as a sequence of key-value pairs in either a
10//! "left" or "right" blob:
11//!
12//! ```text
13//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
14//! | 0 | 1 |    ...    | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |  ...  |50 |...|90 |91 |92 |93 |
15//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
16//! |    Version (u64)  |      Key1     |              Value1           |...|  CRC32(u32)   |
17//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
18//! ```
19//!
20//! _To ensure the integrity of the data, a CRC32 checksum is appended to the end of the blob.
21//! This ensures that partial writes are detected before any data is relied on._
22//!
23//! # Atomic Updates
24//!
25//! To provide support for atomic updates, [Metadata] maintains two blobs: a "left" and a "right"
26//! blob. When a new update is committed, it is written to the "older" of the two blobs (indicated
27//! by the version persisted). Writes to [commonware_runtime::Blob] are not atomic and may only
28//! complete partially, so we only overwrite the "newer" blob once the "older" blob has been synced
29//! (otherwise, we would not be guaranteed to recover the latest complete state from disk on
30//! restart as half of a blob could be old data and half new data).
31//!
32//! # Delta Writes
33//!
34//! If the set of keys and the length of values are stable, [Metadata] will only write an update's
35//! delta to disk (rather than rewriting the entire metadata). This makes [Metadata] a great choice
36//! for maintaining even large collections of data (with the majority rarely modified).
37//!
38//! # Example
39//!
40//! ```rust
41//! use commonware_runtime::{Spawner, Runner, deterministic};
42//! use commonware_storage::metadata::{Metadata, Config};
43//! use commonware_utils::sequence::U64;
44//!
45//! let executor = deterministic::Runner::default();
46//! executor.start(|context| async move {
47//!     // Create a store
48//!     let mut metadata = Metadata::init(context, Config {
49//!         partition: "partition".into(),
50//!         codec_config: ((0..).into(), ()),
51//!     }).await.unwrap();
52//!
53//!     // Store metadata
54//!     metadata.put(U64::new(1), b"hello".to_vec());
55//!     metadata.put(U64::new(2), b"world".to_vec());
56//!
57//!     // Sync the metadata store (batch write changes)
58//!     metadata.sync().await.unwrap();
59//!
60//!     // Retrieve some metadata
61//!     let value = metadata.get(&U64::new(1)).unwrap();
62//!
63//! });
64//! ```
65
66mod storage;
67pub use storage::Metadata;
68use thiserror::Error;
69
70/// Errors that can occur when interacting with [Metadata].
71#[derive(Debug, Error)]
72pub enum Error {
73    #[error("runtime error: {0}")]
74    Runtime(#[from] commonware_runtime::Error),
75}
76
77/// Configuration for [Metadata] storage.
78#[derive(Clone)]
79pub struct Config<C> {
80    /// The [commonware_runtime::Storage] partition to use for storing metadata.
81    pub partition: String,
82
83    /// The codec configuration to use for the value stored in the metadata.
84    pub codec_config: C,
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90    use commonware_formatting::hex;
91    use commonware_macros::{test_group, test_traced};
92    use commonware_runtime::{deterministic, Blob, Metrics as _, Runner, Storage, Supervisor as _};
93    use commonware_utils::sequence::U64;
94    use rand::{Rng, RngCore};
95
96    #[test_traced]
97    fn test_put_get_clear() {
98        // Initialize the deterministic context
99        let executor = deterministic::Runner::default();
100        executor.start(|context| async move {
101            // Create a metadata store
102            let cfg = Config {
103                partition: "test".into(),
104                codec_config: ((0..).into(), ()),
105            };
106            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg)
107                .await
108                .unwrap();
109
110            // Get a key that doesn't exist
111            let key = U64::new(42);
112            let value = metadata.get(&key);
113            assert!(value.is_none());
114
115            // Check metrics
116            let buffer = context.encode();
117            assert!(buffer.contains("first_sync_rewrites_total 0"));
118            assert!(buffer.contains("first_sync_overwrites_total 0"));
119            assert!(buffer.contains("first_keys 0"));
120
121            // Put a key
122            let hello = b"hello".to_vec();
123            metadata.put(key.clone(), hello.clone());
124
125            // Get the key
126            let value = metadata.get(&key).unwrap();
127            assert_eq!(value, &hello);
128
129            // Check metrics
130            let buffer = context.encode();
131            assert!(buffer.contains("first_sync_rewrites_total 0"));
132            assert!(buffer.contains("first_sync_overwrites_total 0"));
133            assert!(buffer.contains("first_keys 1"));
134
135            // Sync the metadata store
136            metadata.sync().await.unwrap();
137
138            // Check metrics
139            let buffer = context.encode();
140            assert!(buffer.contains("first_sync_rewrites_total 1"));
141            assert!(buffer.contains("first_sync_overwrites_total 0"));
142            assert!(buffer.contains("first_keys 1"));
143
144            // Reopen the metadata store
145            drop(metadata);
146            let cfg = Config {
147                partition: "test".into(),
148                codec_config: ((0..).into(), ()),
149            };
150            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
151                .await
152                .unwrap();
153
154            // Check metrics
155            let buffer = context.encode();
156            assert!(buffer.contains("second_sync_rewrites_total 0"));
157            assert!(buffer.contains("second_sync_overwrites_total 0"));
158            assert!(buffer.contains("second_keys 1"));
159
160            // Get the key
161            let value = metadata.get(&key).unwrap();
162            assert_eq!(value, &hello);
163
164            // Test clearing the metadata store
165            metadata.clear();
166            let value = metadata.get(&key);
167            assert!(value.is_none());
168
169            // Check metrics
170            let buffer = context.encode();
171            assert!(buffer.contains("second_sync_rewrites_total 0"));
172            assert!(buffer.contains("second_sync_overwrites_total 0"));
173            assert!(buffer.contains("second_keys 0"));
174
175            metadata.destroy().await.unwrap();
176        });
177    }
178
179    #[test_traced]
180    fn test_put_returns_previous_value() {
181        let executor = deterministic::Runner::default();
182        executor.start(|context| async move {
183            let cfg = Config {
184                partition: "test".into(),
185                codec_config: ((0..).into(), ()),
186            };
187            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg)
188                .await
189                .unwrap();
190
191            let key = U64::new(42);
192
193            // First put returns None (no previous value)
194            let previous = metadata.put(key.clone(), b"first".to_vec());
195            assert!(previous.is_none());
196
197            // Second put returns the previous value
198            let previous = metadata.put(key.clone(), b"second".to_vec());
199            assert_eq!(previous, Some(b"first".to_vec()));
200
201            // Third put returns the previous value
202            let previous = metadata.put(key.clone(), b"third".to_vec());
203            assert_eq!(previous, Some(b"second".to_vec()));
204
205            // Current value is the latest
206            assert_eq!(metadata.get(&key), Some(&b"third".to_vec()));
207
208            // Different key returns None
209            let other_key = U64::new(99);
210            let previous = metadata.put(other_key.clone(), b"other".to_vec());
211            assert!(previous.is_none());
212
213            // Sync and verify persistence
214            metadata.sync().await.unwrap();
215            drop(metadata);
216
217            let cfg = Config {
218                partition: "test".into(),
219                codec_config: ((0..).into(), ()),
220            };
221            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
222                .await
223                .unwrap();
224
225            // After restart, put still returns previous value
226            let previous = metadata.put(key.clone(), b"fourth".to_vec());
227            assert_eq!(previous, Some(b"third".to_vec()));
228
229            metadata.destroy().await.unwrap();
230        });
231    }
232
233    #[test_traced]
234    fn test_multi_sync() {
235        // Initialize the deterministic context
236        let executor = deterministic::Runner::default();
237        executor.start(|context| async move {
238            // Create a metadata store
239            let cfg = Config {
240                partition: "test".into(),
241                codec_config: ((0..).into(), ()),
242            };
243            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg)
244                .await
245                .unwrap();
246
247            // Put a key
248            let key = U64::new(42);
249            let hello = b"hello".to_vec();
250            metadata.put(key.clone(), hello.clone());
251
252            // Sync the metadata store
253            metadata.sync().await.unwrap();
254
255            // Check metrics
256            let buffer = context.encode();
257            assert!(buffer.contains("first_sync_rewrites_total 1"));
258            assert!(buffer.contains("first_sync_overwrites_total 0"));
259            assert!(buffer.contains("first_keys 1"));
260
261            // Put an overlapping key and a new key
262            let world = b"world".to_vec();
263            metadata.put(key.clone(), world.clone());
264            let key2 = U64::new(43);
265            let foo = b"foo".to_vec();
266            metadata.put(key2.clone(), foo.clone());
267
268            // Sync the metadata store
269            metadata.sync().await.unwrap();
270
271            // Check metrics
272            let buffer = context.encode();
273            assert!(buffer.contains("first_sync_rewrites_total 2"));
274            assert!(buffer.contains("first_sync_overwrites_total 0"));
275            assert!(buffer.contains("first_keys 2"));
276
277            // Reopen the metadata store
278            drop(metadata);
279            let cfg = Config {
280                partition: "test".into(),
281                codec_config: ((0..).into(), ()),
282            };
283            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
284                .await
285                .unwrap();
286
287            // Check metrics
288            let buffer = context.encode();
289            assert!(buffer.contains("second_sync_rewrites_total 0"));
290            assert!(buffer.contains("second_sync_overwrites_total 0"));
291            assert!(buffer.contains("second_keys 2"));
292
293            // Get the key
294            let value = metadata.get(&key).unwrap();
295            assert_eq!(value, &world);
296            let value = metadata.get(&key2).unwrap();
297            assert_eq!(value, &foo);
298
299            // Remove the key
300            metadata.remove(&key);
301
302            // Sync the metadata store
303            metadata.sync().await.unwrap();
304
305            // Check metrics
306            let buffer = context.encode();
307            assert!(buffer.contains("second_sync_rewrites_total 1"));
308            assert!(buffer.contains("second_sync_overwrites_total 0"));
309            assert!(buffer.contains("second_keys 1"));
310
311            // Reopen the metadata store
312            drop(metadata);
313            let cfg = Config {
314                partition: "test".into(),
315                codec_config: ((0..).into(), ()),
316            };
317            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("third"), cfg)
318                .await
319                .unwrap();
320
321            // Check metrics
322            let buffer = context.encode();
323            assert!(buffer.contains("third_sync_rewrites_total 0"));
324            assert!(buffer.contains("third_sync_overwrites_total 0"));
325            assert!(buffer.contains("third_keys 1"));
326
327            // Get the key
328            let value = metadata.get(&key);
329            assert!(value.is_none());
330            let value = metadata.get(&key2).unwrap();
331            assert_eq!(value, &foo);
332
333            metadata.destroy().await.unwrap();
334        });
335    }
336
337    #[test_traced]
338    fn test_recover_corrupted_one() {
339        // Initialize the deterministic context
340        let executor = deterministic::Runner::default();
341        executor.start(|context| async move {
342            // Create a metadata store
343            let cfg = Config {
344                partition: "test".into(),
345                codec_config: ((0..).into(), ()),
346            };
347            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg)
348                .await
349                .unwrap();
350
351            // Put a key
352            let key = U64::new(42);
353            let hello = b"hello".to_vec();
354            metadata.put(key.clone(), hello.clone());
355
356            // Sync the metadata store
357            metadata.sync().await.unwrap();
358
359            // Put an overlapping key and a new key
360            let world = b"world".to_vec();
361            metadata.put(key.clone(), world.clone());
362            let key2 = U64::new(43);
363            let foo = b"foo".to_vec();
364            metadata.put(key2, foo.clone());
365
366            // Sync the metadata store
367            metadata.sync().await.unwrap();
368            drop(metadata);
369
370            // Corrupt the metadata store
371            let (blob, _) = context.open("test", b"left").await.unwrap();
372            blob.write_at_sync(0, b"corrupted".to_vec()).await.unwrap();
373
374            // Reopen the metadata store
375            let cfg = Config {
376                partition: "test".into(),
377                codec_config: ((0..).into(), ()),
378            };
379            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
380                .await
381                .unwrap();
382
383            // Get the key (falls back to non-corrupt)
384            let value = metadata.get(&key).unwrap();
385            assert_eq!(value, &hello);
386
387            metadata.destroy().await.unwrap();
388        });
389    }
390
391    #[test_traced]
392    fn test_recover_corrupted_both() {
393        // Initialize the deterministic context
394        let executor = deterministic::Runner::default();
395        executor.start(|context| async move {
396            // Create a metadata store
397            let cfg = Config {
398                partition: "test".into(),
399                codec_config: ((0..).into(), ()),
400            };
401            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg)
402                .await
403                .unwrap();
404
405            // Put a key
406            let key = U64::new(42);
407            let hello = b"hello".to_vec();
408            metadata.put(key.clone(), hello.clone());
409
410            // Sync the metadata store
411            metadata.sync().await.unwrap();
412
413            // Put an overlapping key and a new key
414            let world = b"world".to_vec();
415            metadata.put(key.clone(), world.clone());
416            let key2 = U64::new(43);
417            let foo = b"foo".to_vec();
418            metadata.put(key2, foo.clone());
419
420            // Sync the metadata store
421            metadata.sync().await.unwrap();
422            drop(metadata);
423
424            // Corrupt the metadata store
425            let (blob, _) = context.open("test", b"left").await.unwrap();
426            blob.write_at_sync(0, b"corrupted".to_vec()).await.unwrap();
427            let (blob, _) = context.open("test", b"right").await.unwrap();
428            blob.write_at_sync(0, b"corrupted".to_vec()).await.unwrap();
429
430            // Reopen the metadata store
431            let cfg = Config {
432                partition: "test".into(),
433                codec_config: ((0..).into(), ()),
434            };
435            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
436                .await
437                .unwrap();
438
439            // Get the key (falls back to non-corrupt)
440            let value = metadata.get(&key);
441            assert!(value.is_none());
442
443            // Check metrics
444            let buffer = context.encode();
445            assert!(buffer.contains("second_sync_rewrites_total 0"));
446            assert!(buffer.contains("second_sync_overwrites_total 0"));
447            assert!(buffer.contains("second_keys 0"));
448
449            metadata.destroy().await.unwrap();
450        });
451    }
452
453    #[test_traced]
454    fn test_recover_corrupted_truncate() {
455        // Initialize the deterministic context
456        let executor = deterministic::Runner::default();
457        executor.start(|context| async move {
458            // Create a metadata store
459            let cfg = Config {
460                partition: "test".into(),
461                codec_config: ((0..).into(), ()),
462            };
463            let mut metadata = Metadata::init(context.child("first"), cfg).await.unwrap();
464
465            // Put a key
466            let key = U64::new(42);
467            let hello = b"hello".to_vec();
468            metadata.put(key.clone(), hello.clone());
469
470            // Sync the metadata store
471            metadata.sync().await.unwrap();
472
473            // Put an overlapping key and a new key
474            let world = b"world".to_vec();
475            metadata.put(key.clone(), world.clone());
476            let key2 = U64::new(43);
477            let foo = b"foo".to_vec();
478            metadata.put(key2, foo.clone());
479
480            // Sync the metadata store
481            metadata.sync().await.unwrap();
482            drop(metadata);
483
484            // Corrupt the metadata store
485            let (blob, len) = context.open("test", b"left").await.unwrap();
486            blob.resize(len - 8).await.unwrap();
487            blob.sync().await.unwrap();
488
489            // Reopen the metadata store
490            let cfg = Config {
491                partition: "test".into(),
492                codec_config: ((0..).into(), ()),
493            };
494            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
495                .await
496                .unwrap();
497
498            // Get the key (falls back to non-corrupt)
499            let value = metadata.get(&key).unwrap();
500            assert_eq!(value, &hello);
501
502            metadata.destroy().await.unwrap();
503        });
504    }
505
506    #[test_traced]
507    fn test_recover_corrupted_short() {
508        // Initialize the deterministic context
509        let executor = deterministic::Runner::default();
510        executor.start(|context| async move {
511            // Create a metadata store
512            let cfg = Config {
513                partition: "test".into(),
514                codec_config: ((0..).into(), ()),
515            };
516            let mut metadata = Metadata::init(context.child("first"), cfg).await.unwrap();
517
518            // Put a key
519            let key = U64::new(42);
520            let hello = b"hello".to_vec();
521            metadata.put(key.clone(), hello.clone());
522
523            // Sync the metadata store
524            metadata.sync().await.unwrap();
525
526            // Put an overlapping key and a new key
527            let world = b"world".to_vec();
528            metadata.put(key.clone(), world.clone());
529            let key2 = U64::new(43);
530            let foo = b"foo".to_vec();
531            metadata.put(key2, foo.clone());
532
533            // Sync the metadata store
534            metadata.sync().await.unwrap();
535            drop(metadata);
536
537            // Corrupt the metadata store
538            let (blob, _) = context.open("test", b"left").await.unwrap();
539            blob.resize(5).await.unwrap();
540            blob.sync().await.unwrap();
541
542            // Reopen the metadata store
543            let cfg = Config {
544                partition: "test".into(),
545                codec_config: ((0..).into(), ()),
546            };
547            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
548                .await
549                .unwrap();
550
551            // Get the key (falls back to non-corrupt)
552            let value = metadata.get(&key).unwrap();
553            assert_eq!(value, &hello);
554
555            metadata.destroy().await.unwrap();
556        });
557    }
558
559    #[test_traced]
560    fn test_unclean_shutdown() {
561        // Initialize the deterministic context
562        let executor = deterministic::Runner::default();
563        executor.start(|context| async move {
564            let key = U64::new(42);
565            let hello = b"hello".to_vec();
566            {
567                // Create a metadata store
568                let cfg = Config {
569                    partition: "test".into(),
570                    codec_config: ((0..).into(), ()),
571                };
572                let mut metadata = Metadata::init(context.child("first"), cfg).await.unwrap();
573
574                // Put a key
575                metadata.put(key.clone(), hello.clone());
576
577                // Drop metadata before sync
578            }
579
580            // Reopen the metadata store
581            let cfg = Config {
582                partition: "test".into(),
583                codec_config: ((0..).into(), ()),
584            };
585            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
586                .await
587                .unwrap();
588
589            // Get the key
590            let value = metadata.get(&key);
591            assert!(value.is_none());
592
593            // Check metrics
594            let buffer = context.encode();
595            assert!(buffer.contains("second_sync_rewrites_total 0"));
596            assert!(buffer.contains("second_sync_overwrites_total 0"));
597            assert!(buffer.contains("second_keys 0"));
598
599            metadata.destroy().await.unwrap();
600        });
601    }
602
603    #[test_traced]
604    #[should_panic(expected = "usize value is larger than u32")]
605    fn test_value_too_big_error() {
606        // Initialize the deterministic context
607        let executor = deterministic::Runner::default();
608        executor.start(|context| async move {
609            // Create a metadata store
610            let cfg = Config {
611                partition: "test".into(),
612                codec_config: ((0..).into(), ()),
613            };
614            let mut metadata = Metadata::init(context.child("storage"), cfg).await.unwrap();
615
616            // Create a value that exceeds u32::MAX bytes
617            let value = vec![0u8; (u32::MAX as usize) + 1];
618            metadata.put(U64::new(1), value);
619
620            // Assert
621            metadata.sync().await.unwrap();
622        });
623    }
624
625    #[test_traced]
626    fn test_delta_writes() {
627        // Initialize the deterministic context
628        let executor = deterministic::Runner::default();
629        executor.start(|context| async move {
630            // Create a metadata store
631            let cfg = Config {
632                partition: "test".into(),
633                codec_config: ((0..).into(), ()),
634            };
635            let mut metadata = Metadata::init(context.child("storage"), cfg).await.unwrap();
636
637            // Put initial keys
638            for i in 0..100 {
639                metadata.put(U64::new(i), vec![i as u8; 100]);
640            }
641
642            // First sync - should write everything to the first blob
643            //
644            // 100 keys * (8 bytes for key + 1 byte for len + 100 bytes for value) + 8 bytes for version + 4 bytes for checksum
645            metadata.sync().await.unwrap();
646            let buffer = context.encode();
647            assert!(buffer.contains("sync_rewrites_total 1"), "{buffer}");
648            assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
649            assert!(
650                buffer.contains("runtime_storage_write_bytes_total 10912"),
651                "{buffer}",
652            );
653
654            // Modify just one key
655            metadata.put(U64::new(51), vec![0xff; 100]);
656
657            // Sync again - should write everything to the second blob
658            metadata.sync().await.unwrap();
659            let buffer = context.encode();
660            assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
661            assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
662            assert!(
663                buffer.contains("runtime_storage_write_bytes_total 21824"),
664                "{buffer}",
665            );
666
667            // Sync again - should write only diff from the first blob
668            //
669            // 1 byte for len + 100 bytes for value + 8 byte for version + 4 bytes for checksum
670            metadata.sync().await.unwrap();
671            let buffer = context.encode();
672            assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
673            assert!(buffer.contains("sync_overwrites_total 1"), "{buffer}");
674            assert!(
675                buffer.contains("runtime_storage_write_bytes_total 21937"),
676                "{buffer}",
677            );
678
679            // Sync again - should write only diff from the second blob
680            //
681            // 8 byte for version + 4 bytes for checksum
682            metadata.sync().await.unwrap();
683            let buffer = context.encode();
684            assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
685            assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
686            assert!(
687                buffer.contains("runtime_storage_write_bytes_total 21949"),
688                "{buffer}",
689            );
690
691            // Remove a key - should rewrite everything
692            //
693            // 99 keys * (8 bytes for key + 1 bytes for len + 100 bytes for value) + 8 bytes for version + 4 bytes for checksum
694            metadata.remove(&U64::new(51));
695            metadata.sync().await.unwrap();
696            let buffer = context.encode();
697            assert!(buffer.contains("sync_rewrites_total 3"), "{buffer}");
698            assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
699            assert!(
700                buffer.contains("runtime_storage_write_bytes_total 32752"),
701                "{buffer}"
702            );
703
704            // Sync again - should also rewrite
705            metadata.sync().await.unwrap();
706            let buffer = context.encode();
707            assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
708            assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
709            assert!(
710                buffer.contains("runtime_storage_write_bytes_total 43555"),
711                "{buffer}"
712            );
713
714            // Modify in-place - should overwrite
715            //
716            // 1 byte for len + 100 bytes for value + 8 byte for version + 4 bytes for checksum
717            metadata.put(U64::new(50), vec![0xff; 100]);
718            metadata.sync().await.unwrap();
719            let buffer = context.encode();
720            assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
721            assert!(buffer.contains("sync_overwrites_total 3"), "{buffer}");
722            assert!(
723                buffer.contains("runtime_storage_write_bytes_total 43668"),
724                "{buffer}"
725            );
726
727            // Clean up
728            metadata.destroy().await.unwrap();
729        });
730    }
731
732    #[test_traced]
733    fn test_sync_with_no_changes() {
734        let executor = deterministic::Runner::default();
735        executor.start(|context| async move {
736            let cfg = Config {
737                partition: "test".into(),
738                codec_config: ((0..).into(), ()),
739            };
740            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("storage"), cfg)
741                .await
742                .unwrap();
743
744            // Put initial data
745            metadata
746                .put_sync(U64::new(1), b"hello".to_vec())
747                .await
748                .unwrap();
749
750            // Sync again with no changes - will rewrite because key_order_changed is recent
751            // (on startup, key_order_changed is set to next_version)
752            metadata.sync().await.unwrap();
753            let buffer = context.encode();
754            assert!(buffer.contains("sync_rewrites_total 2"));
755            assert!(buffer.contains("sync_overwrites_total 0"));
756
757            // Sync again - now key order is stable, should do overwrite
758            metadata.sync().await.unwrap();
759            let buffer = context.encode();
760            assert!(buffer.contains("sync_rewrites_total 2"));
761            assert!(buffer.contains("sync_overwrites_total 1"));
762
763            // Sync again - should continue doing overwrites
764            metadata.sync().await.unwrap();
765            let buffer = context.encode();
766            assert!(buffer.contains("sync_rewrites_total 2"));
767            assert!(buffer.contains("sync_overwrites_total 2"));
768
769            metadata.destroy().await.unwrap();
770        });
771    }
772
773    #[test_traced]
774    fn test_get_mut_marks_modified() {
775        let executor = deterministic::Runner::default();
776        executor.start(|context| async move {
777            let cfg = Config {
778                partition: "test".into(),
779                codec_config: ((0..).into(), ()),
780            };
781            let mut metadata =
782                Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg.clone())
783                    .await
784                    .unwrap();
785
786            // Put initial data
787            metadata
788                .put_sync(U64::new(1), b"hello".to_vec())
789                .await
790                .unwrap();
791
792            // Sync again to ensure both blobs are populated
793            metadata.sync().await.unwrap();
794
795            // Use get_mut to modify value
796            let value = metadata.get_mut(&U64::new(1)).unwrap();
797            value[0] = b'H';
798
799            // Sync should detect the modification and do a rewrite (due to recent key_order_changed)
800            metadata.sync().await.unwrap();
801            let buffer = context.encode();
802            assert!(buffer.contains("first_sync_rewrites_total 2"));
803            assert!(buffer.contains("first_sync_overwrites_total 1"));
804
805            // Restart the metadata store
806            drop(metadata);
807            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
808                .await
809                .unwrap();
810
811            // Verify the change persisted
812            let value = metadata.get(&U64::new(1)).unwrap();
813            assert_eq!(value[0], b'H');
814
815            metadata.destroy().await.unwrap();
816        });
817    }
818
819    #[test_traced]
820    fn test_mixed_operation_sequences() {
821        let executor = deterministic::Runner::default();
822        executor.start(|context| async move {
823            let cfg = Config {
824                partition: "test".into(),
825                codec_config: ((0..).into(), ()),
826            };
827            let mut metadata =
828                Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg.clone())
829                    .await
830                    .unwrap();
831
832            let key = U64::new(1);
833
834            // Test: put -> remove -> put same key
835            metadata.put(key.clone(), b"first".to_vec());
836            metadata.remove(&key);
837            metadata
838                .put_sync(key.clone(), b"second".to_vec())
839                .await
840                .unwrap();
841            let value = metadata.get(&key).unwrap();
842            assert_eq!(value, b"second");
843
844            // Test: put -> get_mut -> remove -> put
845            metadata.put(key.clone(), b"third".to_vec());
846            let value = metadata.get_mut(&key).unwrap();
847            value[0] = b'T';
848            metadata.remove(&key);
849            metadata
850                .put_sync(key.clone(), b"fourth".to_vec())
851                .await
852                .unwrap();
853            let value = metadata.get(&key).unwrap();
854            assert_eq!(value, b"fourth");
855
856            // Restart the metadata store
857            drop(metadata);
858            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
859                .await
860                .unwrap();
861
862            // Verify the changes persisted
863            let value = metadata.get(&key).unwrap();
864            assert_eq!(value, b"fourth");
865
866            metadata.destroy().await.unwrap();
867        });
868    }
869
870    #[test_traced]
871    fn test_overwrite_vs_rewrite() {
872        let executor = deterministic::Runner::default();
873        executor.start(|context| async move {
874            let cfg = Config {
875                partition: "test".into(),
876                codec_config: ((0..).into(), ()),
877            };
878            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("storage"), cfg)
879                .await
880                .unwrap();
881
882            // Set up initial data
883            metadata.put(U64::new(1), vec![1; 10]);
884            metadata.put(U64::new(2), vec![2; 10]);
885            metadata.sync().await.unwrap();
886
887            // Same size modification before both blobs are populated
888            metadata.put(U64::new(1), vec![0xFF; 10]);
889            metadata.sync().await.unwrap();
890            let buffer = context.encode();
891            assert!(buffer.contains("sync_rewrites_total 2"));
892            assert!(buffer.contains("sync_overwrites_total 0"));
893
894            // Let key order stabilize with another sync
895            metadata.sync().await.unwrap();
896            let buffer = context.encode();
897            assert!(buffer.contains("sync_rewrites_total 2"));
898            assert!(buffer.contains("sync_overwrites_total 1"));
899
900            // Same size modification after both blobs are populated - should overwrite
901            metadata.put(U64::new(1), vec![0xAA; 10]);
902            metadata.sync().await.unwrap();
903            let buffer = context.encode();
904            assert!(buffer.contains("sync_rewrites_total 2"));
905            assert!(buffer.contains("sync_overwrites_total 2"));
906
907            // Different size modification - should rewrite
908            metadata.put(U64::new(1), vec![0xFF; 20]);
909            metadata.sync().await.unwrap();
910            let buffer = context.encode();
911            assert!(buffer.contains("sync_rewrites_total 3"));
912            assert!(buffer.contains("sync_overwrites_total 2"));
913
914            // Add new key - should rewrite (key order changed)
915            metadata.put(U64::new(3), vec![3; 10]);
916            metadata.sync().await.unwrap();
917            let buffer = context.encode();
918            assert!(buffer.contains("sync_rewrites_total 4"));
919            assert!(buffer.contains("sync_overwrites_total 2"));
920
921            // Stabilize key order
922            metadata.sync().await.unwrap();
923            let buffer = context.encode();
924            assert!(buffer.contains("sync_rewrites_total 5"));
925            assert!(buffer.contains("sync_overwrites_total 2"));
926
927            // Modify existing key with same size - should overwrite after stabilized
928            metadata.put(U64::new(2), vec![0xAA; 10]);
929            metadata.sync().await.unwrap();
930            let buffer = context.encode();
931            assert!(buffer.contains("sync_rewrites_total 5"));
932            assert!(buffer.contains("sync_overwrites_total 3"));
933
934            metadata.destroy().await.unwrap();
935        });
936    }
937
938    #[test_traced]
939    fn test_blob_resize() {
940        let executor = deterministic::Runner::default();
941        executor.start(|context| async move {
942            let cfg = Config {
943                partition: "test".into(),
944                codec_config: ((0..).into(), ()),
945            };
946            let mut metadata =
947                Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg.clone())
948                    .await
949                    .unwrap();
950
951            // Start with large data
952            for i in 0..10 {
953                metadata.put(U64::new(i), vec![i as u8; 100]);
954            }
955            metadata.sync().await.unwrap();
956
957            // Stabilize key order
958            metadata.sync().await.unwrap();
959            let buffer = context.encode();
960            assert!(buffer.contains("first_sync_rewrites_total 2"));
961            assert!(buffer.contains("first_sync_overwrites_total 0"));
962
963            // Remove most data to make blob smaller
964            for i in 1..10 {
965                metadata.remove(&U64::new(i));
966            }
967            metadata.sync().await.unwrap();
968
969            // Verify the remaining data is still accessible
970            let value = metadata.get(&U64::new(0)).unwrap();
971            assert_eq!(value.len(), 100);
972            assert_eq!(value[0], 0);
973
974            // Check that sync properly handles blob resizing
975            let buffer = context.encode();
976            assert!(buffer.contains("first_sync_rewrites_total 3"));
977            assert!(buffer.contains("first_sync_overwrites_total 0"));
978
979            // Restart the metadata store
980            drop(metadata);
981            let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
982                .await
983                .unwrap();
984
985            // Verify the changes persisted
986            let value = metadata.get(&U64::new(0)).unwrap();
987            assert_eq!(value.len(), 100);
988            assert_eq!(value[0], 0);
989
990            // Verify the removed keys are not present
991            for i in 1..10 {
992                assert!(metadata.get(&U64::new(i)).is_none());
993            }
994
995            metadata.destroy().await.unwrap();
996        });
997    }
998
999    #[test_traced]
1000    fn test_clear_and_repopulate() {
1001        let executor = deterministic::Runner::default();
1002        executor.start(|context| async move {
1003            let cfg = Config {
1004                partition: "test".into(),
1005                codec_config: ((0..).into(), ()),
1006            };
1007            let mut metadata =
1008                Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg.clone())
1009                    .await
1010                    .unwrap();
1011
1012            // Initial data
1013            metadata.put(U64::new(1), b"first".to_vec());
1014            metadata
1015                .put_sync(U64::new(2), b"second".to_vec())
1016                .await
1017                .unwrap();
1018
1019            // Clear everything
1020            metadata.clear();
1021            metadata.sync().await.unwrap();
1022
1023            // Verify empty
1024            assert!(metadata.get(&U64::new(1)).is_none());
1025            assert!(metadata.get(&U64::new(2)).is_none());
1026
1027            // Restart the metadata store
1028            drop(metadata);
1029            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
1030                .await
1031                .unwrap();
1032
1033            // Verify the changes persisted
1034            assert!(metadata.get(&U64::new(1)).is_none());
1035            assert!(metadata.get(&U64::new(2)).is_none());
1036
1037            // Repopulate with different data
1038            metadata.put(U64::new(3), b"third".to_vec());
1039            metadata
1040                .put_sync(U64::new(4), b"fourth".to_vec())
1041                .await
1042                .unwrap();
1043
1044            // Verify new data
1045            assert_eq!(metadata.get(&U64::new(3)).unwrap(), b"third");
1046            assert_eq!(metadata.get(&U64::new(4)).unwrap(), b"fourth");
1047            assert!(metadata.get(&U64::new(1)).is_none());
1048            assert!(metadata.get(&U64::new(2)).is_none());
1049
1050            metadata.destroy().await.unwrap();
1051        });
1052    }
1053
1054    fn test_metadata_operations_and_restart(num_operations: usize) -> String {
1055        let executor = deterministic::Runner::default();
1056        executor.start(|mut context| async move {
1057            let cfg = Config {
1058                partition: "test-determinism".into(),
1059                codec_config: ((0..).into(), ()),
1060            };
1061            let mut metadata =
1062                Metadata::<_, U64, Vec<u8>>::init(context.child("storage"), cfg.clone())
1063                    .await
1064                    .unwrap();
1065
1066            // Perform a series of deterministic operations
1067            for i in 0..num_operations {
1068                let key = U64::new(i as u64);
1069                let mut value = vec![0u8; 64];
1070                context.fill_bytes(&mut value);
1071                metadata.put(key, value);
1072
1073                // Sync occasionally
1074                if context.gen_bool(0.1) {
1075                    metadata.sync().await.unwrap();
1076                }
1077
1078                // Update some existing keys
1079                if context.gen_bool(0.1) {
1080                    let selected_index = context.gen_range(0..=i);
1081                    let update_key = U64::new(selected_index as u64);
1082                    let mut new_value = vec![0u8; 64];
1083                    context.fill_bytes(&mut new_value);
1084                    metadata.put(update_key, new_value);
1085                }
1086
1087                // Remove some keys
1088                if context.gen_bool(0.1) {
1089                    let selected_index = context.gen_range(0..=i);
1090                    let remove_key = U64::new(selected_index as u64);
1091                    metadata.remove(&remove_key);
1092                }
1093
1094                // Use get_mut occasionally
1095                if context.gen_bool(0.1) {
1096                    let selected_index = context.gen_range(0..=i);
1097                    let mut_key = U64::new(selected_index as u64);
1098                    if let Some(value) = metadata.get_mut(&mut_key) {
1099                        if !value.is_empty() {
1100                            value[0] = value[0].wrapping_add(1);
1101                        }
1102                    }
1103                }
1104            }
1105            metadata.sync().await.unwrap();
1106
1107            // Destroy the metadata store
1108            metadata.destroy().await.unwrap();
1109
1110            context.auditor().state()
1111        })
1112    }
1113
1114    #[test_group("slow")]
1115    #[test_traced]
1116    fn test_determinism() {
1117        let state1 = test_metadata_operations_and_restart(1_000);
1118        let state2 = test_metadata_operations_and_restart(1_000);
1119        assert_eq!(state1, state2);
1120    }
1121
1122    #[test_traced]
1123    fn test_keys_iterator() {
1124        // Initialize the deterministic context
1125        let executor = deterministic::Runner::default();
1126        executor.start(|context| async move {
1127            // Create a metadata store
1128            let cfg = Config {
1129                partition: "test".into(),
1130                codec_config: ((0..).into(), ()),
1131            };
1132            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("storage"), cfg)
1133                .await
1134                .unwrap();
1135
1136            // Add some keys with different prefixes
1137            metadata.put(U64::new(0x1000), b"value1".to_vec());
1138            metadata.put(U64::new(0x1001), b"value2".to_vec());
1139            metadata.put(U64::new(0x1002), b"value3".to_vec());
1140            metadata.put(U64::new(0x2000), b"value4".to_vec());
1141            metadata.put(U64::new(0x2001), b"value5".to_vec());
1142            metadata.put(U64::new(0x3000), b"value6".to_vec());
1143
1144            // Test iterating over all keys
1145            let all_keys: Vec<_> = metadata.keys().cloned().collect();
1146            assert_eq!(all_keys.len(), 6);
1147            assert!(all_keys.contains(&U64::new(0x1000)));
1148            assert!(all_keys.contains(&U64::new(0x3000)));
1149
1150            // Test iterating with prefix 0x10
1151            let prefix = hex!("0x00000000000010");
1152            let prefix_keys: Vec<_> = metadata
1153                .keys()
1154                .filter(|k| k.as_ref().starts_with(&prefix))
1155                .cloned()
1156                .collect();
1157            assert_eq!(prefix_keys.len(), 3);
1158            assert!(prefix_keys.contains(&U64::new(0x1000)));
1159            assert!(prefix_keys.contains(&U64::new(0x1001)));
1160            assert!(prefix_keys.contains(&U64::new(0x1002)));
1161            assert!(!prefix_keys.contains(&U64::new(0x2000)));
1162
1163            // Test iterating with prefix 0x20
1164            let prefix = hex!("0x00000000000020");
1165            let prefix_keys: Vec<_> = metadata
1166                .keys()
1167                .filter(|k| k.as_ref().starts_with(&prefix))
1168                .cloned()
1169                .collect();
1170            assert_eq!(prefix_keys.len(), 2);
1171            assert!(prefix_keys.contains(&U64::new(0x2000)));
1172            assert!(prefix_keys.contains(&U64::new(0x2001)));
1173
1174            // Test with non-matching prefix
1175            let prefix = hex!("0x00000000000040");
1176            let prefix_keys: Vec<_> = metadata
1177                .keys()
1178                .filter(|k| k.as_ref().starts_with(&prefix))
1179                .cloned()
1180                .collect();
1181            assert_eq!(prefix_keys.len(), 0);
1182
1183            metadata.destroy().await.unwrap();
1184        });
1185    }
1186
1187    #[test_traced]
1188    fn test_retain() {
1189        // Initialize the deterministic context
1190        let executor = deterministic::Runner::default();
1191        executor.start(|context| async move {
1192            // Create a metadata store
1193            let cfg = Config {
1194                partition: "test".into(),
1195                codec_config: ((0..).into(), ()),
1196            };
1197            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg)
1198                .await
1199                .unwrap();
1200
1201            // Add some keys with different prefixes
1202            metadata.put(U64::new(0x1000), b"value1".to_vec());
1203            metadata.put(U64::new(0x1001), b"value2".to_vec());
1204            metadata.put(U64::new(0x1002), b"value3".to_vec());
1205            metadata.put(U64::new(0x2000), b"value4".to_vec());
1206            metadata.put(U64::new(0x2001), b"value5".to_vec());
1207            metadata.put(U64::new(0x3000), b"value6".to_vec());
1208
1209            // Check initial metrics
1210            let buffer = context.encode();
1211            assert!(buffer.contains("first_keys 6"));
1212
1213            // Remove keys with prefix 0x10
1214            let prefix = hex!("0x00000000000010");
1215            metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
1216
1217            // Check metrics after removal
1218            let buffer = context.encode();
1219            assert!(buffer.contains("first_keys 3"));
1220
1221            // Verify remaining keys
1222            assert!(metadata.get(&U64::new(0x1000)).is_none());
1223            assert!(metadata.get(&U64::new(0x1001)).is_none());
1224            assert!(metadata.get(&U64::new(0x1002)).is_none());
1225            assert!(metadata.get(&U64::new(0x2000)).is_some());
1226            assert!(metadata.get(&U64::new(0x2001)).is_some());
1227            assert!(metadata.get(&U64::new(0x3000)).is_some());
1228
1229            // Sync and reopen to ensure persistence
1230            metadata.sync().await.unwrap();
1231            drop(metadata);
1232            let cfg = Config {
1233                partition: "test".into(),
1234                codec_config: ((0..).into(), ()),
1235            };
1236            let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
1237                .await
1238                .unwrap();
1239
1240            // Verify keys are still removed after restart
1241            assert!(metadata.get(&U64::new(0x1000)).is_none());
1242            assert!(metadata.get(&U64::new(0x2000)).is_some());
1243            assert_eq!(metadata.keys().count(), 3);
1244
1245            // Remove non-existing prefix
1246            let prefix = hex!("0x00000000000040");
1247            metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
1248
1249            // Remove all remaining keys
1250            metadata.retain(|_, _| false);
1251            assert_eq!(metadata.keys().count(), 0);
1252
1253            metadata.destroy().await.unwrap();
1254        });
1255    }
1256}