mnem_core/index/query.rs
1//! `Query` engine + predicates + `QueryHit` over an `IndexSet`.
2//!
3//! Extracted from `index.rs` in R3; bodies unchanged.
4
5use std::collections::HashSet;
6
7use ipld_core::ipld::Ipld;
8
9use crate::anchor::is_system_node;
10use crate::error::{Error, RepoError};
11use crate::objects::{Edge, IndexSet, Node};
12use crate::prolly::{self, Cursor, ProllyKey};
13use crate::repo::readonly::{ReadonlyRepo, decode_from_store};
14
15use super::adjacency::{load_incoming, load_outgoing};
16use super::build::prop_value_hash;
17
18/// Predicates supported by [`Query::where_prop`].
19#[derive(Clone, Debug)]
20#[non_exhaustive]
21pub enum PropPredicate {
22 /// Exact value match. Uses the property Prolly index if available.
23 Eq(Ipld),
24}
25
26impl PropPredicate {
27 /// Convenience constructor: `PropPredicate::eq("Alice")` vs
28 /// `PropPredicate::Eq(Ipld::String("Alice".into()))`.
29 pub fn eq(value: impl Into<Ipld>) -> Self {
30 Self::Eq(value.into())
31 }
32}
33
34/// Which direction an edge was loaded from when a query pulls it in.
35///
36/// Informational only; `execute` fills this into each [`Edge`] it
37/// surfaces via an adjacency-carrying field on [`QueryHit`].
38#[derive(Copy, Clone, Debug, PartialEq, Eq)]
39pub enum Direction {
40 /// Edge whose `src` matches the hit node.
41 Outgoing,
42 /// Edge whose `dst` matches the hit node.
43 Incoming,
44}
45
46/// A single query result: the matched node plus any edges requested
47/// via [`Query::with_outgoing`] and/or [`Query::with_incoming`].
48///
49/// The `edges` and `incoming_edges` fields are kept separate rather
50/// than folded into one `Vec<(Direction, Edge)>` because 99% of
51/// existing callers only care about outgoing and already destructure
52/// `.edges`. The self-loop case ([`Query::with_any_direction`] on A→A)
53/// returns ONE `Edge` in `edges` (not one in each direction) to avoid
54/// spurious double-counting - a self-loop is structurally one edge,
55/// not two.
56#[derive(Clone, Debug)]
57#[non_exhaustive]
58pub struct QueryHit {
59 /// The matched node.
60 pub node: Node,
61 /// Outgoing edges whose label is in the requested set. Ordered by
62 /// label then edge CID for deterministic consumption.
63 pub edges: Vec<Edge>,
64 /// Incoming edges whose label is in the requested set. Ordered by
65 /// label, then src, then edge CID for deterministic consumption.
66 ///
67 /// Populated only when the query calls [`Query::with_incoming`] or
68 /// [`Query::with_any_direction`]. For pure [`Query::with_outgoing`]
69 /// queries this is always empty.
70 pub incoming_edges: Vec<Edge>,
71 /// `true` if at least one of `edges` / `incoming_edges` was
72 /// truncated by the per-hit adjacency cap. Callers who need the
73 /// full fan-in/out should widen [`Query::adjacency_cap`].
74 pub edges_truncated: bool,
75}
76
77impl QueryHit {
78 /// All outgoing edges in this hit whose `etype` equals `label`.
79 /// Collects into a `Vec<&Edge>` for ergonomic iteration.
80 pub fn edges_by_label(&self, label: &str) -> Vec<&Edge> {
81 self.edges.iter().filter(|e| e.etype == label).collect()
82 }
83
84 /// Streaming version of [`Self::edges_by_label`]: no intermediate
85 /// allocation. Useful in hot loops when a node has many outgoing
86 /// edges and only a fraction match the label.
87 pub fn edges_by_label_iter<'a>(
88 &'a self,
89 label: &'a str,
90 ) -> impl Iterator<Item = &'a Edge> + 'a {
91 self.edges.iter().filter(move |e| e.etype == label)
92 }
93
94 /// All incoming edges in this hit whose `etype` equals `label`.
95 pub fn incoming_by_label(&self, label: &str) -> Vec<&Edge> {
96 self.incoming_edges
97 .iter()
98 .filter(|e| e.etype == label)
99 .collect()
100 }
101}
102
103/// Declarative agent-facing query over a [`ReadonlyRepo`].
104///
105/// Usage:
106/// ```no_run
107/// # use mnem_core::repo::ReadonlyRepo;
108/// # use mnem_core::index::{Query, PropPredicate};
109/// # use ipld_core::ipld::Ipld;
110/// # fn demo(repo: &ReadonlyRepo) -> Result<(), Box<dyn std::error::Error>> {
111/// let hits = Query::new(repo)
112/// .label("Person")
113/// .where_prop("name", PropPredicate::Eq(Ipld::String("Alice".into())))
114/// .with_outgoing("knows")
115/// .limit(10)
116/// .execute()?;
117/// # Ok(()) }
118/// ```
119#[derive(Clone, Debug)]
120pub struct Query<'a> {
121 repo: &'a ReadonlyRepo,
122 label: Option<String>,
123 prop_filter: Option<(String, PropPredicate)>,
124 with_outgoing: Vec<String>,
125 with_incoming: Vec<String>,
126 limit: Option<usize>,
127 adjacency_cap: usize,
128 include_tombstoned: bool,
129 /// When `true`, system-reserved nodes (today: the `mnem init`
130 /// anchor) are kept in the result set. Defaults to `false` so
131 /// agent-facing queries never surface bookkeeping noise.
132 /// Audit / admin callers opt back in via [`Self::include_system`].
133 include_system: bool,
134}
135
136impl<'a> Query<'a> {
137 /// Default per-hit cap on how many edges (in each direction) are
138 /// surfaced from the adjacency index. Protects agent-facing
139 /// callers from fan-in/out denial-of-service (a "celebrity" node
140 /// with 1M incoming edges would otherwise allocate a 1M-entry
141 /// `Vec` per hit). Override via [`Self::adjacency_cap`].
142 ///
143 /// The default is intentionally generous (`10_000`) so normal
144 /// knowledge graphs are never clipped; the cap is a safety valve,
145 /// not a performance knob. Callers that legitimately need the
146 /// full fan-in should raise it explicitly and consume the result
147 /// stream.
148 pub const DEFAULT_ADJACENCY_CAP: usize = 10_000;
149
150 /// Start a new query against `repo`.
151 #[must_use]
152 pub const fn new(repo: &'a ReadonlyRepo) -> Self {
153 Self {
154 repo,
155 label: None,
156 prop_filter: None,
157 with_outgoing: Vec::new(),
158 with_incoming: Vec::new(),
159 limit: None,
160 adjacency_cap: Self::DEFAULT_ADJACENCY_CAP,
161 include_tombstoned: false,
162 include_system: false,
163 }
164 }
165
166 /// Restrict matches to nodes of a specific label (`ntype`).
167 #[must_use]
168 pub fn label(mut self, label: impl Into<String>) -> Self {
169 self.label = Some(label.into());
170 self
171 }
172
173 /// Add a property predicate. If a label is also set, the indexed
174 /// `(label, prop_name) -> value` Prolly lookup is used (O(log n));
175 /// otherwise the query falls back to a full label scan.
176 #[must_use]
177 pub fn where_prop(mut self, name: impl Into<String>, pred: PropPredicate) -> Self {
178 self.prop_filter = Some((name.into(), pred));
179 self
180 }
181
182 /// Convenience: `where_prop(name, PropPredicate::Eq(value.into()))`.
183 /// The most common agent query shape, one call shorter.
184 #[must_use]
185 pub fn where_eq(self, name: impl Into<String>, value: impl Into<Ipld>) -> Self {
186 self.where_prop(name, PropPredicate::eq(value))
187 }
188
189 /// Include outgoing edges of these labels in every hit.
190 #[must_use]
191 pub fn with_outgoing(mut self, edge_label: impl Into<String>) -> Self {
192 self.with_outgoing.push(edge_label.into());
193 self
194 }
195
196 /// Include incoming edges of these labels in every hit. Symmetric
197 /// mirror of [`Self::with_outgoing`]: answers "who points at me
198 /// through this edge-type?" using the `incoming` Prolly tree in
199 /// O(log n) plus one bucket read per hit.
200 ///
201 /// Populates [`QueryHit::incoming_edges`]. When combined with
202 /// `with_outgoing` in the same query, a hit is kept if it matches
203 /// the base predicates regardless of direction, and each direction's
204 /// edges are surfaced in its own field.
205 #[must_use]
206 pub fn with_incoming(mut self, edge_label: impl Into<String>) -> Self {
207 self.with_incoming.push(edge_label.into());
208 self
209 }
210
211 /// Convenience: ask for this edge-type in BOTH directions. Saves
212 /// the caller from writing `with_outgoing(x).with_incoming(x)`
213 /// every time.
214 ///
215 /// Self-loops (edges where `src == dst`) appear in `edges` only,
216 /// not duplicated into `incoming_edges`. The execute path detects
217 /// the self-loop case and deduplicates on `EdgeId`.
218 #[must_use]
219 pub fn with_any_direction(mut self, edge_label: impl Into<String>) -> Self {
220 let l = edge_label.into();
221 self.with_outgoing.push(l.clone());
222 self.with_incoming.push(l);
223 self
224 }
225
226 /// Override the per-hit adjacency cap. See
227 /// [`Self::DEFAULT_ADJACENCY_CAP`] for the rationale.
228 #[must_use]
229 pub const fn adjacency_cap(mut self, cap: usize) -> Self {
230 self.adjacency_cap = cap;
231 self
232 }
233
234 /// Include tombstoned nodes in results. Defaults to false so normal
235 /// retrieval/query paths honor privacy revocations.
236 #[must_use]
237 pub const fn include_tombstoned(mut self, include: bool) -> Self {
238 self.include_tombstoned = include;
239 self
240 }
241
242 /// Include system-reserved nodes (today: the `mnem init` anchor)
243 /// in results. Defaults to false so agent-facing surfaces never
244 /// see graph bookkeeping. Flip to true for audit / admin flows
245 /// that need to inspect or repair the anchor.
246 ///
247 /// Mirrors [`Self::include_tombstoned`]: both filters live in the
248 /// same execute branches, both default to "hide", both opt-in.
249 #[must_use]
250 pub const fn include_system(mut self, include: bool) -> Self {
251 self.include_system = include;
252 self
253 }
254
255 /// Cap the result set.
256 #[must_use]
257 pub const fn limit(mut self, n: usize) -> Self {
258 self.limit = Some(n);
259 self
260 }
261
262 /// Convenience: execute and return the first hit, or `Ok(None)`
263 /// if the result set is empty. Sets `limit(1)` internally.
264 ///
265 /// # Errors
266 ///
267 /// Same as [`Self::execute`].
268 pub fn first(mut self) -> Result<Option<QueryHit>, Error> {
269 self.limit = Some(1);
270 let mut hits = self.execute()?;
271 Ok(hits.pop())
272 }
273
274 /// Convenience: execute and return the exactly-one hit, erroring if
275 /// the result set is empty or has more than one match. Useful when
276 /// the agent treats a resolve as a precondition.
277 ///
278 /// Internally sets `limit(2)` so a genuine second hit is detected
279 /// cheaply.
280 ///
281 /// # Errors
282 ///
283 /// - [`RepoError::NotFound`] on zero matches.
284 /// - [`RepoError::AmbiguousMatch`] on >1 match.
285 /// - Propagates any error from [`Self::execute`].
286 pub fn one(mut self) -> Result<QueryHit, Error> {
287 self.limit = Some(2);
288 let hits = self.execute()?;
289 match hits.len() {
290 0 => Err(RepoError::NotFound.into()),
291 1 => Ok(hits.into_iter().next().expect("checked len")),
292 _ => Err(RepoError::AmbiguousMatch.into()),
293 }
294 }
295
296 /// Execute the query against the repo's current commit.
297 ///
298 /// Dispatches to the fastest matching path:
299 /// - label + `prop_eq` with an `IndexSet` present: one Prolly point lookup
300 /// - label-only with an `IndexSet`: label sub-tree cursor, bounded by `limit`
301 /// - otherwise: streaming scan of `commit.nodes` with in-memory filter,
302 /// also bounded by `limit`
303 ///
304 /// # Errors
305 ///
306 /// - [`RepoError::Uninitialized`] if the repo has no head commit.
307 /// - Store / codec errors from index lookups.
308 pub fn execute(self) -> Result<Vec<QueryHit>, Error> {
309 let bs = self.repo.blockstore().clone();
310 let Some(commit) = self.repo.head_commit() else {
311 return Err(RepoError::Uninitialized.into());
312 };
313 let indexes = match &commit.indexes {
314 Some(idx_cid) => Some(decode_from_store::<IndexSet, _>(&*bs, idx_cid)?),
315 None => None,
316 };
317
318 // Precompute HashSets for O(1) edge-label membership in the
319 // adjacency loaders.
320 let want_out: HashSet<&str> = self.with_outgoing.iter().map(String::as_str).collect();
321 let want_in: HashSet<&str> = self.with_incoming.iter().map(String::as_str).collect();
322 let adj_cap = self.adjacency_cap;
323
324 let mut hits: Vec<QueryHit> = Vec::new();
325 let cap = self.limit.unwrap_or(usize::MAX);
326
327 // Helper closure: load both directions for a single hit, with
328 // self-loop dedup. A self-loop (src == dst) would otherwise
329 // appear once in `edges` and once in `incoming_edges` for the
330 // same node; callers using `with_any_direction` would see
331 // phantom duplicates. We resolve it by keeping the edge in
332 // `edges` (outgoing) and dropping its twin from
333 // `incoming_edges` when the request asked for both directions
334 // of the same label. Comparison is by `EdgeId` (total-
335 // ordering, unique).
336 let build_hit = |node: Node, indexes: Option<&IndexSet>| -> Result<QueryHit, Error> {
337 let (out_edges, out_trunc) = load_outgoing(&*bs, indexes, node.id, &want_out, adj_cap)?;
338 let (mut in_edges, in_trunc) =
339 load_incoming(&*bs, indexes, node.id, &want_in, adj_cap)?;
340 if !in_edges.is_empty() && !out_edges.is_empty() {
341 let out_ids: HashSet<_> = out_edges.iter().map(|e| e.id).collect();
342 in_edges.retain(|e| {
343 // Drop the self-loop's incoming twin.
344 !(e.src == e.dst && out_ids.contains(&e.id))
345 });
346 }
347 Ok(QueryHit {
348 node,
349 edges: out_edges,
350 incoming_edges: in_edges,
351 edges_truncated: out_trunc || in_trunc,
352 })
353 };
354
355 match (&self.label, &self.prop_filter, indexes.as_ref()) {
356 (Some(label), Some((prop, PropPredicate::Eq(value))), Some(idx)) => {
357 // Indexed point lookup. Skip the redundant label/prop
358 // filter because the index guarantees the match.
359 if let Some(tree_root) = idx.nodes_by_prop.get(label).and_then(|m| m.get(prop)) {
360 let key = ProllyKey::new(prop_value_hash(value)?);
361 if let Some(node_cid) = prolly::lookup(&*bs, tree_root, &key)? {
362 let node: Node = decode_from_store(&*bs, &node_cid)?;
363 // Defensive: the 16-byte hash could collide (cosmically
364 // unlikely with BLAKE3) - reject wrong-label / wrong-
365 // value nodes silently so callers see "no match."
366 if node.ntype == *label
367 && node.props.get(prop) == Some(value)
368 && (self.include_tombstoned || !self.repo.is_tombstoned(&node.id))
369 && (self.include_system || !is_system_node(&node))
370 {
371 hits.push(build_hit(node, indexes.as_ref())?);
372 }
373 }
374 }
375 }
376 (Some(label), None, Some(idx)) => {
377 // Label cursor: streaming, bounded by limit.
378 if let Some(tree_root) = idx.nodes_by_label.get(label) {
379 let cursor = Cursor::new(&*bs, tree_root)?;
380 for entry in cursor {
381 let (_k, node_cid) = entry?;
382 let node: Node = decode_from_store(&*bs, &node_cid)?;
383 if !self.include_tombstoned && self.repo.is_tombstoned(&node.id) {
384 continue;
385 }
386 if !self.include_system && is_system_node(&node) {
387 continue;
388 }
389 // Index already guarantees the label matches; no
390 // redundant filter needed.
391 hits.push(build_hit(node, indexes.as_ref())?);
392 if hits.len() >= cap {
393 break;
394 }
395 }
396 }
397 }
398 _ => {
399 // Streaming fallback: walk the full node tree with
400 // in-memory filter, early-exit on limit.
401 let cursor = Cursor::new(&*bs, &commit.nodes)?;
402 for entry in cursor {
403 let (_k, node_cid) = entry?;
404 let node: Node = decode_from_store(&*bs, &node_cid)?;
405 if !self.include_tombstoned && self.repo.is_tombstoned(&node.id) {
406 continue;
407 }
408 if !self.include_system && is_system_node(&node) {
409 continue;
410 }
411 if let Some(ref lbl) = self.label
412 && &node.ntype != lbl
413 {
414 continue;
415 }
416 if let Some((ref prop, PropPredicate::Eq(ref value))) = self.prop_filter
417 && node.props.get(prop) != Some(value)
418 {
419 continue;
420 }
421 hits.push(build_hit(node, indexes.as_ref())?);
422 if hits.len() >= cap {
423 break;
424 }
425 }
426 }
427 }
428
429 Ok(hits)
430 }
431}