Skip to main content

aura_core/effects/
query.rs

1//! Query Effect Traits
2//!
3//! Algebraic effects for executing Datalog queries against the journal.
4//! Integrates with Biscuit authorization and reactive signal subscriptions.
5//!
6//! # Effect Classification
7//!
8//! - **Category**: Application Effect
9//! - **Implementation**: `aura-effects` (Layer 3)
10//! - **Dependencies**: JournalEffects, AuthorizationEffects
11//!
12//! # Architecture
13//!
14//! QueryEffects bridges the gap between:
15//! - **Journal**: CRDT fact storage
16//! - **Datalog**: Query language for facts
17//! - **Biscuit**: Authorization for query execution
18//! - **Reactive**: Signal subscriptions for live queries
19//!
20//! ```text
21//! Query::to_datalog() → DatalogProgram
22//!        ↓
23//! QueryEffects::query() → Check Biscuit capabilities
24//!        ↓
25//! Execute Datalog against journal facts
26//!        ↓
27//! Query::parse() → Typed result
28//! ```
29
30use async_trait::async_trait;
31use serde::{Deserialize, Serialize};
32
33use crate::domain::ConsistencyMap;
34use crate::effects::reactive::SignalStream;
35use crate::effects::reactive::{ReactiveEffects, ReactiveError, Signal, SignalId};
36use crate::query::{
37    ConsensusId, DatalogBindings, Query, QueryCapability, QueryIsolation, QueryParseError,
38    QueryStats,
39};
40
41// ─────────────────────────────────────────────────────────────────────────────
42// Error Types
43// ─────────────────────────────────────────────────────────────────────────────
44
45/// Error type for query operations
46#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)]
47pub enum QueryError {
48    /// Authorization failed
49    #[error("Query authorization failed: {reason}")]
50    AuthorizationFailed { reason: String },
51
52    /// Missing required capability
53    #[error("Missing capability: {capability}")]
54    MissingCapability { capability: String },
55
56    /// Datalog execution error
57    #[error("Datalog execution error: {reason}")]
58    ExecutionError { reason: String },
59
60    /// Result parsing error
61    #[error("Failed to parse query results: {0}")]
62    ParseError(#[from] QueryParseError),
63
64    /// Query not found (for subscriptions)
65    #[error("Query subscription not found: {query_id}")]
66    SubscriptionNotFound { query_id: String },
67
68    /// Journal access error
69    #[error("Journal access error: {reason}")]
70    JournalError { reason: String },
71
72    /// Handler not available
73    #[error("Query handler not available")]
74    HandlerUnavailable,
75
76    /// Internal error
77    #[error("Internal query error: {reason}")]
78    Internal { reason: String },
79
80    /// Consensus wait timeout
81    #[error("Timed out waiting for consensus: {consensus_id:?}")]
82    ConsensusTimeout { consensus_id: ConsensusId },
83
84    /// Snapshot not available (garbage collected)
85    #[error("Snapshot not available: prestate {prestate_hash:?} has been garbage collected")]
86    SnapshotNotAvailable { prestate_hash: crate::Hash32 },
87
88    /// Isolation level not supported
89    #[error("Isolation level not supported: {reason}")]
90    IsolationNotSupported { reason: String },
91}
92
93impl QueryError {
94    /// Create an authorization failed error
95    pub fn authorization_failed(reason: impl Into<String>) -> Self {
96        Self::AuthorizationFailed {
97            reason: reason.into(),
98        }
99    }
100
101    /// Create a missing capability error
102    pub fn missing_capability(cap: &QueryCapability) -> Self {
103        Self::MissingCapability {
104            capability: format!("{}:{}", cap.resource, cap.action),
105        }
106    }
107
108    /// Create an execution error
109    pub fn execution_error(reason: impl Into<String>) -> Self {
110        Self::ExecutionError {
111            reason: reason.into(),
112        }
113    }
114
115    /// Create a journal error
116    pub fn journal_error(reason: impl Into<String>) -> Self {
117        Self::JournalError {
118            reason: reason.into(),
119        }
120    }
121
122    /// Create an internal error
123    pub fn internal(reason: impl Into<String>) -> Self {
124        Self::Internal {
125            reason: reason.into(),
126        }
127    }
128
129    /// Create a consensus timeout error
130    pub fn consensus_timeout(consensus_id: ConsensusId) -> Self {
131        Self::ConsensusTimeout { consensus_id }
132    }
133
134    /// Create a snapshot not available error
135    pub fn snapshot_not_available(prestate_hash: crate::Hash32) -> Self {
136        Self::SnapshotNotAvailable { prestate_hash }
137    }
138
139    /// Create an isolation not supported error
140    pub fn isolation_not_supported(reason: impl Into<String>) -> Self {
141        Self::IsolationNotSupported {
142            reason: reason.into(),
143        }
144    }
145}
146
147// ─────────────────────────────────────────────────────────────────────────────
148// Query Effects Trait
149// ─────────────────────────────────────────────────────────────────────────────
150
151/// Effects for executing typed Datalog queries.
152///
153/// This trait provides the read interface to the journal through Datalog queries.
154/// All queries:
155/// 1. Compile to Datalog programs via `Query::to_datalog()`
156/// 2. Are authorized via Biscuit capabilities
157/// 3. Execute against journal facts
158/// 4. Parse results to typed values
159///
160/// # Example
161///
162/// ```ignore
163/// use aura_core::effects::QueryEffects;
164/// use aura_app::queries::ChannelsQuery;
165///
166/// // One-shot query
167/// let channels = handler.query(&ChannelsQuery::default()).await?;
168///
169/// // Live subscription (re-evaluates when facts change)
170/// let mut stream = handler.subscribe(&ChannelsQuery::default());
171/// while let Some(channels) = stream.recv().await {
172///     println!("Channels updated: {} total", channels.len());
173/// }
174/// ```
175#[async_trait]
176pub trait QueryEffects: Send + Sync {
177    /// Execute a one-shot query.
178    ///
179    /// This compiles the query to Datalog, checks authorization,
180    /// executes against the journal, and parses the results.
181    ///
182    /// # Errors
183    ///
184    /// - `QueryError::AuthorizationFailed` if capability check fails
185    /// - `QueryError::ExecutionError` if Datalog execution fails
186    /// - `QueryError::ParseError` if result parsing fails
187    async fn query<Q: Query>(&self, query: &Q) -> Result<Q::Result, QueryError>;
188
189    /// Execute a raw Datalog program and return bindings.
190    ///
191    /// Lower-level API for executing arbitrary Datalog without typed parsing.
192    /// Useful for debugging or dynamic queries.
193    async fn query_raw(
194        &self,
195        program: &crate::query::DatalogProgram,
196    ) -> Result<DatalogBindings, QueryError>;
197
198    /// Subscribe to a query for live updates.
199    ///
200    /// Returns a stream that re-evaluates the query whenever facts
201    /// matching the query's `dependencies()` change.
202    ///
203    /// The stream yields new results after each relevant fact change.
204    fn subscribe<Q: Query>(&self, query: &Q) -> QuerySubscription<Q::Result>;
205
206    /// Check if a query's capabilities are satisfied.
207    ///
208    /// Can be used to pre-check authorization before execution.
209    async fn check_capabilities(&self, capabilities: &[QueryCapability]) -> Result<(), QueryError>;
210
211    /// Invalidate cached results for queries matching the given predicate.
212    ///
213    /// Called when facts change to trigger re-evaluation of subscriptions.
214    async fn invalidate(&self, predicate: &crate::query::FactPredicate);
215
216    /// Execute a query with a specific isolation level.
217    ///
218    /// Allows specifying consistency requirements for the query.
219    /// See `QueryIsolation` for available levels.
220    ///
221    /// # Example
222    ///
223    /// ```ignore
224    /// // Wait for specific consensus before querying
225    /// let result = handler.query_with_isolation(
226    ///     &ChannelsQuery::default(),
227    ///     QueryIsolation::ReadCommitted { wait_for: vec![consensus_id] },
228    /// ).await?;
229    /// ```
230    async fn query_with_isolation<Q: Query>(
231        &self,
232        query: &Q,
233        isolation: QueryIsolation,
234    ) -> Result<Q::Result, QueryError>;
235
236    /// Execute a query and return results with execution statistics.
237    ///
238    /// Useful for debugging, profiling, and optimization. Returns both
239    /// the query results and metadata about the execution.
240    ///
241    /// # Example
242    ///
243    /// ```ignore
244    /// let (channels, stats) = handler.query_with_stats(&ChannelsQuery::default()).await?;
245    /// println!("Query took {:?}, scanned {} facts", stats.execution_time, stats.facts_scanned);
246    /// ```
247    async fn query_with_stats<Q: Query>(
248        &self,
249        query: &Q,
250    ) -> Result<(Q::Result, QueryStats), QueryError>;
251
252    /// Execute a query and return results with consistency metadata.
253    ///
254    /// Returns both the query results and a `ConsistencyMap` containing
255    /// the consistency status (agreement, propagation, acknowledgment)
256    /// for each matched fact.
257    ///
258    /// # Example
259    ///
260    /// ```ignore
261    /// let (messages, consistency) = handler.query_with_consistency(&MessagesQuery::default()).await?;
262    /// for msg in &messages {
263    ///     if consistency.is_finalized(&msg.id) {
264    ///         println!("{}: finalized", msg.content);
265    ///     } else {
266    ///         println!("{}: pending", msg.content);
267    ///     }
268    /// }
269    /// ```
270    async fn query_with_consistency<Q: Query>(
271        &self,
272        query: &Q,
273    ) -> Result<(Q::Result, ConsistencyMap), QueryError>;
274
275    /// Execute a query with both isolation level and statistics.
276    ///
277    /// Combines `query_with_isolation` and `query_with_stats`.
278    async fn query_full<Q: Query>(
279        &self,
280        query: &Q,
281        isolation: QueryIsolation,
282    ) -> Result<(Q::Result, QueryStats), QueryError>;
283
284    /// Register a query binding for reactive refresh.
285    ///
286    /// Implementations should store the query/signal mapping and emit
287    /// the initial query result.
288    async fn register_query_binding<Q: Query>(
289        &self,
290        signal: &Signal<Q::Result>,
291        query: Q,
292    ) -> Result<(), QueryError>;
293}
294
295/// Convenience extension trait to register query-bound signals via QueryEffects.
296#[allow(async_fn_in_trait)]
297pub trait QuerySignalEffects: QueryEffects + ReactiveEffects {
298    /// Register a reactive signal that is bound to a query.
299    async fn register_query_signal<Q: Query>(
300        &self,
301        signal: &Signal<Q::Result>,
302        query: Q,
303    ) -> Result<(), ReactiveError> {
304        let query_clone = query.clone();
305        self.register_query(signal, query).await?;
306        self.register_query_binding(signal, query_clone)
307            .await
308            .map_err(|e| ReactiveError::Internal {
309                reason: e.to_string(),
310            })?;
311        Ok(())
312    }
313
314    /// Read query dependencies for a signal id.
315    fn query_dependencies_for(
316        &self,
317        signal_id: &SignalId,
318    ) -> Option<Vec<crate::query::FactPredicate>> {
319        self.query_dependencies(signal_id)
320    }
321}
322
323impl<T: QueryEffects + ReactiveEffects> QuerySignalEffects for T {}
324
325// ─────────────────────────────────────────────────────────────────────────────
326// Query Subscription
327// ─────────────────────────────────────────────────────────────────────────────
328
329/// A subscription to query results that updates when facts change.
330///
331/// QuerySubscription wraps a SignalStream but provides query-specific semantics.
332/// Results are re-evaluated and emitted when underlying facts change.
333pub struct QuerySubscription<T: Clone + Send + 'static> {
334    /// Underlying signal stream
335    stream: SignalStream<T>,
336    /// Query ID for debugging
337    query_id: String,
338}
339
340impl<T: Clone + Send + 'static> QuerySubscription<T> {
341    /// Create a new query subscription
342    pub fn new(stream: SignalStream<T>, query_id: impl Into<String>) -> Self {
343        Self {
344            stream,
345            query_id: query_id.into(),
346        }
347    }
348
349    /// Get the query ID
350    pub fn query_id(&self) -> &str {
351        &self.query_id
352    }
353
354    /// Try to receive the next result without blocking
355    pub fn try_recv(&mut self) -> Option<T> {
356        self.stream.try_recv()
357    }
358
359    /// Receive the next result, waiting if necessary
360    pub async fn recv(&mut self) -> Result<T, QueryError> {
361        self.stream.recv().await.map_err(|e| QueryError::Internal {
362            reason: e.to_string(),
363        })
364    }
365}
366
367// ─────────────────────────────────────────────────────────────────────────────
368// Blanket Implementations
369// ─────────────────────────────────────────────────────────────────────────────
370
371use std::sync::Arc;
372
373/// Blanket implementation for Arc<T> where T: QueryEffects
374#[async_trait]
375impl<T: QueryEffects + ?Sized> QueryEffects for Arc<T> {
376    async fn query<Q: Query>(&self, query: &Q) -> Result<Q::Result, QueryError> {
377        (**self).query(query).await
378    }
379
380    async fn query_raw(
381        &self,
382        program: &crate::query::DatalogProgram,
383    ) -> Result<DatalogBindings, QueryError> {
384        (**self).query_raw(program).await
385    }
386
387    fn subscribe<Q: Query>(&self, query: &Q) -> QuerySubscription<Q::Result> {
388        (**self).subscribe(query)
389    }
390
391    async fn check_capabilities(&self, capabilities: &[QueryCapability]) -> Result<(), QueryError> {
392        (**self).check_capabilities(capabilities).await
393    }
394
395    async fn invalidate(&self, predicate: &crate::query::FactPredicate) {
396        (**self).invalidate(predicate).await;
397    }
398
399    async fn query_with_isolation<Q: Query>(
400        &self,
401        query: &Q,
402        isolation: QueryIsolation,
403    ) -> Result<Q::Result, QueryError> {
404        (**self).query_with_isolation(query, isolation).await
405    }
406
407    async fn query_with_stats<Q: Query>(
408        &self,
409        query: &Q,
410    ) -> Result<(Q::Result, QueryStats), QueryError> {
411        (**self).query_with_stats(query).await
412    }
413
414    async fn query_with_consistency<Q: Query>(
415        &self,
416        query: &Q,
417    ) -> Result<(Q::Result, ConsistencyMap), QueryError> {
418        (**self).query_with_consistency(query).await
419    }
420
421    async fn query_full<Q: Query>(
422        &self,
423        query: &Q,
424        isolation: QueryIsolation,
425    ) -> Result<(Q::Result, QueryStats), QueryError> {
426        (**self).query_full(query, isolation).await
427    }
428
429    async fn register_query_binding<Q: Query>(
430        &self,
431        signal: &Signal<Q::Result>,
432        query: Q,
433    ) -> Result<(), QueryError> {
434        (**self).register_query_binding(signal, query).await
435    }
436}
437
438// ─────────────────────────────────────────────────────────────────────────────
439// Tests
440// ─────────────────────────────────────────────────────────────────────────────
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    #[test]
447    fn test_query_error_display() {
448        let err = QueryError::authorization_failed("insufficient permissions");
449        assert!(err.to_string().contains("authorization"));
450
451        let cap = QueryCapability::read("channels");
452        let err = QueryError::missing_capability(&cap);
453        assert!(err.to_string().contains("channels:read"));
454    }
455
456    #[test]
457    fn test_query_error_from_parse_error() {
458        let parse_err = QueryParseError::MissingField {
459            field: "id".to_string(),
460        };
461        let query_err: QueryError = parse_err.into();
462        assert!(matches!(query_err, QueryError::ParseError(_)));
463    }
464}