aura_agent/runtime/effects/
journal.rs1use super::AuraEffectSystem;
2use async_trait::async_trait;
3use aura_core::effects::indexed;
4use aura_core::effects::{BloomFilter, IndexedJournalEffects, JournalEffects};
5use aura_core::{AuraError, AuthorityId, ContextId, FlowBudget, FlowCost, Journal};
6
7#[async_trait]
9impl JournalEffects for AuraEffectSystem {
10 async fn merge_facts(&self, target: Journal, delta: Journal) -> Result<Journal, AuraError> {
11 self.journal_handler().merge_facts(target, delta).await
12 }
13
14 async fn refine_caps(
15 &self,
16 target: Journal,
17 refinement: Journal,
18 ) -> Result<Journal, AuraError> {
19 self.journal_handler().refine_caps(target, refinement).await
20 }
21
22 async fn get_journal(&self) -> Result<Journal, AuraError> {
23 let handler = self.journal_handler();
24 self.journal
25 .get_or_load_journal(move || async move { handler.get_journal().await })
26 .await
27 }
28
29 async fn persist_journal(&self, journal: &Journal) -> Result<(), AuraError> {
30 self.journal_handler().persist_journal(journal).await?;
32 self.journal.update_cached_journal(journal).await;
33
34 let ts_ms = self.time_handler.current_timestamp().await?;
36 let timestamp = aura_core::time::TimeStamp::PhysicalClock(aura_core::time::PhysicalTime {
37 ts_ms,
38 uncertainty: None,
39 });
40 self.journal
41 .index_new_journal_facts(journal, Some(self.authority_id), Some(timestamp));
42
43 Ok(())
44 }
45
46 async fn get_flow_budget(
47 &self,
48 _context: &ContextId,
49 _peer: &AuthorityId,
50 ) -> Result<FlowBudget, AuraError> {
51 self.journal_handler()
52 .get_flow_budget(_context, _peer)
53 .await
54 }
55
56 async fn update_flow_budget(
57 &self,
58 _context: &ContextId,
59 _peer: &AuthorityId,
60 budget: &FlowBudget,
61 ) -> Result<FlowBudget, AuraError> {
62 self.journal_handler()
63 .update_flow_budget(_context, _peer, budget)
64 .await
65 }
66
67 async fn charge_flow_budget(
68 &self,
69 _context: &ContextId,
70 _peer: &AuthorityId,
71 _cost: FlowCost,
72 ) -> Result<FlowBudget, AuraError> {
73 self.journal_handler()
74 .charge_flow_budget(_context, _peer, _cost)
75 .await
76 }
77}
78
79#[async_trait]
81impl IndexedJournalEffects for AuraEffectSystem {
82 fn watch_facts(&self) -> Box<dyn indexed::FactStreamReceiver> {
83 self.journal.indexed_journal().watch_facts()
84 }
85
86 async fn facts_by_predicate(
87 &self,
88 predicate: &str,
89 ) -> Result<Vec<indexed::IndexedFact>, AuraError> {
90 self.journal
91 .indexed_journal()
92 .facts_by_predicate(predicate)
93 .await
94 }
95
96 async fn facts_by_authority(
97 &self,
98 authority: &AuthorityId,
99 ) -> Result<Vec<indexed::IndexedFact>, AuraError> {
100 self.journal
101 .indexed_journal()
102 .facts_by_authority(authority)
103 .await
104 }
105
106 async fn facts_in_range(
107 &self,
108 start: aura_core::time::TimeStamp,
109 end: aura_core::time::TimeStamp,
110 ) -> Result<Vec<indexed::IndexedFact>, AuraError> {
111 self.journal
112 .indexed_journal()
113 .facts_in_range(start, end)
114 .await
115 }
116
117 async fn all_facts(&self) -> Result<Vec<indexed::IndexedFact>, AuraError> {
118 self.journal.indexed_journal().all_facts().await
119 }
120
121 fn might_contain(
122 &self,
123 predicate: &str,
124 value: &aura_core::domain::journal::FactValue,
125 ) -> bool {
126 self.journal
127 .indexed_journal()
128 .might_contain(predicate, value)
129 }
130
131 async fn merkle_root(&self) -> Result<[u8; 32], AuraError> {
132 self.journal.indexed_journal().merkle_root().await
133 }
134
135 async fn verify_fact_inclusion(&self, fact: &indexed::IndexedFact) -> Result<bool, AuraError> {
136 self.journal
137 .indexed_journal()
138 .verify_fact_inclusion(fact)
139 .await
140 }
141
142 async fn get_bloom_filter(&self) -> Result<BloomFilter, AuraError> {
143 self.journal.indexed_journal().get_bloom_filter().await
144 }
145
146 async fn index_stats(&self) -> Result<indexed::IndexStats, AuraError> {
147 self.journal.indexed_journal().index_stats().await
148 }
149}