mnem_core/index/query.rs
1//! `Query` engine + predicates + `QueryHit` over an `IndexSet`.
2//!
3//! Extracted from `index.rs` in R3; bodies unchanged.
4
5use std::collections::HashSet;
6
7use ipld_core::ipld::Ipld;
8
9use crate::error::{Error, RepoError};
10use crate::objects::{Edge, IndexSet, Node};
11use crate::prolly::{self, Cursor, ProllyKey};
12use crate::repo::readonly::{ReadonlyRepo, decode_from_store};
13
14use super::adjacency::{load_incoming, load_outgoing};
15use super::build::prop_value_hash;
16
17/// Predicates supported by [`Query::where_prop`].
18#[derive(Clone, Debug)]
19#[non_exhaustive]
20pub enum PropPredicate {
21 /// Exact value match. Uses the property Prolly index if available.
22 Eq(Ipld),
23}
24
25impl PropPredicate {
26 /// Convenience constructor: `PropPredicate::eq("Alice")` vs
27 /// `PropPredicate::Eq(Ipld::String("Alice".into()))`.
28 pub fn eq(value: impl Into<Ipld>) -> Self {
29 Self::Eq(value.into())
30 }
31}
32
33/// Which direction an edge was loaded from when a query pulls it in.
34///
35/// Informational only; `execute` fills this into each [`Edge`] it
36/// surfaces via an adjacency-carrying field on [`QueryHit`].
37#[derive(Copy, Clone, Debug, PartialEq, Eq)]
38pub enum Direction {
39 /// Edge whose `src` matches the hit node.
40 Outgoing,
41 /// Edge whose `dst` matches the hit node.
42 Incoming,
43}
44
45/// A single query result: the matched node plus any edges requested
46/// via [`Query::with_outgoing`] and/or [`Query::with_incoming`].
47///
48/// The `edges` and `incoming_edges` fields are kept separate rather
49/// than folded into one `Vec<(Direction, Edge)>` because 99% of
50/// existing callers only care about outgoing and already destructure
51/// `.edges`. The self-loop case ([`Query::with_any_direction`] on A→A)
52/// returns ONE `Edge` in `edges` (not one in each direction) to avoid
53/// spurious double-counting - a self-loop is structurally one edge,
54/// not two.
55#[derive(Clone, Debug)]
56#[non_exhaustive]
57pub struct QueryHit {
58 /// The matched node.
59 pub node: Node,
60 /// Outgoing edges whose label is in the requested set. Ordered by
61 /// label then edge CID for deterministic consumption.
62 pub edges: Vec<Edge>,
63 /// Incoming edges whose label is in the requested set. Ordered by
64 /// label, then src, then edge CID for deterministic consumption.
65 ///
66 /// Populated only when the query calls [`Query::with_incoming`] or
67 /// [`Query::with_any_direction`]. For pure [`Query::with_outgoing`]
68 /// queries this is always empty.
69 pub incoming_edges: Vec<Edge>,
70 /// `true` if at least one of `edges` / `incoming_edges` was
71 /// truncated by the per-hit adjacency cap. Callers who need the
72 /// full fan-in/out should widen [`Query::adjacency_cap`].
73 pub edges_truncated: bool,
74}
75
76impl QueryHit {
77 /// All outgoing edges in this hit whose `etype` equals `label`.
78 /// Collects into a `Vec<&Edge>` for ergonomic iteration.
79 pub fn edges_by_label(&self, label: &str) -> Vec<&Edge> {
80 self.edges.iter().filter(|e| e.etype == label).collect()
81 }
82
83 /// Streaming version of [`Self::edges_by_label`]: no intermediate
84 /// allocation. Useful in hot loops when a node has many outgoing
85 /// edges and only a fraction match the label.
86 pub fn edges_by_label_iter<'a>(
87 &'a self,
88 label: &'a str,
89 ) -> impl Iterator<Item = &'a Edge> + 'a {
90 self.edges.iter().filter(move |e| e.etype == label)
91 }
92
93 /// All incoming edges in this hit whose `etype` equals `label`.
94 pub fn incoming_by_label(&self, label: &str) -> Vec<&Edge> {
95 self.incoming_edges
96 .iter()
97 .filter(|e| e.etype == label)
98 .collect()
99 }
100}
101
102/// Declarative agent-facing query over a [`ReadonlyRepo`].
103///
104/// Usage:
105/// ```no_run
106/// # use mnem_core::repo::ReadonlyRepo;
107/// # use mnem_core::index::{Query, PropPredicate};
108/// # use ipld_core::ipld::Ipld;
109/// # fn demo(repo: &ReadonlyRepo) -> Result<(), Box<dyn std::error::Error>> {
110/// let hits = Query::new(repo)
111/// .label("Person")
112/// .where_prop("name", PropPredicate::Eq(Ipld::String("Alice".into())))
113/// .with_outgoing("knows")
114/// .limit(10)
115/// .execute()?;
116/// # Ok(()) }
117/// ```
118#[derive(Clone, Debug)]
119pub struct Query<'a> {
120 repo: &'a ReadonlyRepo,
121 label: Option<String>,
122 prop_filter: Option<(String, PropPredicate)>,
123 with_outgoing: Vec<String>,
124 with_incoming: Vec<String>,
125 limit: Option<usize>,
126 adjacency_cap: usize,
127}
128
129impl<'a> Query<'a> {
130 /// Default per-hit cap on how many edges (in each direction) are
131 /// surfaced from the adjacency index. Protects agent-facing
132 /// callers from fan-in/out denial-of-service (a "celebrity" node
133 /// with 1M incoming edges would otherwise allocate a 1M-entry
134 /// `Vec` per hit). Override via [`Self::adjacency_cap`].
135 ///
136 /// The default is intentionally generous (`10_000`) so normal
137 /// knowledge graphs are never clipped; the cap is a safety valve,
138 /// not a performance knob. Callers that legitimately need the
139 /// full fan-in should raise it explicitly and consume the result
140 /// stream.
141 pub const DEFAULT_ADJACENCY_CAP: usize = 10_000;
142
143 /// Start a new query against `repo`.
144 #[must_use]
145 pub const fn new(repo: &'a ReadonlyRepo) -> Self {
146 Self {
147 repo,
148 label: None,
149 prop_filter: None,
150 with_outgoing: Vec::new(),
151 with_incoming: Vec::new(),
152 limit: None,
153 adjacency_cap: Self::DEFAULT_ADJACENCY_CAP,
154 }
155 }
156
157 /// Restrict matches to nodes of a specific label (`ntype`).
158 #[must_use]
159 pub fn label(mut self, label: impl Into<String>) -> Self {
160 self.label = Some(label.into());
161 self
162 }
163
164 /// Add a property predicate. If a label is also set, the indexed
165 /// `(label, prop_name) -> value` Prolly lookup is used (O(log n));
166 /// otherwise the query falls back to a full label scan.
167 #[must_use]
168 pub fn where_prop(mut self, name: impl Into<String>, pred: PropPredicate) -> Self {
169 self.prop_filter = Some((name.into(), pred));
170 self
171 }
172
173 /// Convenience: `where_prop(name, PropPredicate::Eq(value.into()))`.
174 /// The most common agent query shape, one call shorter.
175 #[must_use]
176 pub fn where_eq(self, name: impl Into<String>, value: impl Into<Ipld>) -> Self {
177 self.where_prop(name, PropPredicate::eq(value))
178 }
179
180 /// Include outgoing edges of these labels in every hit.
181 #[must_use]
182 pub fn with_outgoing(mut self, edge_label: impl Into<String>) -> Self {
183 self.with_outgoing.push(edge_label.into());
184 self
185 }
186
187 /// Include incoming edges of these labels in every hit. Symmetric
188 /// mirror of [`Self::with_outgoing`]: answers "who points at me
189 /// through this edge-type?" using the `incoming` Prolly tree in
190 /// O(log n) plus one bucket read per hit.
191 ///
192 /// Populates [`QueryHit::incoming_edges`]. When combined with
193 /// `with_outgoing` in the same query, a hit is kept if it matches
194 /// the base predicates regardless of direction, and each direction's
195 /// edges are surfaced in its own field.
196 #[must_use]
197 pub fn with_incoming(mut self, edge_label: impl Into<String>) -> Self {
198 self.with_incoming.push(edge_label.into());
199 self
200 }
201
202 /// Convenience: ask for this edge-type in BOTH directions. Saves
203 /// the caller from writing `with_outgoing(x).with_incoming(x)`
204 /// every time.
205 ///
206 /// Self-loops (edges where `src == dst`) appear in `edges` only,
207 /// not duplicated into `incoming_edges`. The execute path detects
208 /// the self-loop case and deduplicates on `EdgeId`.
209 #[must_use]
210 pub fn with_any_direction(mut self, edge_label: impl Into<String>) -> Self {
211 let l = edge_label.into();
212 self.with_outgoing.push(l.clone());
213 self.with_incoming.push(l);
214 self
215 }
216
217 /// Override the per-hit adjacency cap. See
218 /// [`Self::DEFAULT_ADJACENCY_CAP`] for the rationale.
219 #[must_use]
220 pub const fn adjacency_cap(mut self, cap: usize) -> Self {
221 self.adjacency_cap = cap;
222 self
223 }
224
225 /// Cap the result set.
226 #[must_use]
227 pub const fn limit(mut self, n: usize) -> Self {
228 self.limit = Some(n);
229 self
230 }
231
232 /// Convenience: execute and return the first hit, or `Ok(None)`
233 /// if the result set is empty. Sets `limit(1)` internally.
234 ///
235 /// # Errors
236 ///
237 /// Same as [`Self::execute`].
238 pub fn first(mut self) -> Result<Option<QueryHit>, Error> {
239 self.limit = Some(1);
240 let mut hits = self.execute()?;
241 Ok(hits.pop())
242 }
243
244 /// Convenience: execute and return the exactly-one hit, erroring if
245 /// the result set is empty or has more than one match. Useful when
246 /// the agent treats a resolve as a precondition.
247 ///
248 /// Internally sets `limit(2)` so a genuine second hit is detected
249 /// cheaply.
250 ///
251 /// # Errors
252 ///
253 /// - [`RepoError::NotFound`] on zero matches.
254 /// - [`RepoError::AmbiguousMatch`] on >1 match.
255 /// - Propagates any error from [`Self::execute`].
256 pub fn one(mut self) -> Result<QueryHit, Error> {
257 self.limit = Some(2);
258 let hits = self.execute()?;
259 match hits.len() {
260 0 => Err(RepoError::NotFound.into()),
261 1 => Ok(hits.into_iter().next().expect("checked len")),
262 _ => Err(RepoError::AmbiguousMatch.into()),
263 }
264 }
265
266 /// Execute the query against the repo's current commit.
267 ///
268 /// Dispatches to the fastest matching path:
269 /// - label + `prop_eq` with an `IndexSet` present: one Prolly point lookup
270 /// - label-only with an `IndexSet`: label sub-tree cursor, bounded by `limit`
271 /// - otherwise: streaming scan of `commit.nodes` with in-memory filter,
272 /// also bounded by `limit`
273 ///
274 /// # Errors
275 ///
276 /// - [`RepoError::Uninitialized`] if the repo has no head commit.
277 /// - Store / codec errors from index lookups.
278 pub fn execute(self) -> Result<Vec<QueryHit>, Error> {
279 let bs = self.repo.blockstore().clone();
280 let Some(commit) = self.repo.head_commit() else {
281 return Err(RepoError::Uninitialized.into());
282 };
283 let indexes = match &commit.indexes {
284 Some(idx_cid) => Some(decode_from_store::<IndexSet, _>(&*bs, idx_cid)?),
285 None => None,
286 };
287
288 // Precompute HashSets for O(1) edge-label membership in the
289 // adjacency loaders.
290 let want_out: HashSet<&str> = self.with_outgoing.iter().map(String::as_str).collect();
291 let want_in: HashSet<&str> = self.with_incoming.iter().map(String::as_str).collect();
292 let adj_cap = self.adjacency_cap;
293
294 let mut hits: Vec<QueryHit> = Vec::new();
295 let cap = self.limit.unwrap_or(usize::MAX);
296
297 // Helper closure: load both directions for a single hit, with
298 // self-loop dedup. A self-loop (src == dst) would otherwise
299 // appear once in `edges` and once in `incoming_edges` for the
300 // same node; callers using `with_any_direction` would see
301 // phantom duplicates. We resolve it by keeping the edge in
302 // `edges` (outgoing) and dropping its twin from
303 // `incoming_edges` when the request asked for both directions
304 // of the same label. Comparison is by `EdgeId` (total-
305 // ordering, unique).
306 let build_hit = |node: Node, indexes: Option<&IndexSet>| -> Result<QueryHit, Error> {
307 let (out_edges, out_trunc) = load_outgoing(&*bs, indexes, node.id, &want_out, adj_cap)?;
308 let (mut in_edges, in_trunc) =
309 load_incoming(&*bs, indexes, node.id, &want_in, adj_cap)?;
310 if !in_edges.is_empty() && !out_edges.is_empty() {
311 let out_ids: HashSet<_> = out_edges.iter().map(|e| e.id).collect();
312 in_edges.retain(|e| {
313 // Drop the self-loop's incoming twin.
314 !(e.src == e.dst && out_ids.contains(&e.id))
315 });
316 }
317 Ok(QueryHit {
318 node,
319 edges: out_edges,
320 incoming_edges: in_edges,
321 edges_truncated: out_trunc || in_trunc,
322 })
323 };
324
325 match (&self.label, &self.prop_filter, indexes.as_ref()) {
326 (Some(label), Some((prop, PropPredicate::Eq(value))), Some(idx)) => {
327 // Indexed point lookup. Skip the redundant label/prop
328 // filter because the index guarantees the match.
329 if let Some(tree_root) = idx.nodes_by_prop.get(label).and_then(|m| m.get(prop)) {
330 let key = ProllyKey::new(prop_value_hash(value)?);
331 if let Some(node_cid) = prolly::lookup(&*bs, tree_root, &key)? {
332 let node: Node = decode_from_store(&*bs, &node_cid)?;
333 // Defensive: the 16-byte hash could collide (cosmically
334 // unlikely with BLAKE3) - reject wrong-label / wrong-
335 // value nodes silently so callers see "no match."
336 if node.ntype == *label && node.props.get(prop) == Some(value) {
337 hits.push(build_hit(node, indexes.as_ref())?);
338 }
339 }
340 }
341 }
342 (Some(label), None, Some(idx)) => {
343 // Label cursor: streaming, bounded by limit.
344 if let Some(tree_root) = idx.nodes_by_label.get(label) {
345 let cursor = Cursor::new(&*bs, tree_root)?;
346 for entry in cursor {
347 let (_k, node_cid) = entry?;
348 let node: Node = decode_from_store(&*bs, &node_cid)?;
349 // Index already guarantees the label matches; no
350 // redundant filter needed.
351 hits.push(build_hit(node, indexes.as_ref())?);
352 if hits.len() >= cap {
353 break;
354 }
355 }
356 }
357 }
358 _ => {
359 // Streaming fallback: walk the full node tree with
360 // in-memory filter, early-exit on limit.
361 let cursor = Cursor::new(&*bs, &commit.nodes)?;
362 for entry in cursor {
363 let (_k, node_cid) = entry?;
364 let node: Node = decode_from_store(&*bs, &node_cid)?;
365 if let Some(ref lbl) = self.label
366 && &node.ntype != lbl
367 {
368 continue;
369 }
370 if let Some((ref prop, PropPredicate::Eq(ref value))) = self.prop_filter
371 && node.props.get(prop) != Some(value)
372 {
373 continue;
374 }
375 hits.push(build_hit(node, indexes.as_ref())?);
376 if hits.len() >= cap {
377 break;
378 }
379 }
380 }
381 }
382
383 Ok(hits)
384 }
385}