Skip to main content

mnem_core/index/
query.rs

1//! `Query` engine + predicates + `QueryHit` over an `IndexSet`.
2//!
3//! Extracted from `index.rs` in R3; bodies unchanged.
4
5use std::collections::HashSet;
6
7use ipld_core::ipld::Ipld;
8
9use crate::anchor::is_system_node;
10use crate::error::{Error, RepoError};
11use crate::objects::{Edge, IndexSet, Node};
12use crate::prolly::{self, Cursor, ProllyKey};
13use crate::repo::readonly::{ReadonlyRepo, decode_from_store};
14
15use super::adjacency::{load_incoming, load_outgoing};
16use super::build::prop_value_hash;
17
18/// Predicates supported by [`Query::where_prop`].
19#[derive(Clone, Debug)]
20#[non_exhaustive]
21pub enum PropPredicate {
22    /// Exact value match. Uses the property Prolly index if available.
23    Eq(Ipld),
24}
25
26impl PropPredicate {
27    /// Convenience constructor: `PropPredicate::eq("Alice")` vs
28    /// `PropPredicate::Eq(Ipld::String("Alice".into()))`.
29    pub fn eq(value: impl Into<Ipld>) -> Self {
30        Self::Eq(value.into())
31    }
32}
33
34/// Which direction an edge was loaded from when a query pulls it in.
35///
36/// Informational only; `execute` fills this into each [`Edge`] it
37/// surfaces via an adjacency-carrying field on [`QueryHit`].
38#[derive(Copy, Clone, Debug, PartialEq, Eq)]
39pub enum Direction {
40    /// Edge whose `src` matches the hit node.
41    Outgoing,
42    /// Edge whose `dst` matches the hit node.
43    Incoming,
44}
45
46/// A single query result: the matched node plus any edges requested
47/// via [`Query::with_outgoing`] and/or [`Query::with_incoming`].
48///
49/// The `edges` and `incoming_edges` fields are kept separate rather
50/// than folded into one `Vec<(Direction, Edge)>` because 99% of
51/// existing callers only care about outgoing and already destructure
52/// `.edges`. The self-loop case ([`Query::with_any_direction`] on A→A)
53/// returns ONE `Edge` in `edges` (not one in each direction) to avoid
54/// spurious double-counting - a self-loop is structurally one edge,
55/// not two.
56#[derive(Clone, Debug)]
57#[non_exhaustive]
58pub struct QueryHit {
59    /// The matched node.
60    pub node: Node,
61    /// Outgoing edges whose label is in the requested set. Ordered by
62    /// label then edge CID for deterministic consumption.
63    pub edges: Vec<Edge>,
64    /// Incoming edges whose label is in the requested set. Ordered by
65    /// label, then src, then edge CID for deterministic consumption.
66    ///
67    /// Populated only when the query calls [`Query::with_incoming`] or
68    /// [`Query::with_any_direction`]. For pure [`Query::with_outgoing`]
69    /// queries this is always empty.
70    pub incoming_edges: Vec<Edge>,
71    /// `true` if at least one of `edges` / `incoming_edges` was
72    /// truncated by the per-hit adjacency cap. Callers who need the
73    /// full fan-in/out should widen [`Query::adjacency_cap`].
74    pub edges_truncated: bool,
75}
76
77impl QueryHit {
78    /// All outgoing edges in this hit whose `etype` equals `label`.
79    /// Collects into a `Vec<&Edge>` for ergonomic iteration.
80    pub fn edges_by_label(&self, label: &str) -> Vec<&Edge> {
81        self.edges.iter().filter(|e| e.etype == label).collect()
82    }
83
84    /// Streaming version of [`Self::edges_by_label`]: no intermediate
85    /// allocation. Useful in hot loops when a node has many outgoing
86    /// edges and only a fraction match the label.
87    pub fn edges_by_label_iter<'a>(
88        &'a self,
89        label: &'a str,
90    ) -> impl Iterator<Item = &'a Edge> + 'a {
91        self.edges.iter().filter(move |e| e.etype == label)
92    }
93
94    /// All incoming edges in this hit whose `etype` equals `label`.
95    pub fn incoming_by_label(&self, label: &str) -> Vec<&Edge> {
96        self.incoming_edges
97            .iter()
98            .filter(|e| e.etype == label)
99            .collect()
100    }
101}
102
103/// Declarative agent-facing query over a [`ReadonlyRepo`].
104///
105/// Usage:
106/// ```no_run
107/// # use mnem_core::repo::ReadonlyRepo;
108/// # use mnem_core::index::{Query, PropPredicate};
109/// # use ipld_core::ipld::Ipld;
110/// # fn demo(repo: &ReadonlyRepo) -> Result<(), Box<dyn std::error::Error>> {
111/// let hits = Query::new(repo)
112///     .label("Person")
113///     .where_prop("name", PropPredicate::Eq(Ipld::String("Alice".into())))
114///     .with_outgoing("knows")
115///     .limit(10)
116///     .execute()?;
117/// # Ok(()) }
118/// ```
119#[derive(Clone, Debug)]
120pub struct Query<'a> {
121    repo: &'a ReadonlyRepo,
122    label: Option<String>,
123    prop_filter: Option<(String, PropPredicate)>,
124    with_outgoing: Vec<String>,
125    with_incoming: Vec<String>,
126    limit: Option<usize>,
127    adjacency_cap: usize,
128    include_tombstoned: bool,
129    /// When `true`, system-reserved nodes (today: the `mnem init`
130    /// anchor) are kept in the result set. Defaults to `false` so
131    /// agent-facing queries never surface bookkeeping noise.
132    /// Audit / admin callers opt back in via [`Self::include_system`].
133    include_system: bool,
134}
135
136impl<'a> Query<'a> {
137    /// Default per-hit cap on how many edges (in each direction) are
138    /// surfaced from the adjacency index. Protects agent-facing
139    /// callers from fan-in/out denial-of-service (a "celebrity" node
140    /// with 1M incoming edges would otherwise allocate a 1M-entry
141    /// `Vec` per hit). Override via [`Self::adjacency_cap`].
142    ///
143    /// The default is intentionally generous (`10_000`) so normal
144    /// knowledge graphs are never clipped; the cap is a safety valve,
145    /// not a performance knob. Callers that legitimately need the
146    /// full fan-in should raise it explicitly and consume the result
147    /// stream.
148    pub const DEFAULT_ADJACENCY_CAP: usize = 10_000;
149
150    /// Start a new query against `repo`.
151    #[must_use]
152    pub const fn new(repo: &'a ReadonlyRepo) -> Self {
153        Self {
154            repo,
155            label: None,
156            prop_filter: None,
157            with_outgoing: Vec::new(),
158            with_incoming: Vec::new(),
159            limit: None,
160            adjacency_cap: Self::DEFAULT_ADJACENCY_CAP,
161            include_tombstoned: false,
162            include_system: false,
163        }
164    }
165
166    /// Restrict matches to nodes of a specific label (`ntype`).
167    #[must_use]
168    pub fn label(mut self, label: impl Into<String>) -> Self {
169        self.label = Some(label.into());
170        self
171    }
172
173    /// Add a property predicate. If a label is also set, the indexed
174    /// `(label, prop_name) -> value` Prolly lookup is used (O(log n));
175    /// otherwise the query falls back to a full label scan.
176    #[must_use]
177    pub fn where_prop(mut self, name: impl Into<String>, pred: PropPredicate) -> Self {
178        self.prop_filter = Some((name.into(), pred));
179        self
180    }
181
182    /// Convenience: `where_prop(name, PropPredicate::Eq(value.into()))`.
183    /// The most common agent query shape, one call shorter.
184    #[must_use]
185    pub fn where_eq(self, name: impl Into<String>, value: impl Into<Ipld>) -> Self {
186        self.where_prop(name, PropPredicate::eq(value))
187    }
188
189    /// Include outgoing edges of these labels in every hit.
190    #[must_use]
191    pub fn with_outgoing(mut self, edge_label: impl Into<String>) -> Self {
192        self.with_outgoing.push(edge_label.into());
193        self
194    }
195
196    /// Include incoming edges of these labels in every hit. Symmetric
197    /// mirror of [`Self::with_outgoing`]: answers "who points at me
198    /// through this edge-type?" using the `incoming` Prolly tree in
199    /// O(log n) plus one bucket read per hit.
200    ///
201    /// Populates [`QueryHit::incoming_edges`]. When combined with
202    /// `with_outgoing` in the same query, a hit is kept if it matches
203    /// the base predicates regardless of direction, and each direction's
204    /// edges are surfaced in its own field.
205    #[must_use]
206    pub fn with_incoming(mut self, edge_label: impl Into<String>) -> Self {
207        self.with_incoming.push(edge_label.into());
208        self
209    }
210
211    /// Convenience: ask for this edge-type in BOTH directions. Saves
212    /// the caller from writing `with_outgoing(x).with_incoming(x)`
213    /// every time.
214    ///
215    /// Self-loops (edges where `src == dst`) appear in `edges` only,
216    /// not duplicated into `incoming_edges`. The execute path detects
217    /// the self-loop case and deduplicates on `EdgeId`.
218    #[must_use]
219    pub fn with_any_direction(mut self, edge_label: impl Into<String>) -> Self {
220        let l = edge_label.into();
221        self.with_outgoing.push(l.clone());
222        self.with_incoming.push(l);
223        self
224    }
225
226    /// Override the per-hit adjacency cap. See
227    /// [`Self::DEFAULT_ADJACENCY_CAP`] for the rationale.
228    #[must_use]
229    pub const fn adjacency_cap(mut self, cap: usize) -> Self {
230        self.adjacency_cap = cap;
231        self
232    }
233
234    /// Include tombstoned nodes in results. Defaults to false so normal
235    /// retrieval/query paths honor privacy revocations.
236    #[must_use]
237    pub const fn include_tombstoned(mut self, include: bool) -> Self {
238        self.include_tombstoned = include;
239        self
240    }
241
242    /// Include system-reserved nodes (today: the `mnem init` anchor)
243    /// in results. Defaults to false so agent-facing surfaces never
244    /// see graph bookkeeping. Flip to true for audit / admin flows
245    /// that need to inspect or repair the anchor.
246    ///
247    /// Mirrors [`Self::include_tombstoned`]: both filters live in the
248    /// same execute branches, both default to "hide", both opt-in.
249    #[must_use]
250    pub const fn include_system(mut self, include: bool) -> Self {
251        self.include_system = include;
252        self
253    }
254
255    /// Cap the result set.
256    #[must_use]
257    pub const fn limit(mut self, n: usize) -> Self {
258        self.limit = Some(n);
259        self
260    }
261
262    /// Convenience: execute and return the first hit, or `Ok(None)`
263    /// if the result set is empty. Sets `limit(1)` internally.
264    ///
265    /// # Errors
266    ///
267    /// Same as [`Self::execute`].
268    pub fn first(mut self) -> Result<Option<QueryHit>, Error> {
269        self.limit = Some(1);
270        let mut hits = self.execute()?;
271        Ok(hits.pop())
272    }
273
274    /// Convenience: execute and return the exactly-one hit, erroring if
275    /// the result set is empty or has more than one match. Useful when
276    /// the agent treats a resolve as a precondition.
277    ///
278    /// Internally sets `limit(2)` so a genuine second hit is detected
279    /// cheaply.
280    ///
281    /// # Errors
282    ///
283    /// - [`RepoError::NotFound`] on zero matches.
284    /// - [`RepoError::AmbiguousMatch`] on >1 match.
285    /// - Propagates any error from [`Self::execute`].
286    pub fn one(mut self) -> Result<QueryHit, Error> {
287        self.limit = Some(2);
288        let hits = self.execute()?;
289        match hits.len() {
290            0 => Err(RepoError::NotFound.into()),
291            1 => Ok(hits.into_iter().next().expect("checked len")),
292            _ => Err(RepoError::AmbiguousMatch.into()),
293        }
294    }
295
296    /// Execute the query against the repo's current commit.
297    ///
298    /// Dispatches to the fastest matching path:
299    /// - label + `prop_eq` with an `IndexSet` present: one Prolly point lookup
300    /// - label-only with an `IndexSet`: label sub-tree cursor, bounded by `limit`
301    /// - otherwise: streaming scan of `commit.nodes` with in-memory filter,
302    ///   also bounded by `limit`
303    ///
304    /// # Errors
305    ///
306    /// - [`RepoError::Uninitialized`] if the repo has no head commit.
307    /// - Store / codec errors from index lookups.
308    pub fn execute(self) -> Result<Vec<QueryHit>, Error> {
309        let bs = self.repo.blockstore().clone();
310        let Some(commit) = self.repo.head_commit() else {
311            return Err(RepoError::Uninitialized.into());
312        };
313        let indexes = match &commit.indexes {
314            Some(idx_cid) => Some(decode_from_store::<IndexSet, _>(&*bs, idx_cid)?),
315            None => None,
316        };
317
318        // Precompute HashSets for O(1) edge-label membership in the
319        // adjacency loaders.
320        let want_out: HashSet<&str> = self.with_outgoing.iter().map(String::as_str).collect();
321        let want_in: HashSet<&str> = self.with_incoming.iter().map(String::as_str).collect();
322        let adj_cap = self.adjacency_cap;
323
324        let mut hits: Vec<QueryHit> = Vec::new();
325        let cap = self.limit.unwrap_or(usize::MAX);
326
327        // Helper closure: load both directions for a single hit, with
328        // self-loop dedup. A self-loop (src == dst) would otherwise
329        // appear once in `edges` and once in `incoming_edges` for the
330        // same node; callers using `with_any_direction` would see
331        // phantom duplicates. We resolve it by keeping the edge in
332        // `edges` (outgoing) and dropping its twin from
333        // `incoming_edges` when the request asked for both directions
334        // of the same label. Comparison is by `EdgeId` (total-
335        // ordering, unique).
336        let build_hit = |node: Node, indexes: Option<&IndexSet>| -> Result<QueryHit, Error> {
337            let (out_edges, out_trunc) = load_outgoing(&*bs, indexes, node.id, &want_out, adj_cap)?;
338            let (mut in_edges, in_trunc) =
339                load_incoming(&*bs, indexes, node.id, &want_in, adj_cap)?;
340            if !in_edges.is_empty() && !out_edges.is_empty() {
341                let out_ids: HashSet<_> = out_edges.iter().map(|e| e.id).collect();
342                in_edges.retain(|e| {
343                    // Drop the self-loop's incoming twin.
344                    !(e.src == e.dst && out_ids.contains(&e.id))
345                });
346            }
347            Ok(QueryHit {
348                node,
349                edges: out_edges,
350                incoming_edges: in_edges,
351                edges_truncated: out_trunc || in_trunc,
352            })
353        };
354
355        match (&self.label, &self.prop_filter, indexes.as_ref()) {
356            (Some(label), Some((prop, PropPredicate::Eq(value))), Some(idx)) => {
357                // Indexed point lookup. Skip the redundant label/prop
358                // filter because the index guarantees the match.
359                if let Some(tree_root) = idx.nodes_by_prop.get(label).and_then(|m| m.get(prop)) {
360                    let key = ProllyKey::new(prop_value_hash(value)?);
361                    if let Some(node_cid) = prolly::lookup(&*bs, tree_root, &key)? {
362                        let node: Node = decode_from_store(&*bs, &node_cid)?;
363                        // Defensive: the 16-byte hash could collide (cosmically
364                        // unlikely with BLAKE3) - reject wrong-label / wrong-
365                        // value nodes silently so callers see "no match."
366                        if node.ntype == *label
367                            && node.props.get(prop) == Some(value)
368                            && (self.include_tombstoned || !self.repo.is_tombstoned(&node.id))
369                            && (self.include_system || !is_system_node(&node))
370                        {
371                            hits.push(build_hit(node, indexes.as_ref())?);
372                        }
373                    }
374                }
375            }
376            (Some(label), None, Some(idx)) => {
377                // Label cursor: streaming, bounded by limit.
378                if let Some(tree_root) = idx.nodes_by_label.get(label) {
379                    let cursor = Cursor::new(&*bs, tree_root)?;
380                    for entry in cursor {
381                        let (_k, node_cid) = entry?;
382                        let node: Node = decode_from_store(&*bs, &node_cid)?;
383                        if !self.include_tombstoned && self.repo.is_tombstoned(&node.id) {
384                            continue;
385                        }
386                        if !self.include_system && is_system_node(&node) {
387                            continue;
388                        }
389                        // Index already guarantees the label matches; no
390                        // redundant filter needed.
391                        hits.push(build_hit(node, indexes.as_ref())?);
392                        if hits.len() >= cap {
393                            break;
394                        }
395                    }
396                }
397            }
398            _ => {
399                // Streaming fallback: walk the full node tree with
400                // in-memory filter, early-exit on limit.
401                let cursor = Cursor::new(&*bs, &commit.nodes)?;
402                for entry in cursor {
403                    let (_k, node_cid) = entry?;
404                    let node: Node = decode_from_store(&*bs, &node_cid)?;
405                    if !self.include_tombstoned && self.repo.is_tombstoned(&node.id) {
406                        continue;
407                    }
408                    if !self.include_system && is_system_node(&node) {
409                        continue;
410                    }
411                    if let Some(ref lbl) = self.label
412                        && &node.ntype != lbl
413                    {
414                        continue;
415                    }
416                    if let Some((ref prop, PropPredicate::Eq(ref value))) = self.prop_filter
417                        && node.props.get(prop) != Some(value)
418                    {
419                        continue;
420                    }
421                    hits.push(build_hit(node, indexes.as_ref())?);
422                    if hits.len() >= cap {
423                        break;
424                    }
425                }
426            }
427        }
428
429        Ok(hits)
430    }
431}