1use aura_core::effects::indexed::{IndexStats, IndexedFact, IndexedJournalEffects};
31use aura_core::effects::time::PhysicalTimeEffects;
32use aura_core::effects::BloomFilter;
33use aura_core::time::TimeStamp;
34use aura_core::AuraError;
35use serde::{Deserialize, Serialize};
36use std::sync::Arc;
37
38const MAX_CLOCK_SKEW_MS: u64 = 300_000; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub enum MerkleComparison {
48 InSync,
50 NeedReconcile {
52 local_root: [u8; 32],
54 remote_root: [u8; 32],
56 },
57}
58
59#[derive(Debug, Clone)]
61pub struct VerificationResult {
62 pub verified: Vec<IndexedFact>,
64 pub rejected: Vec<(IndexedFact, String)>,
66 pub merkle_root: [u8; 32],
68}
69
70#[derive(Debug, Clone, Default, Serialize, Deserialize)]
72pub struct VerificationStats {
73 pub total_verified: u64,
75 pub total_rejected: u64,
77 pub root_comparisons: u64,
79 pub root_matches: u64,
81}
82
83pub struct MerkleVerifier {
93 indexed_journal: Arc<dyn IndexedJournalEffects + Send + Sync>,
95 time: Arc<dyn PhysicalTimeEffects>,
97}
98
99impl MerkleVerifier {
100 pub fn new(
102 indexed_journal: Arc<dyn IndexedJournalEffects + Send + Sync>,
103 time: Arc<dyn PhysicalTimeEffects>,
104 ) -> Self {
105 Self {
106 indexed_journal,
107 time,
108 }
109 }
110
111 pub async fn local_merkle_root(&self) -> Result<[u8; 32], AuraError> {
117 self.indexed_journal.merkle_root().await
118 }
119
120 pub async fn local_bloom_filter(&self) -> Result<BloomFilter, AuraError> {
126 self.indexed_journal.get_bloom_filter().await
127 }
128
129 pub async fn compare_roots(
139 &self,
140 remote_root: [u8; 32],
141 ) -> Result<MerkleComparison, AuraError> {
142 let local_root = self.local_merkle_root().await?;
143
144 if local_root == remote_root {
145 tracing::debug!("Merkle roots match - journals in sync");
146 Ok(MerkleComparison::InSync)
147 } else {
148 tracing::debug!(
149 local_root = ?hex::encode(local_root),
150 remote_root = ?hex::encode(remote_root),
151 "Merkle roots differ - reconciliation needed"
152 );
153 Ok(MerkleComparison::NeedReconcile {
154 local_root,
155 remote_root,
156 })
157 }
158 }
159
160 pub async fn verify_incoming_facts(
177 &self,
178 facts: Vec<IndexedFact>,
179 _claimed_root: [u8; 32],
180 ) -> Result<VerificationResult, AuraError> {
181 let mut verified = Vec::new();
182 let mut rejected = Vec::new();
183
184 let now_ms = self
186 .time
187 .physical_time()
188 .await
189 .map(|t| t.ts_ms)
190 .unwrap_or(0);
191
192 for fact in facts {
193 match self.indexed_journal.verify_fact_inclusion(&fact).await {
195 Ok(is_included) => {
196 if is_included {
197 tracing::trace!(
199 fact_id = ?fact.id,
200 "Fact already exists in local journal"
201 );
202 verified.push(fact);
203 } else {
204 if let Err(reason) = Self::validate_timestamp(&fact, now_ms) {
213 tracing::warn!(
214 fact_id = ?fact.id,
215 reason = %reason,
216 "Fact rejected: timestamp validation failed"
217 );
218 rejected.push((fact, reason));
219 continue;
220 }
221
222 if let Err(reason) = Self::validate_authority(&fact) {
224 tracing::warn!(
225 fact_id = ?fact.id,
226 reason = %reason,
227 "Fact rejected: authority validation failed"
228 );
229 rejected.push((fact, reason));
230 continue;
231 }
232
233 tracing::trace!(
240 fact_id = ?fact.id,
241 "New fact accepted for merge"
242 );
243 verified.push(fact);
244 }
245 }
246 Err(e) => {
247 tracing::warn!(
249 fact_id = ?fact.id,
250 error = %e,
251 "Fact verification failed"
252 );
253 rejected.push((fact, format!("Verification error: {e}")));
254 }
255 }
256 }
257
258 let merkle_root = self.local_merkle_root().await?;
259
260 tracing::debug!(
261 verified_count = verified.len(),
262 rejected_count = rejected.len(),
263 merkle_root = ?hex::encode(merkle_root),
264 "Fact verification complete"
265 );
266
267 Ok(VerificationResult {
268 verified,
269 rejected,
270 merkle_root,
271 })
272 }
273
274 fn validate_timestamp(fact: &IndexedFact, now_ms: u64) -> Result<(), String> {
288 let Some(timestamp) = &fact.timestamp else {
289 return Ok(());
291 };
292
293 let fact_time_ms = match timestamp {
295 TimeStamp::PhysicalClock(physical) => physical.ts_ms,
296 TimeStamp::Range(range) => {
297 range.latest_ms()
299 }
300 TimeStamp::LogicalClock(_) | TimeStamp::OrderClock(_) => return Ok(()),
302 };
303
304 if fact_time_ms > now_ms + MAX_CLOCK_SKEW_MS {
306 return Err(format!(
307 "Timestamp {fact_time_ms} is too far in the future (current time: {now_ms}, max skew: {MAX_CLOCK_SKEW_MS}ms)"
308 ));
309 }
310
311 Ok(())
312 }
313
314 fn validate_authority(fact: &IndexedFact) -> Result<(), String> {
323 if fact.authority.is_none() {
324 tracing::warn!(
325 fact_id = ?fact.id,
326 "Rejecting fact without authority - all facts must have an associated authority"
327 );
328 return Err(format!(
329 "Fact {:?} rejected: authority is required for all facts",
330 fact.id
331 ));
332 }
333 Ok(())
334 }
335
336 pub async fn stats(&self) -> Result<IndexStats, AuraError> {
341 self.indexed_journal.index_stats().await
342 }
343}
344
345#[cfg(test)]
350mod tests {
351 use super::*;
352 use async_trait::async_trait;
353 use aura_core::domain::journal::FactValue;
354 use aura_core::effects::indexed::{FactId, FactStreamReceiver};
355 use aura_core::effects::BloomConfig;
356 use aura_core::effects::TimeError;
357 use aura_core::time::PhysicalTime;
358 use aura_core::AuthorityId;
359 use std::sync::Mutex;
360
361 const TEST_TIME_MS: u64 = 1_700_000_000_000;
363
364 struct MockTimeEffects {
366 now_ms: u64,
367 }
368
369 impl MockTimeEffects {
370 fn new(now_ms: u64) -> Arc<Self> {
371 Arc::new(Self { now_ms })
372 }
373 }
374
375 #[async_trait]
376 impl PhysicalTimeEffects for MockTimeEffects {
377 async fn physical_time(&self) -> Result<PhysicalTime, TimeError> {
378 Ok(PhysicalTime {
379 ts_ms: self.now_ms,
380 uncertainty: None,
381 })
382 }
383
384 async fn sleep_ms(&self, _ms: u64) -> Result<(), TimeError> {
385 Ok(())
386 }
387 }
388
389 struct MockIndexedJournal {
391 root: Mutex<[u8; 32]>,
392 facts: Mutex<Vec<IndexedFact>>,
393 }
394
395 impl MockIndexedJournal {
396 fn new(root: [u8; 32]) -> Self {
397 Self {
398 root: Mutex::new(root),
399 facts: Mutex::new(Vec::new()),
400 }
401 }
402
403 fn with_facts(root: [u8; 32], facts: Vec<IndexedFact>) -> Self {
404 Self {
405 root: Mutex::new(root),
406 facts: Mutex::new(facts),
407 }
408 }
409 }
410
411 #[async_trait]
412 impl IndexedJournalEffects for MockIndexedJournal {
413 fn watch_facts(&self) -> Box<dyn FactStreamReceiver> {
414 panic!("Not implemented for mock")
415 }
416
417 async fn facts_by_predicate(
418 &self,
419 _predicate: &str,
420 ) -> Result<Vec<IndexedFact>, AuraError> {
421 Ok(Vec::new())
422 }
423
424 async fn facts_by_authority(
425 &self,
426 _authority: &AuthorityId,
427 ) -> Result<Vec<IndexedFact>, AuraError> {
428 Ok(Vec::new())
429 }
430
431 async fn facts_in_range(
432 &self,
433 _start: aura_core::time::TimeStamp,
434 _end: aura_core::time::TimeStamp,
435 ) -> Result<Vec<IndexedFact>, AuraError> {
436 Ok(Vec::new())
437 }
438
439 async fn all_facts(&self) -> Result<Vec<IndexedFact>, AuraError> {
440 Ok(self.facts.lock().unwrap().clone())
441 }
442
443 fn might_contain(&self, _predicate: &str, _value: &FactValue) -> bool {
444 false
445 }
446
447 async fn merkle_root(&self) -> Result<[u8; 32], AuraError> {
448 Ok(*self.root.lock().unwrap())
449 }
450
451 async fn verify_fact_inclusion(&self, fact: &IndexedFact) -> Result<bool, AuraError> {
452 let facts = self.facts.lock().unwrap();
453 Ok(facts.iter().any(|f| f.id == fact.id))
454 }
455
456 async fn get_bloom_filter(&self) -> Result<BloomFilter, AuraError> {
457 BloomFilter::new(BloomConfig::for_sync(100))
458 }
459
460 async fn index_stats(&self) -> Result<IndexStats, AuraError> {
461 let facts = self.facts.lock().unwrap();
462 Ok(IndexStats {
463 fact_count: facts.len() as u64,
464 predicate_count: 1,
465 authority_count: 1,
466 bloom_fp_rate: 0.01,
467 merkle_depth: 10,
468 })
469 }
470 }
471
472 fn create_test_fact(id: u64) -> IndexedFact {
473 IndexedFact {
474 id: FactId(id),
475 predicate: "test".to_string(),
476 value: FactValue::String("test_value".to_string()),
477 authority: Some(AuthorityId::new_from_entropy([id as u8; 32])),
478 timestamp: None,
479 }
480 }
481
482 #[tokio::test]
483 async fn test_compare_roots_in_sync() {
484 let root = [1u8; 32];
485 let journal = Arc::new(MockIndexedJournal::new(root));
486 let time = MockTimeEffects::new(TEST_TIME_MS);
487 let verifier = MerkleVerifier::new(journal, time);
488
489 let result = verifier.compare_roots(root).await.unwrap();
490 assert_eq!(result, MerkleComparison::InSync);
491 }
492
493 #[tokio::test]
494 async fn test_compare_roots_need_reconcile() {
495 let local_root = [1u8; 32];
496 let remote_root = [2u8; 32];
497 let journal = Arc::new(MockIndexedJournal::new(local_root));
498 let time = MockTimeEffects::new(TEST_TIME_MS);
499 let verifier = MerkleVerifier::new(journal, time);
500
501 let result = verifier.compare_roots(remote_root).await.unwrap();
502 assert_eq!(
503 result,
504 MerkleComparison::NeedReconcile {
505 local_root,
506 remote_root
507 }
508 );
509 }
510
511 #[tokio::test]
512 async fn test_verify_existing_facts() {
513 let root = [1u8; 32];
514 let existing_fact = create_test_fact(1);
515 let journal = Arc::new(MockIndexedJournal::with_facts(
516 root,
517 vec![existing_fact.clone()],
518 ));
519 let time = MockTimeEffects::new(TEST_TIME_MS);
520 let verifier = MerkleVerifier::new(journal, time);
521
522 let result = verifier
523 .verify_incoming_facts(vec![existing_fact], root)
524 .await
525 .unwrap();
526
527 assert_eq!(result.verified.len(), 1);
528 assert!(result.rejected.is_empty());
529 }
530
531 #[tokio::test]
532 async fn test_verify_new_facts() {
533 let root = [1u8; 32];
534 let journal = Arc::new(MockIndexedJournal::new(root));
535 let time = MockTimeEffects::new(TEST_TIME_MS);
536 let verifier = MerkleVerifier::new(journal, time);
537
538 let new_fact = create_test_fact(99);
539 let result = verifier
540 .verify_incoming_facts(vec![new_fact], root)
541 .await
542 .unwrap();
543
544 assert_eq!(result.verified.len(), 1);
546 assert!(result.rejected.is_empty());
547 }
548
549 #[tokio::test]
550 async fn test_stats() {
551 let root = [1u8; 32];
552 let facts = vec![
553 create_test_fact(1),
554 create_test_fact(2),
555 create_test_fact(3),
556 ];
557 let journal = Arc::new(MockIndexedJournal::with_facts(root, facts));
558 let time = MockTimeEffects::new(TEST_TIME_MS);
559 let verifier = MerkleVerifier::new(journal, time);
560
561 let stats = verifier.stats().await.unwrap();
562 assert_eq!(stats.fact_count, 3);
563 }
564}