Skip to main content

aura_core/
query.rs

1//! # Query System
2//!
3//! Unified query abstraction that bridges the algebraic effect system with
4//! Datalog-based journal queries and Biscuit authorization.
5//!
6//! # Architecture
7//!
8//! Queries are the "read" side of the system:
9//! ```text
10//! Intent → Journal (write) → Facts (CRDT)
11//! Query  → Journal (read)  → Datalog → Result
12//! ```
13//!
14//! Each query:
15//! - Compiles to a Datalog program
16//! - Declares required Biscuit capabilities
17//! - Specifies fact predicates for invalidation tracking
18//! - Parses Datalog bindings to typed results
19//!
20//! # Integration with Effects
21//!
22//! Queries are executed through `QueryEffects` (in `effects/query.rs`).
23//! Signals can be bound to queries via `ReactiveEffects`, enabling automatic
24//! updates when underlying facts change.
25
26use serde::{Deserialize, Serialize};
27use std::fmt;
28use std::time::Duration;
29
30use crate::domain::ConsistencyMap;
31use crate::time::PhysicalTime;
32use crate::Hash32;
33use crate::ResourceScope;
34use std::collections::BTreeMap;
35
36// ─────────────────────────────────────────────────────────────────────────────
37// Query Isolation
38// ─────────────────────────────────────────────────────────────────────────────
39
40/// Query isolation levels for consistency requirements.
41///
42/// Aura's journal is eventually consistent via CRDT merge. These isolation levels
43/// let queries specify their consistency requirements:
44///
45/// - `ReadUncommitted`: See latest CRDT state (fastest, may see uncommitted facts)
46/// - `ReadCommitted`: Only see facts confirmed by consensus
47/// - `Snapshot`: Query against a specific historical prestate
48/// - `ReadLatest`: Wait for all pending consensus in scope to complete
49///
50/// # Example
51///
52/// ```ignore
53/// // Fast query - may see uncommitted facts
54/// let result = effects.query_with_isolation(
55///     &ChannelsQuery::default(),
56///     QueryIsolation::ReadUncommitted,
57/// ).await?;
58///
59/// // Strong consistency - wait for specific consensus
60/// let result = effects.query_with_isolation(
61///     &ChannelsQuery::default(),
62///     QueryIsolation::ReadCommitted { wait_for: vec![consensus_id] },
63/// ).await?;
64/// ```
65#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
66pub enum QueryIsolation {
67    /// See all facts including uncommitted (CRDT state).
68    ///
69    /// Fastest option - queries execute immediately against the current
70    /// CRDT state without waiting for consensus confirmation.
71    #[default]
72    ReadUncommitted,
73
74    /// Only see facts with consensus commit.
75    ///
76    /// Waits for specified consensus instances to complete before executing
77    /// the query. Use this when you need to see the results of specific
78    /// operations that required consensus.
79    ReadCommitted {
80        /// Consensus instances to wait for before executing query
81        wait_for: Vec<ConsensusId>,
82    },
83
84    /// Snapshot at specific prestate (time-travel query).
85    ///
86    /// Queries against a historical state identified by its prestate hash.
87    /// Useful for auditing or debugging. The prestate must still be available
88    /// (not garbage collected).
89    Snapshot {
90        /// Hash of the prestate to query against
91        prestate_hash: Hash32,
92    },
93
94    /// Wait for all pending consensus in scope to complete.
95    ///
96    /// More expensive than `ReadCommitted` - waits for all pending consensus
97    /// operations that affect the specified resource scope. Use sparingly.
98    ///
99    /// **Note:** This is NOT linearizable - just ensures all pending commits
100    /// are visible at query time.
101    ReadLatest {
102        /// Resource scope to wait for
103        scope: ResourceScope,
104    },
105}
106
107impl QueryIsolation {
108    /// Create a ReadCommitted isolation waiting for a single consensus
109    pub fn read_committed(consensus_id: ConsensusId) -> Self {
110        Self::ReadCommitted {
111            wait_for: vec![consensus_id],
112        }
113    }
114
115    /// Create a Snapshot isolation for a specific prestate
116    pub fn snapshot(prestate_hash: Hash32) -> Self {
117        Self::Snapshot { prestate_hash }
118    }
119
120    /// Create a ReadLatest isolation for a resource scope
121    pub fn read_latest(scope: ResourceScope) -> Self {
122        Self::ReadLatest { scope }
123    }
124
125    /// Check if this isolation level requires waiting
126    pub fn requires_wait(&self) -> bool {
127        !matches!(self, Self::ReadUncommitted | Self::Snapshot { .. })
128    }
129}
130
131/// Identifier for a consensus instance.
132///
133/// Used to track and wait for consensus completion.
134#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
135pub struct ConsensusId(pub [u8; 32]);
136
137impl ConsensusId {
138    /// Create a new consensus ID from bytes
139    pub fn new(bytes: [u8; 32]) -> Self {
140        Self(bytes)
141    }
142
143    /// Get the underlying bytes
144    pub fn as_bytes(&self) -> &[u8; 32] {
145        &self.0
146    }
147}
148
149/// Identifier for a fact in the journal.
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
151pub struct FactId(pub [u8; 32]);
152
153impl FactId {
154    /// Create a new fact ID from bytes
155    pub fn new(bytes: [u8; 32]) -> Self {
156        Self(bytes)
157    }
158
159    /// Get the underlying bytes
160    pub fn as_bytes(&self) -> &[u8; 32] {
161        &self.0
162    }
163}
164
165// ─────────────────────────────────────────────────────────────────────────────
166// Mutation Receipt
167// ─────────────────────────────────────────────────────────────────────────────
168
169/// Receipt returned from mutation operations.
170///
171/// Indicates whether the operation was completed immediately (monotone) or
172/// required consensus (non-monotone). This allows callers to:
173/// - Know when their operation has taken effect
174/// - Wait for consensus completion if needed
175/// - Track latency and operation routing
176///
177/// # Example
178///
179/// ```ignore
180/// let receipt = effects.mutate(mutation).await?;
181/// match receipt {
182///     MutationReceipt::Immediate { fact_ids, .. } => {
183///         println!("Operation completed immediately, {} facts created", fact_ids.len());
184///     }
185///     MutationReceipt::Consensus { consensus_id, .. } => {
186///         // Can use consensus_id with QueryIsolation::ReadCommitted to wait
187///         println!("Operation requires consensus: {:?}", consensus_id);
188///     }
189/// }
190/// ```
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub enum MutationReceipt {
193    /// Monotone operation completed immediately via CRDT merge.
194    ///
195    /// No coordination was required - the facts were appended directly
196    /// to the local journal and will propagate via anti-entropy.
197    Immediate {
198        /// IDs of facts created by this operation
199        fact_ids: Vec<FactId>,
200        /// When the operation completed
201        timestamp: PhysicalTime,
202    },
203
204    /// Non-monotone operation required consensus.
205    ///
206    /// The operation has been submitted to the consensus protocol.
207    /// Use `consensus_id` with `QueryIsolation::ReadCommitted` to wait
208    /// for the operation to complete before querying.
209    Consensus {
210        /// ID of the consensus instance handling this operation
211        consensus_id: ConsensusId,
212        /// Hash of the prestate the consensus is based on
213        prestate_hash: Hash32,
214        /// Time taken to submit to consensus (not total completion time)
215        submit_latency: Duration,
216    },
217}
218
219impl MutationReceipt {
220    /// Create an immediate receipt
221    pub fn immediate(fact_ids: Vec<FactId>, timestamp: PhysicalTime) -> Self {
222        Self::Immediate {
223            fact_ids,
224            timestamp,
225        }
226    }
227
228    /// Create a consensus receipt
229    pub fn consensus(
230        consensus_id: ConsensusId,
231        prestate_hash: Hash32,
232        submit_latency: Duration,
233    ) -> Self {
234        Self::Consensus {
235            consensus_id,
236            prestate_hash,
237            submit_latency,
238        }
239    }
240
241    /// Check if this was an immediate (monotone) operation
242    pub fn is_immediate(&self) -> bool {
243        matches!(self, Self::Immediate { .. })
244    }
245
246    /// Check if this required consensus
247    pub fn requires_consensus(&self) -> bool {
248        matches!(self, Self::Consensus { .. })
249    }
250
251    /// Get the consensus ID if this was a consensus operation
252    pub fn consensus_id(&self) -> Option<ConsensusId> {
253        match self {
254            Self::Consensus { consensus_id, .. } => Some(*consensus_id),
255            Self::Immediate { .. } => None,
256        }
257    }
258
259    /// Get the fact IDs if this was an immediate operation
260    pub fn fact_ids(&self) -> Option<&[FactId]> {
261        match self {
262            Self::Immediate { fact_ids, .. } => Some(fact_ids),
263            Self::Consensus { .. } => None,
264        }
265    }
266}
267
268// ─────────────────────────────────────────────────────────────────────────────
269// Query Statistics
270// ─────────────────────────────────────────────────────────────────────────────
271
272/// Statistics about query execution.
273///
274/// Returned alongside query results when using `query_with_stats()`.
275/// Useful for debugging, profiling, and optimization.
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct QueryStats {
278    /// Time taken to execute the query
279    pub execution_time: Duration,
280
281    /// Number of facts scanned during execution
282    pub facts_scanned: u32,
283
284    /// Number of facts that matched the query
285    pub facts_matched: u32,
286
287    /// Whether the result was served from cache
288    pub cache_hit: bool,
289
290    /// Isolation level used for the query
291    pub isolation_used: QueryIsolation,
292
293    /// Time spent waiting for consensus (if any)
294    pub consensus_wait_time: Option<Duration>,
295
296    /// Consistency metadata for matched facts.
297    ///
298    /// Maps fact identifiers (or result item IDs) to their consistency status.
299    /// Empty if consistency tracking was not requested.
300    pub consistency: ConsistencyMap,
301}
302
303impl Default for QueryStats {
304    fn default() -> Self {
305        Self {
306            execution_time: Duration::ZERO,
307            facts_scanned: 0,
308            facts_matched: 0,
309            cache_hit: false,
310            isolation_used: QueryIsolation::default(),
311            consensus_wait_time: None,
312            consistency: ConsistencyMap::new(),
313        }
314    }
315}
316
317impl QueryStats {
318    /// Create new stats with execution time
319    pub fn new(execution_time: Duration) -> Self {
320        Self {
321            execution_time,
322            ..Default::default()
323        }
324    }
325
326    /// Mark as a cache hit
327    #[must_use]
328    pub fn with_cache_hit(mut self) -> Self {
329        self.cache_hit = true;
330        self
331    }
332
333    /// Set facts scanned
334    #[must_use]
335    pub fn with_facts_scanned(mut self, count: u32) -> Self {
336        self.facts_scanned = count;
337        self
338    }
339
340    /// Set facts matched
341    #[must_use]
342    pub fn with_facts_matched(mut self, count: u32) -> Self {
343        self.facts_matched = count;
344        self
345    }
346
347    /// Set isolation level used
348    #[must_use]
349    pub fn with_isolation(mut self, isolation: QueryIsolation) -> Self {
350        self.isolation_used = isolation;
351        self
352    }
353
354    /// Set consensus wait time
355    #[must_use]
356    pub fn with_consensus_wait(mut self, duration: Duration) -> Self {
357        self.consensus_wait_time = Some(duration);
358        self
359    }
360
361    /// Set consistency metadata for matched facts
362    #[must_use]
363    pub fn with_consistency(mut self, consistency: ConsistencyMap) -> Self {
364        self.consistency = consistency;
365        self
366    }
367
368    /// Calculate selectivity (matched / scanned)
369    pub fn selectivity(&self) -> f64 {
370        if self.facts_scanned == 0 {
371            0.0
372        } else {
373            self.facts_matched as f64 / self.facts_scanned as f64
374        }
375    }
376
377    /// Check if any matched fact is finalized
378    pub fn any_finalized(&self) -> bool {
379        self.consistency
380            .iter()
381            .any(|(_, c)| c.agreement.is_finalized())
382    }
383
384    /// Check if all matched facts are finalized
385    pub fn all_finalized(&self) -> bool {
386        !self.consistency.is_empty()
387            && self
388                .consistency
389                .iter()
390                .all(|(_, c)| c.agreement.is_finalized())
391    }
392}
393
394// ─────────────────────────────────────────────────────────────────────────────
395// Datalog Types
396// ─────────────────────────────────────────────────────────────────────────────
397
398/// A Datalog program consisting of rules and facts.
399///
400/// This is the intermediate representation that queries compile to.
401/// The actual execution happens in the QueryHandler.
402#[derive(Debug, Clone, Default, Serialize, Deserialize)]
403pub struct DatalogProgram {
404    /// Rules that define derived relations
405    pub rules: Vec<DatalogRule>,
406    /// Base facts to include (optional, usually come from journal)
407    pub facts: Vec<DatalogFact>,
408    /// The goal query to evaluate
409    pub goal: Option<String>,
410}
411
412impl DatalogProgram {
413    /// Create an empty program
414    pub fn empty() -> Self {
415        Self::default()
416    }
417
418    /// Create a program with the given rules
419    pub fn new(rules: Vec<DatalogRule>) -> Self {
420        Self {
421            rules,
422            facts: Vec::new(),
423            goal: None,
424        }
425    }
426
427    /// Add a rule to the program
428    #[must_use]
429    pub fn with_rule(mut self, rule: DatalogRule) -> Self {
430        self.rules.push(rule);
431        self
432    }
433
434    /// Add a fact to the program
435    #[must_use]
436    pub fn with_fact(mut self, fact: DatalogFact) -> Self {
437        self.facts.push(fact);
438        self
439    }
440
441    /// Set the goal query
442    #[must_use]
443    pub fn with_goal(mut self, goal: impl Into<String>) -> Self {
444        self.goal = Some(goal.into());
445        self
446    }
447
448    /// Convert to Datalog source string
449    pub fn to_datalog_source(&self) -> String {
450        let mut source = String::new();
451
452        // Emit facts
453        for fact in &self.facts {
454            source.push_str(&fact.to_string());
455            source.push_str(".\n");
456        }
457
458        // Emit rules
459        for rule in &self.rules {
460            source.push_str(&rule.to_string());
461            source.push_str(".\n");
462        }
463
464        // Emit goal
465        if let Some(ref goal) = self.goal {
466            source.push_str("?- ");
467            source.push_str(goal);
468            source.push_str(".\n");
469        }
470
471        source
472    }
473
474    /// Check if the program is empty
475    pub fn is_empty(&self) -> bool {
476        self.rules.is_empty() && self.facts.is_empty()
477    }
478}
479
480/// A Datalog rule (head :- body)
481#[derive(Debug, Clone, Serialize, Deserialize)]
482pub struct DatalogRule {
483    /// The rule head (conclusion fact)
484    pub head: DatalogFact,
485    /// The rule body (conditions)
486    pub body: Vec<DatalogFact>,
487}
488
489impl DatalogRule {
490    /// Create a new rule with a head and empty body
491    pub fn new(head: DatalogFact) -> Self {
492        Self {
493            head,
494            body: Vec::new(),
495        }
496    }
497
498    /// Create a rule with head and body
499    #[must_use]
500    pub fn with_body(head: DatalogFact, body: Vec<DatalogFact>) -> Self {
501        Self { head, body }
502    }
503
504    /// Add a condition to the body
505    pub fn when(mut self, condition: DatalogFact) -> Self {
506        self.body.push(condition);
507        self
508    }
509
510    /// Add multiple conditions
511    pub fn when_all(mut self, conditions: impl IntoIterator<Item = DatalogFact>) -> Self {
512        for c in conditions {
513            self.body.push(c);
514        }
515        self
516    }
517}
518
519impl fmt::Display for DatalogRule {
520    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521        write!(f, "{}", self.head)?;
522        if !self.body.is_empty() {
523            write!(f, " :- ")?;
524            for (i, fact) in self.body.iter().enumerate() {
525                if i > 0 {
526                    write!(f, ", ")?;
527                }
528                write!(f, "{fact}")?;
529            }
530        }
531        Ok(())
532    }
533}
534
535/// A Datalog fact (ground term)
536#[derive(Debug, Clone, Serialize, Deserialize)]
537pub struct DatalogFact {
538    /// Predicate name
539    pub predicate: String,
540    /// Arguments (as strings for serialization)
541    pub args: Vec<DatalogValue>,
542}
543
544impl DatalogFact {
545    /// Create a new fact
546    pub fn new(predicate: impl Into<String>, args: Vec<DatalogValue>) -> Self {
547        Self {
548            predicate: predicate.into(),
549            args,
550        }
551    }
552}
553
554impl fmt::Display for DatalogFact {
555    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
556        write!(f, "{}(", self.predicate)?;
557        for (i, arg) in self.args.iter().enumerate() {
558            if i > 0 {
559                write!(f, ", ")?;
560            }
561            write!(f, "{arg}")?;
562        }
563        write!(f, ")")
564    }
565}
566
567/// A value in Datalog (string, number, or boolean)
568#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
569pub enum DatalogValue {
570    /// String value
571    String(String),
572    /// Integer value
573    Integer(i64),
574    /// Boolean value
575    Boolean(bool),
576    /// Variable (for patterns)
577    Variable(String),
578    /// Symbol (unquoted identifier)
579    Symbol(String),
580    /// Null/none value
581    Null,
582}
583
584impl DatalogValue {
585    /// Create a variable value (shorthand for `DatalogValue::Variable`)
586    pub fn var(name: impl Into<String>) -> Self {
587        Self::Variable(name.into())
588    }
589
590    /// Create a symbol value
591    pub fn symbol(name: impl Into<String>) -> Self {
592        Self::Symbol(name.into())
593    }
594}
595
596impl fmt::Display for DatalogValue {
597    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598        match self {
599            Self::String(s) => write!(f, "\"{}\"", s.replace('"', "\\\"")),
600            Self::Integer(n) => write!(f, "{n}"),
601            Self::Boolean(b) => write!(f, "{b}"),
602            Self::Variable(v) => write!(f, "${v}"),
603            Self::Symbol(s) => write!(f, "{s}"),
604            Self::Null => write!(f, "null"),
605        }
606    }
607}
608
609/// Result bindings from Datalog evaluation
610#[derive(Debug, Clone, Default, Serialize, Deserialize)]
611pub struct DatalogBindings {
612    /// Each row is a set of variable bindings
613    pub rows: Vec<DatalogRow>,
614}
615
616impl DatalogBindings {
617    /// Create empty bindings
618    pub fn new() -> Self {
619        Self::default()
620    }
621
622    /// Add a row
623    #[must_use]
624    pub fn with_row(mut self, row: DatalogRow) -> Self {
625        self.rows.push(row);
626        self
627    }
628
629    /// Check if empty
630    pub fn is_empty(&self) -> bool {
631        self.rows.is_empty()
632    }
633
634    /// Number of result rows
635    pub fn len(&self) -> usize {
636        self.rows.len()
637    }
638}
639
640/// A row of variable bindings
641#[derive(Debug, Clone, Default, Serialize, Deserialize)]
642pub struct DatalogRow {
643    /// Variable name to value mappings
644    pub bindings: Vec<(String, DatalogValue)>,
645}
646
647impl DatalogRow {
648    /// Create a new row
649    pub fn new() -> Self {
650        Self::default()
651    }
652
653    /// Add a binding
654    #[must_use]
655    pub fn with_binding(mut self, name: impl Into<String>, value: DatalogValue) -> Self {
656        self.bindings.push((name.into(), value));
657        self
658    }
659
660    /// Get a binding by name
661    pub fn get(&self, name: &str) -> Option<&DatalogValue> {
662        self.bindings
663            .iter()
664            .find(|(n, _)| n == name)
665            .map(|(_, v)| v)
666    }
667
668    /// Get a string value by name
669    pub fn get_string(&self, name: &str) -> Option<&str> {
670        match self.get(name) {
671            Some(DatalogValue::String(s)) => Some(s),
672            _ => None,
673        }
674    }
675
676    /// Get an integer value by name
677    pub fn get_integer(&self, name: &str) -> Option<i64> {
678        match self.get(name) {
679            Some(DatalogValue::Integer(n)) => Some(*n),
680            _ => None,
681        }
682    }
683
684    /// Get a boolean value by name
685    pub fn get_bool(&self, name: &str) -> Option<bool> {
686        match self.get(name) {
687            Some(DatalogValue::Boolean(b)) => Some(*b),
688            _ => None,
689        }
690    }
691}
692
693// ─────────────────────────────────────────────────────────────────────────────
694// Fact Predicates (for invalidation tracking)
695// ─────────────────────────────────────────────────────────────────────────────
696
697/// A predicate pattern for matching facts.
698///
699/// Used to determine which queries need re-evaluation when facts change.
700/// Supports both positional argument matching (for Datalog facts) and
701/// named constraint matching (for structured facts with fields).
702///
703/// # Example
704///
705/// ```ignore
706/// // Match any channel_fact with channel_id "ch1"
707/// let pred = FactPredicate::named("channel_fact")
708///     .with_named_constraint("channel_id", "ch1");
709///
710/// // Match channel_fact where first positional arg is "ch1"
711/// let pred = FactPredicate::named("channel_fact")
712///     .with_arg(Some("ch1".to_string()));
713/// ```
714#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
715pub struct FactPredicate {
716    /// The predicate name to match
717    pub name: String,
718    /// Optional positional argument patterns (None = wildcard)
719    pub arg_patterns: Vec<Option<String>>,
720    /// Named field constraints for structured facts
721    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
722    pub named_constraints: BTreeMap<String, String>,
723}
724
725impl FactPredicate {
726    /// Create a predicate that matches any fact with the given name
727    pub fn named(name: impl Into<String>) -> Self {
728        Self {
729            name: name.into(),
730            arg_patterns: Vec::new(),
731            named_constraints: BTreeMap::new(),
732        }
733    }
734
735    /// Create a predicate (alias for named)
736    pub fn new(name: impl Into<String>) -> Self {
737        Self::named(name)
738    }
739
740    /// Create a predicate with named field constraints.
741    ///
742    /// This stores named constraints for matching against structured facts
743    /// that have named fields (e.g., `channel_id`, `authority`).
744    ///
745    /// # Example
746    ///
747    /// ```ignore
748    /// // Match channel_fact where channel_id is "ch1" and type is "home"
749    /// let pred = FactPredicate::with_args("channel_fact", vec![
750    ///     ("channel_id", "ch1"),
751    ///     ("type", "home"),
752    /// ]);
753    /// ```
754    #[must_use]
755    pub fn with_args(name: impl Into<String>, args: Vec<(&str, &str)>) -> Self {
756        let mut predicate = Self::named(name);
757        for (arg_name, arg_value) in args {
758            predicate
759                .named_constraints
760                .insert(arg_name.to_string(), arg_value.to_string());
761        }
762        predicate
763    }
764
765    /// Add a positional argument pattern (Some = must match, None = wildcard)
766    ///
767    /// For Datalog-style facts with positional arguments.
768    #[must_use]
769    pub fn with_arg(mut self, pattern: Option<String>) -> Self {
770        self.arg_patterns.push(pattern);
771        self
772    }
773
774    /// Add a named field constraint for structured facts.
775    ///
776    /// # Example
777    ///
778    /// ```ignore
779    /// let pred = FactPredicate::named("channel_fact")
780    ///     .with_named_constraint("channel_id", "ch1")
781    ///     .with_named_constraint("type", "home");
782    /// ```
783    #[must_use]
784    pub fn with_named_constraint(
785        mut self,
786        name: impl Into<String>,
787        value: impl Into<String>,
788    ) -> Self {
789        self.named_constraints.insert(name.into(), value.into());
790        self
791    }
792
793    /// Check if this predicate matches a fact with positional arguments.
794    ///
795    /// For Datalog-style facts where arguments are positional.
796    pub fn matches_fact(&self, fact_name: &str, fact_args: &[String]) -> bool {
797        if self.name != fact_name {
798            return false;
799        }
800
801        // If no positional arg patterns, match any positional args
802        if self.arg_patterns.is_empty() {
803            return true;
804        }
805
806        // Check each positional arg pattern
807        for (i, pattern) in self.arg_patterns.iter().enumerate() {
808            if let Some(expected) = pattern {
809                if fact_args.get(i) != Some(expected) {
810                    return false;
811                }
812            }
813        }
814
815        true
816    }
817
818    /// Check if this predicate matches a fact with named fields.
819    ///
820    /// For structured facts where fields are named (e.g., from serde serialization).
821    ///
822    /// # Example
823    ///
824    /// ```ignore
825    /// let pred = FactPredicate::with_args("channel_fact", vec![
826    ///     ("channel_id", "ch1"),
827    ///     ("type", "home"),
828    /// ]);
829    ///
830    /// let fact_fields = [
831    ///     ("channel_id".to_string(), "ch1".to_string()),
832    ///     ("type".to_string(), "home".to_string()),
833    ///     ("created_at".to_string(), "2024-01-01".to_string()),
834    /// ].into_iter().collect();
835    ///
836    /// assert!(pred.matches_named_fact("channel_fact", &fact_fields));
837    /// ```
838    pub fn matches_named_fact(
839        &self,
840        fact_name: &str,
841        fact_fields: &BTreeMap<String, String>,
842    ) -> bool {
843        if self.name != fact_name {
844            return false;
845        }
846
847        // If no named constraints, match any fact with this name
848        if self.named_constraints.is_empty() {
849            return true;
850        }
851
852        // All named constraints must match
853        for (key, expected_value) in &self.named_constraints {
854            match fact_fields.get(key) {
855                Some(actual_value) if actual_value == expected_value => continue,
856                _ => return false,
857            }
858        }
859
860        true
861    }
862
863    /// Check if this predicate could match another predicate.
864    ///
865    /// Two predicates match if:
866    /// - They have the same name
867    /// - Their positional arg patterns are compatible (wildcards or same values)
868    /// - Their named constraints are compatible (wildcards or same values)
869    pub fn matches(&self, other: &FactPredicate) -> bool {
870        // Names must match
871        if self.name != other.name {
872            return false;
873        }
874
875        // Check positional arg compatibility
876        if !self.arg_patterns.is_empty() && !other.arg_patterns.is_empty() {
877            let max_len = self.arg_patterns.len().max(other.arg_patterns.len());
878            for i in 0..max_len {
879                let self_arg = self.arg_patterns.get(i).and_then(|a| a.as_ref());
880                let other_arg = other.arg_patterns.get(i).and_then(|a| a.as_ref());
881
882                match (self_arg, other_arg) {
883                    // Both specify a value - must match
884                    (Some(a), Some(b)) if a != b => return false,
885                    // At least one is wildcard - compatible
886                    _ => continue,
887                }
888            }
889        }
890
891        // Check named constraint compatibility
892        if !self.named_constraints.is_empty() && !other.named_constraints.is_empty() {
893            // For each key that exists in both, values must match
894            for (key, self_value) in &self.named_constraints {
895                if let Some(other_value) = other.named_constraints.get(key) {
896                    if self_value != other_value {
897                        return false;
898                    }
899                }
900            }
901        }
902
903        true
904    }
905
906    /// Check if this predicate has any constraints (positional or named).
907    pub fn has_constraints(&self) -> bool {
908        !self.arg_patterns.is_empty() || !self.named_constraints.is_empty()
909    }
910
911    /// Get the named constraints as an iterator.
912    pub fn named_constraints_iter(&self) -> impl Iterator<Item = (&String, &String)> {
913        self.named_constraints.iter()
914    }
915}
916
917// ─────────────────────────────────────────────────────────────────────────────
918// Capability Requirements
919// ─────────────────────────────────────────────────────────────────────────────
920
921/// A capability required to execute a query.
922///
923/// This integrates with Biscuit authorization - queries declare what
924/// capabilities they need, and the QueryHandler checks them before execution.
925#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
926pub struct QueryCapability {
927    /// Resource being accessed
928    pub resource: String,
929    /// Action being performed (e.g., "read", "list")
930    pub action: String,
931    /// Optional constraints
932    pub constraints: Vec<(String, String)>,
933}
934
935impl QueryCapability {
936    /// Create a read capability for a resource
937    pub fn read(resource: impl Into<String>) -> Self {
938        Self {
939            resource: resource.into(),
940            action: "read".to_string(),
941            constraints: Vec::new(),
942        }
943    }
944
945    /// Create a list capability for a resource
946    pub fn list(resource: impl Into<String>) -> Self {
947        Self {
948            resource: resource.into(),
949            action: "list".to_string(),
950            constraints: Vec::new(),
951        }
952    }
953
954    /// Add a constraint
955    #[must_use]
956    pub fn with_constraint(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
957        self.constraints.push((key.into(), value.into()));
958        self
959    }
960
961    /// Convert to Biscuit Datalog check
962    pub fn to_biscuit_check(&self) -> String {
963        let mut check = format!("check if right(\"{}\", \"{}\")", self.resource, self.action);
964        for (key, value) in &self.constraints {
965            check.push_str(&format!(", {key} == \"{value}\""));
966        }
967        check
968    }
969}
970
971// ─────────────────────────────────────────────────────────────────────────────
972// Query Trait
973// ─────────────────────────────────────────────────────────────────────────────
974
975/// Error type for query parsing
976#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)]
977pub enum QueryParseError {
978    /// Missing required field
979    #[error("Missing required field: {field}")]
980    MissingField { field: String },
981
982    /// Invalid field value
983    #[error("Invalid value for field {field}: {reason}")]
984    InvalidValue { field: String, reason: String },
985
986    /// Type conversion error
987    #[error("Type conversion error: {reason}")]
988    TypeConversion { reason: String },
989}
990
991/// Trait for typed queries that compile to Datalog.
992///
993/// Queries are the portable read interface for the journal. They:
994/// - Compile to Datalog programs for execution
995/// - Declare Biscuit capabilities for authorization
996/// - Specify fact predicates for change tracking
997/// - Parse results to typed values
998///
999/// # Example
1000///
1001/// ```ignore
1002/// use aura_core::query::{Query, DatalogProgram, QueryCapability, FactPredicate};
1003///
1004/// struct ChannelsQuery {
1005///     channel_type: Option<String>,
1006/// }
1007///
1008/// impl Query for ChannelsQuery {
1009///     type Result = Vec<Channel>;
1010///
1011///     fn to_datalog(&self) -> DatalogProgram {
1012///         DatalogProgram::new()
1013///             .with_rule(DatalogRule::new("channel($id, $name, $type)")
1014///                 .when("channel_fact($id, $name, $type)"))
1015///             .with_goal("channel($id, $name, $type)")
1016///     }
1017///
1018///     fn required_capabilities(&self) -> Vec<QueryCapability> {
1019///         vec![QueryCapability::list("channels")]
1020///     }
1021///
1022///     fn dependencies(&self) -> Vec<FactPredicate> {
1023///         vec![FactPredicate::named("channel_fact")]
1024///     }
1025///
1026///     fn parse(bindings: DatalogBindings) -> Result<Self::Result, QueryParseError> {
1027///         // Parse bindings to Vec<Channel>
1028///     }
1029/// }
1030/// ```
1031pub trait Query: Send + Sync + Clone + 'static {
1032    /// The result type of this query
1033    type Result: Clone + Send + Sync + Default + 'static;
1034
1035    /// Compile this query to a Datalog program.
1036    ///
1037    /// The program will be executed against the journal facts,
1038    /// filtered by Biscuit authorization.
1039    fn to_datalog(&self) -> DatalogProgram;
1040
1041    /// Get the Biscuit capabilities required to execute this query.
1042    ///
1043    /// The QueryHandler will verify these capabilities before execution.
1044    fn required_capabilities(&self) -> Vec<QueryCapability>;
1045
1046    /// Get the fact predicates this query depends on.
1047    ///
1048    /// Used for invalidation tracking - when facts matching these predicates
1049    /// change, subscriptions to this query will re-evaluate.
1050    fn dependencies(&self) -> Vec<FactPredicate>;
1051
1052    /// Parse Datalog bindings to the typed result.
1053    ///
1054    /// Called after query execution to convert raw bindings to the result type.
1055    fn parse(bindings: DatalogBindings) -> Result<Self::Result, QueryParseError>;
1056
1057    /// Get a unique identifier for this query type.
1058    ///
1059    /// Used for caching and subscription management.
1060    fn query_id(&self) -> String {
1061        std::any::type_name::<Self>().to_string()
1062    }
1063}
1064
1065// ─────────────────────────────────────────────────────────────────────────────
1066// Tests
1067// ─────────────────────────────────────────────────────────────────────────────
1068
1069#[cfg(test)]
1070mod tests {
1071    use super::*;
1072
1073    #[test]
1074    fn test_datalog_rule_display() {
1075        let head = DatalogFact::new(
1076            "channel",
1077            vec![DatalogValue::var("id"), DatalogValue::var("name")],
1078        );
1079        let cond1 = DatalogFact::new(
1080            "channel_fact",
1081            vec![
1082                DatalogValue::var("id"),
1083                DatalogValue::var("name"),
1084                DatalogValue::var("type"),
1085            ],
1086        );
1087        let cond2 = DatalogFact::new(
1088            "eq",
1089            vec![
1090                DatalogValue::var("type"),
1091                DatalogValue::String("home".to_string()),
1092            ],
1093        );
1094
1095        let rule = DatalogRule::new(head).when(cond1).when(cond2);
1096
1097        let s = rule.to_string();
1098        assert!(s.contains("channel($id, $name)"));
1099        assert!(s.contains(":-"));
1100        assert!(s.contains("channel_fact"));
1101    }
1102
1103    #[test]
1104    fn test_datalog_fact_display() {
1105        let fact = DatalogFact::new(
1106            "channel",
1107            vec![
1108                DatalogValue::String("ch1".to_string()),
1109                DatalogValue::String("General".to_string()),
1110                DatalogValue::Boolean(true),
1111            ],
1112        );
1113
1114        let s = fact.to_string();
1115        assert_eq!(s, "channel(\"ch1\", \"General\", true)");
1116    }
1117
1118    #[test]
1119    fn test_datalog_program_source() {
1120        let program = DatalogProgram::new(vec![DatalogRule::new(DatalogFact::new(
1121            "active_user",
1122            vec![DatalogValue::var("name")],
1123        ))
1124        .when(DatalogFact::new("user", vec![DatalogValue::var("name")]))
1125        .when(DatalogFact::new("online", vec![DatalogValue::var("name")]))])
1126        .with_fact(DatalogFact::new(
1127            "user",
1128            vec![DatalogValue::String("alice".to_string())],
1129        ))
1130        .with_goal("active_user($name)");
1131
1132        let source = program.to_datalog_source();
1133        assert!(source.contains("user(\"alice\")"));
1134        assert!(source.contains("active_user($name) :- user($name), online($name)"));
1135        assert!(source.contains("?- active_user($name)"));
1136    }
1137
1138    #[test]
1139    fn test_fact_predicate_matches() {
1140        let pred = FactPredicate::named("channel_fact");
1141        assert!(pred.matches(&FactPredicate::named("channel_fact")));
1142        assert!(pred.matches(&FactPredicate::named("channel_fact")));
1143        assert!(!pred.matches(&FactPredicate::named("other_fact")));
1144
1145        let pred_with_arg =
1146            FactPredicate::named("channel_fact").with_arg(Some("specific_id".to_string()));
1147        assert!(pred_with_arg.matches(
1148            &FactPredicate::named("channel_fact").with_arg(Some("specific_id".to_string()))
1149        ));
1150        assert!(!pred_with_arg
1151            .matches(&FactPredicate::named("channel_fact").with_arg(Some("other_id".to_string()))));
1152    }
1153
1154    #[test]
1155    fn test_fact_predicate_named_constraints() {
1156        // Test with_args constructor
1157        let pred = FactPredicate::with_args(
1158            "channel_fact",
1159            vec![("channel_id", "ch1"), ("type", "home")],
1160        );
1161        assert_eq!(pred.name, "channel_fact");
1162        assert_eq!(
1163            pred.named_constraints.get("channel_id"),
1164            Some(&"ch1".to_string())
1165        );
1166        assert_eq!(
1167            pred.named_constraints.get("type"),
1168            Some(&"home".to_string())
1169        );
1170
1171        // Test with_named_constraint builder
1172        let pred2 = FactPredicate::named("channel_fact")
1173            .with_named_constraint("channel_id", "ch1")
1174            .with_named_constraint("type", "home");
1175        assert_eq!(pred, pred2);
1176    }
1177
1178    #[test]
1179    fn test_fact_predicate_matches_named_fact() {
1180        let pred = FactPredicate::with_args(
1181            "channel_fact",
1182            vec![("channel_id", "ch1"), ("type", "home")],
1183        );
1184
1185        // Matching fact
1186        let mut fact_fields = BTreeMap::new();
1187        fact_fields.insert("channel_id".to_string(), "ch1".to_string());
1188        fact_fields.insert("type".to_string(), "home".to_string());
1189        fact_fields.insert("created_at".to_string(), "2024-01-01".to_string());
1190        assert!(pred.matches_named_fact("channel_fact", &fact_fields));
1191
1192        // Wrong name
1193        assert!(!pred.matches_named_fact("other_fact", &fact_fields));
1194
1195        // Wrong channel_id
1196        let mut wrong_id = fact_fields.clone();
1197        wrong_id.insert("channel_id".to_string(), "ch2".to_string());
1198        assert!(!pred.matches_named_fact("channel_fact", &wrong_id));
1199
1200        // Missing required field
1201        let mut missing_field = BTreeMap::new();
1202        missing_field.insert("channel_id".to_string(), "ch1".to_string());
1203        // Missing "type" field
1204        assert!(!pred.matches_named_fact("channel_fact", &missing_field));
1205
1206        // Predicate with no constraints matches any fact with same name
1207        let wildcard = FactPredicate::named("channel_fact");
1208        assert!(wildcard.matches_named_fact("channel_fact", &fact_fields));
1209        assert!(wildcard.matches_named_fact("channel_fact", &BTreeMap::new()));
1210    }
1211
1212    #[test]
1213    fn test_fact_predicate_matches_with_named_constraints() {
1214        // Two predicates with compatible named constraints
1215        let pred1 = FactPredicate::with_args("channel_fact", vec![("channel_id", "ch1")]);
1216        let pred2 = FactPredicate::with_args(
1217            "channel_fact",
1218            vec![("channel_id", "ch1"), ("type", "home")],
1219        );
1220        assert!(pred1.matches(&pred2));
1221        assert!(pred2.matches(&pred1));
1222
1223        // Incompatible named constraints
1224        let pred3 = FactPredicate::with_args("channel_fact", vec![("channel_id", "ch2")]);
1225        assert!(!pred1.matches(&pred3));
1226
1227        // Wildcard matches anything
1228        let wildcard = FactPredicate::named("channel_fact");
1229        assert!(wildcard.matches(&pred1));
1230        assert!(pred1.matches(&wildcard));
1231    }
1232
1233    #[test]
1234    fn test_fact_predicate_has_constraints() {
1235        let no_constraints = FactPredicate::named("channel_fact");
1236        assert!(!no_constraints.has_constraints());
1237
1238        let with_positional =
1239            FactPredicate::named("channel_fact").with_arg(Some("ch1".to_string()));
1240        assert!(with_positional.has_constraints());
1241
1242        let with_named =
1243            FactPredicate::named("channel_fact").with_named_constraint("channel_id", "ch1");
1244        assert!(with_named.has_constraints());
1245    }
1246
1247    #[test]
1248    fn test_query_capability() {
1249        let cap = QueryCapability::read("channels").with_constraint("owner", "alice");
1250
1251        let check = cap.to_biscuit_check();
1252        assert!(check.contains("right(\"channels\", \"read\")"));
1253        assert!(check.contains("owner == \"alice\""));
1254    }
1255
1256    #[test]
1257    fn test_datalog_row_get() {
1258        let row = DatalogRow::new()
1259            .with_binding("id", DatalogValue::String("ch1".to_string()))
1260            .with_binding("count", DatalogValue::Integer(42))
1261            .with_binding("active", DatalogValue::Boolean(true));
1262
1263        assert_eq!(row.get_string("id"), Some("ch1"));
1264        assert_eq!(row.get_integer("count"), Some(42));
1265        assert_eq!(row.get_bool("active"), Some(true));
1266        assert_eq!(row.get_string("missing"), None);
1267    }
1268
1269    #[test]
1270    fn test_query_isolation_default() {
1271        let isolation = QueryIsolation::default();
1272        assert_eq!(isolation, QueryIsolation::ReadUncommitted);
1273        assert!(!isolation.requires_wait());
1274    }
1275
1276    #[test]
1277    fn test_query_isolation_read_committed() {
1278        let consensus_id = ConsensusId::new([1u8; 32]);
1279        let isolation = QueryIsolation::read_committed(consensus_id);
1280        assert!(isolation.requires_wait());
1281        if let QueryIsolation::ReadCommitted { wait_for } = isolation {
1282            assert_eq!(wait_for.len(), 1);
1283            assert_eq!(wait_for[0], consensus_id);
1284        } else {
1285            panic!("Expected ReadCommitted variant");
1286        }
1287    }
1288
1289    #[test]
1290    fn test_query_isolation_snapshot() {
1291        let hash = crate::Hash32([42u8; 32]);
1292        let isolation = QueryIsolation::snapshot(hash);
1293        assert!(!isolation.requires_wait()); // Snapshot doesn't require waiting
1294        if let QueryIsolation::Snapshot { prestate_hash } = isolation {
1295            assert_eq!(prestate_hash, hash);
1296        } else {
1297            panic!("Expected Snapshot variant");
1298        }
1299    }
1300
1301    #[test]
1302    fn test_query_stats_default() {
1303        let stats = QueryStats::default();
1304        assert_eq!(stats.execution_time, Duration::ZERO);
1305        assert_eq!(stats.facts_scanned, 0);
1306        assert_eq!(stats.facts_matched, 0);
1307        assert!(!stats.cache_hit);
1308        assert_eq!(stats.selectivity(), 0.0);
1309    }
1310
1311    #[test]
1312    fn test_query_stats_builder() {
1313        let stats = QueryStats::new(Duration::from_millis(50))
1314            .with_facts_scanned(100)
1315            .with_facts_matched(25)
1316            .with_cache_hit()
1317            .with_isolation(QueryIsolation::ReadUncommitted);
1318
1319        assert_eq!(stats.execution_time, Duration::from_millis(50));
1320        assert_eq!(stats.facts_scanned, 100);
1321        assert_eq!(stats.facts_matched, 25);
1322        assert!(stats.cache_hit);
1323        assert!((stats.selectivity() - 0.25).abs() < 0.001);
1324    }
1325
1326    #[test]
1327    fn test_consensus_id() {
1328        let bytes = [0xAB; 32];
1329        let id = ConsensusId::new(bytes);
1330        assert_eq!(id.as_bytes(), &bytes);
1331    }
1332}