Skip to main content

aura_agent/runtime/effects/
journal.rs

1use super::AuraEffectSystem;
2use async_trait::async_trait;
3use aura_core::effects::indexed;
4use aura_core::effects::{BloomFilter, IndexedJournalEffects, JournalEffects};
5use aura_core::{AuraError, AuthorityId, ContextId, FlowBudget, FlowCost, Journal};
6
7// Implementation of JournalEffects
8#[async_trait]
9impl JournalEffects for AuraEffectSystem {
10    async fn merge_facts(&self, target: Journal, delta: Journal) -> Result<Journal, AuraError> {
11        self.journal_handler().merge_facts(target, delta).await
12    }
13
14    async fn refine_caps(
15        &self,
16        target: Journal,
17        refinement: Journal,
18    ) -> Result<Journal, AuraError> {
19        self.journal_handler().refine_caps(target, refinement).await
20    }
21
22    async fn get_journal(&self) -> Result<Journal, AuraError> {
23        let handler = self.journal_handler();
24        self.journal
25            .get_or_load_journal(move || async move { handler.get_journal().await })
26            .await
27    }
28
29    async fn persist_journal(&self, journal: &Journal) -> Result<(), AuraError> {
30        // Persist the journal to storage
31        self.journal_handler().persist_journal(journal).await?;
32        self.journal.update_cached_journal(journal).await;
33
34        // Mirror only newly appended facts into the runtime index.
35        let ts_ms = self.time_handler.current_timestamp().await?;
36        let timestamp = aura_core::time::TimeStamp::PhysicalClock(aura_core::time::PhysicalTime {
37            ts_ms,
38            uncertainty: None,
39        });
40        self.journal
41            .index_new_journal_facts(journal, Some(self.authority_id), Some(timestamp));
42
43        Ok(())
44    }
45
46    async fn get_flow_budget(
47        &self,
48        _context: &ContextId,
49        _peer: &AuthorityId,
50    ) -> Result<FlowBudget, AuraError> {
51        self.journal_handler()
52            .get_flow_budget(_context, _peer)
53            .await
54    }
55
56    async fn update_flow_budget(
57        &self,
58        _context: &ContextId,
59        _peer: &AuthorityId,
60        budget: &FlowBudget,
61    ) -> Result<FlowBudget, AuraError> {
62        self.journal_handler()
63            .update_flow_budget(_context, _peer, budget)
64            .await
65    }
66
67    async fn charge_flow_budget(
68        &self,
69        _context: &ContextId,
70        _peer: &AuthorityId,
71        _cost: FlowCost,
72    ) -> Result<FlowBudget, AuraError> {
73        self.journal_handler()
74            .charge_flow_budget(_context, _peer, _cost)
75            .await
76    }
77}
78
79// Implementation of IndexedJournalEffects - provides B-tree indexes, Bloom filters, Merkle trees
80#[async_trait]
81impl IndexedJournalEffects for AuraEffectSystem {
82    fn watch_facts(&self) -> Box<dyn indexed::FactStreamReceiver> {
83        self.journal.indexed_journal().watch_facts()
84    }
85
86    async fn facts_by_predicate(
87        &self,
88        predicate: &str,
89    ) -> Result<Vec<indexed::IndexedFact>, AuraError> {
90        self.journal
91            .indexed_journal()
92            .facts_by_predicate(predicate)
93            .await
94    }
95
96    async fn facts_by_authority(
97        &self,
98        authority: &AuthorityId,
99    ) -> Result<Vec<indexed::IndexedFact>, AuraError> {
100        self.journal
101            .indexed_journal()
102            .facts_by_authority(authority)
103            .await
104    }
105
106    async fn facts_in_range(
107        &self,
108        start: aura_core::time::TimeStamp,
109        end: aura_core::time::TimeStamp,
110    ) -> Result<Vec<indexed::IndexedFact>, AuraError> {
111        self.journal
112            .indexed_journal()
113            .facts_in_range(start, end)
114            .await
115    }
116
117    async fn all_facts(&self) -> Result<Vec<indexed::IndexedFact>, AuraError> {
118        self.journal.indexed_journal().all_facts().await
119    }
120
121    fn might_contain(
122        &self,
123        predicate: &str,
124        value: &aura_core::domain::journal::FactValue,
125    ) -> bool {
126        self.journal
127            .indexed_journal()
128            .might_contain(predicate, value)
129    }
130
131    async fn merkle_root(&self) -> Result<[u8; 32], AuraError> {
132        self.journal.indexed_journal().merkle_root().await
133    }
134
135    async fn verify_fact_inclusion(&self, fact: &indexed::IndexedFact) -> Result<bool, AuraError> {
136        self.journal
137            .indexed_journal()
138            .verify_fact_inclusion(fact)
139            .await
140    }
141
142    async fn get_bloom_filter(&self) -> Result<BloomFilter, AuraError> {
143        self.journal.indexed_journal().get_bloom_filter().await
144    }
145
146    async fn index_stats(&self) -> Result<indexed::IndexStats, AuraError> {
147        self.journal.indexed_journal().index_stats().await
148    }
149}