Skip to main content

neleus_db/
state.rs

1use std::cmp::Ordering;
2use std::collections::BTreeMap;
3
4use anyhow::{Result, anyhow};
5use serde::{Deserialize, Serialize};
6
7use crate::blob_store::BlobStore;
8use crate::canonical::from_cbor;
9use crate::hash::{Hash, hash_typed};
10use crate::merkle::{MerkleProof, prove_inclusion, root as merkle_root, verify_inclusion};
11use crate::object_store::ObjectStore;
12use crate::wal::{Wal, WalOp};
13
14const STATE_TAG: &[u8] = b"state_node:";
15const STATE_LEAF_TAG: &[u8] = b"state_leaf:";
16const STATE_MANIFEST_LEAF_TAG: &[u8] = b"state_manifest_leaf:";
17const STATE_SCHEMA_VERSION: u32 = 1;
18
19pub type StateRoot = Hash;
20
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22pub enum ValueRef {
23    Value(Hash),
24    Tombstone,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28pub struct SegmentEntry {
29    pub key: Vec<u8>,
30    pub value: ValueRef,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct StateSegment {
35    #[serde(default = "default_schema_version")]
36    pub schema_version: u32,
37    pub entries: Vec<SegmentEntry>,
38    pub merkle_root: Hash,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct StateManifest {
43    #[serde(default = "default_schema_version")]
44    pub schema_version: u32,
45    pub segments: Vec<Hash>, // newest first
46    #[serde(default = "Hash::zero")]
47    pub segments_merkle_root: Hash,
48}
49
50#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51pub struct EntryProof {
52    pub entry: SegmentEntry,
53    pub proof: MerkleProof,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57pub struct NonInclusionProof {
58    pub insertion_index: usize,
59    pub left: Option<EntryProof>,
60    pub right: Option<EntryProof>,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64pub enum SegmentKeyProof {
65    Inclusion(EntryProof),
66    NonInclusion(NonInclusionProof),
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
70pub enum SegmentVerdict {
71    Value(Hash),
72    Tombstone,
73    NotPresent,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77pub struct SegmentScanProof {
78    pub segment_hash: Hash,
79    pub manifest_proof: MerkleProof,
80    pub segment_merkle_root: Hash,
81    pub segment_leaf_count: usize,
82    pub key_proof: SegmentKeyProof,
83    pub verdict: SegmentVerdict,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
87pub enum StateOutcome {
88    Found(Hash),
89    Deleted,
90    Missing,
91}
92
93#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
94pub struct StateProof {
95    pub manifest_schema_version: u32,
96    pub manifest_segment_count: usize,
97    pub manifest_segments_root: Hash,
98    pub scans: Vec<SegmentScanProof>,
99    pub outcome: StateOutcome,
100}
101
102#[derive(Clone, Debug)]
103pub struct StateStore {
104    objects: ObjectStore,
105    blobs: BlobStore,
106    wal: Wal,
107}
108
109impl StateStore {
110    pub fn new(objects: ObjectStore, blobs: BlobStore, wal: Wal) -> Self {
111        Self {
112            objects,
113            blobs,
114            wal,
115        }
116    }
117
118    pub fn empty_root(&self) -> Result<StateRoot> {
119        let empty = new_state_manifest(vec![]);
120        self.store_manifest(&empty)
121    }
122
123    pub fn get(&self, root: StateRoot, key: &[u8]) -> Result<Option<Vec<u8>>> {
124        let manifest = self.load_manifest_or_empty(root)?;
125        for segment_hash in &manifest.segments {
126            let segment = self.load_segment(*segment_hash)?;
127            match find_key(&segment.entries, key) {
128                Ok(idx) => match segment.entries[idx].value {
129                    ValueRef::Value(value_hash) => return Ok(Some(self.blobs.get(value_hash)?)),
130                    ValueRef::Tombstone => return Ok(None),
131                },
132                Err(_) => continue,
133            }
134        }
135        Ok(None)
136    }
137
138    pub fn set(&self, root: StateRoot, key: &[u8], value: &[u8]) -> Result<StateRoot> {
139        let wal_path =
140            self.wal
141                .begin_entry(&Wal::make_state_entry(WalOp::StateSet, root, key.len()))?;
142
143        let result = (|| {
144            let manifest = self.load_manifest_or_empty(root)?;
145            let value_hash = self.blobs.put(value)?;
146            let segment = StateSegment::from_entries(vec![SegmentEntry {
147                key: key.to_vec(),
148                value: ValueRef::Value(value_hash),
149            }])?;
150            let segment_hash = self.store_segment(&segment)?;
151            let new_manifest =
152                new_state_manifest(prepend_segment(segment_hash, &manifest.segments));
153            self.store_manifest(&new_manifest)
154        })();
155
156        if result.is_ok() {
157            self.wal.end(&wal_path)?;
158        }
159        result
160    }
161
162    pub fn del(&self, root: StateRoot, key: &[u8]) -> Result<StateRoot> {
163        let wal_path =
164            self.wal
165                .begin_entry(&Wal::make_state_entry(WalOp::StateDel, root, key.len()))?;
166
167        let result = (|| {
168            let manifest = self.load_manifest_or_empty(root)?;
169            let segment = StateSegment::from_entries(vec![SegmentEntry {
170                key: key.to_vec(),
171                value: ValueRef::Tombstone,
172            }])?;
173            let segment_hash = self.store_segment(&segment)?;
174            let new_manifest =
175                new_state_manifest(prepend_segment(segment_hash, &manifest.segments));
176            self.store_manifest(&new_manifest)
177        })();
178
179        if result.is_ok() {
180            self.wal.end(&wal_path)?;
181        }
182        result
183    }
184
185    pub fn compact(&self, root: StateRoot) -> Result<StateRoot> {
186        let wal_path =
187            self.wal
188                .begin_entry(&Wal::make_state_entry(WalOp::StateCompact, root, 0))?;
189
190        let result = (|| {
191            let manifest = self.load_manifest_or_empty(root)?;
192            let mut visible: BTreeMap<Vec<u8>, ValueRef> = BTreeMap::new();
193
194            for segment_hash in &manifest.segments {
195                let segment = self.load_segment(*segment_hash)?;
196                for entry in segment.entries {
197                    visible.entry(entry.key).or_insert(entry.value);
198                }
199            }
200
201            let merged_entries: Vec<SegmentEntry> = visible
202                .into_iter()
203                .filter_map(|(key, value)| match value {
204                    ValueRef::Value(h) => Some(SegmentEntry {
205                        key,
206                        value: ValueRef::Value(h),
207                    }),
208                    ValueRef::Tombstone => None,
209                })
210                .collect();
211
212            if merged_entries.is_empty() {
213                return self.store_manifest(&new_state_manifest(vec![]));
214            }
215
216            let merged_segment = StateSegment::from_entries(merged_entries)?;
217            let merged_hash = self.store_segment(&merged_segment)?;
218            self.store_manifest(&new_state_manifest(vec![merged_hash]))
219        })();
220
221        if result.is_ok() {
222            self.wal.end(&wal_path)?;
223        }
224
225        result
226    }
227
228    pub fn proof(&self, root: StateRoot, key: &[u8]) -> Result<StateProof> {
229        let manifest = self.load_manifest_or_empty(root)?;
230        let manifest_leaves = manifest_segment_leaves(&manifest.segments);
231
232        let mut scans = Vec::new();
233        let mut outcome = StateOutcome::Missing;
234
235        for (idx, segment_hash) in manifest.segments.iter().enumerate() {
236            let segment = self.load_segment(*segment_hash)?;
237            let manifest_proof = prove_inclusion(&manifest_leaves, idx)
238                .ok_or_else(|| anyhow!("failed to build manifest inclusion proof"))?;
239            let scan = build_segment_scan(*segment_hash, segment, manifest_proof, key)?;
240
241            match scan.verdict {
242                SegmentVerdict::Value(h) => {
243                    scans.push(scan);
244                    outcome = StateOutcome::Found(h);
245                    break;
246                }
247                SegmentVerdict::Tombstone => {
248                    scans.push(scan);
249                    outcome = StateOutcome::Deleted;
250                    break;
251                }
252                SegmentVerdict::NotPresent => scans.push(scan),
253            }
254        }
255
256        Ok(StateProof {
257            manifest_schema_version: manifest.schema_version,
258            manifest_segment_count: manifest.segments.len(),
259            manifest_segments_root: manifest.segments_merkle_root,
260            scans,
261            outcome,
262        })
263    }
264
265    pub fn verify_proof(&self, root: StateRoot, key: &[u8], proof: &StateProof) -> bool {
266        let manifest = match self.load_manifest_or_empty(root) {
267            Ok(m) => m,
268            Err(_) => return false,
269        };
270
271        if proof.manifest_schema_version != manifest.schema_version {
272            return false;
273        }
274        if proof.manifest_segment_count != manifest.segments.len() {
275            return false;
276        }
277        if proof.manifest_segments_root != manifest.segments_merkle_root {
278            return false;
279        }
280        if proof.scans.len() > manifest.segments.len() {
281            return false;
282        }
283
284        let mut computed_outcome = StateOutcome::Missing;
285        let mut terminal = None;
286
287        for (idx, scan) in proof.scans.iter().enumerate() {
288            if manifest.segments.get(idx).copied() != Some(scan.segment_hash) {
289                return false;
290            }
291
292            if scan.manifest_proof.index != idx {
293                return false;
294            }
295
296            let leaf = manifest_segment_leaf_hash(scan.segment_hash);
297            if !verify_inclusion(manifest.segments_merkle_root, leaf, &scan.manifest_proof) {
298                return false;
299            }
300
301            let segment = match self.load_segment(scan.segment_hash) {
302                Ok(s) => s,
303                Err(_) => return false,
304            };
305
306            if segment.merkle_root != scan.segment_merkle_root {
307                return false;
308            }
309            if segment.entries.len() != scan.segment_leaf_count {
310                return false;
311            }
312            if !segment_is_sorted_unique(&segment.entries) {
313                return false;
314            }
315
316            let verdict_terminal = match &scan.key_proof {
317                SegmentKeyProof::Inclusion(ep) => {
318                    if ep.entry.key.as_slice() != key {
319                        return false;
320                    }
321                    if !verify_entry_proof(&segment, ep) {
322                        return false;
323                    }
324                    match ep.entry.value {
325                        ValueRef::Value(h) => {
326                            if scan.verdict != SegmentVerdict::Value(h) {
327                                return false;
328                            }
329                            Some(StateOutcome::Found(h))
330                        }
331                        ValueRef::Tombstone => {
332                            if scan.verdict != SegmentVerdict::Tombstone {
333                                return false;
334                            }
335                            Some(StateOutcome::Deleted)
336                        }
337                    }
338                }
339                SegmentKeyProof::NonInclusion(np) => {
340                    if scan.verdict != SegmentVerdict::NotPresent {
341                        return false;
342                    }
343                    if !verify_non_inclusion(&segment, key, np) {
344                        return false;
345                    }
346                    None
347                }
348            };
349
350            if let Some(outcome) = verdict_terminal {
351                terminal = Some(idx);
352                computed_outcome = outcome;
353                break;
354            }
355        }
356
357        match terminal {
358            Some(i) => {
359                if proof.scans.len() != i + 1 {
360                    return false;
361                }
362            }
363            None => {
364                if proof.scans.len() != manifest.segments.len() {
365                    return false;
366                }
367                computed_outcome = StateOutcome::Missing;
368            }
369        }
370
371        computed_outcome == proof.outcome
372    }
373
374    fn load_manifest_or_empty(&self, root: StateRoot) -> Result<StateManifest> {
375        if self.objects.exists(root) {
376            return self.load_manifest(root);
377        }
378
379        let empty_root = self.empty_root()?;
380        if root == empty_root {
381            return Ok(new_state_manifest(vec![]));
382        }
383
384        Err(anyhow!("state root {} not found", root))
385    }
386
387    fn store_manifest(&self, manifest: &StateManifest) -> Result<StateRoot> {
388        self.objects.put_serialized(STATE_TAG, manifest)
389    }
390
391    fn load_manifest(&self, hash: StateRoot) -> Result<StateManifest> {
392        let bytes = self.objects.get_typed_bytes(STATE_TAG, hash)?;
393        let computed = hash_typed(STATE_TAG, &bytes);
394        if computed != hash {
395            return Err(anyhow!("manifest hash mismatch for {}", hash));
396        }
397        let mut manifest: StateManifest = from_cbor(&bytes)?;
398        migrate_manifest(&mut manifest);
399        Ok(manifest)
400    }
401
402    fn store_segment(&self, segment: &StateSegment) -> Result<Hash> {
403        self.objects.put_serialized(STATE_TAG, segment)
404    }
405
406    fn load_segment(&self, hash: Hash) -> Result<StateSegment> {
407        let bytes = self.objects.get_typed_bytes(STATE_TAG, hash)?;
408        let computed = hash_typed(STATE_TAG, &bytes);
409        if computed != hash {
410            return Err(anyhow!("segment hash mismatch for {}", hash));
411        }
412        let mut segment: StateSegment = from_cbor(&bytes)?;
413        if segment.schema_version == 0 {
414            segment.schema_version = STATE_SCHEMA_VERSION;
415        }
416        Ok(segment)
417    }
418}
419
420impl StateSegment {
421    pub fn from_entries(mut entries: Vec<SegmentEntry>) -> Result<Self> {
422        entries.sort_by(|a, b| a.key.cmp(&b.key));
423        if !segment_is_sorted_unique(&entries) {
424            return Err(anyhow!("segment entries must have unique sorted keys"));
425        }
426
427        let leaves: Vec<Hash> = entries.iter().map(leaf_hash).collect();
428        let merkle_root = merkle_root(&leaves);
429        Ok(Self {
430            schema_version: STATE_SCHEMA_VERSION,
431            entries,
432            merkle_root,
433        })
434    }
435}
436
437fn default_schema_version() -> u32 {
438    STATE_SCHEMA_VERSION
439}
440
441fn new_state_manifest(segments: Vec<Hash>) -> StateManifest {
442    StateManifest {
443        schema_version: STATE_SCHEMA_VERSION,
444        segments_merkle_root: merkle_root(&manifest_segment_leaves(&segments)),
445        segments,
446    }
447}
448
449fn migrate_manifest(manifest: &mut StateManifest) {
450    if manifest.schema_version == 0 {
451        manifest.schema_version = STATE_SCHEMA_VERSION;
452    }
453    let expected = merkle_root(&manifest_segment_leaves(&manifest.segments));
454    if manifest.segments_merkle_root == Hash::zero() || manifest.segments_merkle_root != expected {
455        manifest.segments_merkle_root = expected;
456    }
457}
458
459#[cfg(test)]
460fn hash_state_object<T: Serialize>(obj: &T) -> Result<Hash> {
461    let bytes = crate::canonical::to_cbor(obj)?;
462    Ok(hash_typed(STATE_TAG, &bytes))
463}
464
465fn prepend_segment(newest: Hash, existing: &[Hash]) -> Vec<Hash> {
466    let mut out = Vec::with_capacity(existing.len() + 1);
467    out.push(newest);
468    out.extend_from_slice(existing);
469    out
470}
471
472fn find_key(entries: &[SegmentEntry], key: &[u8]) -> std::result::Result<usize, usize> {
473    entries.binary_search_by(|e| compare_key(e.key.as_slice(), key))
474}
475
476fn compare_key(a: &[u8], b: &[u8]) -> Ordering {
477    a.cmp(b)
478}
479
480fn segment_is_sorted_unique(entries: &[SegmentEntry]) -> bool {
481    entries
482        .windows(2)
483        .all(|w| w[0].key.as_slice() < w[1].key.as_slice())
484}
485
486fn manifest_segment_leaf_hash(segment_hash: Hash) -> Hash {
487    hash_typed(STATE_MANIFEST_LEAF_TAG, segment_hash.as_bytes())
488}
489
490fn manifest_segment_leaves(segments: &[Hash]) -> Vec<Hash> {
491    segments
492        .iter()
493        .copied()
494        .map(manifest_segment_leaf_hash)
495        .collect()
496}
497
498fn leaf_hash(entry: &SegmentEntry) -> Hash {
499    let mut bytes = Vec::new();
500    bytes.extend_from_slice(&(entry.key.len() as u32).to_be_bytes());
501    bytes.extend_from_slice(&entry.key);
502
503    match entry.value {
504        ValueRef::Value(h) => {
505            bytes.push(1);
506            bytes.extend_from_slice(h.as_bytes());
507        }
508        ValueRef::Tombstone => bytes.push(0),
509    }
510
511    hash_typed(STATE_LEAF_TAG, &bytes)
512}
513
514fn build_segment_scan(
515    segment_hash: Hash,
516    segment: StateSegment,
517    manifest_proof: MerkleProof,
518    key: &[u8],
519) -> Result<SegmentScanProof> {
520    let leaves: Vec<Hash> = segment.entries.iter().map(leaf_hash).collect();
521
522    match find_key(&segment.entries, key) {
523        Ok(index) => {
524            let entry = segment.entries[index].clone();
525            let proof = prove_inclusion(&leaves, index)
526                .ok_or_else(|| anyhow!("failed to build inclusion proof"))?;
527            let verdict = match entry.value {
528                ValueRef::Value(h) => SegmentVerdict::Value(h),
529                ValueRef::Tombstone => SegmentVerdict::Tombstone,
530            };
531            Ok(SegmentScanProof {
532                segment_hash,
533                manifest_proof,
534                segment_merkle_root: segment.merkle_root,
535                segment_leaf_count: segment.entries.len(),
536                key_proof: SegmentKeyProof::Inclusion(EntryProof { entry, proof }),
537                verdict,
538            })
539        }
540        Err(ins) => {
541            let left = if ins > 0 {
542                let idx = ins - 1;
543                Some(EntryProof {
544                    entry: segment.entries[idx].clone(),
545                    proof: prove_inclusion(&leaves, idx)
546                        .ok_or_else(|| anyhow!("failed to build left neighbor proof"))?,
547                })
548            } else {
549                None
550            };
551
552            let right = if ins < segment.entries.len() {
553                Some(EntryProof {
554                    entry: segment.entries[ins].clone(),
555                    proof: prove_inclusion(&leaves, ins)
556                        .ok_or_else(|| anyhow!("failed to build right neighbor proof"))?,
557                })
558            } else {
559                None
560            };
561
562            Ok(SegmentScanProof {
563                segment_hash,
564                manifest_proof,
565                segment_merkle_root: segment.merkle_root,
566                segment_leaf_count: segment.entries.len(),
567                key_proof: SegmentKeyProof::NonInclusion(NonInclusionProof {
568                    insertion_index: ins,
569                    left,
570                    right,
571                }),
572                verdict: SegmentVerdict::NotPresent,
573            })
574        }
575    }
576}
577
578fn verify_entry_proof(segment: &StateSegment, entry_proof: &EntryProof) -> bool {
579    let idx = entry_proof.proof.index;
580    if segment.entries.get(idx) != Some(&entry_proof.entry) {
581        return false;
582    }
583
584    let leaf = leaf_hash(&entry_proof.entry);
585    verify_inclusion(segment.merkle_root, leaf, &entry_proof.proof)
586}
587
588fn verify_non_inclusion(segment: &StateSegment, key: &[u8], np: &NonInclusionProof) -> bool {
589    let len = segment.entries.len();
590    if np.insertion_index > len {
591        return false;
592    }
593
594    if np.insertion_index < len && segment.entries[np.insertion_index].key.as_slice() == key {
595        return false;
596    }
597
598    match &np.left {
599        Some(left) => {
600            if !verify_entry_proof(segment, left) {
601                return false;
602            }
603            if left.proof.index + 1 != np.insertion_index {
604                return false;
605            }
606            if !(left.entry.key.as_slice() < key) {
607                return false;
608            }
609        }
610        None => {
611            if np.insertion_index != 0 {
612                return false;
613            }
614        }
615    }
616
617    match &np.right {
618        Some(right) => {
619            if !verify_entry_proof(segment, right) {
620                return false;
621            }
622            if right.proof.index != np.insertion_index {
623                return false;
624            }
625            if !(key < right.entry.key.as_slice()) {
626                return false;
627            }
628        }
629        None => {
630            if np.insertion_index != len {
631                return false;
632            }
633        }
634    }
635
636    if let (Some(left), Some(right)) = (&np.left, &np.right)
637        && !(left.entry.key.as_slice() < right.entry.key.as_slice())
638    {
639        return false;
640    }
641
642    true
643}
644
645#[cfg(test)]
646mod tests {
647    use std::collections::BTreeMap;
648
649    use rand::rngs::StdRng;
650    use rand::{Rng, SeedableRng};
651    use tempfile::TempDir;
652
653    use super::*;
654
655    fn store(tmp: &TempDir) -> StateStore {
656        let objects = ObjectStore::new(tmp.path().join("objects"));
657        objects.ensure_dir().unwrap();
658        let blobs = BlobStore::new(tmp.path().join("blobs"));
659        blobs.ensure_dir().unwrap();
660        StateStore::new(objects, blobs, Wal::new(tmp.path().join("wal")))
661    }
662
663    #[test]
664    fn empty_root_get_none() {
665        let tmp = TempDir::new().unwrap();
666        let s = store(&tmp);
667        let root = s.empty_root().unwrap();
668        assert_eq!(s.get(root, b"missing").unwrap(), None);
669    }
670
671    #[test]
672    fn set_get_single_key() {
673        let tmp = TempDir::new().unwrap();
674        let s = store(&tmp);
675        let root0 = s.empty_root().unwrap();
676        let root1 = s.set(root0, b"k", b"v").unwrap();
677        assert_eq!(s.get(root1, b"k").unwrap(), Some(b"v".to_vec()));
678    }
679
680    #[test]
681    fn overwrite_key_returns_latest_value() {
682        let tmp = TempDir::new().unwrap();
683        let s = store(&tmp);
684        let root0 = s.empty_root().unwrap();
685        let root1 = s.set(root0, b"k", b"v1").unwrap();
686        let root2 = s.set(root1, b"k", b"v2").unwrap();
687        assert_eq!(s.get(root2, b"k").unwrap(), Some(b"v2".to_vec()));
688    }
689
690    #[test]
691    fn old_roots_remain_readable() {
692        let tmp = TempDir::new().unwrap();
693        let s = store(&tmp);
694        let root0 = s.empty_root().unwrap();
695        let root1 = s.set(root0, b"k", b"v1").unwrap();
696        let root2 = s.set(root1, b"k", b"v2").unwrap();
697        assert_eq!(s.get(root1, b"k").unwrap(), Some(b"v1".to_vec()));
698        assert_eq!(s.get(root2, b"k").unwrap(), Some(b"v2".to_vec()));
699    }
700
701    #[test]
702    fn delete_removes_value() {
703        let tmp = TempDir::new().unwrap();
704        let s = store(&tmp);
705        let r0 = s.empty_root().unwrap();
706        let r1 = s.set(r0, b"k", b"v").unwrap();
707        let r2 = s.del(r1, b"k").unwrap();
708        assert_eq!(s.get(r2, b"k").unwrap(), None);
709    }
710
711    #[test]
712    fn delete_missing_key_still_creates_new_root() {
713        let tmp = TempDir::new().unwrap();
714        let s = store(&tmp);
715        let r0 = s.empty_root().unwrap();
716        let r1 = s.del(r0, b"missing").unwrap();
717        assert_ne!(r0, r1);
718    }
719
720    #[test]
721    fn multiple_keys_independent() {
722        let tmp = TempDir::new().unwrap();
723        let s = store(&tmp);
724        let r0 = s.empty_root().unwrap();
725        let r1 = s.set(r0, b"a", b"1").unwrap();
726        let r2 = s.set(r1, b"b", b"2").unwrap();
727        assert_eq!(s.get(r2, b"a").unwrap(), Some(b"1".to_vec()));
728        assert_eq!(s.get(r2, b"b").unwrap(), Some(b"2".to_vec()));
729    }
730
731    #[test]
732    fn tombstone_wins_over_older_value() {
733        let tmp = TempDir::new().unwrap();
734        let s = store(&tmp);
735        let r0 = s.empty_root().unwrap();
736        let r1 = s.set(r0, b"k", b"1").unwrap();
737        let r2 = s.del(r1, b"k").unwrap();
738        let r3 = s.set(r2, b"other", b"x").unwrap();
739        assert_eq!(s.get(r3, b"k").unwrap(), None);
740    }
741
742    #[test]
743    fn segment_order_newest_first() {
744        let tmp = TempDir::new().unwrap();
745        let s = store(&tmp);
746        let r0 = s.empty_root().unwrap();
747        let r1 = s.set(r0, b"k", b"1").unwrap();
748        let r2 = s.set(r1, b"k", b"2").unwrap();
749        let m: StateManifest = s.load_manifest_or_empty(r2).unwrap();
750        assert_eq!(m.segments.len(), 2);
751        let newer: StateSegment = s.load_segment(m.segments[0]).unwrap();
752        assert_eq!(
753            newer.entries[0].value,
754            ValueRef::Value(s.blobs.put(b"2").unwrap())
755        );
756    }
757
758    #[test]
759    fn manifest_merkle_root_matches_segments() {
760        let tmp = TempDir::new().unwrap();
761        let s = store(&tmp);
762        let r0 = s.empty_root().unwrap();
763        let r1 = s.set(r0, b"a", b"1").unwrap();
764        let m = s.load_manifest_or_empty(r1).unwrap();
765        let leaves = m
766            .segments
767            .iter()
768            .copied()
769            .map(super::manifest_segment_leaf_hash)
770            .collect::<Vec<_>>();
771        assert_eq!(m.segments_merkle_root, merkle_root(&leaves));
772    }
773
774    #[test]
775    fn compact_reduces_segments_and_preserves_values() {
776        let tmp = TempDir::new().unwrap();
777        let s = store(&tmp);
778        let r0 = s.empty_root().unwrap();
779        let r1 = s.set(r0, b"a", b"1").unwrap();
780        let r2 = s.set(r1, b"a", b"2").unwrap();
781        let r3 = s.set(r2, b"b", b"3").unwrap();
782        let r4 = s.del(r3, b"b").unwrap();
783        let compacted = s.compact(r4).unwrap();
784
785        let manifest = s.load_manifest_or_empty(compacted).unwrap();
786        assert!(manifest.segments.len() <= 1);
787        assert_eq!(s.get(compacted, b"a").unwrap(), Some(b"2".to_vec()));
788        assert_eq!(s.get(compacted, b"b").unwrap(), None);
789    }
790
791    #[test]
792    fn proof_membership_verifies() {
793        let tmp = TempDir::new().unwrap();
794        let s = store(&tmp);
795        let r0 = s.empty_root().unwrap();
796        let r1 = s.set(r0, b"k", b"v").unwrap();
797        let p = s.proof(r1, b"k").unwrap();
798        assert!(s.verify_proof(r1, b"k", &p));
799        assert!(matches!(p.outcome, StateOutcome::Found(_)));
800    }
801
802    #[test]
803    fn proof_non_membership_verifies() {
804        let tmp = TempDir::new().unwrap();
805        let s = store(&tmp);
806        let r0 = s.empty_root().unwrap();
807        let r1 = s.set(r0, b"a", b"1").unwrap();
808        let p = s.proof(r1, b"z").unwrap();
809        assert!(s.verify_proof(r1, b"z", &p));
810        assert_eq!(p.outcome, StateOutcome::Missing);
811    }
812
813    #[test]
814    fn proof_deleted_verifies() {
815        let tmp = TempDir::new().unwrap();
816        let s = store(&tmp);
817        let r0 = s.empty_root().unwrap();
818        let r1 = s.set(r0, b"k", b"v").unwrap();
819        let r2 = s.del(r1, b"k").unwrap();
820        let p = s.proof(r2, b"k").unwrap();
821        assert!(s.verify_proof(r2, b"k", &p));
822        assert_eq!(p.outcome, StateOutcome::Deleted);
823    }
824
825    #[test]
826    fn proof_wrong_key_fails() {
827        let tmp = TempDir::new().unwrap();
828        let s = store(&tmp);
829        let r0 = s.empty_root().unwrap();
830        let r1 = s.set(r0, b"k", b"v").unwrap();
831        let p = s.proof(r1, b"k").unwrap();
832        assert!(!s.verify_proof(r1, b"x", &p));
833    }
834
835    #[test]
836    fn proof_wrong_root_fails() {
837        let tmp = TempDir::new().unwrap();
838        let s = store(&tmp);
839        let r0 = s.empty_root().unwrap();
840        let r1 = s.set(r0, b"k", b"v").unwrap();
841        let p = s.proof(r1, b"k").unwrap();
842        assert!(!s.verify_proof(r0, b"k", &p));
843    }
844
845    #[test]
846    fn proof_tampered_manifest_proof_fails() {
847        let tmp = TempDir::new().unwrap();
848        let s = store(&tmp);
849        let r0 = s.empty_root().unwrap();
850        let r1 = s.set(r0, b"k", b"v").unwrap();
851        let mut p = s.proof(r1, b"k").unwrap();
852        p.scans[0].manifest_proof.index = 1;
853        assert!(!s.verify_proof(r1, b"k", &p));
854    }
855
856    #[test]
857    fn proof_tampered_entry_fails() {
858        let tmp = TempDir::new().unwrap();
859        let s = store(&tmp);
860        let r0 = s.empty_root().unwrap();
861        let r1 = s.set(r0, b"k", b"v").unwrap();
862        let mut p = s.proof(r1, b"k").unwrap();
863        match &mut p.scans[0].key_proof {
864            SegmentKeyProof::Inclusion(ep) => ep.entry.key = b"x".to_vec(),
865            _ => panic!("expected inclusion"),
866        }
867        assert!(!s.verify_proof(r1, b"k", &p));
868    }
869
870    #[test]
871    fn set_same_input_same_root() {
872        let tmp = TempDir::new().unwrap();
873        let s = store(&tmp);
874        let r0 = s.empty_root().unwrap();
875        let a = s.set(r0, b"k", b"v").unwrap();
876        let b = s.set(r0, b"k", b"v").unwrap();
877        assert_eq!(a, b);
878    }
879
880    #[test]
881    fn set_different_value_changes_root() {
882        let tmp = TempDir::new().unwrap();
883        let s = store(&tmp);
884        let r0 = s.empty_root().unwrap();
885        let a = s.set(r0, b"k", b"v1").unwrap();
886        let b = s.set(r0, b"k", b"v2").unwrap();
887        assert_ne!(a, b);
888    }
889
890    #[test]
891    fn load_missing_root_errors() {
892        let tmp = TempDir::new().unwrap();
893        let s = store(&tmp);
894        let missing = Hash::zero();
895        assert!(s.get(missing, b"k").is_err());
896    }
897
898    #[test]
899    fn proof_for_empty_root_missing() {
900        let tmp = TempDir::new().unwrap();
901        let s = store(&tmp);
902        let r = s.empty_root().unwrap();
903        let p = s.proof(r, b"k").unwrap();
904        assert_eq!(p.outcome, StateOutcome::Missing);
905        assert!(s.verify_proof(r, b"k", &p));
906    }
907
908    #[test]
909    fn hash_state_object_is_stable() {
910        let m = new_state_manifest(vec![]);
911        assert_eq!(
912            hash_state_object(&m).unwrap(),
913            hash_state_object(&m).unwrap()
914        );
915    }
916
917    fn random_property(seed: u64) {
918        let tmp = TempDir::new().unwrap();
919        let s = store(&tmp);
920        let mut rng = StdRng::seed_from_u64(seed);
921
922        let mut root = s.empty_root().unwrap();
923        let mut model: BTreeMap<Vec<u8>, Option<Vec<u8>>> = BTreeMap::new();
924        let keys: Vec<Vec<u8>> = (0..12).map(|i| format!("k{i}").into_bytes()).collect();
925
926        for _ in 0..200 {
927            let k = keys[rng.gen_range(0..keys.len())].clone();
928            let op = rng.gen_range(0..4);
929            if op < 2 {
930                let val_len = rng.gen_range(1..8);
931                let val: Vec<u8> = (0..val_len).map(|_| rng.gen_range(0u8..=255u8)).collect();
932                root = s.set(root, &k, &val).unwrap();
933                model.insert(k.clone(), Some(val));
934            } else if op == 2 {
935                root = s.del(root, &k).unwrap();
936                model.insert(k.clone(), None);
937            } else {
938                root = s.compact(root).unwrap();
939                // Compaction does not change logical view.
940            }
941
942            for probe in &keys {
943                let got = s.get(root, probe).unwrap();
944                let expected = model.get(probe).cloned().flatten();
945                assert_eq!(got, expected);
946            }
947        }
948    }
949
950    macro_rules! prop_test {
951        ($name:ident, $seed:expr) => {
952            #[test]
953            fn $name() {
954                random_property($seed);
955            }
956        };
957    }
958
959    prop_test!(property_random_ops_seed_1, 1);
960    prop_test!(property_random_ops_seed_2, 2);
961    prop_test!(property_random_ops_seed_3, 3);
962    prop_test!(property_random_ops_seed_4, 4);
963    prop_test!(property_random_ops_seed_5, 5);
964    prop_test!(property_random_ops_seed_6, 6);
965    prop_test!(property_random_ops_seed_7, 7);
966    prop_test!(property_random_ops_seed_8, 8);
967    prop_test!(property_random_ops_seed_9, 9);
968    prop_test!(property_random_ops_seed_10, 10);
969}