net/adapter/net/behavior/query.rs
1//! Federated query primitives over the capability index.
2//!
3//! Phase E of `CAPABILITY_SYSTEM_PLAN.md`. The plan calls for
4//! five composable operators — `filter`, `match_axis`, `traverse`,
5//! `aggregate`, `nearest` — that decompose dual-axis cross-axis
6//! queries into compositions of primitives. This slice ships
7//! the trait + reference impl for the first three; `traverse`
8//! and `nearest` follow in slice 2 once their substrate
9//! contracts (edge-kind taxonomy + proximity lookup) are
10//! scoped.
11//!
12//! ## Composability
13//!
14//! Operators chain. The user-facing query a downstream consumer
15//! (Rebel Yell, Atomic Playboys) might write —
16//!
17//! ```text
18//! hardware.gpu AND software.model:llama-3-70b AND dataforts.has_chain:Y
19//! ```
20//!
21//! — decomposes to:
22//!
23//! ```ignore
24//! let candidates: Vec<(NodeId, _)> = index
25//! .match_axis(TaxonomyAxis::Hardware, "gpu", None)
26//! .filter(|(_, caps)| {
27//! let owned: Vec<Tag> = caps.tags.iter().cloned().collect();
28//! let model_pred = Predicate::Equals {
29//! key: TagKey::new(TaxonomyAxis::Software, "model".to_string()),
30//! value: "llama-3-70b".to_string(),
31//! };
32//! model_pred.evaluate_unplanned(&EvalContext::new(&owned, &caps.metadata))
33//! })
34//! .collect();
35//! ```
36//!
37//! No new operators needed for the cross-axis composition.
38//!
39//! ## Local-only
40//!
41//! All operators run against the **local** capability-index view
42//! by default. Cross-node federation is opt-in via a
43//! `Federated` wrapper trait (Phase E slice 3, not in this
44//! slice). Local-first avoids surprising round-trips on every
45//! call; downstream code that needs federation reaches for the
46//! wrapper deliberately.
47
48use std::collections::BTreeMap;
49use std::time::Duration;
50
51use crate::adapter::net::behavior::{
52 predicate::Predicate, tag::Tag, tag::TagKey, tag::TaxonomyAxis,
53};
54
55/// Reserved-prefix edge kind for [`CapabilityQuery::traverse`].
56/// Identifies which reserved-prefix tag forms a graph edge from a
57/// child entity to its parent.
58///
59/// Today's substrate uses two reserved-prefix shapes that
60/// genuinely encode parent links:
61///
62/// - `fork-of:<parent_origin_hex>` — a forked entity carries a
63/// `fork-of:` tag whose body is the parent's origin hash. The
64/// parent itself may carry its own `fork-of:` tag for the
65/// grand-parent. Walking these chains terminates at a root
66/// (an entity with no `fork-of:` tag).
67///
68/// - `causal:<chain_hex>` is NOT an edge kind in the
69/// parent-pointer sense — it's a chain advertisement. Listed
70/// here so adding it later (e.g., for "find the chain head of
71/// chain X" traversal) is mechanical.
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum EdgeKind {
74 /// Walk `fork-of:<parent>` parent links upward.
75 ForkOfParent,
76}
77
78impl EdgeKind {
79 /// Reserved-prefix string this edge kind walks. Matches the
80 /// `Tag::Reserved::prefix` field for the corresponding tag.
81 pub fn prefix(&self) -> &'static str {
82 match self {
83 EdgeKind::ForkOfParent => "fork-of:",
84 }
85 }
86}
87
88/// Proximity distance for [`CapabilityQuery::nearest`]. Wraps
89/// `Duration` so callers can't accidentally swap it with a wall-
90/// clock duration. RTT "missing" (e.g. no proximity data for a
91/// candidate) is represented as `None` at the lookup boundary;
92/// `nearest` ranks unmeasured candidates at the back.
93#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
94pub struct Distance(pub Duration);
95
96/// Aggregator over filtered query results. Implementations
97/// receive each matching `(NodeId, &CapabilitySet)` and update
98/// internal state; `finalize` produces the typed output.
99///
100/// Three reference impls ship in this module: [`Count`],
101/// [`UniqueAxisValues`], and [`MaxNumericMetadata`]. Plan §6
102/// calls these "Aggregator" — same name retained.
103///
104/// Custom aggregators (per-binding folds) can layer on top of
105/// this trait without substrate changes; matches the plan's
106/// "no fold required for capability-level aggregates" decision.
107pub trait Aggregator {
108 /// Final output type once aggregation completes.
109 type Output;
110
111 /// Update internal state with a matching candidate.
112 fn observe(
113 &mut self,
114 node_id: u64,
115 caps: &crate::adapter::net::behavior::capability::CapabilitySet,
116 );
117
118 /// Produce the final aggregated value.
119 fn finalize(self) -> Self::Output;
120}
121
122/// Reference impl: count matching candidates.
123#[derive(Default)]
124pub struct Count(usize);
125
126impl Aggregator for Count {
127 type Output = usize;
128 fn observe(&mut self, _: u64, _: &crate::adapter::net::behavior::capability::CapabilitySet) {
129 self.0 += 1;
130 }
131 fn finalize(self) -> usize {
132 self.0
133 }
134}
135
136/// Reference impl: collect the distinct values present under a
137/// specific axis-key across matching candidates. Useful for
138/// "what tenant ids are running GPU workloads?" style queries.
139pub struct UniqueAxisValues {
140 axis: TaxonomyAxis,
141 key: String,
142 seen: std::collections::BTreeSet<String>,
143}
144
145impl UniqueAxisValues {
146 /// Build an aggregator for `axis.key`. The aggregator
147 /// observes each matching node's tag set, extracts any tag
148 /// of shape `axis.key=value`, and accumulates the distinct
149 /// `value`s.
150 pub fn new(axis: TaxonomyAxis, key: impl Into<String>) -> Self {
151 Self {
152 axis,
153 key: key.into(),
154 seen: std::collections::BTreeSet::new(),
155 }
156 }
157}
158
159impl Aggregator for UniqueAxisValues {
160 type Output = Vec<String>;
161
162 fn observe(&mut self, _: u64, caps: &crate::adapter::net::behavior::capability::CapabilitySet) {
163 for tag in &caps.tags {
164 if let Tag::AxisValue {
165 axis, key, value, ..
166 } = tag
167 {
168 if *axis == self.axis && key == &self.key {
169 self.seen.insert(value.clone());
170 }
171 }
172 }
173 }
174
175 fn finalize(self) -> Vec<String> {
176 self.seen.into_iter().collect()
177 }
178}
179
180/// Reference impl: take the maximum of a numeric metadata field
181/// across matching candidates. Returns `None` when no
182/// candidate's metadata carries the key OR no value parses as a
183/// finite `f64`.
184pub struct MaxNumericMetadata {
185 key: String,
186 current_max: Option<f64>,
187}
188
189impl MaxNumericMetadata {
190 /// Build an aggregator over `metadata[key]`. Values are
191 /// parsed via `f64::from_str` — non-numeric values are
192 /// ignored rather than panicking.
193 pub fn new(key: impl Into<String>) -> Self {
194 Self {
195 key: key.into(),
196 current_max: None,
197 }
198 }
199}
200
201impl Aggregator for MaxNumericMetadata {
202 type Output = Option<f64>;
203
204 fn observe(&mut self, _: u64, caps: &crate::adapter::net::behavior::capability::CapabilitySet) {
205 let Some(raw) = caps.metadata.get(&self.key) else {
206 return;
207 };
208 let Ok(parsed) = raw.parse::<f64>() else {
209 return;
210 };
211 if !parsed.is_finite() {
212 return;
213 }
214 self.current_max = Some(match self.current_max {
215 Some(m) if m >= parsed => m,
216 _ => parsed,
217 });
218 }
219
220 fn finalize(self) -> Option<f64> {
221 self.current_max
222 }
223}
224
225/// Five composable operators over the capability index. This
226/// slice ships `filter`, `match_axis`, and `aggregate`;
227/// `traverse` and `nearest` land in slice 2 once their
228/// edge-kind / proximity-lookup contracts are scoped.
229///
230/// Reference impl previously lived on the legacy
231/// `CapabilityIndex`; that store was removed in Phase 3B of the
232/// multifold migration. Downstream consumers layer their own
233/// implementations (federated, in-memory test fixtures, the
234/// capability fold) without changing the trait surface.
235pub trait CapabilityQuery {
236 /// Scan the index, returning `(node_id, caps)` pairs whose
237 /// tags + metadata satisfy `predicate`. Ordering is
238 /// implementation-defined; callers that need stable order
239 /// sort the result.
240 fn filter(
241 &self,
242 predicate: &Predicate,
243 ) -> Vec<(
244 u64,
245 crate::adapter::net::behavior::capability::CapabilitySet,
246 )>;
247
248 /// Type-aware match against a single axis-key. `value =
249 /// None` matches any candidate carrying the axis-key (in
250 /// either `axis.key` presence form or `axis.key=value` form);
251 /// `value = Some(s)` requires exact value match against the
252 /// `axis.key=value` form.
253 ///
254 /// Cheaper than `filter(Equals/Exists)` — skips the
255 /// predicate AST + the `EvalContext` materialization, walks
256 /// each node's tag set once.
257 fn match_axis(
258 &self,
259 axis: TaxonomyAxis,
260 key: &str,
261 value: Option<&str>,
262 ) -> Vec<(
263 u64,
264 crate::adapter::net::behavior::capability::CapabilitySet,
265 )>;
266
267 /// Run `agg` across every candidate satisfying `predicate`.
268 /// Streaming: each match's caps are observed in turn; no
269 /// intermediate `Vec` allocation. `Aggregator::finalize`
270 /// produces the typed output.
271 fn aggregate<A>(&self, predicate: &Predicate, agg: A) -> A::Output
272 where
273 A: Aggregator;
274
275 /// Walk capability-tag edges recursively. Starting from
276 /// `start_tag`, follow [`EdgeKind`]-shaped parent links up
277 /// to `max_depth` hops. Returns the chain of
278 /// `(node_id, tag)` pairs visited, in walk order.
279 ///
280 /// For `EdgeKind::ForkOfParent`: `start_tag` is a
281 /// `fork-of:<parent_origin_hex>` tag. The walker:
282 ///
283 /// 1. Records the start node + tag.
284 /// 2. Looks up nodes hosting `causal:<parent_origin_hex>`
285 /// (the parent chain's holders).
286 /// 3. From each holder, reads any `fork-of:` tag — the
287 /// grandparent — and recurses.
288 /// 4. Terminates at `max_depth` hops OR when no
289 /// `fork-of:` tag is found (root reached).
290 ///
291 /// First parent's host wins on ties — the index iteration
292 /// order is implementation-defined; callers needing
293 /// deterministic ordering across runs should snapshot the
294 /// index outside this call.
295 ///
296 /// `max_depth = 0` returns just `(start_node, start_tag)`
297 /// without recursing. `start_node` defaults to `0` if the
298 /// caller doesn't have a node id (the start tag itself is
299 /// what matters for the walk).
300 fn traverse(
301 &self,
302 start_node: u64,
303 start_tag: &Tag,
304 edge: EdgeKind,
305 max_depth: u32,
306 ) -> Vec<(u64, Tag)>;
307
308 /// Top-N candidates by proximity. Filters by `predicate`,
309 /// then ranks survivors by `rtt_lookup(node_id)` (lower
310 /// RTT first). Candidates with no RTT data sort to the
311 /// back; ties broken by lex-NodeId for determinism.
312 ///
313 /// `n = 0` returns empty. The function clones the matching
314 /// `CapabilitySet`s into the result; callers that don't
315 /// need the caps for downstream work can use `filter` +
316 /// their own ranking.
317 ///
318 /// Distance closure decouples `nearest` from the substrate's
319 /// proximity-graph internals (which use a `[u8; 32]`
320 /// node-id shape that this trait would otherwise have to
321 /// know about). Same plumbing convention as Phase F's
322 /// `RttLookup` for placement scoring.
323 fn nearest<F: Fn(u64) -> Option<Duration>>(
324 &self,
325 predicate: &Predicate,
326 rtt_lookup: F,
327 n: usize,
328 ) -> Vec<(
329 u64,
330 crate::adapter::net::behavior::capability::CapabilitySet,
331 Option<Distance>,
332 )>;
333}
334
335// =========================================================================
336// Helpers
337// =========================================================================
338
339/// Walk `caps.tags` looking for `axis.key` (`value=None` →
340/// presence-OR-value-form match; `value=Some(s)` → exact
341/// `axis.key=s` value-form match). Cheaper than building an
342/// EvalContext + running a Predicate AST since we walk the tag
343/// set once with a tight `match`.
344///
345/// Kept (with `#[allow(dead_code)]`) for downstream `CapabilityQuery`
346/// implementors — the legacy `CapabilityIndex` impl was removed in
347/// Phase 3B of the multifold migration.
348#[allow(dead_code)]
349fn axis_match(
350 caps: &crate::adapter::net::behavior::capability::CapabilitySet,
351 axis: TaxonomyAxis,
352 key: &str,
353 value: Option<&str>,
354) -> bool {
355 for tag in &caps.tags {
356 match tag {
357 Tag::AxisPresent {
358 axis: tag_axis,
359 key: tag_key,
360 } if *tag_axis == axis && tag_key == key && value.is_none() => {
361 return true;
362 }
363 Tag::AxisValue {
364 axis: tag_axis,
365 key: tag_key,
366 value: tag_value,
367 ..
368 } => {
369 if *tag_axis != axis || tag_key != key {
370 continue;
371 }
372 match value {
373 None => return true, // any value satisfies presence
374 Some(target) if tag_value == target => return true,
375 _ => {}
376 }
377 }
378 _ => {}
379 }
380 }
381 false
382}
383
384// `BTreeMap` is referenced in the inline doc-comment example at
385// the top of the module; suppress unused-import warnings while
386// keeping the docs link-checkable.
387#[allow(dead_code)]
388const _DOC_LINK: BTreeMap<String, String> = BTreeMap::new();
389// `TagKey` referenced in the doc-comment example.
390#[allow(dead_code)]
391fn _doc_link_tag_key(axis: TaxonomyAxis, k: &str) -> TagKey {
392 TagKey::new(axis, k.to_string())
393}