Skip to main content

aura_agent/database/
handler.rs

1//! Production indexed journal handler.
2//!
3//! Provides efficient O(log n) lookups using B-tree indexes,
4//! O(1) membership testing using Bloom filters, and
5//! cryptographic verification of integrity using Merkle trees.
6
7use super::authority_index::AuthorityIndex;
8use super::merkle::build_merkle_tree;
9use super::stream::TokioFactStreamReceiver;
10use async_trait::async_trait;
11use aura_core::{
12    domain::journal::FactValue,
13    effects::indexed::{FactId, FactStreamReceiver, IndexedFact},
14    effects::{BloomConfig, BloomFilter, IndexStats, IndexedJournalEffects},
15    time::TimeStamp,
16    types::identifiers::AuthorityId,
17    AuraError,
18};
19use parking_lot::RwLock;
20use std::collections::HashSet;
21
22/// Production indexed journal handler
23///
24/// Provides efficient O(log n) lookups using B-tree indexes,
25/// O(1) membership testing using Bloom filters, and
26/// cryptographic verification or integrity using Merkle trees.
27///
28/// # Performance Guarantees
29///
30/// - `facts_by_predicate`: O(log n + k) where k is result size
31/// - `facts_by_authority`: O(log n + k) where k is result size
32/// - `facts_in_range`: O(log n + k) where k is result size
33/// - `might_contain`: O(1) with <1% false positive rate
34/// - `merkle_root`: O(n) on first call, O(1) cached
35/// - `verify_fact_inclusion`: O(log n)
36///
37/// # Concurrency Model
38///
39/// Uses `parking_lot::RwLock` for synchronous locking of the state bundle.
40/// Each operation acquires and releases the lock in sub-millisecond time.
41/// See module documentation for scale expectations and alternatives.
42pub struct IndexedJournalHandler {
43    /// Database handler state (index, bloom, merkle cache)
44    state: RwLock<DatabaseState>,
45    /// Broadcast channel for streaming fact updates to subscribers
46    pub(crate) fact_updates: tokio::sync::broadcast::Sender<Vec<IndexedFact>>,
47}
48
49#[derive(Debug)]
50struct DatabaseState {
51    index: AuthorityIndex,
52    bloom_filter: BloomFilter,
53    merkle_root_cache: Option<[u8; 32]>,
54    fact_hashes: HashSet<[u8; 32]>,
55}
56
57impl DatabaseState {
58    #[allow(dead_code)] // For use with with_state_mut_validated
59    fn validate(&self) -> Result<(), crate::runtime::services::invariant::InvariantViolation> {
60        if self.bloom_filter.element_count < self.fact_hashes.len() as u64 {
61            return Err(
62                crate::runtime::services::invariant::InvariantViolation::new(
63                    "IndexedJournal",
64                    format!(
65                        "bloom filter count {} below fact hash count {}",
66                        self.bloom_filter.element_count,
67                        self.fact_hashes.len()
68                    ),
69                ),
70            );
71        }
72        Ok(())
73    }
74}
75
76impl IndexedJournalHandler {
77    /// Create a new indexed journal handler with default configuration
78    pub fn new() -> Self {
79        Self::with_capacity(10000)
80    }
81
82    /// Create a new indexed journal handler with specified expected capacity
83    pub fn with_capacity(expected_elements: u64) -> Self {
84        let bloom_config = BloomConfig::optimal(expected_elements, 0.01);
85        let bloom_filter = BloomFilter::new(bloom_config).expect("Failed to create bloom filter");
86
87        // Create broadcast channel for fact streaming (capacity: 100 batches)
88        let (fact_updates_tx, _) = tokio::sync::broadcast::channel(100);
89
90        Self {
91            state: RwLock::new(DatabaseState {
92                index: AuthorityIndex::new(),
93                bloom_filter,
94                merkle_root_cache: None,
95                fact_hashes: HashSet::new(),
96            }),
97            fact_updates: fact_updates_tx,
98        }
99    }
100
101    fn with_state<R>(&self, op: impl FnOnce(&DatabaseState) -> R) -> R {
102        let guard = self.state.read();
103        op(&guard)
104    }
105
106    fn with_state_mut<R>(&self, op: impl FnOnce(&mut DatabaseState) -> R) -> R {
107        let mut guard = self.state.write();
108        let result = op(&mut guard);
109        #[cfg(debug_assertions)]
110        {
111            if let Err(message) = guard.validate() {
112                tracing::error!(%message, "IndexedJournalHandler state invariant violated");
113                debug_assert!(
114                    false,
115                    "IndexedJournalHandler invariant violated: {}",
116                    message
117                );
118            }
119        }
120        result
121    }
122
123    /// Add a fact to the index
124    pub fn add_fact(
125        &self,
126        predicate: String,
127        value: FactValue,
128        authority: Option<AuthorityId>,
129        timestamp: Option<TimeStamp>,
130    ) -> FactId {
131        let element = self.fact_to_bytes(&predicate, &value);
132        let fact_hash = aura_core::hash::hash(&element);
133
134        let (id, fact_to_broadcast) = self.with_state_mut(|state| {
135            // Insert into B-tree indexes
136            let id = state
137                .index
138                .insert(predicate.clone(), value.clone(), authority, timestamp);
139
140            // Insert into Bloom filter
141            // Direct bloom filter insertion (synchronous for lock-based access)
142            for i in 0..state.bloom_filter.config.num_hash_functions {
143                let i = i as u64;
144                let hash_bytes = {
145                    let mut hasher = aura_core::hash::hasher();
146                    hasher.update(&i.to_le_bytes());
147                    hasher.update(&element);
148                    hasher.finalize().to_vec()
149                };
150                let mut hash_u64_bytes = [0u8; 8];
151                hash_u64_bytes.copy_from_slice(&hash_bytes[..8]);
152                let hash_value = u64::from_le_bytes(hash_u64_bytes);
153
154                let bit_index = hash_value % state.bloom_filter.config.bit_vector_size;
155                let byte_index = (bit_index / 8) as usize;
156                let bit_offset = (bit_index % 8) as u8;
157
158                if byte_index < state.bloom_filter.bits.len() {
159                    state.bloom_filter.bits[byte_index] |= 1u8 << bit_offset;
160                }
161            }
162            state.bloom_filter.element_count += 1;
163
164            // Add to Merkle tree hashes
165            state.fact_hashes.insert(fact_hash);
166
167            // Invalidate Merkle cache
168            state.merkle_root_cache = None;
169
170            // Retrieve the fact we just added for broadcasting.
171            let fact = state.index.facts.get(&id).cloned();
172            (id, fact)
173        });
174
175        if let Some(fact) = fact_to_broadcast {
176            // Send as a batch of one fact
177            // Ignore send errors (happens when there are no subscribers)
178            let _ = self.fact_updates.send(vec![fact]);
179        };
180
181        id
182    }
183
184    /// Convert a fact to bytes for hashing
185    pub(crate) fn fact_to_bytes(&self, predicate: &str, value: &FactValue) -> Vec<u8> {
186        let mut bytes = Vec::new();
187        bytes.extend_from_slice(predicate.as_bytes());
188        bytes.push(0); // separator
189        match value {
190            FactValue::String(s) => {
191                bytes.push(0);
192                bytes.extend_from_slice(s.as_bytes());
193            }
194            FactValue::Number(n) => {
195                bytes.push(1);
196                bytes.extend_from_slice(&n.to_le_bytes());
197            }
198            FactValue::Bytes(b) => {
199                bytes.push(2);
200                bytes.extend_from_slice(b);
201            }
202            FactValue::Set(s) => {
203                bytes.push(3);
204                for item in s {
205                    bytes.extend_from_slice(item.as_bytes());
206                    bytes.push(0);
207                }
208            }
209            FactValue::Nested(nested_fact) => {
210                bytes.push(4);
211                // Hash the nested fact for a stable representation
212                // Fact derives Serialize so we can use canonical serialization
213                if let Ok(serialized) = aura_core::util::serialization::to_vec(nested_fact.as_ref())
214                {
215                    let hash = aura_core::hash::hash(&serialized);
216                    bytes.extend_from_slice(&hash);
217                }
218            }
219        }
220        bytes
221    }
222
223    /// Check if a fact might be contained (using Bloom filter)
224    pub(crate) fn bloom_check(&self, predicate: &str, value: &FactValue) -> bool {
225        let element = self.fact_to_bytes(predicate, value);
226        self.with_state(|state| {
227            let filter = &state.bloom_filter;
228            for i in 0..filter.config.num_hash_functions {
229                let i = i as u64;
230                let hash_bytes = {
231                    let mut hasher = aura_core::hash::hasher();
232                    hasher.update(&i.to_le_bytes());
233                    hasher.update(&element);
234                    hasher.finalize().to_vec()
235                };
236                let mut hash_u64_bytes = [0u8; 8];
237                hash_u64_bytes.copy_from_slice(&hash_bytes[..8]);
238                let hash_value = u64::from_le_bytes(hash_u64_bytes);
239
240                let bit_index = hash_value % filter.config.bit_vector_size;
241                let byte_index = (bit_index / 8) as usize;
242                let bit_offset = (bit_index % 8) as u8;
243
244                if byte_index >= filter.bits.len()
245                    || (filter.bits[byte_index] & (1u8 << bit_offset)) == 0
246                {
247                    return false;
248                }
249            }
250            true
251        })
252    }
253
254    /// Compute or retrieve cached Merkle root
255    pub(crate) fn compute_merkle_root(&self) -> [u8; 32] {
256        self.with_state_mut(|state| {
257            if let Some(root) = state.merkle_root_cache {
258                return root;
259            }
260
261            let hashes: Vec<[u8; 32]> = state.fact_hashes.iter().copied().collect();
262            let root = if let Some(tree) = build_merkle_tree(hashes) {
263                tree.hash
264            } else {
265                [0u8; 32]
266            };
267
268            state.merkle_root_cache = Some(root);
269            root
270        })
271    }
272}
273
274impl Default for IndexedJournalHandler {
275    fn default() -> Self {
276        Self::new()
277    }
278}
279
280impl std::fmt::Debug for IndexedJournalHandler {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        let (fact_count, predicate_count, authority_count) = self.with_state(|state| {
283            (
284                state.index.facts.len(),
285                state.index.by_predicate.len(),
286                state.index.by_authority.len(),
287            )
288        });
289        f.debug_struct("IndexedJournalHandler")
290            .field("fact_count", &fact_count)
291            .field("predicate_count", &predicate_count)
292            .field("authority_count", &authority_count)
293            .finish()
294    }
295}
296
297#[async_trait]
298impl IndexedJournalEffects for IndexedJournalHandler {
299    fn watch_facts(&self) -> Box<dyn FactStreamReceiver> {
300        Box::new(TokioFactStreamReceiver::new(self.fact_updates.subscribe()))
301    }
302
303    async fn facts_by_predicate(&self, predicate: &str) -> Result<Vec<IndexedFact>, AuraError> {
304        Ok(self.with_state(|state| state.index.get_by_predicate(predicate)))
305    }
306
307    async fn facts_by_authority(
308        &self,
309        authority: &AuthorityId,
310    ) -> Result<Vec<IndexedFact>, AuraError> {
311        Ok(self.with_state(|state| state.index.get_by_authority(authority)))
312    }
313
314    async fn facts_in_range(
315        &self,
316        start: TimeStamp,
317        end: TimeStamp,
318    ) -> Result<Vec<IndexedFact>, AuraError> {
319        Ok(self.with_state(|state| state.index.get_in_range(&start, &end)))
320    }
321
322    async fn all_facts(&self) -> Result<Vec<IndexedFact>, AuraError> {
323        Ok(self.with_state(|state| state.index.facts.values().cloned().collect()))
324    }
325
326    fn might_contain(&self, predicate: &str, value: &FactValue) -> bool {
327        self.bloom_check(predicate, value)
328    }
329
330    async fn merkle_root(&self) -> Result<[u8; 32], AuraError> {
331        Ok(self.compute_merkle_root())
332    }
333
334    async fn verify_fact_inclusion(&self, fact: &IndexedFact) -> Result<bool, AuraError> {
335        let element = self.fact_to_bytes(&fact.predicate, &fact.value);
336        let fact_hash = aura_core::hash::hash(&element);
337        Ok(self.with_state(|state| state.fact_hashes.contains(&fact_hash)))
338    }
339
340    async fn get_bloom_filter(&self) -> Result<BloomFilter, AuraError> {
341        Ok(self.with_state(|state| state.bloom_filter.clone()))
342    }
343
344    async fn index_stats(&self) -> Result<IndexStats, AuraError> {
345        let (stats, filter) =
346            self.with_state(|state| (state.index.stats(), state.bloom_filter.clone()));
347        let mut stats = stats;
348        // Estimate false positive rate based on filter fill ratio
349        // Note: set_bits could be used for more accurate FP estimation in the future
350        let _set_bits: u64 = filter.bits.iter().map(|b| b.count_ones() as u64).sum();
351        let m = filter.config.bit_vector_size as f64;
352        let k = filter.config.num_hash_functions as f64;
353        let n = filter.element_count as f64;
354
355        if n > 0.0 {
356            // FP rate ≈ (1 - e^(-kn/m))^k
357            let fill_ratio: f64 = (-k * n / m).exp();
358            stats.bloom_fp_rate = (1.0_f64 - fill_ratio).powf(k);
359        }
360
361        Ok(stats)
362    }
363}