Skip to main content

aura_sync/
verification.rs

1//! Merkle verification for journal synchronization
2//!
3//! Provides cryptographic verification of facts during synchronization.
4//! Integrates with the IndexedJournalEffects to verify fact integrity
5//! using Merkle trees and Bloom filters.
6//!
7//! # Architecture
8//!
9//! The verification system provides:
10//! - Merkle root comparison for quick sync status check
11//! - Bloom filter exchange for efficient set reconciliation
12//! - Fact-level verification using Merkle proofs
13//!
14//! # Usage
15//!
16//! ```rust,ignore
17//! use aura_sync::verification::MerkleVerifier;
18//! use aura_core::effects::indexed::IndexedJournalEffects;
19//! use aura_core::effects::time::PhysicalTimeEffects;
20//!
21//! let verifier = MerkleVerifier::new(indexed_journal, time_effects);
22//!
23//! // Check if local and remote journals are in sync
24//! let comparison = verifier.compare_roots(remote_root).await?;
25//!
26//! // Verify incoming facts
27//! let result = verifier.verify_incoming_facts(facts, claimed_root).await?;
28//! ```
29
30use aura_core::effects::indexed::{IndexStats, IndexedFact, IndexedJournalEffects};
31use aura_core::effects::time::PhysicalTimeEffects;
32use aura_core::effects::BloomFilter;
33use aura_core::time::TimeStamp;
34use aura_core::AuraError;
35use serde::{Deserialize, Serialize};
36use std::sync::Arc;
37
38/// Maximum allowed clock skew in milliseconds for timestamp validation
39const MAX_CLOCK_SKEW_MS: u64 = 300_000; // 5 minutes
40
41// =============================================================================
42// Types
43// =============================================================================
44
45/// Result of comparing Merkle roots between peers
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub enum MerkleComparison {
48    /// Roots match - journals are in sync
49    InSync,
50    /// Roots differ - reconciliation needed
51    NeedReconcile {
52        /// Local Merkle root
53        local_root: [u8; 32],
54        /// Remote Merkle root
55        remote_root: [u8; 32],
56    },
57}
58
59/// Verification result for a batch of facts
60#[derive(Debug, Clone)]
61pub struct VerificationResult {
62    /// Facts that passed verification
63    pub verified: Vec<IndexedFact>,
64    /// Facts that failed verification (with reasons)
65    pub rejected: Vec<(IndexedFact, String)>,
66    /// Local Merkle root after verification
67    pub merkle_root: [u8; 32],
68}
69
70/// Statistics about verification operations
71#[derive(Debug, Clone, Default, Serialize, Deserialize)]
72pub struct VerificationStats {
73    /// Total facts verified
74    pub total_verified: u64,
75    /// Total facts rejected
76    pub total_rejected: u64,
77    /// Root comparisons performed
78    pub root_comparisons: u64,
79    /// Comparisons where roots matched (in sync)
80    pub root_matches: u64,
81}
82
83// =============================================================================
84// MerkleVerifier
85// =============================================================================
86
87/// Merkle verification handler for sync operations
88///
89/// Provides cryptographic verification of facts using the local indexed journal's
90/// Merkle tree. Used to verify incoming facts during synchronization and to
91/// compare journal state between peers.
92pub struct MerkleVerifier {
93    /// Local indexed journal for verification operations
94    indexed_journal: Arc<dyn IndexedJournalEffects + Send + Sync>,
95    /// Time effects for timestamp validation
96    time: Arc<dyn PhysicalTimeEffects>,
97}
98
99impl MerkleVerifier {
100    /// Create a new MerkleVerifier with the given indexed journal and time effects
101    pub fn new(
102        indexed_journal: Arc<dyn IndexedJournalEffects + Send + Sync>,
103        time: Arc<dyn PhysicalTimeEffects>,
104    ) -> Self {
105        Self {
106            indexed_journal,
107            time,
108        }
109    }
110
111    /// Get local Merkle root for exchange with peer
112    ///
113    /// Returns the root of the local Merkle tree, which can be compared
114    /// with a remote peer's root to quickly determine if synchronization
115    /// is needed.
116    pub async fn local_merkle_root(&self) -> Result<[u8; 32], AuraError> {
117        self.indexed_journal.merkle_root().await
118    }
119
120    /// Get local Bloom filter for set reconciliation
121    ///
122    /// Returns a Bloom filter representing the local journal's facts.
123    /// Used to efficiently determine which facts need to be exchanged
124    /// during synchronization.
125    pub async fn local_bloom_filter(&self) -> Result<BloomFilter, AuraError> {
126        self.indexed_journal.get_bloom_filter().await
127    }
128
129    /// Compare local and remote Merkle roots
130    ///
131    /// This is the first step in the sync protocol - a fast O(1) check
132    /// to determine if synchronization is needed.
133    ///
134    /// # Returns
135    ///
136    /// - `InSync`: Roots match, no synchronization needed
137    /// - `NeedReconcile`: Roots differ, facts must be exchanged
138    pub async fn compare_roots(
139        &self,
140        remote_root: [u8; 32],
141    ) -> Result<MerkleComparison, AuraError> {
142        let local_root = self.local_merkle_root().await?;
143
144        if local_root == remote_root {
145            tracing::debug!("Merkle roots match - journals in sync");
146            Ok(MerkleComparison::InSync)
147        } else {
148            tracing::debug!(
149                local_root = ?hex::encode(local_root),
150                remote_root = ?hex::encode(remote_root),
151                "Merkle roots differ - reconciliation needed"
152            );
153            Ok(MerkleComparison::NeedReconcile {
154                local_root,
155                remote_root,
156            })
157        }
158    }
159
160    /// Verify incoming facts against the local Merkle tree
161    ///
162    /// This checks if each fact is consistent with our local state.
163    /// Facts that don't verify may be:
164    /// - New facts we don't have yet (valid - should merge)
165    /// - Tampered facts (invalid - should reject)
166    ///
167    /// # Arguments
168    ///
169    /// * `facts` - Facts received from peer to verify
170    /// * `claimed_root` - Merkle root claimed by the peer
171    ///
172    /// # Returns
173    ///
174    /// `VerificationResult` containing verified facts, rejected facts, and
175    /// the current local Merkle root.
176    pub async fn verify_incoming_facts(
177        &self,
178        facts: Vec<IndexedFact>,
179        _claimed_root: [u8; 32],
180    ) -> Result<VerificationResult, AuraError> {
181        let mut verified = Vec::new();
182        let mut rejected = Vec::new();
183
184        // Get current time from effect trait for timestamp validation
185        let now_ms = self
186            .time
187            .physical_time()
188            .await
189            .map(|t| t.ts_ms)
190            .unwrap_or(0);
191
192        for fact in facts {
193            // Check if fact is already in our index
194            match self.indexed_journal.verify_fact_inclusion(&fact).await {
195                Ok(is_included) => {
196                    if is_included {
197                        // Fact already exists and is verified in our tree
198                        tracing::trace!(
199                            fact_id = ?fact.id,
200                            "Fact already exists in local journal"
201                        );
202                        verified.push(fact);
203                    } else {
204                        // New fact - validate structure before accepting
205                        // Perform available validations:
206                        // 1. Timestamp consistency (fact shouldn't be too far in the future)
207                        // 2. Authority presence (if required by policy)
208                        // 3. Merkle proof verification (when proofs are provided)
209                        // 4. Signature verification (when signatures are provided)
210
211                        // Validate timestamp consistency
212                        if let Err(reason) = Self::validate_timestamp(&fact, now_ms) {
213                            tracing::warn!(
214                                fact_id = ?fact.id,
215                                reason = %reason,
216                                "Fact rejected: timestamp validation failed"
217                            );
218                            rejected.push((fact, reason));
219                            continue;
220                        }
221
222                        // Validate authority presence
223                        if let Err(reason) = Self::validate_authority(&fact) {
224                            tracing::warn!(
225                                fact_id = ?fact.id,
226                                reason = %reason,
227                                "Fact rejected: authority validation failed"
228                            );
229                            rejected.push((fact, reason));
230                            continue;
231                        }
232
233                        // NOTE: Merkle proof and signature verification require additional
234                        // infrastructure:
235                        // - IndexedFact would need to carry SimpleMerkleProof from the sender
236                        // - IndexedFact would need to carry authority signature
237                        // When these are available, add verification here.
238
239                        tracing::trace!(
240                            fact_id = ?fact.id,
241                            "New fact accepted for merge"
242                        );
243                        verified.push(fact);
244                    }
245                }
246                Err(e) => {
247                    // Verification error - reject the fact
248                    tracing::warn!(
249                        fact_id = ?fact.id,
250                        error = %e,
251                        "Fact verification failed"
252                    );
253                    rejected.push((fact, format!("Verification error: {e}")));
254                }
255            }
256        }
257
258        let merkle_root = self.local_merkle_root().await?;
259
260        tracing::debug!(
261            verified_count = verified.len(),
262            rejected_count = rejected.len(),
263            merkle_root = ?hex::encode(merkle_root),
264            "Fact verification complete"
265        );
266
267        Ok(VerificationResult {
268            verified,
269            rejected,
270            merkle_root,
271        })
272    }
273
274    /// Validate timestamp consistency for an incoming fact
275    ///
276    /// Checks that the fact's timestamp is not too far in the future,
277    /// which would indicate clock skew or potential manipulation.
278    ///
279    /// # Arguments
280    ///
281    /// * `fact` - The fact to validate
282    /// * `now_ms` - Current time in milliseconds (from effect trait)
283    ///
284    /// # Returns
285    /// - `Ok(())` if timestamp is valid or not present
286    /// - `Err(reason)` if timestamp is too far in the future
287    fn validate_timestamp(fact: &IndexedFact, now_ms: u64) -> Result<(), String> {
288        let Some(timestamp) = &fact.timestamp else {
289            // No timestamp is acceptable for facts that don't require it
290            return Ok(());
291        };
292
293        // Extract physical time if available
294        let fact_time_ms = match timestamp {
295            TimeStamp::PhysicalClock(physical) => physical.ts_ms,
296            TimeStamp::Range(range) => {
297                // For ranges, check the latest time (most permissive)
298                range.latest_ms()
299            }
300            // Logical and Order clocks don't have physical time semantics
301            TimeStamp::LogicalClock(_) | TimeStamp::OrderClock(_) => return Ok(()),
302        };
303
304        // Reject if timestamp is too far in the future
305        if fact_time_ms > now_ms + MAX_CLOCK_SKEW_MS {
306            return Err(format!(
307                "Timestamp {fact_time_ms} is too far in the future (current time: {now_ms}, max skew: {MAX_CLOCK_SKEW_MS}ms)"
308            ));
309        }
310
311        Ok(())
312    }
313
314    /// Validate authority presence for an incoming fact
315    ///
316    /// Facts must have an associated authority that created them.
317    /// This provides accountability and enables signature verification.
318    ///
319    /// # Returns
320    /// - `Ok(())` if authority is present
321    /// - `Err(reason)` if authority is missing
322    fn validate_authority(fact: &IndexedFact) -> Result<(), String> {
323        if fact.authority.is_none() {
324            tracing::warn!(
325                fact_id = ?fact.id,
326                "Rejecting fact without authority - all facts must have an associated authority"
327            );
328            return Err(format!(
329                "Fact {:?} rejected: authority is required for all facts",
330                fact.id
331            ));
332        }
333        Ok(())
334    }
335
336    /// Get index statistics for monitoring
337    ///
338    /// Returns statistics about the indexed journal including
339    /// fact counts, index sizes, and Bloom filter configuration.
340    pub async fn stats(&self) -> Result<IndexStats, AuraError> {
341        self.indexed_journal.index_stats().await
342    }
343}
344
345// =============================================================================
346// Tests
347// =============================================================================
348
349#[cfg(test)]
350mod tests {
351    use super::*;
352    use async_trait::async_trait;
353    use aura_core::domain::journal::FactValue;
354    use aura_core::effects::indexed::{FactId, FactStreamReceiver};
355    use aura_core::effects::BloomConfig;
356    use aura_core::effects::TimeError;
357    use aura_core::time::PhysicalTime;
358    use aura_core::AuthorityId;
359    use std::sync::Mutex;
360
361    /// Fixed time for deterministic tests
362    const TEST_TIME_MS: u64 = 1_700_000_000_000;
363
364    /// Mock time effects for testing
365    struct MockTimeEffects {
366        now_ms: u64,
367    }
368
369    impl MockTimeEffects {
370        fn new(now_ms: u64) -> Arc<Self> {
371            Arc::new(Self { now_ms })
372        }
373    }
374
375    #[async_trait]
376    impl PhysicalTimeEffects for MockTimeEffects {
377        async fn physical_time(&self) -> Result<PhysicalTime, TimeError> {
378            Ok(PhysicalTime {
379                ts_ms: self.now_ms,
380                uncertainty: None,
381            })
382        }
383
384        async fn sleep_ms(&self, _ms: u64) -> Result<(), TimeError> {
385            Ok(())
386        }
387    }
388
389    /// Mock indexed journal for testing
390    struct MockIndexedJournal {
391        root: Mutex<[u8; 32]>,
392        facts: Mutex<Vec<IndexedFact>>,
393    }
394
395    impl MockIndexedJournal {
396        fn new(root: [u8; 32]) -> Self {
397            Self {
398                root: Mutex::new(root),
399                facts: Mutex::new(Vec::new()),
400            }
401        }
402
403        fn with_facts(root: [u8; 32], facts: Vec<IndexedFact>) -> Self {
404            Self {
405                root: Mutex::new(root),
406                facts: Mutex::new(facts),
407            }
408        }
409    }
410
411    #[async_trait]
412    impl IndexedJournalEffects for MockIndexedJournal {
413        fn watch_facts(&self) -> Box<dyn FactStreamReceiver> {
414            panic!("Not implemented for mock")
415        }
416
417        async fn facts_by_predicate(
418            &self,
419            _predicate: &str,
420        ) -> Result<Vec<IndexedFact>, AuraError> {
421            Ok(Vec::new())
422        }
423
424        async fn facts_by_authority(
425            &self,
426            _authority: &AuthorityId,
427        ) -> Result<Vec<IndexedFact>, AuraError> {
428            Ok(Vec::new())
429        }
430
431        async fn facts_in_range(
432            &self,
433            _start: aura_core::time::TimeStamp,
434            _end: aura_core::time::TimeStamp,
435        ) -> Result<Vec<IndexedFact>, AuraError> {
436            Ok(Vec::new())
437        }
438
439        async fn all_facts(&self) -> Result<Vec<IndexedFact>, AuraError> {
440            Ok(self.facts.lock().unwrap().clone())
441        }
442
443        fn might_contain(&self, _predicate: &str, _value: &FactValue) -> bool {
444            false
445        }
446
447        async fn merkle_root(&self) -> Result<[u8; 32], AuraError> {
448            Ok(*self.root.lock().unwrap())
449        }
450
451        async fn verify_fact_inclusion(&self, fact: &IndexedFact) -> Result<bool, AuraError> {
452            let facts = self.facts.lock().unwrap();
453            Ok(facts.iter().any(|f| f.id == fact.id))
454        }
455
456        async fn get_bloom_filter(&self) -> Result<BloomFilter, AuraError> {
457            BloomFilter::new(BloomConfig::for_sync(100))
458        }
459
460        async fn index_stats(&self) -> Result<IndexStats, AuraError> {
461            let facts = self.facts.lock().unwrap();
462            Ok(IndexStats {
463                fact_count: facts.len() as u64,
464                predicate_count: 1,
465                authority_count: 1,
466                bloom_fp_rate: 0.01,
467                merkle_depth: 10,
468            })
469        }
470    }
471
472    fn create_test_fact(id: u64) -> IndexedFact {
473        IndexedFact {
474            id: FactId(id),
475            predicate: "test".to_string(),
476            value: FactValue::String("test_value".to_string()),
477            authority: Some(AuthorityId::new_from_entropy([id as u8; 32])),
478            timestamp: None,
479        }
480    }
481
482    #[tokio::test]
483    async fn test_compare_roots_in_sync() {
484        let root = [1u8; 32];
485        let journal = Arc::new(MockIndexedJournal::new(root));
486        let time = MockTimeEffects::new(TEST_TIME_MS);
487        let verifier = MerkleVerifier::new(journal, time);
488
489        let result = verifier.compare_roots(root).await.unwrap();
490        assert_eq!(result, MerkleComparison::InSync);
491    }
492
493    #[tokio::test]
494    async fn test_compare_roots_need_reconcile() {
495        let local_root = [1u8; 32];
496        let remote_root = [2u8; 32];
497        let journal = Arc::new(MockIndexedJournal::new(local_root));
498        let time = MockTimeEffects::new(TEST_TIME_MS);
499        let verifier = MerkleVerifier::new(journal, time);
500
501        let result = verifier.compare_roots(remote_root).await.unwrap();
502        assert_eq!(
503            result,
504            MerkleComparison::NeedReconcile {
505                local_root,
506                remote_root
507            }
508        );
509    }
510
511    #[tokio::test]
512    async fn test_verify_existing_facts() {
513        let root = [1u8; 32];
514        let existing_fact = create_test_fact(1);
515        let journal = Arc::new(MockIndexedJournal::with_facts(
516            root,
517            vec![existing_fact.clone()],
518        ));
519        let time = MockTimeEffects::new(TEST_TIME_MS);
520        let verifier = MerkleVerifier::new(journal, time);
521
522        let result = verifier
523            .verify_incoming_facts(vec![existing_fact], root)
524            .await
525            .unwrap();
526
527        assert_eq!(result.verified.len(), 1);
528        assert!(result.rejected.is_empty());
529    }
530
531    #[tokio::test]
532    async fn test_verify_new_facts() {
533        let root = [1u8; 32];
534        let journal = Arc::new(MockIndexedJournal::new(root));
535        let time = MockTimeEffects::new(TEST_TIME_MS);
536        let verifier = MerkleVerifier::new(journal, time);
537
538        let new_fact = create_test_fact(99);
539        let result = verifier
540            .verify_incoming_facts(vec![new_fact], root)
541            .await
542            .unwrap();
543
544        // New facts are accepted for merge
545        assert_eq!(result.verified.len(), 1);
546        assert!(result.rejected.is_empty());
547    }
548
549    #[tokio::test]
550    async fn test_stats() {
551        let root = [1u8; 32];
552        let facts = vec![
553            create_test_fact(1),
554            create_test_fact(2),
555            create_test_fact(3),
556        ];
557        let journal = Arc::new(MockIndexedJournal::with_facts(root, facts));
558        let time = MockTimeEffects::new(TEST_TIME_MS);
559        let verifier = MerkleVerifier::new(journal, time);
560
561        let stats = verifier.stats().await.unwrap();
562        assert_eq!(stats.fact_count, 3);
563    }
564}