aura_core/effects/query.rs
1//! Query Effect Traits
2//!
3//! Algebraic effects for executing Datalog queries against the journal.
4//! Integrates with Biscuit authorization and reactive signal subscriptions.
5//!
6//! # Effect Classification
7//!
8//! - **Category**: Application Effect
9//! - **Implementation**: `aura-effects` (Layer 3)
10//! - **Dependencies**: JournalEffects, AuthorizationEffects
11//!
12//! # Architecture
13//!
14//! QueryEffects bridges the gap between:
15//! - **Journal**: CRDT fact storage
16//! - **Datalog**: Query language for facts
17//! - **Biscuit**: Authorization for query execution
18//! - **Reactive**: Signal subscriptions for live queries
19//!
20//! ```text
21//! Query::to_datalog() → DatalogProgram
22//! ↓
23//! QueryEffects::query() → Check Biscuit capabilities
24//! ↓
25//! Execute Datalog against journal facts
26//! ↓
27//! Query::parse() → Typed result
28//! ```
29
30use async_trait::async_trait;
31use serde::{Deserialize, Serialize};
32
33use crate::domain::ConsistencyMap;
34use crate::effects::reactive::SignalStream;
35use crate::effects::reactive::{ReactiveEffects, ReactiveError, Signal, SignalId};
36use crate::query::{
37 ConsensusId, DatalogBindings, Query, QueryCapability, QueryIsolation, QueryParseError,
38 QueryStats,
39};
40
41// ─────────────────────────────────────────────────────────────────────────────
42// Error Types
43// ─────────────────────────────────────────────────────────────────────────────
44
45/// Error type for query operations
46#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)]
47pub enum QueryError {
48 /// Authorization failed
49 #[error("Query authorization failed: {reason}")]
50 AuthorizationFailed { reason: String },
51
52 /// Missing required capability
53 #[error("Missing capability: {capability}")]
54 MissingCapability { capability: String },
55
56 /// Datalog execution error
57 #[error("Datalog execution error: {reason}")]
58 ExecutionError { reason: String },
59
60 /// Result parsing error
61 #[error("Failed to parse query results: {0}")]
62 ParseError(#[from] QueryParseError),
63
64 /// Query not found (for subscriptions)
65 #[error("Query subscription not found: {query_id}")]
66 SubscriptionNotFound { query_id: String },
67
68 /// Journal access error
69 #[error("Journal access error: {reason}")]
70 JournalError { reason: String },
71
72 /// Handler not available
73 #[error("Query handler not available")]
74 HandlerUnavailable,
75
76 /// Internal error
77 #[error("Internal query error: {reason}")]
78 Internal { reason: String },
79
80 /// Consensus wait timeout
81 #[error("Timed out waiting for consensus: {consensus_id:?}")]
82 ConsensusTimeout { consensus_id: ConsensusId },
83
84 /// Snapshot not available (garbage collected)
85 #[error("Snapshot not available: prestate {prestate_hash:?} has been garbage collected")]
86 SnapshotNotAvailable { prestate_hash: crate::Hash32 },
87
88 /// Isolation level not supported
89 #[error("Isolation level not supported: {reason}")]
90 IsolationNotSupported { reason: String },
91}
92
93impl QueryError {
94 /// Create an authorization failed error
95 pub fn authorization_failed(reason: impl Into<String>) -> Self {
96 Self::AuthorizationFailed {
97 reason: reason.into(),
98 }
99 }
100
101 /// Create a missing capability error
102 pub fn missing_capability(cap: &QueryCapability) -> Self {
103 Self::MissingCapability {
104 capability: format!("{}:{}", cap.resource, cap.action),
105 }
106 }
107
108 /// Create an execution error
109 pub fn execution_error(reason: impl Into<String>) -> Self {
110 Self::ExecutionError {
111 reason: reason.into(),
112 }
113 }
114
115 /// Create a journal error
116 pub fn journal_error(reason: impl Into<String>) -> Self {
117 Self::JournalError {
118 reason: reason.into(),
119 }
120 }
121
122 /// Create an internal error
123 pub fn internal(reason: impl Into<String>) -> Self {
124 Self::Internal {
125 reason: reason.into(),
126 }
127 }
128
129 /// Create a consensus timeout error
130 pub fn consensus_timeout(consensus_id: ConsensusId) -> Self {
131 Self::ConsensusTimeout { consensus_id }
132 }
133
134 /// Create a snapshot not available error
135 pub fn snapshot_not_available(prestate_hash: crate::Hash32) -> Self {
136 Self::SnapshotNotAvailable { prestate_hash }
137 }
138
139 /// Create an isolation not supported error
140 pub fn isolation_not_supported(reason: impl Into<String>) -> Self {
141 Self::IsolationNotSupported {
142 reason: reason.into(),
143 }
144 }
145}
146
147// ─────────────────────────────────────────────────────────────────────────────
148// Query Effects Trait
149// ─────────────────────────────────────────────────────────────────────────────
150
151/// Effects for executing typed Datalog queries.
152///
153/// This trait provides the read interface to the journal through Datalog queries.
154/// All queries:
155/// 1. Compile to Datalog programs via `Query::to_datalog()`
156/// 2. Are authorized via Biscuit capabilities
157/// 3. Execute against journal facts
158/// 4. Parse results to typed values
159///
160/// # Example
161///
162/// ```ignore
163/// use aura_core::effects::QueryEffects;
164/// use aura_app::queries::ChannelsQuery;
165///
166/// // One-shot query
167/// let channels = handler.query(&ChannelsQuery::default()).await?;
168///
169/// // Live subscription (re-evaluates when facts change)
170/// let mut stream = handler.subscribe(&ChannelsQuery::default());
171/// while let Some(channels) = stream.recv().await {
172/// println!("Channels updated: {} total", channels.len());
173/// }
174/// ```
175#[async_trait]
176pub trait QueryEffects: Send + Sync {
177 /// Execute a one-shot query.
178 ///
179 /// This compiles the query to Datalog, checks authorization,
180 /// executes against the journal, and parses the results.
181 ///
182 /// # Errors
183 ///
184 /// - `QueryError::AuthorizationFailed` if capability check fails
185 /// - `QueryError::ExecutionError` if Datalog execution fails
186 /// - `QueryError::ParseError` if result parsing fails
187 async fn query<Q: Query>(&self, query: &Q) -> Result<Q::Result, QueryError>;
188
189 /// Execute a raw Datalog program and return bindings.
190 ///
191 /// Lower-level API for executing arbitrary Datalog without typed parsing.
192 /// Useful for debugging or dynamic queries.
193 async fn query_raw(
194 &self,
195 program: &crate::query::DatalogProgram,
196 ) -> Result<DatalogBindings, QueryError>;
197
198 /// Subscribe to a query for live updates.
199 ///
200 /// Returns a stream that re-evaluates the query whenever facts
201 /// matching the query's `dependencies()` change.
202 ///
203 /// The stream yields new results after each relevant fact change.
204 fn subscribe<Q: Query>(&self, query: &Q) -> QuerySubscription<Q::Result>;
205
206 /// Check if a query's capabilities are satisfied.
207 ///
208 /// Can be used to pre-check authorization before execution.
209 async fn check_capabilities(&self, capabilities: &[QueryCapability]) -> Result<(), QueryError>;
210
211 /// Invalidate cached results for queries matching the given predicate.
212 ///
213 /// Called when facts change to trigger re-evaluation of subscriptions.
214 async fn invalidate(&self, predicate: &crate::query::FactPredicate);
215
216 /// Execute a query with a specific isolation level.
217 ///
218 /// Allows specifying consistency requirements for the query.
219 /// See `QueryIsolation` for available levels.
220 ///
221 /// # Example
222 ///
223 /// ```ignore
224 /// // Wait for specific consensus before querying
225 /// let result = handler.query_with_isolation(
226 /// &ChannelsQuery::default(),
227 /// QueryIsolation::ReadCommitted { wait_for: vec![consensus_id] },
228 /// ).await?;
229 /// ```
230 async fn query_with_isolation<Q: Query>(
231 &self,
232 query: &Q,
233 isolation: QueryIsolation,
234 ) -> Result<Q::Result, QueryError>;
235
236 /// Execute a query and return results with execution statistics.
237 ///
238 /// Useful for debugging, profiling, and optimization. Returns both
239 /// the query results and metadata about the execution.
240 ///
241 /// # Example
242 ///
243 /// ```ignore
244 /// let (channels, stats) = handler.query_with_stats(&ChannelsQuery::default()).await?;
245 /// println!("Query took {:?}, scanned {} facts", stats.execution_time, stats.facts_scanned);
246 /// ```
247 async fn query_with_stats<Q: Query>(
248 &self,
249 query: &Q,
250 ) -> Result<(Q::Result, QueryStats), QueryError>;
251
252 /// Execute a query and return results with consistency metadata.
253 ///
254 /// Returns both the query results and a `ConsistencyMap` containing
255 /// the consistency status (agreement, propagation, acknowledgment)
256 /// for each matched fact.
257 ///
258 /// # Example
259 ///
260 /// ```ignore
261 /// let (messages, consistency) = handler.query_with_consistency(&MessagesQuery::default()).await?;
262 /// for msg in &messages {
263 /// if consistency.is_finalized(&msg.id) {
264 /// println!("{}: finalized", msg.content);
265 /// } else {
266 /// println!("{}: pending", msg.content);
267 /// }
268 /// }
269 /// ```
270 async fn query_with_consistency<Q: Query>(
271 &self,
272 query: &Q,
273 ) -> Result<(Q::Result, ConsistencyMap), QueryError>;
274
275 /// Execute a query with both isolation level and statistics.
276 ///
277 /// Combines `query_with_isolation` and `query_with_stats`.
278 async fn query_full<Q: Query>(
279 &self,
280 query: &Q,
281 isolation: QueryIsolation,
282 ) -> Result<(Q::Result, QueryStats), QueryError>;
283
284 /// Register a query binding for reactive refresh.
285 ///
286 /// Implementations should store the query/signal mapping and emit
287 /// the initial query result.
288 async fn register_query_binding<Q: Query>(
289 &self,
290 signal: &Signal<Q::Result>,
291 query: Q,
292 ) -> Result<(), QueryError>;
293}
294
295/// Convenience extension trait to register query-bound signals via QueryEffects.
296#[allow(async_fn_in_trait)]
297pub trait QuerySignalEffects: QueryEffects + ReactiveEffects {
298 /// Register a reactive signal that is bound to a query.
299 async fn register_query_signal<Q: Query>(
300 &self,
301 signal: &Signal<Q::Result>,
302 query: Q,
303 ) -> Result<(), ReactiveError> {
304 let query_clone = query.clone();
305 self.register_query(signal, query).await?;
306 self.register_query_binding(signal, query_clone)
307 .await
308 .map_err(|e| ReactiveError::Internal {
309 reason: e.to_string(),
310 })?;
311 Ok(())
312 }
313
314 /// Read query dependencies for a signal id.
315 fn query_dependencies_for(
316 &self,
317 signal_id: &SignalId,
318 ) -> Option<Vec<crate::query::FactPredicate>> {
319 self.query_dependencies(signal_id)
320 }
321}
322
323impl<T: QueryEffects + ReactiveEffects> QuerySignalEffects for T {}
324
325// ─────────────────────────────────────────────────────────────────────────────
326// Query Subscription
327// ─────────────────────────────────────────────────────────────────────────────
328
329/// A subscription to query results that updates when facts change.
330///
331/// QuerySubscription wraps a SignalStream but provides query-specific semantics.
332/// Results are re-evaluated and emitted when underlying facts change.
333pub struct QuerySubscription<T: Clone + Send + 'static> {
334 /// Underlying signal stream
335 stream: SignalStream<T>,
336 /// Query ID for debugging
337 query_id: String,
338}
339
340impl<T: Clone + Send + 'static> QuerySubscription<T> {
341 /// Create a new query subscription
342 pub fn new(stream: SignalStream<T>, query_id: impl Into<String>) -> Self {
343 Self {
344 stream,
345 query_id: query_id.into(),
346 }
347 }
348
349 /// Get the query ID
350 pub fn query_id(&self) -> &str {
351 &self.query_id
352 }
353
354 /// Try to receive the next result without blocking
355 pub fn try_recv(&mut self) -> Option<T> {
356 self.stream.try_recv()
357 }
358
359 /// Receive the next result, waiting if necessary
360 pub async fn recv(&mut self) -> Result<T, QueryError> {
361 self.stream.recv().await.map_err(|e| QueryError::Internal {
362 reason: e.to_string(),
363 })
364 }
365}
366
367// ─────────────────────────────────────────────────────────────────────────────
368// Blanket Implementations
369// ─────────────────────────────────────────────────────────────────────────────
370
371use std::sync::Arc;
372
373/// Blanket implementation for Arc<T> where T: QueryEffects
374#[async_trait]
375impl<T: QueryEffects + ?Sized> QueryEffects for Arc<T> {
376 async fn query<Q: Query>(&self, query: &Q) -> Result<Q::Result, QueryError> {
377 (**self).query(query).await
378 }
379
380 async fn query_raw(
381 &self,
382 program: &crate::query::DatalogProgram,
383 ) -> Result<DatalogBindings, QueryError> {
384 (**self).query_raw(program).await
385 }
386
387 fn subscribe<Q: Query>(&self, query: &Q) -> QuerySubscription<Q::Result> {
388 (**self).subscribe(query)
389 }
390
391 async fn check_capabilities(&self, capabilities: &[QueryCapability]) -> Result<(), QueryError> {
392 (**self).check_capabilities(capabilities).await
393 }
394
395 async fn invalidate(&self, predicate: &crate::query::FactPredicate) {
396 (**self).invalidate(predicate).await;
397 }
398
399 async fn query_with_isolation<Q: Query>(
400 &self,
401 query: &Q,
402 isolation: QueryIsolation,
403 ) -> Result<Q::Result, QueryError> {
404 (**self).query_with_isolation(query, isolation).await
405 }
406
407 async fn query_with_stats<Q: Query>(
408 &self,
409 query: &Q,
410 ) -> Result<(Q::Result, QueryStats), QueryError> {
411 (**self).query_with_stats(query).await
412 }
413
414 async fn query_with_consistency<Q: Query>(
415 &self,
416 query: &Q,
417 ) -> Result<(Q::Result, ConsistencyMap), QueryError> {
418 (**self).query_with_consistency(query).await
419 }
420
421 async fn query_full<Q: Query>(
422 &self,
423 query: &Q,
424 isolation: QueryIsolation,
425 ) -> Result<(Q::Result, QueryStats), QueryError> {
426 (**self).query_full(query, isolation).await
427 }
428
429 async fn register_query_binding<Q: Query>(
430 &self,
431 signal: &Signal<Q::Result>,
432 query: Q,
433 ) -> Result<(), QueryError> {
434 (**self).register_query_binding(signal, query).await
435 }
436}
437
438// ─────────────────────────────────────────────────────────────────────────────
439// Tests
440// ─────────────────────────────────────────────────────────────────────────────
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445
446 #[test]
447 fn test_query_error_display() {
448 let err = QueryError::authorization_failed("insufficient permissions");
449 assert!(err.to_string().contains("authorization"));
450
451 let cap = QueryCapability::read("channels");
452 let err = QueryError::missing_capability(&cap);
453 assert!(err.to_string().contains("channels:read"));
454 }
455
456 #[test]
457 fn test_query_error_from_parse_error() {
458 let parse_err = QueryParseError::MissingField {
459 field: "id".to_string(),
460 };
461 let query_err: QueryError = parse_err.into();
462 assert!(matches!(query_err, QueryError::ParseError(_)));
463 }
464}