Skip to main content

neleus_db/
state.rs

1use std::cmp::Ordering;
2use std::collections::BTreeMap;
3use std::sync::{Arc, OnceLock};
4
5use anyhow::{Result, anyhow};
6use serde::{Deserialize, Serialize};
7
8use crate::blob_store::BlobStore;
9use crate::canonical::from_cbor;
10use crate::hash::{Hash, hash_typed};
11use crate::merkle::{MerkleLeaf, MerkleProof, prove_inclusion, root as merkle_root, verify_inclusion};
12use crate::object_store::ObjectStore;
13use crate::wal::{Wal, WalOp};
14
15const STATE_TAG: &[u8] = b"state_node:";
16const STATE_LEAF_TAG: &[u8] = b"state_leaf:";
17const STATE_MANIFEST_LEAF_TAG: &[u8] = b"state_manifest_leaf:";
18const STATE_SCHEMA_VERSION: u32 = 1;
19
20pub type StateRoot = Hash;
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23pub enum ValueRef {
24    Value(Hash),
25    Tombstone,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
29pub struct SegmentEntry {
30    pub key: Vec<u8>,
31    pub value: ValueRef,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub struct StateSegment {
36    pub schema_version: u32,
37    pub entries: Vec<SegmentEntry>,
38    pub merkle_root: Hash,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct StateManifest {
43    pub schema_version: u32,
44    pub segments: Vec<Hash>, // newest first
45    pub segments_merkle_root: Hash,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49pub struct EntryProof {
50    pub entry: SegmentEntry,
51    pub proof: MerkleProof,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
55pub struct NonInclusionProof {
56    pub insertion_index: usize,
57    pub left: Option<EntryProof>,
58    pub right: Option<EntryProof>,
59}
60
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub enum SegmentKeyProof {
63    Inclusion(EntryProof),
64    NonInclusion(NonInclusionProof),
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68pub enum SegmentVerdict {
69    Value(Hash),
70    Tombstone,
71    NotPresent,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
75pub struct SegmentScanProof {
76    pub segment_hash: Hash,
77    pub manifest_proof: MerkleProof,
78    pub segment_merkle_root: Hash,
79    pub segment_leaf_count: usize,
80    pub key_proof: SegmentKeyProof,
81    pub verdict: SegmentVerdict,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85pub enum StateOutcome {
86    Found(Hash),
87    Deleted,
88    Missing,
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
92pub struct StateProof {
93    pub manifest_schema_version: u32,
94    pub manifest_segment_count: usize,
95    pub manifest_segments_root: Hash,
96    pub scans: Vec<SegmentScanProof>,
97    pub outcome: StateOutcome,
98}
99
100#[derive(Clone, Debug)]
101pub struct StateStore {
102    objects: ObjectStore,
103    blobs: BlobStore,
104    wal: Wal,
105    /// Cached hash of the empty manifest. Computed and stored on first call;
106    /// subsequent calls return the hash without touching disk. Shared across
107    /// all clones of this `StateStore` so different `Database` views share
108    /// the cache.
109    empty_root: Arc<OnceLock<StateRoot>>,
110}
111
112impl StateStore {
113    pub fn new(objects: ObjectStore, blobs: BlobStore, wal: Wal) -> Self {
114        Self {
115            objects,
116            blobs,
117            wal,
118            empty_root: Arc::new(OnceLock::new()),
119        }
120    }
121
122    pub fn empty_root(&self) -> Result<StateRoot> {
123        if let Some(cached) = self.empty_root.get() {
124            return Ok(*cached);
125        }
126        let empty = new_state_manifest(vec![]);
127        let root = self.store_manifest(&empty)?;
128        // `set` returns Err if another thread won the race; either way the
129        // cached value is now Some, so we just read it back.
130        let _ = self.empty_root.set(root);
131        Ok(*self.empty_root.get().unwrap_or(&root))
132    }
133
134    pub fn get(&self, root: StateRoot, key: &[u8]) -> Result<Option<Vec<u8>>> {
135        let manifest = self.load_manifest_or_empty(root)?;
136        for segment_hash in &manifest.segments {
137            let segment = self.load_segment(*segment_hash)?;
138            match find_key(&segment.entries, key) {
139                Ok(idx) => match segment.entries[idx].value {
140                    ValueRef::Value(value_hash) => return Ok(Some(self.blobs.get(value_hash)?)),
141                    ValueRef::Tombstone => return Ok(None),
142                },
143                Err(_) => continue,
144            }
145        }
146        Ok(None)
147    }
148
149    pub fn set(&self, root: StateRoot, key: &[u8], value: &[u8]) -> Result<StateRoot> {
150        let wal_path =
151            self.wal
152                .begin_entry(&Wal::make_state_entry(WalOp::StateSet, root, key.len()))?;
153
154        let result = (|| {
155            let manifest = self.load_manifest_or_empty(root)?;
156            let value_hash = self.blobs.put(value)?;
157            let segment = StateSegment::from_entries(vec![SegmentEntry {
158                key: key.to_vec(),
159                value: ValueRef::Value(value_hash),
160            }])?;
161            let segment_hash = self.store_segment(&segment)?;
162            let new_manifest =
163                new_state_manifest(prepend_segment(segment_hash, &manifest.segments));
164            self.store_manifest(&new_manifest)
165        })();
166
167        if result.is_ok() {
168            self.wal.end(&wal_path)?;
169        }
170        result
171    }
172
173    pub fn del(&self, root: StateRoot, key: &[u8]) -> Result<StateRoot> {
174        let wal_path =
175            self.wal
176                .begin_entry(&Wal::make_state_entry(WalOp::StateDel, root, key.len()))?;
177
178        let result = (|| {
179            let manifest = self.load_manifest_or_empty(root)?;
180            let segment = StateSegment::from_entries(vec![SegmentEntry {
181                key: key.to_vec(),
182                value: ValueRef::Tombstone,
183            }])?;
184            let segment_hash = self.store_segment(&segment)?;
185            let new_manifest =
186                new_state_manifest(prepend_segment(segment_hash, &manifest.segments));
187            self.store_manifest(&new_manifest)
188        })();
189
190        if result.is_ok() {
191            self.wal.end(&wal_path)?;
192        }
193        result
194    }
195
196    /// Set multiple key-value pairs atomically in a single segment.
197    /// Duplicate keys are resolved last-write-wins (stable sort preserves order,
198    /// then dedup keeps the last occurrence so the caller's ordering matters).
199    pub fn set_many(&self, root: StateRoot, pairs: &[(&[u8], &[u8])]) -> Result<StateRoot> {
200        if pairs.is_empty() {
201            return Ok(root);
202        }
203
204        let wal_path =
205            self.wal
206                .begin_entry(&Wal::make_state_entry(WalOp::StateSet, root, pairs.len()))?;
207
208        let result = (|| {
209            let manifest = self.load_manifest_or_empty(root)?;
210
211            // Deduplicate: last occurrence for each key wins.
212            let mut deduped: BTreeMap<Vec<u8>, &[u8]> = BTreeMap::new();
213            for (k, v) in pairs {
214                deduped.insert(k.to_vec(), v);
215            }
216
217            let mut entries = Vec::with_capacity(deduped.len());
218            for (key, value) in deduped {
219                let value_hash = self.blobs.put(value)?;
220                entries.push(SegmentEntry {
221                    key,
222                    value: ValueRef::Value(value_hash),
223                });
224            }
225
226            let segment = StateSegment::from_entries(entries)?;
227            let segment_hash = self.store_segment(&segment)?;
228            let new_manifest =
229                new_state_manifest(prepend_segment(segment_hash, &manifest.segments));
230            self.store_manifest(&new_manifest)
231        })();
232
233        if result.is_ok() {
234            self.wal.end(&wal_path)?;
235        }
236        result
237    }
238
239    /// Delete multiple keys atomically in a single tombstone segment.
240    pub fn del_many(&self, root: StateRoot, keys: &[&[u8]]) -> Result<StateRoot> {
241        if keys.is_empty() {
242            return Ok(root);
243        }
244
245        let wal_path =
246            self.wal
247                .begin_entry(&Wal::make_state_entry(WalOp::StateDel, root, keys.len()))?;
248
249        let result = (|| {
250            let manifest = self.load_manifest_or_empty(root)?;
251
252            // Deduplicate keys via BTreeSet.
253            let mut unique: std::collections::BTreeSet<Vec<u8>> = std::collections::BTreeSet::new();
254            for k in keys {
255                unique.insert(k.to_vec());
256            }
257
258            let entries = unique
259                .into_iter()
260                .map(|key| SegmentEntry {
261                    key,
262                    value: ValueRef::Tombstone,
263                })
264                .collect();
265
266            let segment = StateSegment::from_entries(entries)?;
267            let segment_hash = self.store_segment(&segment)?;
268            let new_manifest =
269                new_state_manifest(prepend_segment(segment_hash, &manifest.segments));
270            self.store_manifest(&new_manifest)
271        })();
272
273        if result.is_ok() {
274            self.wal.end(&wal_path)?;
275        }
276        result
277    }
278
279    pub fn compact(&self, root: StateRoot) -> Result<StateRoot> {
280        let wal_path =
281            self.wal
282                .begin_entry(&Wal::make_state_entry(WalOp::StateCompact, root, 0))?;
283
284        let result = (|| {
285            let manifest = self.load_manifest_or_empty(root)?;
286            let mut visible: BTreeMap<Vec<u8>, ValueRef> = BTreeMap::new();
287
288            for segment_hash in &manifest.segments {
289                let segment = self.load_segment(*segment_hash)?;
290                for entry in segment.entries {
291                    visible.entry(entry.key).or_insert(entry.value);
292                }
293            }
294
295            let merged_entries: Vec<SegmentEntry> = visible
296                .into_iter()
297                .filter_map(|(key, value)| match value {
298                    ValueRef::Value(h) => Some(SegmentEntry {
299                        key,
300                        value: ValueRef::Value(h),
301                    }),
302                    ValueRef::Tombstone => None,
303                })
304                .collect();
305
306            if merged_entries.is_empty() {
307                return self.store_manifest(&new_state_manifest(vec![]));
308            }
309
310            let merged_segment = StateSegment::from_entries(merged_entries)?;
311            let merged_hash = self.store_segment(&merged_segment)?;
312            self.store_manifest(&new_state_manifest(vec![merged_hash]))
313        })();
314
315        if result.is_ok() {
316            self.wal.end(&wal_path)?;
317        }
318
319        result
320    }
321
322    pub fn proof(&self, root: StateRoot, key: &[u8]) -> Result<StateProof> {
323        let manifest = self.load_manifest_or_empty(root)?;
324        let manifest_leaves = manifest_segment_leaves(&manifest.segments);
325
326        let mut scans = Vec::new();
327        let mut outcome = StateOutcome::Missing;
328
329        for (idx, segment_hash) in manifest.segments.iter().enumerate() {
330            let segment = self.load_segment(*segment_hash)?;
331            let manifest_proof = prove_inclusion(&manifest_leaves, idx)
332                .ok_or_else(|| anyhow!("failed to build manifest inclusion proof"))?;
333            let scan = build_segment_scan(*segment_hash, segment, manifest_proof, key)?;
334
335            match scan.verdict {
336                SegmentVerdict::Value(h) => {
337                    scans.push(scan);
338                    outcome = StateOutcome::Found(h);
339                    break;
340                }
341                SegmentVerdict::Tombstone => {
342                    scans.push(scan);
343                    outcome = StateOutcome::Deleted;
344                    break;
345                }
346                SegmentVerdict::NotPresent => scans.push(scan),
347            }
348        }
349
350        Ok(StateProof {
351            manifest_schema_version: manifest.schema_version,
352            manifest_segment_count: manifest.segments.len(),
353            manifest_segments_root: manifest.segments_merkle_root,
354            scans,
355            outcome,
356        })
357    }
358
359    pub fn verify_proof(&self, root: StateRoot, key: &[u8], proof: &StateProof) -> bool {
360        let manifest = match self.load_manifest_or_empty(root) {
361            Ok(m) => m,
362            Err(_) => return false,
363        };
364
365        if proof.manifest_schema_version != manifest.schema_version {
366            return false;
367        }
368        if proof.manifest_segment_count != manifest.segments.len() {
369            return false;
370        }
371        if proof.manifest_segments_root != manifest.segments_merkle_root {
372            return false;
373        }
374        if proof.scans.len() > manifest.segments.len() {
375            return false;
376        }
377
378        let mut computed_outcome = StateOutcome::Missing;
379        let mut terminal = None;
380
381        for (idx, scan) in proof.scans.iter().enumerate() {
382            if manifest.segments.get(idx).copied() != Some(scan.segment_hash) {
383                return false;
384            }
385
386            if scan.manifest_proof.index != idx {
387                return false;
388            }
389
390            let leaf = manifest_segment_leaf(scan.segment_hash);
391            if !verify_inclusion(manifest.segments_merkle_root, leaf, &scan.manifest_proof) {
392                return false;
393            }
394
395            let segment = match self.load_segment(scan.segment_hash) {
396                Ok(s) => s,
397                Err(_) => return false,
398            };
399
400            if segment.merkle_root != scan.segment_merkle_root {
401                return false;
402            }
403            if segment.entries.len() != scan.segment_leaf_count {
404                return false;
405            }
406            if !segment_is_sorted_unique(&segment.entries) {
407                return false;
408            }
409
410            let verdict_terminal = match &scan.key_proof {
411                SegmentKeyProof::Inclusion(ep) => {
412                    if ep.entry.key.as_slice() != key {
413                        return false;
414                    }
415                    if !verify_entry_proof(&segment, ep) {
416                        return false;
417                    }
418                    match ep.entry.value {
419                        ValueRef::Value(h) => {
420                            if scan.verdict != SegmentVerdict::Value(h) {
421                                return false;
422                            }
423                            Some(StateOutcome::Found(h))
424                        }
425                        ValueRef::Tombstone => {
426                            if scan.verdict != SegmentVerdict::Tombstone {
427                                return false;
428                            }
429                            Some(StateOutcome::Deleted)
430                        }
431                    }
432                }
433                SegmentKeyProof::NonInclusion(np) => {
434                    if scan.verdict != SegmentVerdict::NotPresent {
435                        return false;
436                    }
437                    if !verify_non_inclusion(&segment, key, np) {
438                        return false;
439                    }
440                    None
441                }
442            };
443
444            if let Some(outcome) = verdict_terminal {
445                terminal = Some(idx);
446                computed_outcome = outcome;
447                break;
448            }
449        }
450
451        match terminal {
452            Some(i) => {
453                if proof.scans.len() != i + 1 {
454                    return false;
455                }
456            }
457            None => {
458                if proof.scans.len() != manifest.segments.len() {
459                    return false;
460                }
461                computed_outcome = StateOutcome::Missing;
462            }
463        }
464
465        computed_outcome == proof.outcome
466    }
467
468    fn load_manifest_or_empty(&self, root: StateRoot) -> Result<StateManifest> {
469        if self.objects.exists(root) {
470            return self.load_manifest(root);
471        }
472
473        let empty_root = self.empty_root()?;
474        if root == empty_root {
475            return Ok(new_state_manifest(vec![]));
476        }
477
478        Err(anyhow!("state root {} not found", root))
479    }
480
481    fn store_manifest(&self, manifest: &StateManifest) -> Result<StateRoot> {
482        self.objects.put_serialized(STATE_TAG, manifest)
483    }
484
485    fn load_manifest(&self, hash: StateRoot) -> Result<StateManifest> {
486        let bytes = self.objects.get_typed_bytes(STATE_TAG, hash)?;
487        let computed = hash_typed(STATE_TAG, &bytes);
488        if computed != hash {
489            return Err(anyhow!("manifest hash mismatch for {}", hash));
490        }
491        let manifest: StateManifest = from_cbor(&bytes)?;
492        Ok(manifest)
493    }
494
495    fn store_segment(&self, segment: &StateSegment) -> Result<Hash> {
496        self.objects.put_serialized(STATE_TAG, segment)
497    }
498
499    fn load_segment(&self, hash: Hash) -> Result<StateSegment> {
500        let bytes = self.objects.get_typed_bytes(STATE_TAG, hash)?;
501        let computed = hash_typed(STATE_TAG, &bytes);
502        if computed != hash {
503            return Err(anyhow!("segment hash mismatch for {}", hash));
504        }
505        let segment: StateSegment = from_cbor(&bytes)?;
506        Ok(segment)
507    }
508}
509
510impl StateSegment {
511    pub fn from_entries(mut entries: Vec<SegmentEntry>) -> Result<Self> {
512        entries.sort_by(|a, b| a.key.cmp(&b.key));
513        if !segment_is_sorted_unique(&entries) {
514            return Err(anyhow!("segment entries must have unique sorted keys"));
515        }
516
517        let leaves: Vec<MerkleLeaf> = entries.iter().map(entry_leaf).collect();
518        let merkle_root = merkle_root(&leaves);
519        Ok(Self {
520            schema_version: STATE_SCHEMA_VERSION,
521            entries,
522            merkle_root,
523        })
524    }
525}
526
527fn new_state_manifest(segments: Vec<Hash>) -> StateManifest {
528    StateManifest {
529        schema_version: STATE_SCHEMA_VERSION,
530        segments_merkle_root: merkle_root(&manifest_segment_leaves(&segments)),
531        segments,
532    }
533}
534
535#[cfg(test)]
536fn hash_state_object<T: Serialize>(obj: &T) -> Result<Hash> {
537    let bytes = crate::canonical::to_cbor(obj)?;
538    Ok(hash_typed(STATE_TAG, &bytes))
539}
540
541fn prepend_segment(newest: Hash, existing: &[Hash]) -> Vec<Hash> {
542    let mut out = Vec::with_capacity(existing.len() + 1);
543    out.push(newest);
544    out.extend_from_slice(existing);
545    out
546}
547
548fn find_key(entries: &[SegmentEntry], key: &[u8]) -> std::result::Result<usize, usize> {
549    entries.binary_search_by(|e| compare_key(e.key.as_slice(), key))
550}
551
552fn compare_key(a: &[u8], b: &[u8]) -> Ordering {
553    a.cmp(b)
554}
555
556fn segment_is_sorted_unique(entries: &[SegmentEntry]) -> bool {
557    entries
558        .windows(2)
559        .all(|w| w[0].key.as_slice() < w[1].key.as_slice())
560}
561
562fn manifest_segment_leaf(segment_hash: Hash) -> MerkleLeaf {
563    MerkleLeaf::new(STATE_MANIFEST_LEAF_TAG, segment_hash.as_bytes())
564}
565
566fn manifest_segment_leaves(segments: &[Hash]) -> Vec<MerkleLeaf> {
567    segments.iter().copied().map(manifest_segment_leaf).collect()
568}
569
570fn entry_leaf(entry: &SegmentEntry) -> MerkleLeaf {
571    let mut bytes = Vec::new();
572    bytes.extend_from_slice(&(entry.key.len() as u32).to_be_bytes());
573    bytes.extend_from_slice(&entry.key);
574
575    match entry.value {
576        ValueRef::Value(h) => {
577            bytes.push(1);
578            bytes.extend_from_slice(h.as_bytes());
579        }
580        ValueRef::Tombstone => bytes.push(0),
581    }
582
583    MerkleLeaf::new(STATE_LEAF_TAG, &bytes)
584}
585
586fn build_segment_scan(
587    segment_hash: Hash,
588    segment: StateSegment,
589    manifest_proof: MerkleProof,
590    key: &[u8],
591) -> Result<SegmentScanProof> {
592    let leaves: Vec<MerkleLeaf> = segment.entries.iter().map(entry_leaf).collect();
593
594    match find_key(&segment.entries, key) {
595        Ok(index) => {
596            let entry = segment.entries[index].clone();
597            let proof = prove_inclusion(&leaves, index)
598                .ok_or_else(|| anyhow!("failed to build inclusion proof"))?;
599            let verdict = match entry.value {
600                ValueRef::Value(h) => SegmentVerdict::Value(h),
601                ValueRef::Tombstone => SegmentVerdict::Tombstone,
602            };
603            Ok(SegmentScanProof {
604                segment_hash,
605                manifest_proof,
606                segment_merkle_root: segment.merkle_root,
607                segment_leaf_count: segment.entries.len(),
608                key_proof: SegmentKeyProof::Inclusion(EntryProof { entry, proof }),
609                verdict,
610            })
611        }
612        Err(ins) => {
613            let left = if ins > 0 {
614                let idx = ins - 1;
615                Some(EntryProof {
616                    entry: segment.entries[idx].clone(),
617                    proof: prove_inclusion(&leaves, idx)
618                        .ok_or_else(|| anyhow!("failed to build left neighbor proof"))?,
619                })
620            } else {
621                None
622            };
623
624            let right = if ins < segment.entries.len() {
625                Some(EntryProof {
626                    entry: segment.entries[ins].clone(),
627                    proof: prove_inclusion(&leaves, ins)
628                        .ok_or_else(|| anyhow!("failed to build right neighbor proof"))?,
629                })
630            } else {
631                None
632            };
633
634            Ok(SegmentScanProof {
635                segment_hash,
636                manifest_proof,
637                segment_merkle_root: segment.merkle_root,
638                segment_leaf_count: segment.entries.len(),
639                key_proof: SegmentKeyProof::NonInclusion(NonInclusionProof {
640                    insertion_index: ins,
641                    left,
642                    right,
643                }),
644                verdict: SegmentVerdict::NotPresent,
645            })
646        }
647    }
648}
649
650fn verify_entry_proof(segment: &StateSegment, entry_proof: &EntryProof) -> bool {
651    let idx = entry_proof.proof.index;
652    if segment.entries.get(idx) != Some(&entry_proof.entry) {
653        return false;
654    }
655
656    let leaf = entry_leaf(&entry_proof.entry);
657    verify_inclusion(segment.merkle_root, leaf, &entry_proof.proof)
658}
659
660fn verify_non_inclusion(segment: &StateSegment, key: &[u8], np: &NonInclusionProof) -> bool {
661    let len = segment.entries.len();
662    if np.insertion_index > len {
663        return false;
664    }
665
666    if np.insertion_index < len && segment.entries[np.insertion_index].key.as_slice() == key {
667        return false;
668    }
669
670    match &np.left {
671        Some(left) => {
672            if !verify_entry_proof(segment, left) {
673                return false;
674            }
675            if left.proof.index + 1 != np.insertion_index {
676                return false;
677            }
678            if left.entry.key.as_slice() >= key {
679                return false;
680            }
681        }
682        None => {
683            if np.insertion_index != 0 {
684                return false;
685            }
686        }
687    }
688
689    match &np.right {
690        Some(right) => {
691            if !verify_entry_proof(segment, right) {
692                return false;
693            }
694            if right.proof.index != np.insertion_index {
695                return false;
696            }
697            if key >= right.entry.key.as_slice() {
698                return false;
699            }
700        }
701        None => {
702            if np.insertion_index != len {
703                return false;
704            }
705        }
706    }
707
708    if let (Some(left), Some(right)) = (&np.left, &np.right)
709        && left.entry.key.as_slice() >= right.entry.key.as_slice()
710    {
711        return false;
712    }
713
714    true
715}
716
717#[cfg(test)]
718mod tests {
719    use std::collections::BTreeMap;
720
721    use rand::rngs::StdRng;
722    use rand::{Rng, SeedableRng};
723    use tempfile::TempDir;
724
725    use super::*;
726
727    fn store(tmp: &TempDir) -> StateStore {
728        let objects = ObjectStore::new(tmp.path().join("objects"));
729        objects.ensure_dir().unwrap();
730        let blobs = BlobStore::new(tmp.path().join("blobs"));
731        blobs.ensure_dir().unwrap();
732        StateStore::new(objects, blobs, Wal::new(tmp.path().join("wal")))
733    }
734
735    #[test]
736    fn empty_root_get_none() {
737        let tmp = TempDir::new().unwrap();
738        let s = store(&tmp);
739        let root = s.empty_root().unwrap();
740        assert_eq!(s.get(root, b"missing").unwrap(), None);
741    }
742
743    #[test]
744    fn set_get_single_key() {
745        let tmp = TempDir::new().unwrap();
746        let s = store(&tmp);
747        let root0 = s.empty_root().unwrap();
748        let root1 = s.set(root0, b"k", b"v").unwrap();
749        assert_eq!(s.get(root1, b"k").unwrap(), Some(b"v".to_vec()));
750    }
751
752    #[test]
753    fn overwrite_key_returns_latest_value() {
754        let tmp = TempDir::new().unwrap();
755        let s = store(&tmp);
756        let root0 = s.empty_root().unwrap();
757        let root1 = s.set(root0, b"k", b"v1").unwrap();
758        let root2 = s.set(root1, b"k", b"v2").unwrap();
759        assert_eq!(s.get(root2, b"k").unwrap(), Some(b"v2".to_vec()));
760    }
761
762    #[test]
763    fn old_roots_remain_readable() {
764        let tmp = TempDir::new().unwrap();
765        let s = store(&tmp);
766        let root0 = s.empty_root().unwrap();
767        let root1 = s.set(root0, b"k", b"v1").unwrap();
768        let root2 = s.set(root1, b"k", b"v2").unwrap();
769        assert_eq!(s.get(root1, b"k").unwrap(), Some(b"v1".to_vec()));
770        assert_eq!(s.get(root2, b"k").unwrap(), Some(b"v2".to_vec()));
771    }
772
773    #[test]
774    fn delete_removes_value() {
775        let tmp = TempDir::new().unwrap();
776        let s = store(&tmp);
777        let r0 = s.empty_root().unwrap();
778        let r1 = s.set(r0, b"k", b"v").unwrap();
779        let r2 = s.del(r1, b"k").unwrap();
780        assert_eq!(s.get(r2, b"k").unwrap(), None);
781    }
782
783    #[test]
784    fn delete_missing_key_still_creates_new_root() {
785        let tmp = TempDir::new().unwrap();
786        let s = store(&tmp);
787        let r0 = s.empty_root().unwrap();
788        let r1 = s.del(r0, b"missing").unwrap();
789        assert_ne!(r0, r1);
790    }
791
792    #[test]
793    fn multiple_keys_independent() {
794        let tmp = TempDir::new().unwrap();
795        let s = store(&tmp);
796        let r0 = s.empty_root().unwrap();
797        let r1 = s.set(r0, b"a", b"1").unwrap();
798        let r2 = s.set(r1, b"b", b"2").unwrap();
799        assert_eq!(s.get(r2, b"a").unwrap(), Some(b"1".to_vec()));
800        assert_eq!(s.get(r2, b"b").unwrap(), Some(b"2".to_vec()));
801    }
802
803    #[test]
804    fn tombstone_wins_over_older_value() {
805        let tmp = TempDir::new().unwrap();
806        let s = store(&tmp);
807        let r0 = s.empty_root().unwrap();
808        let r1 = s.set(r0, b"k", b"1").unwrap();
809        let r2 = s.del(r1, b"k").unwrap();
810        let r3 = s.set(r2, b"other", b"x").unwrap();
811        assert_eq!(s.get(r3, b"k").unwrap(), None);
812    }
813
814    #[test]
815    fn segment_order_newest_first() {
816        let tmp = TempDir::new().unwrap();
817        let s = store(&tmp);
818        let r0 = s.empty_root().unwrap();
819        let r1 = s.set(r0, b"k", b"1").unwrap();
820        let r2 = s.set(r1, b"k", b"2").unwrap();
821        let m: StateManifest = s.load_manifest_or_empty(r2).unwrap();
822        assert_eq!(m.segments.len(), 2);
823        let newer: StateSegment = s.load_segment(m.segments[0]).unwrap();
824        assert_eq!(
825            newer.entries[0].value,
826            ValueRef::Value(s.blobs.put(b"2").unwrap())
827        );
828    }
829
830    #[test]
831    fn manifest_merkle_root_matches_segments() {
832        let tmp = TempDir::new().unwrap();
833        let s = store(&tmp);
834        let r0 = s.empty_root().unwrap();
835        let r1 = s.set(r0, b"a", b"1").unwrap();
836        let m = s.load_manifest_or_empty(r1).unwrap();
837        let leaves = m
838            .segments
839            .iter()
840            .copied()
841            .map(super::manifest_segment_leaf)
842            .collect::<Vec<_>>();
843        assert_eq!(m.segments_merkle_root, merkle_root(&leaves));
844    }
845
846    #[test]
847    fn compact_reduces_segments_and_preserves_values() {
848        let tmp = TempDir::new().unwrap();
849        let s = store(&tmp);
850        let r0 = s.empty_root().unwrap();
851        let r1 = s.set(r0, b"a", b"1").unwrap();
852        let r2 = s.set(r1, b"a", b"2").unwrap();
853        let r3 = s.set(r2, b"b", b"3").unwrap();
854        let r4 = s.del(r3, b"b").unwrap();
855        let compacted = s.compact(r4).unwrap();
856
857        let manifest = s.load_manifest_or_empty(compacted).unwrap();
858        assert!(manifest.segments.len() <= 1);
859        assert_eq!(s.get(compacted, b"a").unwrap(), Some(b"2".to_vec()));
860        assert_eq!(s.get(compacted, b"b").unwrap(), None);
861    }
862
863    #[test]
864    fn proof_membership_verifies() {
865        let tmp = TempDir::new().unwrap();
866        let s = store(&tmp);
867        let r0 = s.empty_root().unwrap();
868        let r1 = s.set(r0, b"k", b"v").unwrap();
869        let p = s.proof(r1, b"k").unwrap();
870        assert!(s.verify_proof(r1, b"k", &p));
871        assert!(matches!(p.outcome, StateOutcome::Found(_)));
872    }
873
874    #[test]
875    fn proof_non_membership_verifies() {
876        let tmp = TempDir::new().unwrap();
877        let s = store(&tmp);
878        let r0 = s.empty_root().unwrap();
879        let r1 = s.set(r0, b"a", b"1").unwrap();
880        let p = s.proof(r1, b"z").unwrap();
881        assert!(s.verify_proof(r1, b"z", &p));
882        assert_eq!(p.outcome, StateOutcome::Missing);
883    }
884
885    #[test]
886    fn proof_deleted_verifies() {
887        let tmp = TempDir::new().unwrap();
888        let s = store(&tmp);
889        let r0 = s.empty_root().unwrap();
890        let r1 = s.set(r0, b"k", b"v").unwrap();
891        let r2 = s.del(r1, b"k").unwrap();
892        let p = s.proof(r2, b"k").unwrap();
893        assert!(s.verify_proof(r2, b"k", &p));
894        assert_eq!(p.outcome, StateOutcome::Deleted);
895    }
896
897    #[test]
898    fn proof_wrong_key_fails() {
899        let tmp = TempDir::new().unwrap();
900        let s = store(&tmp);
901        let r0 = s.empty_root().unwrap();
902        let r1 = s.set(r0, b"k", b"v").unwrap();
903        let p = s.proof(r1, b"k").unwrap();
904        assert!(!s.verify_proof(r1, b"x", &p));
905    }
906
907    #[test]
908    fn proof_wrong_root_fails() {
909        let tmp = TempDir::new().unwrap();
910        let s = store(&tmp);
911        let r0 = s.empty_root().unwrap();
912        let r1 = s.set(r0, b"k", b"v").unwrap();
913        let p = s.proof(r1, b"k").unwrap();
914        assert!(!s.verify_proof(r0, b"k", &p));
915    }
916
917    #[test]
918    fn proof_tampered_manifest_proof_fails() {
919        let tmp = TempDir::new().unwrap();
920        let s = store(&tmp);
921        let r0 = s.empty_root().unwrap();
922        let r1 = s.set(r0, b"k", b"v").unwrap();
923        let mut p = s.proof(r1, b"k").unwrap();
924        p.scans[0].manifest_proof.index = 1;
925        assert!(!s.verify_proof(r1, b"k", &p));
926    }
927
928    #[test]
929    fn proof_tampered_entry_fails() {
930        let tmp = TempDir::new().unwrap();
931        let s = store(&tmp);
932        let r0 = s.empty_root().unwrap();
933        let r1 = s.set(r0, b"k", b"v").unwrap();
934        let mut p = s.proof(r1, b"k").unwrap();
935        match &mut p.scans[0].key_proof {
936            SegmentKeyProof::Inclusion(ep) => ep.entry.key = b"x".to_vec(),
937            _ => panic!("expected inclusion"),
938        }
939        assert!(!s.verify_proof(r1, b"k", &p));
940    }
941
942    #[test]
943    fn set_same_input_same_root() {
944        let tmp = TempDir::new().unwrap();
945        let s = store(&tmp);
946        let r0 = s.empty_root().unwrap();
947        let a = s.set(r0, b"k", b"v").unwrap();
948        let b = s.set(r0, b"k", b"v").unwrap();
949        assert_eq!(a, b);
950    }
951
952    #[test]
953    fn set_different_value_changes_root() {
954        let tmp = TempDir::new().unwrap();
955        let s = store(&tmp);
956        let r0 = s.empty_root().unwrap();
957        let a = s.set(r0, b"k", b"v1").unwrap();
958        let b = s.set(r0, b"k", b"v2").unwrap();
959        assert_ne!(a, b);
960    }
961
962    #[test]
963    fn load_missing_root_errors() {
964        let tmp = TempDir::new().unwrap();
965        let s = store(&tmp);
966        let missing = Hash::zero();
967        assert!(s.get(missing, b"k").is_err());
968    }
969
970    #[test]
971    fn proof_for_empty_root_missing() {
972        let tmp = TempDir::new().unwrap();
973        let s = store(&tmp);
974        let r = s.empty_root().unwrap();
975        let p = s.proof(r, b"k").unwrap();
976        assert_eq!(p.outcome, StateOutcome::Missing);
977        assert!(s.verify_proof(r, b"k", &p));
978    }
979
980    #[test]
981    fn hash_state_object_is_stable() {
982        let m = new_state_manifest(vec![]);
983        assert_eq!(
984            hash_state_object(&m).unwrap(),
985            hash_state_object(&m).unwrap()
986        );
987    }
988
989    /// Regression for issue #5: `empty_root` must memoize so subsequent
990    /// calls don't re-write the empty manifest to disk. Verified by
991    /// deleting the on-disk object after the first call and asserting that
992    /// the second call returns the same hash without re-creating the file.
993    #[test]
994    fn empty_root_is_memoized_and_skips_redundant_writes() {
995        use crate::cas::CasStore;
996
997        let tmp = TempDir::new().unwrap();
998        let s = store(&tmp);
999        let h1 = s.empty_root().unwrap();
1000
1001        let cas_path = CasStore::new(tmp.path().join("objects")).path_for(h1);
1002        assert!(cas_path.exists(), "first empty_root call must store the object");
1003        std::fs::remove_file(&cas_path).unwrap();
1004
1005        let h2 = s.empty_root().unwrap();
1006        assert_eq!(h1, h2);
1007        assert!(
1008            !cas_path.exists(),
1009            "second empty_root call re-stored the object — memoization is bypassed"
1010        );
1011    }
1012
1013    fn random_property(seed: u64) {
1014        let tmp = TempDir::new().unwrap();
1015        let s = store(&tmp);
1016        let mut rng = StdRng::seed_from_u64(seed);
1017
1018        let mut root = s.empty_root().unwrap();
1019        let mut model: BTreeMap<Vec<u8>, Option<Vec<u8>>> = BTreeMap::new();
1020        let keys: Vec<Vec<u8>> = (0..12).map(|i| format!("k{i}").into_bytes()).collect();
1021
1022        for _ in 0..200 {
1023            let k = keys[rng.gen_range(0..keys.len())].clone();
1024            let op = rng.gen_range(0..4);
1025            if op < 2 {
1026                let val_len = rng.gen_range(1..8);
1027                let val: Vec<u8> = (0..val_len).map(|_| rng.gen_range(0u8..=255u8)).collect();
1028                root = s.set(root, &k, &val).unwrap();
1029                model.insert(k.clone(), Some(val));
1030            } else if op == 2 {
1031                root = s.del(root, &k).unwrap();
1032                model.insert(k.clone(), None);
1033            } else {
1034                root = s.compact(root).unwrap();
1035                // Compaction does not change logical view.
1036            }
1037
1038            for probe in &keys {
1039                let got = s.get(root, probe).unwrap();
1040                let expected = model.get(probe).cloned().flatten();
1041                assert_eq!(got, expected);
1042            }
1043        }
1044    }
1045
1046    macro_rules! prop_test {
1047        ($name:ident, $seed:expr) => {
1048            #[test]
1049            fn $name() {
1050                random_property($seed);
1051            }
1052        };
1053    }
1054
1055    prop_test!(property_random_ops_seed_1, 1);
1056    prop_test!(property_random_ops_seed_2, 2);
1057    prop_test!(property_random_ops_seed_3, 3);
1058    prop_test!(property_random_ops_seed_4, 4);
1059    prop_test!(property_random_ops_seed_5, 5);
1060    prop_test!(property_random_ops_seed_6, 6);
1061    prop_test!(property_random_ops_seed_7, 7);
1062    prop_test!(property_random_ops_seed_8, 8);
1063    prop_test!(property_random_ops_seed_9, 9);
1064    prop_test!(property_random_ops_seed_10, 10);
1065
1066    mod proptest_suite {
1067        use proptest::prelude::*;
1068        use tempfile::TempDir;
1069
1070        use super::*;
1071
1072        fn fresh_store() -> (TempDir, StateStore) {
1073            let tmp = TempDir::new().unwrap();
1074            let s = store(&tmp);
1075            (tmp, s)
1076        }
1077
1078        proptest! {
1079            // The 256-case default makes these tests TempDir-heavy and fsync-bound.
1080            // 32 cases saturate coverage well before that and keep the suite snappy.
1081            #![proptest_config(ProptestConfig::with_cases(32))]
1082
1083            /// For any sequence of unique keys, set_many must make all of them readable.
1084            #[test]
1085            fn set_many_all_readable(
1086                pairs in proptest::collection::vec(
1087                    (proptest::collection::vec(any::<u8>(), 1..16),
1088                     proptest::collection::vec(any::<u8>(), 1..32)),
1089                    1..20,
1090                )
1091            ) {
1092                // Deduplicate keys so the last one wins (mirrors set_many semantics).
1093                use std::collections::BTreeMap;
1094                let mut map: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1095                for (k, v) in &pairs {
1096                    map.insert(k.clone(), v.clone());
1097                }
1098
1099                let (_tmp, s) = fresh_store();
1100                let root0 = s.empty_root().unwrap();
1101                let pairs_ref: Vec<(&[u8], &[u8])> = map
1102                    .iter()
1103                    .map(|(k, v)| (k.as_slice(), v.as_slice()))
1104                    .collect();
1105                let root1 = s.set_many(root0, &pairs_ref).unwrap();
1106
1107                for (k, expected_v) in &map {
1108                    let got = s.get(root1, k.as_slice()).unwrap();
1109                    prop_assert_eq!(got.as_deref(), Some(expected_v.as_slice()));
1110                }
1111            }
1112
1113            /// del_many must remove exactly the specified keys without touching others.
1114            #[test]
1115            fn del_many_removes_specified_keys(
1116                all_keys in proptest::collection::vec(
1117                    proptest::collection::vec(any::<u8>(), 1..12),
1118                    2..16,
1119                )
1120            ) {
1121                use std::collections::BTreeSet;
1122                // Deduplicate keys.
1123                let unique: BTreeSet<Vec<u8>> = all_keys.into_iter().collect();
1124                if unique.len() < 2 {
1125                    return Ok(());
1126                }
1127                let keys: Vec<Vec<u8>> = unique.into_iter().collect();
1128
1129                let (_tmp, s) = fresh_store();
1130                let mut root = s.empty_root().unwrap();
1131
1132                // Set all keys.
1133                for k in &keys {
1134                    root = s.set(root, k, b"value").unwrap();
1135                }
1136
1137                // Delete the first half.
1138                let (to_delete, to_keep) = keys.split_at(keys.len() / 2);
1139                let del_refs: Vec<&[u8]> = to_delete.iter().map(|k| k.as_slice()).collect();
1140                root = s.del_many(root, &del_refs).unwrap();
1141
1142                for k in to_delete {
1143                    prop_assert_eq!(s.get(root, k).unwrap(), None, "key {:?} should be deleted", k);
1144                }
1145                for k in to_keep {
1146                    let val = s.get(root, k).unwrap();
1147                    prop_assert_eq!(
1148                        val.as_deref(),
1149                        Some(b"value".as_slice()),
1150                        "key {:?} should still be present",
1151                        k
1152                    );
1153                }
1154            }
1155
1156            /// Every state proof must self-verify after arbitrary set operations.
1157            #[test]
1158            fn proofs_always_verify(
1159                ops in proptest::collection::vec(
1160                    (proptest::collection::vec(b'a'..=b'z', 1..8),
1161                     proptest::collection::vec(any::<u8>(), 1..16),
1162                     any::<bool>()),
1163                    1..15,
1164                )
1165            ) {
1166                let (_tmp, s) = fresh_store();
1167                let mut root = s.empty_root().unwrap();
1168
1169                for (k, v, is_del) in &ops {
1170                    if *is_del {
1171                        root = s.del(root, k).unwrap();
1172                    } else {
1173                        root = s.set(root, k, v).unwrap();
1174                    }
1175                }
1176
1177                // Probe every key that was ever touched.
1178                for (k, _, _) in &ops {
1179                    let proof = s.proof(root, k).unwrap();
1180                    prop_assert!(
1181                        s.verify_proof(root, k, &proof),
1182                        "proof failed for key {:?}",
1183                        k
1184                    );
1185                }
1186            }
1187        }
1188    }
1189
1190    /// Forged proofs must be rejected by `verify_proof`. Each test tampers one
1191    /// field at a time on an otherwise-valid proof and asserts rejection — this
1192    /// is the soundness side of the existing `proofs_always_verify` property
1193    /// (which only checks honest proofs).
1194    mod adversarial_proofs {
1195        use super::*;
1196        use crate::hash::hash_typed;
1197
1198        fn setup() -> (TempDir, StateStore, StateRoot) {
1199            let tmp = TempDir::new().unwrap();
1200            let s = store(&tmp);
1201            let root = s.empty_root().unwrap();
1202            let root = s.set(root, b"a", b"1").unwrap();
1203            let root = s.set(root, b"b", b"2").unwrap();
1204            let root = s.set(root, b"c", b"3").unwrap();
1205            (tmp, s, root)
1206        }
1207
1208        #[test]
1209        fn forged_outcome_value_for_missing_key_is_rejected() {
1210            let (_tmp, s, root) = setup();
1211            let mut proof = s.proof(root, b"missing-key").unwrap();
1212            assert!(matches!(proof.outcome, StateOutcome::Missing));
1213            proof.outcome = StateOutcome::Found(hash_typed(b"blob:", b"fake"));
1214            assert!(!s.verify_proof(root, b"missing-key", &proof));
1215        }
1216
1217        #[test]
1218        fn forged_outcome_missing_for_present_key_is_rejected() {
1219            let (_tmp, s, root) = setup();
1220            let mut proof = s.proof(root, b"a").unwrap();
1221            assert!(matches!(proof.outcome, StateOutcome::Found(_)));
1222            proof.outcome = StateOutcome::Missing;
1223            assert!(!s.verify_proof(root, b"a", &proof));
1224        }
1225
1226        #[test]
1227        fn wrong_manifest_segment_count_is_rejected() {
1228            let (_tmp, s, root) = setup();
1229            let mut proof = s.proof(root, b"a").unwrap();
1230            proof.manifest_segment_count += 1;
1231            assert!(!s.verify_proof(root, b"a", &proof));
1232        }
1233
1234        #[test]
1235        fn wrong_manifest_segments_root_is_rejected() {
1236            let (_tmp, s, root) = setup();
1237            let mut proof = s.proof(root, b"a").unwrap();
1238            proof.manifest_segments_root = hash_typed(b"forged:", b"root");
1239            assert!(!s.verify_proof(root, b"a", &proof));
1240        }
1241
1242        #[test]
1243        fn extra_unrelated_scan_is_rejected() {
1244            let (_tmp, s, root) = setup();
1245            let mut proof = s.proof(root, b"a").unwrap();
1246            // Duplicate the terminal scan to make `scans.len()` inconsistent
1247            // with the recorded terminal index.
1248            let last = proof.scans.last().cloned().unwrap();
1249            proof.scans.push(last);
1250            assert!(!s.verify_proof(root, b"a", &proof));
1251        }
1252
1253        #[test]
1254        fn schema_version_mismatch_is_rejected() {
1255            let (_tmp, s, root) = setup();
1256            let mut proof = s.proof(root, b"a").unwrap();
1257            proof.manifest_schema_version = proof.manifest_schema_version.wrapping_add(1);
1258            assert!(!s.verify_proof(root, b"a", &proof));
1259        }
1260    }
1261}