Skip to main content

mnem_core/index/
query.rs

1//! `Query` engine + predicates + `QueryHit` over an `IndexSet`.
2//!
3//! Extracted from `index.rs` in R3; bodies unchanged.
4
5use std::collections::HashSet;
6
7use ipld_core::ipld::Ipld;
8
9use crate::error::{Error, RepoError};
10use crate::objects::{Edge, IndexSet, Node};
11use crate::prolly::{self, Cursor, ProllyKey};
12use crate::repo::readonly::{ReadonlyRepo, decode_from_store};
13
14use super::adjacency::{load_incoming, load_outgoing};
15use super::build::prop_value_hash;
16
17/// Predicates supported by [`Query::where_prop`].
18#[derive(Clone, Debug)]
19#[non_exhaustive]
20pub enum PropPredicate {
21    /// Exact value match. Uses the property Prolly index if available.
22    Eq(Ipld),
23}
24
25impl PropPredicate {
26    /// Convenience constructor: `PropPredicate::eq("Alice")` vs
27    /// `PropPredicate::Eq(Ipld::String("Alice".into()))`.
28    pub fn eq(value: impl Into<Ipld>) -> Self {
29        Self::Eq(value.into())
30    }
31}
32
33/// Which direction an edge was loaded from when a query pulls it in.
34///
35/// Informational only; `execute` fills this into each [`Edge`] it
36/// surfaces via an adjacency-carrying field on [`QueryHit`].
37#[derive(Copy, Clone, Debug, PartialEq, Eq)]
38pub enum Direction {
39    /// Edge whose `src` matches the hit node.
40    Outgoing,
41    /// Edge whose `dst` matches the hit node.
42    Incoming,
43}
44
45/// A single query result: the matched node plus any edges requested
46/// via [`Query::with_outgoing`] and/or [`Query::with_incoming`].
47///
48/// The `edges` and `incoming_edges` fields are kept separate rather
49/// than folded into one `Vec<(Direction, Edge)>` because 99% of
50/// existing callers only care about outgoing and already destructure
51/// `.edges`. The self-loop case ([`Query::with_any_direction`] on A→A)
52/// returns ONE `Edge` in `edges` (not one in each direction) to avoid
53/// spurious double-counting - a self-loop is structurally one edge,
54/// not two.
55#[derive(Clone, Debug)]
56#[non_exhaustive]
57pub struct QueryHit {
58    /// The matched node.
59    pub node: Node,
60    /// Outgoing edges whose label is in the requested set. Ordered by
61    /// label then edge CID for deterministic consumption.
62    pub edges: Vec<Edge>,
63    /// Incoming edges whose label is in the requested set. Ordered by
64    /// label, then src, then edge CID for deterministic consumption.
65    ///
66    /// Populated only when the query calls [`Query::with_incoming`] or
67    /// [`Query::with_any_direction`]. For pure [`Query::with_outgoing`]
68    /// queries this is always empty.
69    pub incoming_edges: Vec<Edge>,
70    /// `true` if at least one of `edges` / `incoming_edges` was
71    /// truncated by the per-hit adjacency cap. Callers who need the
72    /// full fan-in/out should widen [`Query::adjacency_cap`].
73    pub edges_truncated: bool,
74}
75
76impl QueryHit {
77    /// All outgoing edges in this hit whose `etype` equals `label`.
78    /// Collects into a `Vec<&Edge>` for ergonomic iteration.
79    pub fn edges_by_label(&self, label: &str) -> Vec<&Edge> {
80        self.edges.iter().filter(|e| e.etype == label).collect()
81    }
82
83    /// Streaming version of [`Self::edges_by_label`]: no intermediate
84    /// allocation. Useful in hot loops when a node has many outgoing
85    /// edges and only a fraction match the label.
86    pub fn edges_by_label_iter<'a>(
87        &'a self,
88        label: &'a str,
89    ) -> impl Iterator<Item = &'a Edge> + 'a {
90        self.edges.iter().filter(move |e| e.etype == label)
91    }
92
93    /// All incoming edges in this hit whose `etype` equals `label`.
94    pub fn incoming_by_label(&self, label: &str) -> Vec<&Edge> {
95        self.incoming_edges
96            .iter()
97            .filter(|e| e.etype == label)
98            .collect()
99    }
100}
101
102/// Declarative agent-facing query over a [`ReadonlyRepo`].
103///
104/// Usage:
105/// ```no_run
106/// # use mnem_core::repo::ReadonlyRepo;
107/// # use mnem_core::index::{Query, PropPredicate};
108/// # use ipld_core::ipld::Ipld;
109/// # fn demo(repo: &ReadonlyRepo) -> Result<(), Box<dyn std::error::Error>> {
110/// let hits = Query::new(repo)
111///     .label("Person")
112///     .where_prop("name", PropPredicate::Eq(Ipld::String("Alice".into())))
113///     .with_outgoing("knows")
114///     .limit(10)
115///     .execute()?;
116/// # Ok(()) }
117/// ```
118#[derive(Clone, Debug)]
119pub struct Query<'a> {
120    repo: &'a ReadonlyRepo,
121    label: Option<String>,
122    prop_filter: Option<(String, PropPredicate)>,
123    with_outgoing: Vec<String>,
124    with_incoming: Vec<String>,
125    limit: Option<usize>,
126    adjacency_cap: usize,
127}
128
129impl<'a> Query<'a> {
130    /// Default per-hit cap on how many edges (in each direction) are
131    /// surfaced from the adjacency index. Protects agent-facing
132    /// callers from fan-in/out denial-of-service (a "celebrity" node
133    /// with 1M incoming edges would otherwise allocate a 1M-entry
134    /// `Vec` per hit). Override via [`Self::adjacency_cap`].
135    ///
136    /// The default is intentionally generous (`10_000`) so normal
137    /// knowledge graphs are never clipped; the cap is a safety valve,
138    /// not a performance knob. Callers that legitimately need the
139    /// full fan-in should raise it explicitly and consume the result
140    /// stream.
141    pub const DEFAULT_ADJACENCY_CAP: usize = 10_000;
142
143    /// Start a new query against `repo`.
144    #[must_use]
145    pub const fn new(repo: &'a ReadonlyRepo) -> Self {
146        Self {
147            repo,
148            label: None,
149            prop_filter: None,
150            with_outgoing: Vec::new(),
151            with_incoming: Vec::new(),
152            limit: None,
153            adjacency_cap: Self::DEFAULT_ADJACENCY_CAP,
154        }
155    }
156
157    /// Restrict matches to nodes of a specific label (`ntype`).
158    #[must_use]
159    pub fn label(mut self, label: impl Into<String>) -> Self {
160        self.label = Some(label.into());
161        self
162    }
163
164    /// Add a property predicate. If a label is also set, the indexed
165    /// `(label, prop_name) -> value` Prolly lookup is used (O(log n));
166    /// otherwise the query falls back to a full label scan.
167    #[must_use]
168    pub fn where_prop(mut self, name: impl Into<String>, pred: PropPredicate) -> Self {
169        self.prop_filter = Some((name.into(), pred));
170        self
171    }
172
173    /// Convenience: `where_prop(name, PropPredicate::Eq(value.into()))`.
174    /// The most common agent query shape, one call shorter.
175    #[must_use]
176    pub fn where_eq(self, name: impl Into<String>, value: impl Into<Ipld>) -> Self {
177        self.where_prop(name, PropPredicate::eq(value))
178    }
179
180    /// Include outgoing edges of these labels in every hit.
181    #[must_use]
182    pub fn with_outgoing(mut self, edge_label: impl Into<String>) -> Self {
183        self.with_outgoing.push(edge_label.into());
184        self
185    }
186
187    /// Include incoming edges of these labels in every hit. Symmetric
188    /// mirror of [`Self::with_outgoing`]: answers "who points at me
189    /// through this edge-type?" using the `incoming` Prolly tree in
190    /// O(log n) plus one bucket read per hit.
191    ///
192    /// Populates [`QueryHit::incoming_edges`]. When combined with
193    /// `with_outgoing` in the same query, a hit is kept if it matches
194    /// the base predicates regardless of direction, and each direction's
195    /// edges are surfaced in its own field.
196    #[must_use]
197    pub fn with_incoming(mut self, edge_label: impl Into<String>) -> Self {
198        self.with_incoming.push(edge_label.into());
199        self
200    }
201
202    /// Convenience: ask for this edge-type in BOTH directions. Saves
203    /// the caller from writing `with_outgoing(x).with_incoming(x)`
204    /// every time.
205    ///
206    /// Self-loops (edges where `src == dst`) appear in `edges` only,
207    /// not duplicated into `incoming_edges`. The execute path detects
208    /// the self-loop case and deduplicates on `EdgeId`.
209    #[must_use]
210    pub fn with_any_direction(mut self, edge_label: impl Into<String>) -> Self {
211        let l = edge_label.into();
212        self.with_outgoing.push(l.clone());
213        self.with_incoming.push(l);
214        self
215    }
216
217    /// Override the per-hit adjacency cap. See
218    /// [`Self::DEFAULT_ADJACENCY_CAP`] for the rationale.
219    #[must_use]
220    pub const fn adjacency_cap(mut self, cap: usize) -> Self {
221        self.adjacency_cap = cap;
222        self
223    }
224
225    /// Cap the result set.
226    #[must_use]
227    pub const fn limit(mut self, n: usize) -> Self {
228        self.limit = Some(n);
229        self
230    }
231
232    /// Convenience: execute and return the first hit, or `Ok(None)`
233    /// if the result set is empty. Sets `limit(1)` internally.
234    ///
235    /// # Errors
236    ///
237    /// Same as [`Self::execute`].
238    pub fn first(mut self) -> Result<Option<QueryHit>, Error> {
239        self.limit = Some(1);
240        let mut hits = self.execute()?;
241        Ok(hits.pop())
242    }
243
244    /// Convenience: execute and return the exactly-one hit, erroring if
245    /// the result set is empty or has more than one match. Useful when
246    /// the agent treats a resolve as a precondition.
247    ///
248    /// Internally sets `limit(2)` so a genuine second hit is detected
249    /// cheaply.
250    ///
251    /// # Errors
252    ///
253    /// - [`RepoError::NotFound`] on zero matches.
254    /// - [`RepoError::AmbiguousMatch`] on >1 match.
255    /// - Propagates any error from [`Self::execute`].
256    pub fn one(mut self) -> Result<QueryHit, Error> {
257        self.limit = Some(2);
258        let hits = self.execute()?;
259        match hits.len() {
260            0 => Err(RepoError::NotFound.into()),
261            1 => Ok(hits.into_iter().next().expect("checked len")),
262            _ => Err(RepoError::AmbiguousMatch.into()),
263        }
264    }
265
266    /// Execute the query against the repo's current commit.
267    ///
268    /// Dispatches to the fastest matching path:
269    /// - label + `prop_eq` with an `IndexSet` present: one Prolly point lookup
270    /// - label-only with an `IndexSet`: label sub-tree cursor, bounded by `limit`
271    /// - otherwise: streaming scan of `commit.nodes` with in-memory filter,
272    ///   also bounded by `limit`
273    ///
274    /// # Errors
275    ///
276    /// - [`RepoError::Uninitialized`] if the repo has no head commit.
277    /// - Store / codec errors from index lookups.
278    pub fn execute(self) -> Result<Vec<QueryHit>, Error> {
279        let bs = self.repo.blockstore().clone();
280        let Some(commit) = self.repo.head_commit() else {
281            return Err(RepoError::Uninitialized.into());
282        };
283        let indexes = match &commit.indexes {
284            Some(idx_cid) => Some(decode_from_store::<IndexSet, _>(&*bs, idx_cid)?),
285            None => None,
286        };
287
288        // Precompute HashSets for O(1) edge-label membership in the
289        // adjacency loaders.
290        let want_out: HashSet<&str> = self.with_outgoing.iter().map(String::as_str).collect();
291        let want_in: HashSet<&str> = self.with_incoming.iter().map(String::as_str).collect();
292        let adj_cap = self.adjacency_cap;
293
294        let mut hits: Vec<QueryHit> = Vec::new();
295        let cap = self.limit.unwrap_or(usize::MAX);
296
297        // Helper closure: load both directions for a single hit, with
298        // self-loop dedup. A self-loop (src == dst) would otherwise
299        // appear once in `edges` and once in `incoming_edges` for the
300        // same node; callers using `with_any_direction` would see
301        // phantom duplicates. We resolve it by keeping the edge in
302        // `edges` (outgoing) and dropping its twin from
303        // `incoming_edges` when the request asked for both directions
304        // of the same label. Comparison is by `EdgeId` (total-
305        // ordering, unique).
306        let build_hit = |node: Node, indexes: Option<&IndexSet>| -> Result<QueryHit, Error> {
307            let (out_edges, out_trunc) = load_outgoing(&*bs, indexes, node.id, &want_out, adj_cap)?;
308            let (mut in_edges, in_trunc) =
309                load_incoming(&*bs, indexes, node.id, &want_in, adj_cap)?;
310            if !in_edges.is_empty() && !out_edges.is_empty() {
311                let out_ids: HashSet<_> = out_edges.iter().map(|e| e.id).collect();
312                in_edges.retain(|e| {
313                    // Drop the self-loop's incoming twin.
314                    !(e.src == e.dst && out_ids.contains(&e.id))
315                });
316            }
317            Ok(QueryHit {
318                node,
319                edges: out_edges,
320                incoming_edges: in_edges,
321                edges_truncated: out_trunc || in_trunc,
322            })
323        };
324
325        match (&self.label, &self.prop_filter, indexes.as_ref()) {
326            (Some(label), Some((prop, PropPredicate::Eq(value))), Some(idx)) => {
327                // Indexed point lookup. Skip the redundant label/prop
328                // filter because the index guarantees the match.
329                if let Some(tree_root) = idx.nodes_by_prop.get(label).and_then(|m| m.get(prop)) {
330                    let key = ProllyKey::new(prop_value_hash(value)?);
331                    if let Some(node_cid) = prolly::lookup(&*bs, tree_root, &key)? {
332                        let node: Node = decode_from_store(&*bs, &node_cid)?;
333                        // Defensive: the 16-byte hash could collide (cosmically
334                        // unlikely with BLAKE3) - reject wrong-label / wrong-
335                        // value nodes silently so callers see "no match."
336                        if node.ntype == *label && node.props.get(prop) == Some(value) {
337                            hits.push(build_hit(node, indexes.as_ref())?);
338                        }
339                    }
340                }
341            }
342            (Some(label), None, Some(idx)) => {
343                // Label cursor: streaming, bounded by limit.
344                if let Some(tree_root) = idx.nodes_by_label.get(label) {
345                    let cursor = Cursor::new(&*bs, tree_root)?;
346                    for entry in cursor {
347                        let (_k, node_cid) = entry?;
348                        let node: Node = decode_from_store(&*bs, &node_cid)?;
349                        // Index already guarantees the label matches; no
350                        // redundant filter needed.
351                        hits.push(build_hit(node, indexes.as_ref())?);
352                        if hits.len() >= cap {
353                            break;
354                        }
355                    }
356                }
357            }
358            _ => {
359                // Streaming fallback: walk the full node tree with
360                // in-memory filter, early-exit on limit.
361                let cursor = Cursor::new(&*bs, &commit.nodes)?;
362                for entry in cursor {
363                    let (_k, node_cid) = entry?;
364                    let node: Node = decode_from_store(&*bs, &node_cid)?;
365                    if let Some(ref lbl) = self.label
366                        && &node.ntype != lbl
367                    {
368                        continue;
369                    }
370                    if let Some((ref prop, PropPredicate::Eq(ref value))) = self.prop_filter
371                        && node.props.get(prop) != Some(value)
372                    {
373                        continue;
374                    }
375                    hits.push(build_hit(node, indexes.as_ref())?);
376                    if hits.len() >= cap {
377                        break;
378                    }
379                }
380            }
381        }
382
383        Ok(hits)
384    }
385}