aura_agent/database/
handler.rs1use super::authority_index::AuthorityIndex;
8use super::merkle::build_merkle_tree;
9use super::stream::TokioFactStreamReceiver;
10use async_trait::async_trait;
11use aura_core::{
12 domain::journal::FactValue,
13 effects::indexed::{FactId, FactStreamReceiver, IndexedFact},
14 effects::{BloomConfig, BloomFilter, IndexStats, IndexedJournalEffects},
15 time::TimeStamp,
16 types::identifiers::AuthorityId,
17 AuraError,
18};
19use parking_lot::RwLock;
20use std::collections::HashSet;
21
22pub struct IndexedJournalHandler {
43 state: RwLock<DatabaseState>,
45 pub(crate) fact_updates: tokio::sync::broadcast::Sender<Vec<IndexedFact>>,
47}
48
49#[derive(Debug)]
50struct DatabaseState {
51 index: AuthorityIndex,
52 bloom_filter: BloomFilter,
53 merkle_root_cache: Option<[u8; 32]>,
54 fact_hashes: HashSet<[u8; 32]>,
55}
56
57impl DatabaseState {
58 #[allow(dead_code)] fn validate(&self) -> Result<(), crate::runtime::services::invariant::InvariantViolation> {
60 if self.bloom_filter.element_count < self.fact_hashes.len() as u64 {
61 return Err(
62 crate::runtime::services::invariant::InvariantViolation::new(
63 "IndexedJournal",
64 format!(
65 "bloom filter count {} below fact hash count {}",
66 self.bloom_filter.element_count,
67 self.fact_hashes.len()
68 ),
69 ),
70 );
71 }
72 Ok(())
73 }
74}
75
76impl IndexedJournalHandler {
77 pub fn new() -> Self {
79 Self::with_capacity(10000)
80 }
81
82 pub fn with_capacity(expected_elements: u64) -> Self {
84 let bloom_config = BloomConfig::optimal(expected_elements, 0.01);
85 let bloom_filter = BloomFilter::new(bloom_config).expect("Failed to create bloom filter");
86
87 let (fact_updates_tx, _) = tokio::sync::broadcast::channel(100);
89
90 Self {
91 state: RwLock::new(DatabaseState {
92 index: AuthorityIndex::new(),
93 bloom_filter,
94 merkle_root_cache: None,
95 fact_hashes: HashSet::new(),
96 }),
97 fact_updates: fact_updates_tx,
98 }
99 }
100
101 fn with_state<R>(&self, op: impl FnOnce(&DatabaseState) -> R) -> R {
102 let guard = self.state.read();
103 op(&guard)
104 }
105
106 fn with_state_mut<R>(&self, op: impl FnOnce(&mut DatabaseState) -> R) -> R {
107 let mut guard = self.state.write();
108 let result = op(&mut guard);
109 #[cfg(debug_assertions)]
110 {
111 if let Err(message) = guard.validate() {
112 tracing::error!(%message, "IndexedJournalHandler state invariant violated");
113 debug_assert!(
114 false,
115 "IndexedJournalHandler invariant violated: {}",
116 message
117 );
118 }
119 }
120 result
121 }
122
123 pub fn add_fact(
125 &self,
126 predicate: String,
127 value: FactValue,
128 authority: Option<AuthorityId>,
129 timestamp: Option<TimeStamp>,
130 ) -> FactId {
131 let element = self.fact_to_bytes(&predicate, &value);
132 let fact_hash = aura_core::hash::hash(&element);
133
134 let (id, fact_to_broadcast) = self.with_state_mut(|state| {
135 let id = state
137 .index
138 .insert(predicate.clone(), value.clone(), authority, timestamp);
139
140 for i in 0..state.bloom_filter.config.num_hash_functions {
143 let i = i as u64;
144 let hash_bytes = {
145 let mut hasher = aura_core::hash::hasher();
146 hasher.update(&i.to_le_bytes());
147 hasher.update(&element);
148 hasher.finalize().to_vec()
149 };
150 let mut hash_u64_bytes = [0u8; 8];
151 hash_u64_bytes.copy_from_slice(&hash_bytes[..8]);
152 let hash_value = u64::from_le_bytes(hash_u64_bytes);
153
154 let bit_index = hash_value % state.bloom_filter.config.bit_vector_size;
155 let byte_index = (bit_index / 8) as usize;
156 let bit_offset = (bit_index % 8) as u8;
157
158 if byte_index < state.bloom_filter.bits.len() {
159 state.bloom_filter.bits[byte_index] |= 1u8 << bit_offset;
160 }
161 }
162 state.bloom_filter.element_count += 1;
163
164 state.fact_hashes.insert(fact_hash);
166
167 state.merkle_root_cache = None;
169
170 let fact = state.index.facts.get(&id).cloned();
172 (id, fact)
173 });
174
175 if let Some(fact) = fact_to_broadcast {
176 let _ = self.fact_updates.send(vec![fact]);
179 };
180
181 id
182 }
183
184 pub(crate) fn fact_to_bytes(&self, predicate: &str, value: &FactValue) -> Vec<u8> {
186 let mut bytes = Vec::new();
187 bytes.extend_from_slice(predicate.as_bytes());
188 bytes.push(0); match value {
190 FactValue::String(s) => {
191 bytes.push(0);
192 bytes.extend_from_slice(s.as_bytes());
193 }
194 FactValue::Number(n) => {
195 bytes.push(1);
196 bytes.extend_from_slice(&n.to_le_bytes());
197 }
198 FactValue::Bytes(b) => {
199 bytes.push(2);
200 bytes.extend_from_slice(b);
201 }
202 FactValue::Set(s) => {
203 bytes.push(3);
204 for item in s {
205 bytes.extend_from_slice(item.as_bytes());
206 bytes.push(0);
207 }
208 }
209 FactValue::Nested(nested_fact) => {
210 bytes.push(4);
211 if let Ok(serialized) = aura_core::util::serialization::to_vec(nested_fact.as_ref())
214 {
215 let hash = aura_core::hash::hash(&serialized);
216 bytes.extend_from_slice(&hash);
217 }
218 }
219 }
220 bytes
221 }
222
223 pub(crate) fn bloom_check(&self, predicate: &str, value: &FactValue) -> bool {
225 let element = self.fact_to_bytes(predicate, value);
226 self.with_state(|state| {
227 let filter = &state.bloom_filter;
228 for i in 0..filter.config.num_hash_functions {
229 let i = i as u64;
230 let hash_bytes = {
231 let mut hasher = aura_core::hash::hasher();
232 hasher.update(&i.to_le_bytes());
233 hasher.update(&element);
234 hasher.finalize().to_vec()
235 };
236 let mut hash_u64_bytes = [0u8; 8];
237 hash_u64_bytes.copy_from_slice(&hash_bytes[..8]);
238 let hash_value = u64::from_le_bytes(hash_u64_bytes);
239
240 let bit_index = hash_value % filter.config.bit_vector_size;
241 let byte_index = (bit_index / 8) as usize;
242 let bit_offset = (bit_index % 8) as u8;
243
244 if byte_index >= filter.bits.len()
245 || (filter.bits[byte_index] & (1u8 << bit_offset)) == 0
246 {
247 return false;
248 }
249 }
250 true
251 })
252 }
253
254 pub(crate) fn compute_merkle_root(&self) -> [u8; 32] {
256 self.with_state_mut(|state| {
257 if let Some(root) = state.merkle_root_cache {
258 return root;
259 }
260
261 let hashes: Vec<[u8; 32]> = state.fact_hashes.iter().copied().collect();
262 let root = if let Some(tree) = build_merkle_tree(hashes) {
263 tree.hash
264 } else {
265 [0u8; 32]
266 };
267
268 state.merkle_root_cache = Some(root);
269 root
270 })
271 }
272}
273
274impl Default for IndexedJournalHandler {
275 fn default() -> Self {
276 Self::new()
277 }
278}
279
280impl std::fmt::Debug for IndexedJournalHandler {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 let (fact_count, predicate_count, authority_count) = self.with_state(|state| {
283 (
284 state.index.facts.len(),
285 state.index.by_predicate.len(),
286 state.index.by_authority.len(),
287 )
288 });
289 f.debug_struct("IndexedJournalHandler")
290 .field("fact_count", &fact_count)
291 .field("predicate_count", &predicate_count)
292 .field("authority_count", &authority_count)
293 .finish()
294 }
295}
296
297#[async_trait]
298impl IndexedJournalEffects for IndexedJournalHandler {
299 fn watch_facts(&self) -> Box<dyn FactStreamReceiver> {
300 Box::new(TokioFactStreamReceiver::new(self.fact_updates.subscribe()))
301 }
302
303 async fn facts_by_predicate(&self, predicate: &str) -> Result<Vec<IndexedFact>, AuraError> {
304 Ok(self.with_state(|state| state.index.get_by_predicate(predicate)))
305 }
306
307 async fn facts_by_authority(
308 &self,
309 authority: &AuthorityId,
310 ) -> Result<Vec<IndexedFact>, AuraError> {
311 Ok(self.with_state(|state| state.index.get_by_authority(authority)))
312 }
313
314 async fn facts_in_range(
315 &self,
316 start: TimeStamp,
317 end: TimeStamp,
318 ) -> Result<Vec<IndexedFact>, AuraError> {
319 Ok(self.with_state(|state| state.index.get_in_range(&start, &end)))
320 }
321
322 async fn all_facts(&self) -> Result<Vec<IndexedFact>, AuraError> {
323 Ok(self.with_state(|state| state.index.facts.values().cloned().collect()))
324 }
325
326 fn might_contain(&self, predicate: &str, value: &FactValue) -> bool {
327 self.bloom_check(predicate, value)
328 }
329
330 async fn merkle_root(&self) -> Result<[u8; 32], AuraError> {
331 Ok(self.compute_merkle_root())
332 }
333
334 async fn verify_fact_inclusion(&self, fact: &IndexedFact) -> Result<bool, AuraError> {
335 let element = self.fact_to_bytes(&fact.predicate, &fact.value);
336 let fact_hash = aura_core::hash::hash(&element);
337 Ok(self.with_state(|state| state.fact_hashes.contains(&fact_hash)))
338 }
339
340 async fn get_bloom_filter(&self) -> Result<BloomFilter, AuraError> {
341 Ok(self.with_state(|state| state.bloom_filter.clone()))
342 }
343
344 async fn index_stats(&self) -> Result<IndexStats, AuraError> {
345 let (stats, filter) =
346 self.with_state(|state| (state.index.stats(), state.bloom_filter.clone()));
347 let mut stats = stats;
348 let _set_bits: u64 = filter.bits.iter().map(|b| b.count_ones() as u64).sum();
351 let m = filter.config.bit_vector_size as f64;
352 let k = filter.config.num_hash_functions as f64;
353 let n = filter.element_count as f64;
354
355 if n > 0.0 {
356 let fill_ratio: f64 = (-k * n / m).exp();
358 stats.bloom_fp_rate = (1.0_f64 - fill_ratio).powf(k);
359 }
360
361 Ok(stats)
362 }
363}