Skip to main content

net/adapter/net/behavior/
predicate.rs

1//! Capability predicate AST — Phase A foundation for the federated
2//! query primitives in `CAPABILITY_SYSTEM_PLAN.md` §6a.
3//!
4//! Ships the `Predicate` enum with all 17 variants the substrate plan
5//! pins, an evaluator that takes a `(tags, metadata)` context, and
6//! constructor helpers + the `pred!` macro that the cross-binding
7//! SDK plan exposes language-idiomatic builders for.
8//!
9//! ## Variants
10//!
11//! Existence + equality (axis tags):
12//! - [`Predicate::Exists`] — tag with this `(axis, key)` is present.
13//! - [`Predicate::Equals`] — tag's value matches exactly.
14//!
15//! Numeric (axis tags whose value parses to `f64`):
16//! - [`Predicate::NumericAtLeast`] / [`Predicate::NumericAtMost`] / [`Predicate::NumericInRange`]
17//!
18//! Semver (axis tags whose value parses to `MAJOR.MINOR.PATCH`):
19//! - [`Predicate::SemverAtLeast`] / [`Predicate::SemverAtMost`]
20//! - [`Predicate::SemverCompatible`] — same major-version family
21//!   (or, for `0.x.y`, same minor) per the standard semver
22//!   compatibility rules.
23//!
24//! String (axis tag values):
25//! - [`Predicate::StringPrefix`] — value starts with the prefix.
26//! - [`Predicate::StringMatches`] — value contains the substring.
27//!   Phase E will swap this to regex behind the existing `regex`
28//!   feature gate; semantics today are substring-only.
29//!
30//! Metadata (the `BTreeMap<String, String>` field added in Phase C):
31//! - [`Predicate::MetadataExists`] / [`Predicate::MetadataEquals`]
32//! - [`Predicate::MetadataMatches`] (substring; same Phase-E swap)
33//! - [`Predicate::MetadataNumericAtLeast`]
34//!
35//! Boolean composition:
36//! - [`Predicate::And`] / [`Predicate::Or`] / [`Predicate::Not`]
37//!
38//! ## Evaluation
39//!
40//! `Predicate::evaluate` is a pure function over [`EvalContext`]
41//! (`(tags, metadata)`) — no I/O, no allocation outside what the
42//! pattern variants explicitly need (regex compilation lands with
43//! the Phase E swap). Numeric / semver parse failures evaluate to
44//! `false` rather than panicking; cross-binding queries should not
45//! fault on a malformed tag value.
46
47use std::collections::BTreeMap;
48
49use crate::adapter::net::behavior::tag::{Tag, TagKey};
50
51// =============================================================================
52// EvalContext
53// =============================================================================
54
55/// `(tags, metadata)` context passed to [`Predicate::evaluate`].
56/// Decoupled from `CapabilitySet` so the predicate evaluator works
57/// against the substrate's pre-Phase-A.5 capability shape AND the
58/// post-migration shape (`tags: HashSet<Tag>`) without churn.
59#[derive(Debug, Clone, Copy)]
60pub struct EvalContext<'a> {
61    /// Tag set against which axis predicates evaluate.
62    pub tags: &'a [Tag],
63    /// Key-value metadata against which metadata predicates evaluate.
64    pub metadata: &'a BTreeMap<String, String>,
65}
66
67impl<'a> EvalContext<'a> {
68    /// Build a context from explicit slices. The most common
69    /// constructor for callers that hold a `Vec<Tag>` or `&[Tag]`.
70    pub fn new(tags: &'a [Tag], metadata: &'a BTreeMap<String, String>) -> Self {
71        Self { tags, metadata }
72    }
73}
74
75// =============================================================================
76// Predicate
77// =============================================================================
78
79/// AST for capability queries. Pure data — clones, equality, and
80/// serde round-trip are the basis of cross-binding wire format.
81///
82/// See module docs for the variant taxonomy.
83// `PartialEq` only because `f64` doesn't implement `Eq` (NaN
84// asymmetry). Predicate equality is structural, not hashable —
85// we never use it as a HashMap key.
86//
87// Serde derive intentionally OMITTED for Phase A. The recursive
88// `Box<Predicate>` + `Vec<Predicate>` shape compounds with the
89// existing `event::*` serializer monomorphization graph and
90// pushes the test-build's recursion-limit / compile-time past
91// the project's budget. Phase E (federated query primitives)
92// adds cross-binding wire format with a flat-tree IR (or
93// postcard, which handles recursion better than serde_json's
94// derive expansion). For Phase A, the AST + evaluator are
95// process-local — no need to serialize.
96#[derive(Debug, Clone, PartialEq)]
97pub enum Predicate {
98    // ---- Axis tags: existence + equality --------------------------------
99    /// Tag with this `(axis, key)` is present (regardless of value).
100    Exists {
101        /// Tag key to probe.
102        key: TagKey,
103    },
104    /// Tag's value matches exactly. Presence-only tags don't match
105    /// (use [`Predicate::Exists`] for that).
106    Equals {
107        /// Tag key.
108        key: TagKey,
109        /// Required value (string-equality).
110        value: String,
111    },
112
113    // ---- Axis tags: numeric ---------------------------------------------
114    /// Tag's value parses to `f64` and is `>= threshold`.
115    NumericAtLeast {
116        /// Tag key.
117        key: TagKey,
118        /// Inclusive lower bound.
119        threshold: f64,
120    },
121    /// Tag's value parses to `f64` and is `<= threshold`.
122    NumericAtMost {
123        /// Tag key.
124        key: TagKey,
125        /// Inclusive upper bound.
126        threshold: f64,
127    },
128    /// Tag's value parses to `f64` and lies in `[min, max]` inclusive.
129    NumericInRange {
130        /// Tag key.
131        key: TagKey,
132        /// Inclusive lower bound.
133        min: f64,
134        /// Inclusive upper bound.
135        max: f64,
136    },
137
138    // ---- Axis tags: semver ----------------------------------------------
139    /// Tag's value parses to `MAJOR.MINOR.PATCH` and is `>= version`.
140    SemverAtLeast {
141        /// Tag key.
142        key: TagKey,
143        /// Reference version.
144        version: String,
145    },
146    /// Tag's value parses to `MAJOR.MINOR.PATCH` and is `<= version`.
147    SemverAtMost {
148        /// Tag key.
149        key: TagKey,
150        /// Reference version.
151        version: String,
152    },
153    /// Tag's value parses to `MAJOR.MINOR.PATCH` and is in the same
154    /// compatibility band: same major for `>= 1.0.0`, same minor for
155    /// `0.x.y`. Mirrors the standard semver caret-compatibility rule.
156    SemverCompatible {
157        /// Tag key.
158        key: TagKey,
159        /// Reference version.
160        version: String,
161    },
162
163    // ---- Axis tags: string ----------------------------------------------
164    /// Tag's value starts with `prefix`.
165    StringPrefix {
166        /// Tag key.
167        key: TagKey,
168        /// Prefix to match.
169        prefix: String,
170    },
171    /// Tag's value contains `pattern` as a substring. Phase E will
172    /// upgrade to regex behind the `regex` feature gate; semantics
173    /// today are substring-only.
174    StringMatches {
175        /// Tag key.
176        key: TagKey,
177        /// Substring pattern.
178        pattern: String,
179    },
180
181    // ---- Metadata -------------------------------------------------------
182    /// Metadata key is present.
183    MetadataExists {
184        /// Metadata key.
185        key: String,
186    },
187    /// Metadata value matches exactly.
188    MetadataEquals {
189        /// Metadata key.
190        key: String,
191        /// Required value (string-equality).
192        value: String,
193    },
194    /// Metadata value contains `pattern` as a substring (same
195    /// substring-only semantics as [`Predicate::StringMatches`]).
196    MetadataMatches {
197        /// Metadata key.
198        key: String,
199        /// Substring pattern.
200        pattern: String,
201    },
202    /// Metadata value parses to `f64` and is `>= threshold`.
203    MetadataNumericAtLeast {
204        /// Metadata key.
205        key: String,
206        /// Inclusive lower bound.
207        threshold: f64,
208    },
209
210    // ---- Boolean composition --------------------------------------------
211    /// Conjunction. Empty `Vec` evaluates to `true` (vacuous match —
212    /// matches the standard math/logic convention; pin in tests).
213    And(Vec<Predicate>),
214    /// Disjunction. Empty `Vec` evaluates to `false` (vacuous miss).
215    Or(Vec<Predicate>),
216    /// Negation.
217    Not(Box<Predicate>),
218}
219
220// =============================================================================
221// Wire format — Phase 5 of CAPABILITY_ENHANCEMENTS_PLAN.md.
222//
223// The recursive `Box<Predicate>` + `Vec<Predicate>` shape compounds
224// with the existing `event::*` serializer monomorphization graph
225// and pushes test-build recursion-limit / compile-time past the
226// project's budget (per the comment at the head of this module).
227//
228// The flat-tree IR below sidesteps that: nodes live in a single
229// `Vec<PredicateNodeWire>`; And/Or/Not reference children via
230// `u32` indices into that table. No variant of `PredicateNodeWire`
231// transitively references `PredicateWire` itself, so serde derive
232// expansion stays bounded.
233//
234// Round-trip:
235//
236//   Predicate::to_wire()        →  PredicateWire
237//   PredicateWire::into_predicate() →  Result<Predicate, _>
238//
239// Pinned in `wire_round_trip_*` tests below.
240// =============================================================================
241
242/// One node in the flat predicate wire format. `And`/`Or`/`Not`
243/// reference their children via `u32` indices into the parent
244/// [`PredicateWire`]'s `nodes` table.
245///
246/// Node ordering invariant: children always appear at lower
247/// indices than their parent (post-order serialization). The
248/// rebuild path enforces this to catch malformed wire payloads
249/// that attempt index cycles.
250#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
251#[serde(rename_all = "snake_case", tag = "kind")]
252pub enum PredicateNodeWire {
253    /// Leaf: tag with this `(axis, key)` is present.
254    Exists {
255        /// Tag key.
256        key: TagKey,
257    },
258    /// Leaf: tag's value matches exactly.
259    Equals {
260        /// Tag key.
261        key: TagKey,
262        /// Required value.
263        value: String,
264    },
265    /// Leaf: tag's value parses to `f64` and is `>= threshold`.
266    NumericAtLeast {
267        /// Tag key.
268        key: TagKey,
269        /// Inclusive lower bound.
270        threshold: f64,
271    },
272    /// Leaf: tag's value parses to `f64` and is `<= threshold`.
273    NumericAtMost {
274        /// Tag key.
275        key: TagKey,
276        /// Inclusive upper bound.
277        threshold: f64,
278    },
279    /// Leaf: tag's value parses to `f64` and lies in `[min, max]`.
280    NumericInRange {
281        /// Tag key.
282        key: TagKey,
283        /// Inclusive lower bound.
284        min: f64,
285        /// Inclusive upper bound.
286        max: f64,
287    },
288    /// Leaf: tag's value parses to a semver triple and is `>= version`.
289    SemverAtLeast {
290        /// Tag key.
291        key: TagKey,
292        /// Reference version.
293        version: String,
294    },
295    /// Leaf: tag's value parses to a semver triple and is `<= version`.
296    SemverAtMost {
297        /// Tag key.
298        key: TagKey,
299        /// Reference version.
300        version: String,
301    },
302    /// Leaf: tag's value parses to a semver triple and is in the
303    /// same compatibility band as `version`.
304    SemverCompatible {
305        /// Tag key.
306        key: TagKey,
307        /// Reference version.
308        version: String,
309    },
310    /// Leaf: tag's value starts with `prefix`.
311    StringPrefix {
312        /// Tag key.
313        key: TagKey,
314        /// Prefix to match.
315        prefix: String,
316    },
317    /// Leaf: tag's value contains `pattern` as a substring.
318    StringMatches {
319        /// Tag key.
320        key: TagKey,
321        /// Substring pattern.
322        pattern: String,
323    },
324    /// Leaf: metadata key is present.
325    MetadataExists {
326        /// Metadata key.
327        key: String,
328    },
329    /// Leaf: metadata value matches exactly.
330    MetadataEquals {
331        /// Metadata key.
332        key: String,
333        /// Required value.
334        value: String,
335    },
336    /// Leaf: metadata value contains `pattern` as a substring.
337    MetadataMatches {
338        /// Metadata key.
339        key: String,
340        /// Substring pattern.
341        pattern: String,
342    },
343    /// Leaf: metadata value parses to `f64` and is `>= threshold`.
344    MetadataNumericAtLeast {
345        /// Metadata key.
346        key: String,
347        /// Inclusive lower bound.
348        threshold: f64,
349    },
350    /// Composite: conjunction of children at the named indices.
351    And {
352        /// Child indices into the parent `PredicateWire::nodes`.
353        children: Vec<u32>,
354    },
355    /// Composite: disjunction of children at the named indices.
356    Or {
357        /// Child indices into the parent `PredicateWire::nodes`.
358        children: Vec<u32>,
359    },
360    /// Composite: negation of the child at the named index.
361    Not {
362        /// Child index into the parent `PredicateWire::nodes`.
363        child: u32,
364    },
365}
366
367/// Wire format for [`Predicate`]. Flat node table with index
368/// references for `And`/`Or`/`Not` children.
369///
370/// Phase 5 of `CAPABILITY_ENHANCEMENTS_PLAN.md`. Crosses the
371/// nRPC envelope as serde-encoded bytes (postcard for cross-binding,
372/// JSON for debug fixtures); the substrate's capability
373/// announcement path is unchanged.
374///
375/// Build via [`Predicate::to_wire`]; rebuild via
376/// [`PredicateWire::into_predicate`].
377#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
378pub struct PredicateWire {
379    /// Flat node table. Children always live at lower indices
380    /// than their parents.
381    pub nodes: Vec<PredicateNodeWire>,
382    /// Index of the root node within `nodes`. Always
383    /// `nodes.len() - 1` for a freshly-emitted `to_wire()` output;
384    /// callers receiving an externally-built wire payload should
385    /// not assume that.
386    pub root_idx: u32,
387}
388
389/// Errors raised by [`PredicateWire::into_predicate`].
390#[derive(Debug, Clone, PartialEq, thiserror::Error)]
391pub enum PredicateWireError {
392    /// Wire payload had an empty `nodes` table.
393    #[error("predicate wire has empty nodes table")]
394    Empty,
395    /// `root_idx` was out of bounds for the `nodes` table.
396    #[error("predicate wire root_idx {root_idx} >= nodes len {len}")]
397    RootOutOfBounds {
398        /// The provided `root_idx`.
399        root_idx: u32,
400        /// Length of the `nodes` table.
401        len: usize,
402    },
403    /// A composite node referenced a child index that was out of
404    /// bounds.
405    #[error("predicate wire child index {child} out of bounds for nodes len {len}")]
406    ChildOutOfBounds {
407        /// The malformed child index.
408        child: u32,
409        /// Length of the `nodes` table.
410        len: usize,
411    },
412    /// A composite node referenced a child index that was greater
413    /// than or equal to its own. Catches index cycles introduced
414    /// by malformed / malicious wire payloads.
415    #[error("predicate wire child index {child} >= parent index {parent} (cycle)")]
416    CycleDetected {
417        /// Parent node index.
418        parent: u32,
419        /// Offending child index.
420        child: u32,
421    },
422}
423
424impl Predicate {
425    /// Convert to the flat wire format. Post-order serialization:
426    /// leaves land first, the root has the highest index.
427    ///
428    /// Output is byte-stable across calls — two `to_wire()`s on
429    /// equal predicates produce identical `PredicateWire` values
430    /// (and identical bytes through any serde encoder).
431    pub fn to_wire(&self) -> PredicateWire {
432        let mut nodes = Vec::new();
433        let root_idx = self.append_to_wire(&mut nodes);
434        PredicateWire { nodes, root_idx }
435    }
436
437    /// Iterative helper: append `self` (and any sub-tree) into
438    /// `nodes`, returning the index of the root of the sub-tree.
439    ///
440    /// Implemented as a heap-allocated work stack rather than
441    /// straight recursion. A deeply-nested predicate
442    /// (`Not(Not(Not(...)))` 10k deep, or `And([many And([...])])`)
443    /// otherwise overflows the thread stack — caller-controlled
444    /// input from the FFI shims can build arbitrarily-deep
445    /// `Predicate` trees via typed factories. Output is identical
446    /// to the prior recursive implementation: post-order
447    /// serialization, children at lower indices than their
448    /// parents.
449    #[expect(
450        clippy::expect_used,
451        reason = "predicate tree-walk invariants: every FinishX step is preceded by a matching number of Begin steps that push children; the final pop yields the root that the walk always pushes"
452    )]
453    fn append_to_wire(&self, nodes: &mut Vec<PredicateNodeWire>) -> u32 {
454        enum Step<'a> {
455            /// Visit a predicate. Leaves emit immediately;
456            /// composites schedule a matching `Finish*` after
457            /// pushing their children.
458            Visit(&'a Predicate),
459            /// Pop `n` child indices off `child_stack` and emit
460            /// an `And` referring to them, in the order the
461            /// children were visited (left to right).
462            FinishAnd(usize),
463            /// As `FinishAnd` but for `Or`.
464            FinishOr(usize),
465            /// Pop one child index off `child_stack` and emit a
466            /// `Not`.
467            FinishNot,
468        }
469
470        let mut work: Vec<Step<'_>> = Vec::with_capacity(8);
471        work.push(Step::Visit(self));
472        // Each child push records the node index it landed at;
473        // composite `Finish*` steps drain the trailing N entries.
474        let mut child_stack: Vec<u32> = Vec::with_capacity(8);
475
476        // Helper: emit a leaf node, push its index on the
477        // child_stack so the enclosing composite picks it up.
478        let emit = |nodes: &mut Vec<PredicateNodeWire>,
479                    child_stack: &mut Vec<u32>,
480                    node: PredicateNodeWire| {
481            let idx = nodes.len() as u32;
482            nodes.push(node);
483            child_stack.push(idx);
484        };
485
486        while let Some(step) = work.pop() {
487            match step {
488                Step::Visit(p) => match p {
489                    Self::Exists { key } => emit(
490                        nodes,
491                        &mut child_stack,
492                        PredicateNodeWire::Exists { key: key.clone() },
493                    ),
494                    Self::Equals { key, value } => emit(
495                        nodes,
496                        &mut child_stack,
497                        PredicateNodeWire::Equals {
498                            key: key.clone(),
499                            value: value.clone(),
500                        },
501                    ),
502                    Self::NumericAtLeast { key, threshold } => emit(
503                        nodes,
504                        &mut child_stack,
505                        PredicateNodeWire::NumericAtLeast {
506                            key: key.clone(),
507                            threshold: *threshold,
508                        },
509                    ),
510                    Self::NumericAtMost { key, threshold } => emit(
511                        nodes,
512                        &mut child_stack,
513                        PredicateNodeWire::NumericAtMost {
514                            key: key.clone(),
515                            threshold: *threshold,
516                        },
517                    ),
518                    Self::NumericInRange { key, min, max } => emit(
519                        nodes,
520                        &mut child_stack,
521                        PredicateNodeWire::NumericInRange {
522                            key: key.clone(),
523                            min: *min,
524                            max: *max,
525                        },
526                    ),
527                    Self::SemverAtLeast { key, version } => emit(
528                        nodes,
529                        &mut child_stack,
530                        PredicateNodeWire::SemverAtLeast {
531                            key: key.clone(),
532                            version: version.clone(),
533                        },
534                    ),
535                    Self::SemverAtMost { key, version } => emit(
536                        nodes,
537                        &mut child_stack,
538                        PredicateNodeWire::SemverAtMost {
539                            key: key.clone(),
540                            version: version.clone(),
541                        },
542                    ),
543                    Self::SemverCompatible { key, version } => emit(
544                        nodes,
545                        &mut child_stack,
546                        PredicateNodeWire::SemverCompatible {
547                            key: key.clone(),
548                            version: version.clone(),
549                        },
550                    ),
551                    Self::StringPrefix { key, prefix } => emit(
552                        nodes,
553                        &mut child_stack,
554                        PredicateNodeWire::StringPrefix {
555                            key: key.clone(),
556                            prefix: prefix.clone(),
557                        },
558                    ),
559                    Self::StringMatches { key, pattern } => emit(
560                        nodes,
561                        &mut child_stack,
562                        PredicateNodeWire::StringMatches {
563                            key: key.clone(),
564                            pattern: pattern.clone(),
565                        },
566                    ),
567                    Self::MetadataExists { key } => emit(
568                        nodes,
569                        &mut child_stack,
570                        PredicateNodeWire::MetadataExists { key: key.clone() },
571                    ),
572                    Self::MetadataEquals { key, value } => emit(
573                        nodes,
574                        &mut child_stack,
575                        PredicateNodeWire::MetadataEquals {
576                            key: key.clone(),
577                            value: value.clone(),
578                        },
579                    ),
580                    Self::MetadataMatches { key, pattern } => emit(
581                        nodes,
582                        &mut child_stack,
583                        PredicateNodeWire::MetadataMatches {
584                            key: key.clone(),
585                            pattern: pattern.clone(),
586                        },
587                    ),
588                    Self::MetadataNumericAtLeast { key, threshold } => emit(
589                        nodes,
590                        &mut child_stack,
591                        PredicateNodeWire::MetadataNumericAtLeast {
592                            key: key.clone(),
593                            threshold: *threshold,
594                        },
595                    ),
596                    Self::And(children) => {
597                        work.push(Step::FinishAnd(children.len()));
598                        // Push children in reverse so that the
599                        // leftmost child is popped first; this
600                        // preserves the left-to-right child
601                        // ordering of the recursive version.
602                        for c in children.iter().rev() {
603                            work.push(Step::Visit(c));
604                        }
605                    }
606                    Self::Or(children) => {
607                        work.push(Step::FinishOr(children.len()));
608                        for c in children.iter().rev() {
609                            work.push(Step::Visit(c));
610                        }
611                    }
612                    Self::Not(inner) => {
613                        work.push(Step::FinishNot);
614                        work.push(Step::Visit(inner));
615                    }
616                },
617                Step::FinishAnd(n) => {
618                    let start = child_stack.len() - n;
619                    let kids: Vec<u32> = child_stack.drain(start..).collect();
620                    let idx = nodes.len() as u32;
621                    nodes.push(PredicateNodeWire::And { children: kids });
622                    child_stack.push(idx);
623                }
624                Step::FinishOr(n) => {
625                    let start = child_stack.len() - n;
626                    let kids: Vec<u32> = child_stack.drain(start..).collect();
627                    let idx = nodes.len() as u32;
628                    nodes.push(PredicateNodeWire::Or { children: kids });
629                    child_stack.push(idx);
630                }
631                Step::FinishNot => {
632                    let child = child_stack
633                        .pop()
634                        .expect("Not body must emit one child before FinishNot");
635                    let idx = nodes.len() as u32;
636                    nodes.push(PredicateNodeWire::Not { child });
637                    child_stack.push(idx);
638                }
639            }
640        }
641
642        child_stack
643            .pop()
644            .expect("predicate must produce at least one node")
645    }
646}
647
648impl PredicateWire {
649    /// Rebuild a [`Predicate`] AST from the flat wire format.
650    ///
651    /// Validates structural integrity: empty tables, out-of-bounds
652    /// indices, and child-index cycles are surfaced as typed
653    /// [`PredicateWireError`] rather than panicking. A successful
654    /// rebuild is byte-equal to the input of the matching
655    /// [`Predicate::to_wire`] call.
656    pub fn into_predicate(self) -> Result<Predicate, PredicateWireError> {
657        if self.nodes.is_empty() {
658            return Err(PredicateWireError::Empty);
659        }
660        let len = self.nodes.len();
661        if (self.root_idx as usize) >= len {
662            return Err(PredicateWireError::RootOutOfBounds {
663                root_idx: self.root_idx,
664                len,
665            });
666        }
667        rebuild_predicate(&self.nodes, self.root_idx)
668    }
669}
670
671/// Recursive rebuild helper. Walks the flat node table from `idx`,
672/// validating child indices and cycles as it goes.
673fn rebuild_predicate(
674    nodes: &[PredicateNodeWire],
675    idx: u32,
676) -> Result<Predicate, PredicateWireError> {
677    let len = nodes.len();
678    let node = nodes
679        .get(idx as usize)
680        .ok_or(PredicateWireError::ChildOutOfBounds { child: idx, len })?;
681    let result = match node {
682        PredicateNodeWire::Exists { key } => Predicate::Exists { key: key.clone() },
683        PredicateNodeWire::Equals { key, value } => Predicate::Equals {
684            key: key.clone(),
685            value: value.clone(),
686        },
687        PredicateNodeWire::NumericAtLeast { key, threshold } => Predicate::NumericAtLeast {
688            key: key.clone(),
689            threshold: *threshold,
690        },
691        PredicateNodeWire::NumericAtMost { key, threshold } => Predicate::NumericAtMost {
692            key: key.clone(),
693            threshold: *threshold,
694        },
695        PredicateNodeWire::NumericInRange { key, min, max } => Predicate::NumericInRange {
696            key: key.clone(),
697            min: *min,
698            max: *max,
699        },
700        PredicateNodeWire::SemverAtLeast { key, version } => Predicate::SemverAtLeast {
701            key: key.clone(),
702            version: version.clone(),
703        },
704        PredicateNodeWire::SemverAtMost { key, version } => Predicate::SemverAtMost {
705            key: key.clone(),
706            version: version.clone(),
707        },
708        PredicateNodeWire::SemverCompatible { key, version } => Predicate::SemverCompatible {
709            key: key.clone(),
710            version: version.clone(),
711        },
712        PredicateNodeWire::StringPrefix { key, prefix } => Predicate::StringPrefix {
713            key: key.clone(),
714            prefix: prefix.clone(),
715        },
716        PredicateNodeWire::StringMatches { key, pattern } => Predicate::StringMatches {
717            key: key.clone(),
718            pattern: pattern.clone(),
719        },
720        PredicateNodeWire::MetadataExists { key } => Predicate::MetadataExists { key: key.clone() },
721        PredicateNodeWire::MetadataEquals { key, value } => Predicate::MetadataEquals {
722            key: key.clone(),
723            value: value.clone(),
724        },
725        PredicateNodeWire::MetadataMatches { key, pattern } => Predicate::MetadataMatches {
726            key: key.clone(),
727            pattern: pattern.clone(),
728        },
729        PredicateNodeWire::MetadataNumericAtLeast { key, threshold } => {
730            Predicate::MetadataNumericAtLeast {
731                key: key.clone(),
732                threshold: *threshold,
733            }
734        }
735        PredicateNodeWire::And { children } => {
736            check_children_below(children, idx)?;
737            let kids: Result<Vec<_>, _> = children
738                .iter()
739                .map(|&c| rebuild_predicate(nodes, c))
740                .collect();
741            Predicate::And(kids?)
742        }
743        PredicateNodeWire::Or { children } => {
744            check_children_below(children, idx)?;
745            let kids: Result<Vec<_>, _> = children
746                .iter()
747                .map(|&c| rebuild_predicate(nodes, c))
748                .collect();
749            Predicate::Or(kids?)
750        }
751        PredicateNodeWire::Not { child } => {
752            if *child >= idx {
753                return Err(PredicateWireError::CycleDetected {
754                    parent: idx,
755                    child: *child,
756                });
757            }
758            Predicate::Not(Box::new(rebuild_predicate(nodes, *child)?))
759        }
760    };
761    Ok(result)
762}
763
764/// Validate that every child index in `children` is strictly less
765/// than `parent`. Catches cycles introduced by malformed wire
766/// payloads.
767fn check_children_below(children: &[u32], parent: u32) -> Result<(), PredicateWireError> {
768    for &child in children {
769        if child >= parent {
770            return Err(PredicateWireError::CycleDetected { parent, child });
771        }
772    }
773    Ok(())
774}
775
776// =============================================================================
777// nRPC envelope integration — Phase 5.B of CAPABILITY_ENHANCEMENTS_PLAN.md.
778//
779// The cleanest place to attach a `where:` filter to an nRPC call
780// is the existing request-headers slot. Headers already carry
781// out-of-band metadata (trace context, idempotency keys,
782// content-type) and are typed as `(String, Vec<u8>)` — binary-safe,
783// per-header capped at `MAX_RPC_HEADER_VALUE_LEN` (4 KB), passed
784// through opaquely by the substrate.
785//
786// Predicate-handling code uses two helpers:
787//
788//   `predicate_to_rpc_header(&pred)` — JSON-encodes a `PredicateWire`
789//                                      into the canonical
790//                                      `net-where` header.
791//   `predicate_from_rpc_headers(headers)` — locates the header in
792//                                           a request's headers,
793//                                           decodes back to
794//                                           `Predicate`.
795//
796// Service handlers that opt in look for the header; services that
797// don't ignore it. The substrate (cortex/rpc) itself never
798// inspects the header — `eternal-rule §4: no semantic growth at
799// the substrate`. Per-binding API exposure lives in the SDK layer
800// (Phase 9b of `CAPABILITY_SYSTEM_SDK_PLAN.md`).
801//
802// JSON wire format (vs. postcard) trades ~2-3× size for human
803// readability + diff-able cross-binding fixtures. Predicates that
804// fit a typical service filter are ~200-500 bytes JSON, well
805// under the header cap.
806// =============================================================================
807
808/// Canonical header name for a predicate-pushdown filter on an
809/// nRPC request. Lowercase per HTTP-style convention; the substrate
810/// `cortex/rpc` codec passes header names through unchanged, but
811/// this constant is the one downstream callers must agree on.
812pub const RPC_WHERE_HEADER: &str = "net-where";
813
814/// Maximum size of the JSON-encoded `PredicateWire` header value.
815/// Mirrors `cortex::rpc::MAX_RPC_HEADER_VALUE_LEN`; redeclared here
816/// so the predicate helper can reject oversize encodings without
817/// pulling in the `cortex` feature gate.
818pub const MAX_PREDICATE_RPC_HEADER_VALUE_LEN: usize = 4096;
819
820/// Errors raised by [`predicate_to_rpc_header`].
821#[derive(Debug, thiserror::Error)]
822pub enum PredicateRpcEncodeError {
823    /// `serde_json::to_vec` failed on the wire-form predicate.
824    #[error("predicate wire encode failed: {0}")]
825    Encode(#[from] serde_json::Error),
826    /// The encoded payload exceeds the header-value cap.
827    #[error("predicate wire encoding {actual} bytes exceeds header cap {limit}")]
828    TooLarge {
829        /// Encoded byte length.
830        actual: usize,
831        /// Maximum permitted (`MAX_PREDICATE_RPC_HEADER_VALUE_LEN`).
832        limit: usize,
833    },
834}
835
836/// Errors raised by [`predicate_from_rpc_headers`].
837#[derive(Debug, thiserror::Error)]
838pub enum PredicateRpcDecodeError {
839    /// JSON parse failed on the header value.
840    #[error("predicate wire decode failed: {0}")]
841    Json(#[from] serde_json::Error),
842    /// Wire payload was structurally malformed (cycle, OOB index,
843    /// empty table).
844    #[error("predicate wire malformed: {0}")]
845    Wire(#[from] PredicateWireError),
846    /// Header value bytes exceeded the negotiated cap
847    /// (`MAX_PREDICATE_RPC_HEADER_VALUE_LEN`). Mirrors the encode
848    /// side's `TooLarge`; rejects parse-bombs before serde walks
849    /// the payload.
850    #[error("predicate wire payload {actual} bytes exceeds header cap {limit}")]
851    Oversize {
852        /// Observed payload size in bytes.
853        actual: usize,
854        /// Maximum permitted (`MAX_PREDICATE_RPC_HEADER_VALUE_LEN`).
855        limit: usize,
856    },
857}
858
859/// Encode a [`Predicate`] for transport in an nRPC request header.
860///
861/// Returns the canonical header tuple `(name, json_bytes)`. The
862/// service handler reading the request looks up the header by
863/// name (`RPC_WHERE_HEADER`) and decodes via
864/// [`predicate_from_rpc_headers`].
865///
866/// Phase 5.B of `CAPABILITY_ENHANCEMENTS_PLAN.md`. Round-trip
867/// pinned in `predicate_rpc_header_round_trip_*` tests.
868pub fn predicate_to_rpc_header(
869    pred: &Predicate,
870) -> Result<(String, Vec<u8>), PredicateRpcEncodeError> {
871    let wire = pred.to_wire();
872    let bytes = serde_json::to_vec(&wire)?;
873    if bytes.len() > MAX_PREDICATE_RPC_HEADER_VALUE_LEN {
874        return Err(PredicateRpcEncodeError::TooLarge {
875            actual: bytes.len(),
876            limit: MAX_PREDICATE_RPC_HEADER_VALUE_LEN,
877        });
878    }
879    Ok((RPC_WHERE_HEADER.to_string(), bytes))
880}
881
882/// Extract and decode a [`Predicate`] from a request's headers,
883/// if a `net-where` header is present.
884///
885/// Returns:
886///
887/// - `None` — no `net-where` header. Service should default
888///   to "no filter" (return all rows).
889/// - `Some(Ok(pred))` — header present, decoded cleanly. Service
890///   filters its result stream against `pred`.
891/// - `Some(Err(_))` — header present but malformed JSON or
892///   structurally invalid wire payload. Service should reject the
893///   request with a typed error rather than silently ignoring;
894///   silent skip would let a misencoded filter return more rows
895///   than the caller expected, which is a confidentiality concern
896///   in some workloads.
897///
898/// The first matching header wins — duplicate headers under the
899/// same name are not coalesced.
900///
901/// Phase 5.B of `CAPABILITY_ENHANCEMENTS_PLAN.md`.
902pub fn predicate_from_rpc_headers<H>(
903    headers: &[H],
904) -> Option<Result<Predicate, PredicateRpcDecodeError>>
905where
906    H: AsRpcHeader,
907{
908    let value = headers
909        .iter()
910        .find(|h| h.name() == RPC_WHERE_HEADER)?
911        .value();
912    // N-2 second pass: enforce the cap symmetrically with the encode
913    // side. The encode path rejects oversize payloads at line 735;
914    // pre-fix the decode path had no length check, so a peer that
915    // submitted an attacker-shaped JSON of arbitrary size let
916    // `serde_json::from_slice` plus `rebuild_predicate` walk a
917    // payload whose recursion depth was bounded only by the input
918    // size — a cheap parse-bomb DoS shape if a transport cap were
919    // ever relaxed for unrelated reasons.
920    if value.len() > MAX_PREDICATE_RPC_HEADER_VALUE_LEN {
921        return Some(Err(PredicateRpcDecodeError::Oversize {
922            actual: value.len(),
923            limit: MAX_PREDICATE_RPC_HEADER_VALUE_LEN,
924        }));
925    }
926    let result = serde_json::from_slice::<PredicateWire>(value)
927        .map_err(PredicateRpcDecodeError::Json)
928        .and_then(|wire| wire.into_predicate().map_err(PredicateRpcDecodeError::Wire));
929    Some(result)
930}
931
932/// Adapter trait letting [`predicate_from_rpc_headers`] consume any
933/// shape that exposes a `(name, value)` view. Generic over both
934/// `(String, Vec<u8>)` (the substrate's `RpcHeader` alias) and
935/// any binding-side wrapper that exposes name + value accessors.
936pub trait AsRpcHeader {
937    /// Header name (case-sensitive match against `RPC_WHERE_HEADER`).
938    fn name(&self) -> &str;
939    /// Header value bytes.
940    fn value(&self) -> &[u8];
941}
942
943impl AsRpcHeader for (String, Vec<u8>) {
944    fn name(&self) -> &str {
945        &self.0
946    }
947    fn value(&self) -> &[u8] {
948        &self.1
949    }
950}
951
952impl AsRpcHeader for &(String, Vec<u8>) {
953    fn name(&self) -> &str {
954        &self.0
955    }
956    fn value(&self) -> &[u8] {
957        &self.1
958    }
959}
960
961// =============================================================================
962// Service-side row filter ergonomics — Phase 5.B follow-on of
963// CAPABILITY_ENHANCEMENTS_PLAN.md.
964//
965// The Phase 5.B helpers (`predicate_to_rpc_header` /
966// `predicate_from_rpc_headers`) move predicates across the wire,
967// but service handlers still have to manually construct an
968// `EvalContext` per row and dispatch through `Predicate::evaluate`.
969// These helpers close that gap:
970//
971//   - `Predicate::matches_capability_set(caps)` — single-row match
972//     against a `CapabilitySet`.
973//   - `RpcPredicateContext` trait — application rows expose tags +
974//     metadata for predicate evaluation.
975//   - `filter_by_predicate(rows, pred)` — iterator combinator that
976//     skips rows the predicate filters out.
977//
978// All three handle the `Option<&Predicate>` shape returned by
979// `predicate_from_rpc_headers` ergonomically — `None` means "no
980// filter, all rows match".
981// =============================================================================
982
983impl Predicate {
984    /// True if this predicate evaluates to true against the
985    /// given [`super::capability::CapabilitySet`]'s tags + metadata.
986    ///
987    /// Materializes `caps.tags` (a `HashSet<Tag>`) as a `Vec<Tag>`
988    /// for the slice-based `EvalContext`. The cost is a single
989    /// allocation per call; for hot loops over many capability
990    /// sets, callers may prefer to materialize tags once and
991    /// invoke [`Self::evaluate`] directly.
992    pub fn matches_capability_set(
993        &self,
994        caps: &crate::adapter::net::behavior::CapabilitySet,
995    ) -> bool {
996        let tags: Vec<Tag> = caps.tags.iter().cloned().collect();
997        let ctx = EvalContext::new(&tags, &caps.metadata);
998        self.evaluate(&ctx)
999    }
1000}
1001
1002/// Application-row adapter for predicate evaluation.
1003///
1004/// Service handlers that filter custom row types (training jobs,
1005/// documents, sensor readings, …) implement this trait on their
1006/// row to expose tags + metadata to the predicate. The
1007/// [`filter_by_predicate`] helper then provides a one-line
1008/// filter pattern over any iterator of `RpcPredicateContext` rows.
1009///
1010/// Phase 5.B follow-on of `CAPABILITY_ENHANCEMENTS_PLAN.md`.
1011///
1012/// ```ignore
1013/// struct TrainingJob {
1014///     tags: Vec<Tag>,
1015///     metadata: BTreeMap<String, String>,
1016///     payload: ...,
1017/// }
1018///
1019/// impl RpcPredicateContext for TrainingJob {
1020///     fn rpc_predicate_tags(&self) -> &[Tag] { &self.tags }
1021///     fn rpc_predicate_metadata(&self) -> &BTreeMap<String, String> {
1022///         &self.metadata
1023///     }
1024/// }
1025/// ```
1026pub trait RpcPredicateContext {
1027    /// Tags the predicate's axis-keyed clauses match against.
1028    fn rpc_predicate_tags(&self) -> &[Tag];
1029    /// Metadata the predicate's metadata-keyed clauses match against.
1030    fn rpc_predicate_metadata(&self) -> &BTreeMap<String, String>;
1031}
1032
1033/// Filter `rows` by an optional predicate.
1034///
1035/// `pred = None` returns all rows (the no-filter case — the
1036/// caller's request didn't include a `net-where` header).
1037/// `pred = Some(p)` returns only rows where `p` evaluates to true
1038/// against the row's tags + metadata.
1039///
1040/// Service handler usage:
1041///
1042/// ```ignore
1043/// let pred_opt = predicate_from_rpc_headers(&request.headers).transpose()?;
1044/// let matched: Vec<TrainingJob> =
1045///     filter_by_predicate(jobs, pred_opt.as_ref()).collect();
1046/// ```
1047///
1048/// Phase 5.B follow-on of `CAPABILITY_ENHANCEMENTS_PLAN.md`.
1049pub fn filter_by_predicate<R, I>(rows: I, pred: Option<&Predicate>) -> impl Iterator<Item = R> + '_
1050where
1051    R: RpcPredicateContext,
1052    I: IntoIterator<Item = R>,
1053    I::IntoIter: 'static,
1054{
1055    rows.into_iter().filter(move |row| match pred {
1056        None => true,
1057        Some(p) => {
1058            let ctx = EvalContext::new(row.rpc_predicate_tags(), row.rpc_predicate_metadata());
1059            p.evaluate(&ctx)
1060        }
1061    })
1062}
1063
1064// =============================================================================
1065// Constructors
1066// =============================================================================
1067
1068impl Predicate {
1069    /// Build [`Predicate::Exists`] from a [`TagKey`].
1070    pub fn exists(key: TagKey) -> Self {
1071        Self::Exists { key }
1072    }
1073
1074    /// Build [`Predicate::Equals`] from a key + value.
1075    pub fn equals(key: TagKey, value: impl Into<String>) -> Self {
1076        Self::Equals {
1077            key,
1078            value: value.into(),
1079        }
1080    }
1081
1082    /// Build [`Predicate::NumericAtLeast`].
1083    pub fn numeric_at_least(key: TagKey, threshold: f64) -> Self {
1084        Self::NumericAtLeast { key, threshold }
1085    }
1086
1087    /// Build [`Predicate::NumericAtMost`].
1088    pub fn numeric_at_most(key: TagKey, threshold: f64) -> Self {
1089        Self::NumericAtMost { key, threshold }
1090    }
1091
1092    /// Build [`Predicate::NumericInRange`].
1093    pub fn numeric_in_range(key: TagKey, min: f64, max: f64) -> Self {
1094        Self::NumericInRange { key, min, max }
1095    }
1096
1097    /// Build [`Predicate::SemverAtLeast`].
1098    pub fn semver_at_least(key: TagKey, version: impl Into<String>) -> Self {
1099        Self::SemverAtLeast {
1100            key,
1101            version: version.into(),
1102        }
1103    }
1104
1105    /// Build [`Predicate::SemverAtMost`].
1106    pub fn semver_at_most(key: TagKey, version: impl Into<String>) -> Self {
1107        Self::SemverAtMost {
1108            key,
1109            version: version.into(),
1110        }
1111    }
1112
1113    /// Build [`Predicate::SemverCompatible`].
1114    pub fn semver_compatible(key: TagKey, version: impl Into<String>) -> Self {
1115        Self::SemverCompatible {
1116            key,
1117            version: version.into(),
1118        }
1119    }
1120
1121    /// Build [`Predicate::StringPrefix`].
1122    pub fn string_prefix(key: TagKey, prefix: impl Into<String>) -> Self {
1123        Self::StringPrefix {
1124            key,
1125            prefix: prefix.into(),
1126        }
1127    }
1128
1129    /// Build [`Predicate::StringMatches`].
1130    pub fn string_matches(key: TagKey, pattern: impl Into<String>) -> Self {
1131        Self::StringMatches {
1132            key,
1133            pattern: pattern.into(),
1134        }
1135    }
1136
1137    /// Build [`Predicate::MetadataExists`].
1138    pub fn metadata_exists(key: impl Into<String>) -> Self {
1139        Self::MetadataExists { key: key.into() }
1140    }
1141
1142    /// Build [`Predicate::MetadataEquals`].
1143    pub fn metadata_equals(key: impl Into<String>, value: impl Into<String>) -> Self {
1144        Self::MetadataEquals {
1145            key: key.into(),
1146            value: value.into(),
1147        }
1148    }
1149
1150    /// Build [`Predicate::MetadataMatches`].
1151    pub fn metadata_matches(key: impl Into<String>, pattern: impl Into<String>) -> Self {
1152        Self::MetadataMatches {
1153            key: key.into(),
1154            pattern: pattern.into(),
1155        }
1156    }
1157
1158    /// Build [`Predicate::MetadataNumericAtLeast`].
1159    pub fn metadata_numeric_at_least(key: impl Into<String>, threshold: f64) -> Self {
1160        Self::MetadataNumericAtLeast {
1161            key: key.into(),
1162            threshold,
1163        }
1164    }
1165
1166    /// Build [`Predicate::And`] from a `Vec` of clauses.
1167    pub fn and(clauses: Vec<Predicate>) -> Self {
1168        Self::And(clauses)
1169    }
1170
1171    /// Build [`Predicate::Or`] from a `Vec` of clauses.
1172    pub fn or(clauses: Vec<Predicate>) -> Self {
1173        Self::Or(clauses)
1174    }
1175
1176    /// Build [`Predicate::Not`] wrapping a single clause.
1177    ///
1178    /// Named `not` to match `and` / `or` as a constructor —
1179    /// not an `Op<Output = Predicate>` impl. Implementing
1180    /// `std::ops::Not` would force callers to depend on
1181    /// `Predicate: Not` for the `!` operator, which requires
1182    /// `Predicate: Sized + Not<Output = ?>` boilerplate without
1183    /// any expressivity gain over the explicit constructor.
1184    #[allow(clippy::should_implement_trait)]
1185    pub fn not(inner: Predicate) -> Self {
1186        Self::Not(Box::new(inner))
1187    }
1188}
1189
1190// =============================================================================
1191// Evaluation
1192// =============================================================================
1193
1194impl Predicate {
1195    /// Evaluate against `(tags, metadata)`. Pure function.
1196    ///
1197    /// Phase 4 of `CAPABILITY_ENHANCEMENTS_PLAN.md`: at every
1198    /// `And` / `Or` node, children are evaluated in cost-ascending
1199    /// order so cheap+selective clauses short-circuit first. The
1200    /// reordering is a pure local optimization — semantics are
1201    /// identical to [`Self::evaluate_unplanned`]. Pinned by the
1202    /// `planned_evaluate_matches_unplanned_*` property tests.
1203    ///
1204    /// Numeric / semver parse failures yield `false` (a malformed
1205    /// tag value shouldn't fault a federated query).
1206    pub fn evaluate(&self, ctx: &EvalContext<'_>) -> bool {
1207        match self {
1208            Self::And(children) => Self::eval_all_in_cost_order(children, ctx),
1209            Self::Or(children) => Self::eval_any_in_cost_order(children, ctx),
1210            Self::Not(inner) => !inner.evaluate(ctx),
1211            other => other.evaluate_leaf(ctx),
1212        }
1213    }
1214
1215    /// Evaluate without the planner — children of `And` / `Or` run
1216    /// in declaration order.
1217    ///
1218    /// Phase 4 escape hatch for benchmarking and the planner-
1219    /// equivalence property tests. Production callers should use
1220    /// [`Self::evaluate`]; this is a diagnostic surface only.
1221    pub fn evaluate_unplanned(&self, ctx: &EvalContext<'_>) -> bool {
1222        match self {
1223            Self::And(children) => children.iter().all(|c| c.evaluate_unplanned(ctx)),
1224            Self::Or(children) => children.iter().any(|c| c.evaluate_unplanned(ctx)),
1225            Self::Not(inner) => !inner.evaluate_unplanned(ctx),
1226            other => other.evaluate_leaf(ctx),
1227        }
1228    }
1229
1230    /// Evaluate a leaf predicate (anything except `And` / `Or` /
1231    /// `Not`). Shared between [`Self::evaluate`] and
1232    /// [`Self::evaluate_unplanned`] so the leaf logic lives in one
1233    /// place and the two entry points only differ in their
1234    /// composite handling.
1235    fn evaluate_leaf(&self, ctx: &EvalContext<'_>) -> bool {
1236        match self {
1237            // Presence check: matches both `AxisPresent` and
1238            // `AxisValue` for `key`. Cannot route through
1239            // `match_axis_tag` because that helper now skips
1240            // `AxisPresent` (presence-only tags carry no value;
1241            // value predicates would otherwise see `""`).
1242            Self::Exists { key } => ctx
1243                .tags
1244                .iter()
1245                .any(|t| matches!(t.axis_key_ref(), Some((a, k)) if a == key.axis && k == key.key)),
1246            Self::Equals { key, value } => match_axis_tag(ctx.tags, key, |v| v == value.as_str()),
1247            Self::NumericAtLeast { key, threshold } => match_axis_tag(ctx.tags, key, |v| {
1248                v.parse::<f64>().is_ok_and(|n| n >= *threshold)
1249            }),
1250            Self::NumericAtMost { key, threshold } => match_axis_tag(ctx.tags, key, |v| {
1251                v.parse::<f64>().is_ok_and(|n| n <= *threshold)
1252            }),
1253            Self::NumericInRange { key, min, max } => match_axis_tag(ctx.tags, key, |v| {
1254                v.parse::<f64>().is_ok_and(|n| n >= *min && n <= *max)
1255            }),
1256            Self::SemverAtLeast { key, version } => {
1257                let Some(rhs) = parse_semver(version) else {
1258                    return false;
1259                };
1260                match_axis_tag(ctx.tags, key, |v| {
1261                    parse_semver(v).is_some_and(|lhs| lhs >= rhs)
1262                })
1263            }
1264            Self::SemverAtMost { key, version } => {
1265                let Some(rhs) = parse_semver(version) else {
1266                    return false;
1267                };
1268                match_axis_tag(ctx.tags, key, |v| {
1269                    parse_semver(v).is_some_and(|lhs| lhs <= rhs)
1270                })
1271            }
1272            Self::SemverCompatible { key, version } => {
1273                let Some(rhs) = parse_semver(version) else {
1274                    return false;
1275                };
1276                match_axis_tag(ctx.tags, key, |v| {
1277                    parse_semver(v).is_some_and(|lhs| semver_compatible(lhs, rhs))
1278                })
1279            }
1280            Self::StringPrefix { key, prefix } => {
1281                match_axis_tag(ctx.tags, key, |v| v.starts_with(prefix.as_str()))
1282            }
1283            Self::StringMatches { key, pattern } => {
1284                match_axis_tag(ctx.tags, key, |v| v.contains(pattern.as_str()))
1285            }
1286            Self::MetadataExists { key } => ctx.metadata.contains_key(key),
1287            Self::MetadataEquals { key, value } => {
1288                ctx.metadata.get(key).is_some_and(|v| v == value)
1289            }
1290            Self::MetadataMatches { key, pattern } => ctx
1291                .metadata
1292                .get(key)
1293                .is_some_and(|v| v.contains(pattern.as_str())),
1294            Self::MetadataNumericAtLeast { key, threshold } => ctx
1295                .metadata
1296                .get(key)
1297                .and_then(|v| v.parse::<f64>().ok())
1298                .is_some_and(|n| n >= *threshold),
1299            // Composite variants are routed through `evaluate` /
1300            // `evaluate_unplanned`, never reach this leaf-only path.
1301            Self::And(_) | Self::Or(_) | Self::Not(_) => unreachable!(
1302                "evaluate_leaf called with a composite Predicate; \
1303                 routing bug in evaluate / evaluate_unplanned"
1304            ),
1305        }
1306    }
1307
1308    /// `And` short-circuit evaluation in cost-ascending child order.
1309    fn eval_all_in_cost_order(children: &[Predicate], ctx: &EvalContext<'_>) -> bool {
1310        let mut order: Vec<usize> = (0..children.len()).collect();
1311        order.sort_by_key(|&i| children[i].static_cost());
1312        order.into_iter().all(|i| children[i].evaluate(ctx))
1313    }
1314
1315    /// `Or` short-circuit evaluation in cost-ascending child order.
1316    ///
1317    /// CR-18: this uses the same `static_cost` as the And path,
1318    /// not a mirrored Or-specific function. The And-vs-Or
1319    /// asymmetry (And wants rare-true clauses first to fail
1320    /// fast; Or wants often-true clauses first to succeed fast)
1321    /// is encoded in the index-aware path via
1322    /// [`Self::dynamic_cost`] vs [`Self::dynamic_cost_or`], which
1323    /// invert the cardinality direction. Without an index the
1324    /// planner has no per-clause trueness signal, so it falls
1325    /// back to ordering by raw evaluation work — neutral between
1326    /// And and Or, which is the best we can do here.
1327    fn eval_any_in_cost_order(children: &[Predicate], ctx: &EvalContext<'_>) -> bool {
1328        let mut order: Vec<usize> = (0..children.len()).collect();
1329        order.sort_by_key(|&i| children[i].static_cost());
1330        order.into_iter().any(|i| children[i].evaluate(ctx))
1331    }
1332
1333    /// Static cost estimate for the planner. Lower = cheaper to
1334    /// evaluate; planner sorts children ascending.
1335    ///
1336    /// Phase 4 first cut uses fixed-per-variant costs (no index
1337    /// integration). The ordering reflects empirical evaluation
1338    /// cost: hashmap lookups < tag-set scans with simple parses
1339    /// < substring scans < semver parses.
1340    ///
1341    /// Composite costs sum the children's costs, so a deeply
1342    /// nested branch is heavier than a shallow one with the same
1343    /// leaf shape.
1344    fn static_cost(&self) -> u32 {
1345        match self {
1346            // Tier 1: O(1) hashmap lookup.
1347            Self::MetadataExists { .. } => 10,
1348            Self::MetadataEquals { .. } => 11,
1349            // Tier 2: O(N) tag-set scan with cheap value handling.
1350            Self::Exists { .. } => 20,
1351            Self::Equals { .. } => 21,
1352            Self::MetadataNumericAtLeast { .. } => 25,
1353            // Tier 3: O(N) tag-set scan + numeric parse per match.
1354            Self::NumericAtLeast { .. }
1355            | Self::NumericAtMost { .. }
1356            | Self::NumericInRange { .. } => 30,
1357            // Tier 4: O(N) scan + substring / prefix scan.
1358            Self::StringPrefix { .. } => 40,
1359            Self::MetadataMatches { .. } => 45,
1360            Self::StringMatches { .. } => 50,
1361            // Tier 5: semver triple parse (heaviest leaf).
1362            Self::SemverAtLeast { .. }
1363            | Self::SemverAtMost { .. }
1364            | Self::SemverCompatible { .. } => 60,
1365            // Composites: sum of children. Caps avoid u32 overflow
1366            // by saturating at u32::MAX (a predicate this big
1367            // would have a different problem already).
1368            Self::And(c) | Self::Or(c) => c
1369                .iter()
1370                .map(|c| c.static_cost())
1371                .fold(0u32, |a, b| a.saturating_add(b)),
1372            Self::Not(inner) => inner.static_cost(),
1373        }
1374    }
1375
1376    /// Cardinality-aware cost estimate. Refines [`Self::static_cost`]
1377    /// with per-key distinct-value counts from a
1378    /// [`CardinalityProvider`](crate::adapter::net::behavior::CardinalityProvider).
1379    ///
1380    /// Phase 4 follow-on of `CAPABILITY_ENHANCEMENTS_PLAN.md`. A
1381    /// leaf clause keyed on a high-cardinality tag (many distinct
1382    /// values across nodes) is more selective than one keyed on
1383    /// a low-cardinality tag — running it first short-circuits
1384    /// faster on the common-mismatch case.
1385    ///
1386    /// The intuition: an `Equals(key, v)` clause has roughly
1387    /// `1 / cardinality` chance of matching a uniformly-random
1388    /// node, so expected work is `static_cost / cardinality`.
1389    ///
1390    /// Behavior:
1391    ///
1392    /// - Tag-keyed leaves (Exists / Equals / Numeric* / Semver* /
1393    ///   String*): `static_cost / max(1, cardinality)`. A
1394    ///   cardinality of zero (key not yet indexed) falls back to
1395    ///   raw `static_cost` — conservative.
1396    /// - Metadata leaves: `static_cost` unchanged. The
1397    ///   provider trait doesn't track metadata cardinality by
1398    ///   default (Phase D / E may add a metadata index; lands then).
1399    /// - Composites: sum of child dynamic costs (saturating).
1400    /// - `Not`: passes through inner cost.
1401    fn dynamic_cost<P: crate::adapter::net::behavior::CardinalityProvider>(
1402        &self,
1403        index: &P,
1404    ) -> u32 {
1405        match self {
1406            // Tag-keyed leaves: static_cost / cardinality.
1407            Self::Exists { key }
1408            | Self::Equals { key, .. }
1409            | Self::NumericAtLeast { key, .. }
1410            | Self::NumericAtMost { key, .. }
1411            | Self::NumericInRange { key, .. }
1412            | Self::SemverAtLeast { key, .. }
1413            | Self::SemverAtMost { key, .. }
1414            | Self::SemverCompatible { key, .. }
1415            | Self::StringPrefix { key, .. }
1416            | Self::StringMatches { key, .. } => {
1417                let static_c = self.static_cost();
1418                let cardinality = index.axis_cardinality(key);
1419                if cardinality > 0 {
1420                    static_c.saturating_div(u32::try_from(cardinality).unwrap_or(u32::MAX).max(1))
1421                } else {
1422                    // Key absent from the index — could be a brand-new
1423                    // tag the substrate hasn't observed yet. Conservatively
1424                    // keep static_cost so we don't underestimate work.
1425                    static_c
1426                }
1427            }
1428            // Metadata leaves: refine via the index's metadata
1429            // cardinality tracking (mirrors the axis-tag side).
1430            Self::MetadataExists { key }
1431            | Self::MetadataEquals { key, .. }
1432            | Self::MetadataMatches { key, .. }
1433            | Self::MetadataNumericAtLeast { key, .. } => {
1434                let static_c = self.static_cost();
1435                let cardinality = index.metadata_value_cardinality(key);
1436                if cardinality > 0 {
1437                    static_c.saturating_div(u32::try_from(cardinality).unwrap_or(u32::MAX).max(1))
1438                } else {
1439                    // Key absent from index → fall back to static cost.
1440                    static_c
1441                }
1442            }
1443            // Composites: sum of children's dynamic costs.
1444            Self::And(c) | Self::Or(c) => c
1445                .iter()
1446                .map(|c| c.dynamic_cost(index))
1447                .fold(0u32, |a, b| a.saturating_add(b)),
1448            Self::Not(inner) => inner.dynamic_cost(index),
1449        }
1450    }
1451
1452    /// Evaluate against `ctx`, using `index`'s per-key cardinality
1453    /// data to refine the planner's clause ordering at every
1454    /// `And` / `Or` node.
1455    ///
1456    /// Phase 4 follow-on of `CAPABILITY_ENHANCEMENTS_PLAN.md`.
1457    /// Produces the same boolean result as
1458    /// [`Self::evaluate_unplanned`] for any `(ast, ctx)`; the index
1459    /// only changes execution order, not semantics. Pinned in the
1460    /// `index_planner_evaluate_matches_unplanned_*` property tests.
1461    ///
1462    /// When the index is available, prefer this entry point over
1463    /// [`Self::evaluate`] (static-cost planner) — cardinality data
1464    /// catches selective clauses the static planner misses (e.g.,
1465    /// a `MetadataEquals` happens to be the cheapest leaf
1466    /// statically, but a high-cardinality `Equals` on an axis tag
1467    /// is even more selective in this index's data).
1468    ///
1469    /// When the index is unavailable or unhelpful (zero-cardinality
1470    /// for every key — empty index), this falls back to behavior
1471    /// equivalent to [`Self::evaluate`].
1472    pub fn evaluate_with_index<P: crate::adapter::net::behavior::CardinalityProvider>(
1473        &self,
1474        ctx: &EvalContext<'_>,
1475        index: &P,
1476    ) -> bool {
1477        match self {
1478            Self::And(children) => Self::eval_all_with_index(children, ctx, index),
1479            Self::Or(children) => Self::eval_any_with_index(children, ctx, index),
1480            Self::Not(inner) => !inner.evaluate_with_index(ctx, index),
1481            other => other.evaluate_leaf(ctx),
1482        }
1483    }
1484
1485    /// `And` short-circuit evaluation in dynamic-cost-ascending
1486    /// child order.
1487    fn eval_all_with_index<P: crate::adapter::net::behavior::CardinalityProvider>(
1488        children: &[Predicate],
1489        ctx: &EvalContext<'_>,
1490        index: &P,
1491    ) -> bool {
1492        let mut order: Vec<usize> = (0..children.len()).collect();
1493        order.sort_by_key(|&i| children[i].dynamic_cost(index));
1494        order
1495            .into_iter()
1496            .all(|i| children[i].evaluate_with_index(ctx, index))
1497    }
1498
1499    /// `Or` short-circuit evaluation in Or-mode-cost-ascending
1500    /// child order.
1501    ///
1502    /// Phase 4 final close of `CAPABILITY_ENHANCEMENTS_PLAN.md`.
1503    /// Uses [`Self::dynamic_cost_or`] (the inverted formula
1504    /// favoring low-cardinality "often-true" clauses) instead of
1505    /// the And-mode [`Self::dynamic_cost`]. The asymmetry matches
1506    /// short-circuit semantics: And short-circuits on first false
1507    /// (run rare-true clauses first), Or short-circuits on first
1508    /// true (run often-true clauses first).
1509    fn eval_any_with_index<P: crate::adapter::net::behavior::CardinalityProvider>(
1510        children: &[Predicate],
1511        ctx: &EvalContext<'_>,
1512        index: &P,
1513    ) -> bool {
1514        let mut order: Vec<usize> = (0..children.len()).collect();
1515        order.sort_by_key(|&i| children[i].dynamic_cost_or(index));
1516        order
1517            .into_iter()
1518            .any(|i| children[i].evaluate_with_index(ctx, index))
1519    }
1520
1521    /// Or-mode dynamic cost. Inverts the cardinality direction
1522    /// from [`Self::dynamic_cost`] so low-cardinality clauses
1523    /// (likely to match many candidates → often-true) sort first.
1524    ///
1525    /// Phase 4 final close of `CAPABILITY_ENHANCEMENTS_PLAN.md`.
1526    ///
1527    /// Behavior at leaves:
1528    ///
1529    /// - Tag-keyed leaves: `static_cost × max(1, cardinality)`.
1530    ///   High cardinality → many distinct values → each rare → high
1531    ///   Or-cost (run later). Low cardinality → matches concentrated
1532    ///   on few values → each common → low Or-cost (run first).
1533    /// - Metadata leaves: same shape against
1534    ///   `metadata_value_cardinality`.
1535    /// - Cardinality-0 (key absent from index) → fall back to
1536    ///   `static_cost`, conservative.
1537    ///
1538    /// Behavior at composites:
1539    ///
1540    /// - `And(children)` recurses with And-mode `dynamic_cost`
1541    ///   (the And's own internal ordering).
1542    /// - `Or(children)` recurses with Or-mode `dynamic_cost_or`.
1543    /// - `Not(inner)` passes through the same Or-mode recursion.
1544    ///
1545    /// Note: this is a leaf-level asymmetry. A rigorous treatment
1546    /// would also penalize And-as-Or-child with a "rare-true"
1547    /// score (since an And is true only when all children are
1548    /// true), but doing that requires modeling per-clause
1549    /// truthiness probability (a separate piece of work). For
1550    /// typical predicate shapes (mostly leaf-or-mixed, not
1551    /// deeply-nested And-of-Or-of-And), the leaf-level
1552    /// asymmetry catches the load-bearing case.
1553    fn dynamic_cost_or<P: crate::adapter::net::behavior::CardinalityProvider>(
1554        &self,
1555        index: &P,
1556    ) -> u32 {
1557        match self {
1558            Self::Exists { key }
1559            | Self::Equals { key, .. }
1560            | Self::NumericAtLeast { key, .. }
1561            | Self::NumericAtMost { key, .. }
1562            | Self::NumericInRange { key, .. }
1563            | Self::SemverAtLeast { key, .. }
1564            | Self::SemverAtMost { key, .. }
1565            | Self::SemverCompatible { key, .. }
1566            | Self::StringPrefix { key, .. }
1567            | Self::StringMatches { key, .. } => {
1568                let static_c = self.static_cost();
1569                let cardinality = index.axis_cardinality(key);
1570                if cardinality == 0 {
1571                    return static_c;
1572                }
1573                static_c.saturating_mul(u32::try_from(cardinality).unwrap_or(u32::MAX).max(1))
1574            }
1575            Self::MetadataExists { key }
1576            | Self::MetadataEquals { key, .. }
1577            | Self::MetadataMatches { key, .. }
1578            | Self::MetadataNumericAtLeast { key, .. } => {
1579                let static_c = self.static_cost();
1580                let cardinality = index.metadata_value_cardinality(key);
1581                if cardinality == 0 {
1582                    return static_c;
1583                }
1584                static_c.saturating_mul(u32::try_from(cardinality).unwrap_or(u32::MAX).max(1))
1585            }
1586            // Composites: recurse with mode appropriate to the
1587            // composite's own type. This is a leaf-level asymmetry —
1588            // the cost reflects the composite's own internal
1589            // expected work, not its truthiness probability.
1590            Self::And(c) => c
1591                .iter()
1592                .map(|c| c.dynamic_cost(index))
1593                .fold(0u32, |a, b| a.saturating_add(b)),
1594            Self::Or(c) => c
1595                .iter()
1596                .map(|c| c.dynamic_cost_or(index))
1597                .fold(0u32, |a, b| a.saturating_add(b)),
1598            Self::Not(inner) => inner.dynamic_cost_or(index),
1599        }
1600    }
1601}
1602
1603// =============================================================================
1604// Debug session — Phase 6 of CAPABILITY_ENHANCEMENTS_PLAN.md.
1605//
1606// `Predicate::evaluate_with_trace` instruments a single evaluation,
1607// producing a tree of clause traces showing which clauses ran and
1608// what they returned. `PredicateDebugReport` aggregates traces over
1609// a candidate corpus into per-clause hit/miss stats plus a printable
1610// summary.
1611//
1612// Opt-in only — production hot paths use `evaluate()`, never this
1613// path. The instrumentation overhead is dominated by the per-clause
1614// label allocation (`format!`); production-grade ~5% overhead is
1615// achievable but the current implementation favors simplicity.
1616// =============================================================================
1617
1618/// Tree-shaped trace from one debug evaluation against a single
1619/// `EvalContext`. Mirrors the AST of the predicate that was
1620/// evaluated, except `And` / `Or` short-circuits drop unevaluated
1621/// siblings — the trace only carries clauses that actually ran.
1622///
1623/// Phase 6 of `CAPABILITY_ENHANCEMENTS_PLAN.md`. Returned by
1624/// [`Predicate::evaluate_with_trace`].
1625#[derive(Debug, Clone, PartialEq)]
1626pub struct ClauseTrace {
1627    /// One-line summary of the clause (`"Exists(hardware.gpu)"`,
1628    /// `"And(3 clauses)"`, `"MetadataEquals(intent=ml-training)"`).
1629    /// Aggregated stats merge by label, so two structurally-equal
1630    /// leaf clauses share one entry in the report.
1631    pub label: String,
1632    /// Final result of evaluating this clause.
1633    pub result: bool,
1634    /// Children traces in evaluation order. For `And` / `Or` this is
1635    /// the planner-ordered (cost-ascending) sequence of children
1636    /// that actually ran (short-circuited siblings are absent).
1637    /// `Not` has exactly one child. Leaves have an empty children
1638    /// list.
1639    pub children: Vec<ClauseTrace>,
1640}
1641
1642impl Predicate {
1643    /// Evaluate against `ctx`, also producing a tree of per-clause
1644    /// traces.
1645    ///
1646    /// The result equals `self.evaluate(ctx)`; this entry point adds
1647    /// the [`ClauseTrace`] tree as a side channel for debug
1648    /// inspection. Composite clauses retain the planner's
1649    /// short-circuit behavior — descendants that didn't run aren't
1650    /// in the trace.
1651    ///
1652    /// Phase 6 of `CAPABILITY_ENHANCEMENTS_PLAN.md`. Opt-in only;
1653    /// production callers use [`Predicate::evaluate`].
1654    pub fn evaluate_with_trace(&self, ctx: &EvalContext<'_>) -> (bool, ClauseTrace) {
1655        let label = self.debug_label();
1656        match self {
1657            Self::And(children) => {
1658                let mut order: Vec<usize> = (0..children.len()).collect();
1659                order.sort_by_key(|&i| children[i].static_cost());
1660                let mut traces = Vec::with_capacity(order.len());
1661                let mut result = true;
1662                for i in order {
1663                    let (r, t) = children[i].evaluate_with_trace(ctx);
1664                    traces.push(t);
1665                    if !r {
1666                        result = false;
1667                        break;
1668                    }
1669                }
1670                (
1671                    result,
1672                    ClauseTrace {
1673                        label,
1674                        result,
1675                        children: traces,
1676                    },
1677                )
1678            }
1679            Self::Or(children) => {
1680                let mut order: Vec<usize> = (0..children.len()).collect();
1681                order.sort_by_key(|&i| children[i].static_cost());
1682                let mut traces = Vec::with_capacity(order.len());
1683                let mut result = false;
1684                for i in order {
1685                    let (r, t) = children[i].evaluate_with_trace(ctx);
1686                    traces.push(t);
1687                    if r {
1688                        result = true;
1689                        break;
1690                    }
1691                }
1692                (
1693                    result,
1694                    ClauseTrace {
1695                        label,
1696                        result,
1697                        children: traces,
1698                    },
1699                )
1700            }
1701            Self::Not(inner) => {
1702                let (r, t) = inner.evaluate_with_trace(ctx);
1703                (
1704                    !r,
1705                    ClauseTrace {
1706                        label,
1707                        result: !r,
1708                        children: vec![t],
1709                    },
1710                )
1711            }
1712            leaf => {
1713                let result = leaf.evaluate_leaf(ctx);
1714                (
1715                    result,
1716                    ClauseTrace {
1717                        label,
1718                        result,
1719                        children: Vec::new(),
1720                    },
1721                )
1722            }
1723        }
1724    }
1725
1726    /// One-line debug label for this clause. Used by
1727    /// [`ClauseTrace`] and [`PredicateDebugReport`] to identify
1728    /// clauses in human-readable output.
1729    fn debug_label(&self) -> String {
1730        match self {
1731            Self::Exists { key } => format!("Exists({key})"),
1732            Self::Equals { key, value } => format!("Equals({key}={value})"),
1733            Self::NumericAtLeast { key, threshold } => {
1734                format!("NumericAtLeast({key} >= {threshold})")
1735            }
1736            Self::NumericAtMost { key, threshold } => {
1737                format!("NumericAtMost({key} <= {threshold})")
1738            }
1739            Self::NumericInRange { key, min, max } => {
1740                format!("NumericInRange({key} in [{min}, {max}])")
1741            }
1742            Self::SemverAtLeast { key, version } => {
1743                format!("SemverAtLeast({key} >= {version})")
1744            }
1745            Self::SemverAtMost { key, version } => {
1746                format!("SemverAtMost({key} <= {version})")
1747            }
1748            Self::SemverCompatible { key, version } => {
1749                format!("SemverCompatible({key} ~= {version})")
1750            }
1751            Self::StringPrefix { key, prefix } => {
1752                format!("StringPrefix({key} starts with {prefix:?})")
1753            }
1754            Self::StringMatches { key, pattern } => {
1755                format!("StringMatches({key} contains {pattern:?})")
1756            }
1757            Self::MetadataExists { key } => format!("MetadataExists({key})"),
1758            Self::MetadataEquals { key, value } => {
1759                format!("MetadataEquals({key}={value})")
1760            }
1761            Self::MetadataMatches { key, pattern } => {
1762                format!("MetadataMatches({key} contains {pattern:?})")
1763            }
1764            Self::MetadataNumericAtLeast { key, threshold } => {
1765                format!("MetadataNumericAtLeast({key} >= {threshold})")
1766            }
1767            Self::And(c) => format!("And({} clauses)", c.len()),
1768            Self::Or(c) => format!("Or({} clauses)", c.len()),
1769            Self::Not(_) => "Not".to_string(),
1770        }
1771    }
1772}
1773
1774/// Per-clause aggregated stats across a candidate corpus.
1775///
1776/// Merged by `label`: two structurally-equal clauses (same variant,
1777/// same key, same value) share one [`ClauseStats`] entry. This is
1778/// generally what an operator wants — "how often does
1779/// `MetadataEquals(intent=ml-training)` succeed?" doesn't depend on
1780/// where in the AST that clause sits.
1781#[derive(Debug, Clone, Default, PartialEq)]
1782pub struct ClauseStats {
1783    /// Clause label (matches the `label` field on [`ClauseTrace`]).
1784    pub label: String,
1785    /// Number of candidates whose evaluation reached this clause
1786    /// (i.e. wasn't short-circuited away by an earlier sibling).
1787    pub evaluated: usize,
1788    /// Number of those evaluations that returned `true`.
1789    pub matched: usize,
1790}
1791
1792/// Aggregate report from running a [`Predicate`] across a corpus
1793/// of candidate evaluation contexts.
1794///
1795/// Phase 6 of `CAPABILITY_ENHANCEMENTS_PLAN.md`. Built by
1796/// [`PredicateDebugReport::from_evaluations`].
1797///
1798/// The report answers: "given this predicate and these candidates,
1799/// how many matched, and how often did each clause filter?". A
1800/// clause with 1042 evaluations and 12 matches has 1.2% positive
1801/// selectivity — operators use that to spot mismatches between
1802/// their mental model of the data and the actual data.
1803#[derive(Debug, Clone, Default, PartialEq)]
1804pub struct PredicateDebugReport {
1805    /// Number of candidates the predicate was evaluated against.
1806    pub total_candidates: usize,
1807    /// Number of candidates the predicate matched (returned `true`).
1808    pub matched: usize,
1809    /// Per-clause aggregated stats, keyed by the clause's debug
1810    /// label. `BTreeMap` for deterministic iteration order in
1811    /// printed output.
1812    pub clause_stats: std::collections::BTreeMap<String, ClauseStats>,
1813}
1814
1815impl PredicateDebugReport {
1816    /// Run `pred` against each context in `contexts`, accumulating
1817    /// per-clause hit / miss stats.
1818    ///
1819    /// Each context contributes one trace; the trace tree is walked
1820    /// post-order to update the per-label `ClauseStats`. Composite
1821    /// clauses (And / Or / Not) get their own labels too, so an
1822    /// operator can see "the And short-circuited 730/1042 times" at
1823    /// a glance.
1824    pub fn from_evaluations<'a, I>(pred: &Predicate, contexts: I) -> Self
1825    where
1826        I: IntoIterator<Item = EvalContext<'a>>,
1827    {
1828        let mut report = Self::default();
1829        for ctx in contexts {
1830            report.total_candidates += 1;
1831            let (matched, trace) = pred.evaluate_with_trace(&ctx);
1832            if matched {
1833                report.matched += 1;
1834            }
1835            accumulate_trace(&trace, &mut report.clause_stats);
1836        }
1837        report
1838    }
1839
1840    /// Format a human-readable summary suitable for terminal output.
1841    ///
1842    /// Returned as a `String` rather than printed directly so tests
1843    /// can pin the format and callers can route to their own logger.
1844    pub fn render(&self) -> String {
1845        let mut out = String::new();
1846        let pct = |num: usize, denom: usize| -> f64 {
1847            if denom == 0 {
1848                0.0
1849            } else {
1850                100.0 * (num as f64) / (denom as f64)
1851            }
1852        };
1853        out.push_str("Predicate evaluation report\n");
1854        out.push_str("─────────────────────────────────────────\n");
1855        out.push_str(&format!(
1856            "Total candidates: {}\nMatched:          {} ({:.1}%)\n\n",
1857            self.total_candidates,
1858            self.matched,
1859            pct(self.matched, self.total_candidates),
1860        ));
1861        out.push_str("Per-clause stats (alphabetical):\n");
1862        for stats in self.clause_stats.values() {
1863            out.push_str(&format!(
1864                "  {:<60} evaluated {:>5}, matched {:>5} ({:>5.1}%)\n",
1865                stats.label,
1866                stats.evaluated,
1867                stats.matched,
1868                pct(stats.matched, stats.evaluated),
1869            ));
1870        }
1871        out
1872    }
1873}
1874
1875/// Walk a [`ClauseTrace`] tree post-order, updating per-label
1876/// stats in `acc`.
1877fn accumulate_trace(
1878    trace: &ClauseTrace,
1879    acc: &mut std::collections::BTreeMap<String, ClauseStats>,
1880) {
1881    let entry = acc
1882        .entry(trace.label.clone())
1883        .or_insert_with(|| ClauseStats {
1884            label: trace.label.clone(),
1885            evaluated: 0,
1886            matched: 0,
1887        });
1888    entry.evaluated += 1;
1889    if trace.result {
1890        entry.matched += 1;
1891    }
1892    for child in &trace.children {
1893        accumulate_trace(child, acc);
1894    }
1895}
1896
1897/// Find any value-bearing tag in `tags` matching `key` and run
1898/// `value_pred` against its value. [`Tag::AxisPresent`] tags carry
1899/// no value and are skipped — feeding `""` through `value_pred`
1900/// would let an empty-string `Equals` / `StringPrefix` /
1901/// `StringMatches` predicate spuriously match a presence-only tag.
1902/// Use [`Predicate::Exists`] (which goes through a separate
1903/// presence-aware path in `evaluate_leaf`) when key-presence
1904/// without a value is the intended check.
1905fn match_axis_tag(tags: &[Tag], key: &TagKey, value_pred: impl Fn(&str) -> bool) -> bool {
1906    tags.iter().any(|t| match t {
1907        Tag::AxisValue {
1908            axis,
1909            key: k,
1910            value,
1911            ..
1912        } => *axis == key.axis && k == &key.key && value_pred(value),
1913        _ => false,
1914    })
1915}
1916
1917// =============================================================================
1918// Semver — minimal inline parser
1919// =============================================================================
1920
1921/// Semver triple `(major, minor, patch)`. Pre-release / build
1922/// metadata is stripped at parse time; comparing only the triple is
1923/// enough for this plan's `NumericAtLeast` / `Compatible` semantics.
1924type SemverTriple = (u64, u64, u64);
1925
1926/// Parse a `MAJOR.MINOR.PATCH[-prerelease][+build]` string. Returns
1927/// `None` on any malformed input. Lenient on missing components: `1`
1928/// → `(1, 0, 0)`, `1.2` → `(1, 2, 0)` — matches caller expectation
1929/// when applications emit truncated version strings.
1930fn parse_semver(s: &str) -> Option<SemverTriple> {
1931    // Drop pre-release / build suffix.
1932    let core = s
1933        .split_once('-')
1934        .map(|(c, _)| c)
1935        .unwrap_or(s)
1936        .split_once('+')
1937        .map(|(c, _)| c)
1938        .unwrap_or_else(|| s.split_once('-').map(|(c, _)| c).unwrap_or(s));
1939    let mut parts = core.split('.').map(str::trim);
1940    let major = parts.next()?.parse().ok()?;
1941    let minor = parts.next().map(|p| p.parse().ok()).unwrap_or(Some(0))?;
1942    let patch = parts.next().map(|p| p.parse().ok()).unwrap_or(Some(0))?;
1943    if parts.next().is_some() {
1944        return None; // 4+ components is not semver
1945    }
1946    Some((major, minor, patch))
1947}
1948
1949/// `lhs` is caret-compatible with `rhs` per the standard semver
1950/// rule: same major (or same minor for `0.x.y`, exact for `0.0.x`),
1951/// and `lhs >= rhs`. Mirrors cargo's `^` operator semantics.
1952fn semver_compatible(lhs: SemverTriple, rhs: SemverTriple) -> bool {
1953    if lhs < rhs {
1954        return false;
1955    }
1956    if rhs.0 == 0 {
1957        if rhs.1 == 0 {
1958            // 0.0.x — patch is the compatibility band; anything
1959            // other than the exact tuple is a breaking change.
1960            // Combined with the `lhs >= rhs` guard above this
1961            // collapses to lhs == rhs.
1962            lhs == rhs
1963        } else {
1964            // 0.x.y — minor is the compatibility band, AND the
1965            // major must also be 0. Pre-fix `rhs.1 == lhs.1`
1966            // alone admitted `lhs = 1.2.5` as compatible with
1967            // `rhs = 0.2.3` (the lhs >= rhs guard passes since
1968            // 1 > 0, then minors match). 1.2.5 is not `^0.2.3`-
1969            // compatible per Cargo: 0.x.y treats minor as the
1970            // band IFF the band itself is 0.x.y.
1971            lhs.0 == 0 && rhs.1 == lhs.1
1972        }
1973    } else {
1974        rhs.0 == lhs.0
1975    }
1976}
1977
1978// =============================================================================
1979// pred! macro
1980// =============================================================================
1981
1982/// Lightweight macro sugar over [`Predicate`] constructors. Mirrors
1983/// the substrate plan's macro-style examples in §6a; lowers to plain
1984/// constructor calls so the AST stays the single source of truth.
1985///
1986/// ## Forms
1987///
1988/// ```ignore
1989/// pred!(exists "hardware.gpu");
1990/// pred!(equals "software.runtime", "cuda-12.4");
1991/// pred!(num_at_least "hardware.gpu.vram_gb", 24.0);
1992/// pred!(num_at_most "hardware.gpu.vram_gb", 80.0);
1993/// pred!(num_in_range "hardware.cpu_cores", 8.0, 64.0);
1994/// pred!(semver_at_least "software.runtime", "12.0");
1995/// pred!(semver_compatible "software.runtime", "12.0");
1996/// pred!(prefix "software.tool", "ffmpeg");
1997/// pred!(matches "software.daemon", "postgres");
1998/// pred!(metadata_exists "intent");
1999/// pred!(metadata_equals "intent", "ml-training");
2000/// pred!(and [a, b, c]);
2001/// pred!(or  [a, b, c]);
2002/// pred!(not a);
2003/// ```
2004///
2005/// The string forms are `<axis>.<key>` literals; the macro splits
2006/// them into `(axis, key)` via [`crate::adapter::net::behavior::tag::Tag::parse`]
2007/// and panics at construction time on invalid axis prefixes —
2008/// matching the substrate plan's "validates shapes at parse time"
2009/// contract for the macro.
2010#[macro_export]
2011macro_rules! pred {
2012    (exists $key:literal) => {
2013        $crate::adapter::net::behavior::predicate::Predicate::exists(
2014            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2015        )
2016    };
2017    (equals $key:literal, $value:expr) => {
2018        $crate::adapter::net::behavior::predicate::Predicate::equals(
2019            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2020            $value,
2021        )
2022    };
2023    (num_at_least $key:literal, $t:expr) => {
2024        $crate::adapter::net::behavior::predicate::Predicate::numeric_at_least(
2025            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2026            $t,
2027        )
2028    };
2029    (num_at_most $key:literal, $t:expr) => {
2030        $crate::adapter::net::behavior::predicate::Predicate::numeric_at_most(
2031            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2032            $t,
2033        )
2034    };
2035    (num_in_range $key:literal, $min:expr, $max:expr) => {
2036        $crate::adapter::net::behavior::predicate::Predicate::numeric_in_range(
2037            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2038            $min,
2039            $max,
2040        )
2041    };
2042    (semver_at_least $key:literal, $v:expr) => {
2043        $crate::adapter::net::behavior::predicate::Predicate::semver_at_least(
2044            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2045            $v,
2046        )
2047    };
2048    (semver_at_most $key:literal, $v:expr) => {
2049        $crate::adapter::net::behavior::predicate::Predicate::semver_at_most(
2050            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2051            $v,
2052        )
2053    };
2054    (semver_compatible $key:literal, $v:expr) => {
2055        $crate::adapter::net::behavior::predicate::Predicate::semver_compatible(
2056            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2057            $v,
2058        )
2059    };
2060    (prefix $key:literal, $p:expr) => {
2061        $crate::adapter::net::behavior::predicate::Predicate::string_prefix(
2062            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2063            $p,
2064        )
2065    };
2066    (matches $key:literal, $p:expr) => {
2067        $crate::adapter::net::behavior::predicate::Predicate::string_matches(
2068            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2069            $p,
2070        )
2071    };
2072    (metadata_exists $key:expr) => {
2073        $crate::adapter::net::behavior::predicate::Predicate::metadata_exists($key)
2074    };
2075    (metadata_equals $key:expr, $v:expr) => {
2076        $crate::adapter::net::behavior::predicate::Predicate::metadata_equals($key, $v)
2077    };
2078    (metadata_matches $key:expr, $p:expr) => {
2079        $crate::adapter::net::behavior::predicate::Predicate::metadata_matches($key, $p)
2080    };
2081    (metadata_num_at_least $key:expr, $t:expr) => {
2082        $crate::adapter::net::behavior::predicate::Predicate::metadata_numeric_at_least(
2083            $key, $t,
2084        )
2085    };
2086    (and [ $($clause:expr),* $(,)? ]) => {
2087        $crate::adapter::net::behavior::predicate::Predicate::and(vec![$($clause),*])
2088    };
2089    (or [ $($clause:expr),* $(,)? ]) => {
2090        $crate::adapter::net::behavior::predicate::Predicate::or(vec![$($clause),*])
2091    };
2092    (not $clause:expr) => {
2093        $crate::adapter::net::behavior::predicate::Predicate::not($clause)
2094    };
2095}
2096
2097/// Internal helper used by the [`pred!`] macro to lift an
2098/// `<axis>.<key>` string literal into a [`TagKey`]. Panics on
2099/// unknown axis or empty key — the macro contract is "parse-time
2100/// validation," and violating it at the call site is a programmer
2101/// error caught at the first run (matches the substrate plan's
2102/// macro-validation guarantee).
2103#[doc(hidden)]
2104pub fn __tag_key_from_str(s: &'static str) -> TagKey {
2105    let (axis_str, key) = s
2106        .split_once('.')
2107        .unwrap_or_else(|| panic!("pred! tag key {s:?} must be `<axis>.<key>`"));
2108    let axis = crate::adapter::net::behavior::tag::TaxonomyAxis::from_prefix(axis_str)
2109        .unwrap_or_else(|| {
2110            panic!(
2111                "pred! tag key {s:?} has unknown axis prefix {axis_str:?}; \
2112                 valid axes: hardware, software, devices, dataforts"
2113            )
2114        });
2115    TagKey::new(axis, key.to_string())
2116}
2117
2118// =============================================================================
2119// Tests
2120// =============================================================================
2121
2122#[cfg(test)]
2123mod tests {
2124    use super::*;
2125    use crate::adapter::net::behavior::tag::{Tag, TaxonomyAxis};
2126    use crate::adapter::net::behavior::{CapabilitySet, GpuInfo, GpuVendor, HardwareCapabilities};
2127    fn ctx<'a>(tags: &'a [Tag], metadata: &'a BTreeMap<String, String>) -> EvalContext<'a> {
2128        EvalContext::new(tags, metadata)
2129    }
2130    fn empty_meta() -> BTreeMap<String, String> {
2131        BTreeMap::new()
2132    }
2133    fn axis_present(axis: TaxonomyAxis, key: &str) -> Tag {
2134        Tag::AxisPresent {
2135            axis,
2136            key: key.into(),
2137        }
2138    }
2139    fn axis_eq(axis: TaxonomyAxis, key: &str, value: &str) -> Tag {
2140        Tag::AxisValue {
2141            axis,
2142            key: key.into(),
2143            value: value.into(),
2144            separator: crate::adapter::net::behavior::tag::AxisSeparator::Eq,
2145        }
2146    }
2147    // ---- existence + equality ------------------------------------------
2148
2149    #[test]
2150    fn exists_matches_axis_present_tag() {
2151        let tags = [axis_present(TaxonomyAxis::Hardware, "gpu")];
2152        let meta = empty_meta();
2153        let p = pred!(exists "hardware.gpu");
2154        assert!(p.evaluate(&ctx(&tags, &meta)));
2155    }
2156    #[test]
2157    fn exists_matches_axis_value_tag() {
2158        let tags = [axis_eq(TaxonomyAxis::Hardware, "gpu.vram_gb", "80")];
2159        let meta = empty_meta();
2160        let p = pred!(exists "hardware.gpu.vram_gb");
2161        assert!(p.evaluate(&ctx(&tags, &meta)));
2162    }
2163    #[test]
2164    fn exists_misses_when_axis_differs() {
2165        let tags = [axis_present(TaxonomyAxis::Software, "gpu")];
2166        let meta = empty_meta();
2167        let p = pred!(exists "hardware.gpu");
2168        assert!(!p.evaluate(&ctx(&tags, &meta)));
2169    }
2170    #[test]
2171    fn equals_matches_value_exactly() {
2172        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "cuda-12.4")];
2173        let meta = empty_meta();
2174        assert!(pred!(equals "software.runtime", "cuda-12.4").evaluate(&ctx(&tags, &meta)));
2175        assert!(!pred!(equals "software.runtime", "cuda-11").evaluate(&ctx(&tags, &meta)));
2176    }
2177    // ---- numeric --------------------------------------------------------
2178
2179    #[test]
2180    fn numeric_at_least_compares_value() {
2181        let tags = [axis_eq(TaxonomyAxis::Hardware, "gpu.vram_gb", "80")];
2182        let meta = empty_meta();
2183        assert!(pred!(num_at_least "hardware.gpu.vram_gb", 24.0).evaluate(&ctx(&tags, &meta)));
2184        assert!(pred!(num_at_least "hardware.gpu.vram_gb", 80.0).evaluate(&ctx(&tags, &meta)));
2185        assert!(!pred!(num_at_least "hardware.gpu.vram_gb", 96.0).evaluate(&ctx(&tags, &meta)));
2186    }
2187    #[test]
2188    fn numeric_at_most_and_in_range() {
2189        let tags = [axis_eq(TaxonomyAxis::Hardware, "cpu_cores", "16")];
2190        let meta = empty_meta();
2191        assert!(pred!(num_at_most "hardware.cpu_cores", 32.0).evaluate(&ctx(&tags, &meta)));
2192        assert!(!pred!(num_at_most "hardware.cpu_cores", 8.0).evaluate(&ctx(&tags, &meta)));
2193        assert!(pred!(num_in_range "hardware.cpu_cores", 8.0, 32.0).evaluate(&ctx(&tags, &meta)));
2194        assert!(!pred!(num_in_range "hardware.cpu_cores", 32.0, 64.0).evaluate(&ctx(&tags, &meta)));
2195    }
2196    #[test]
2197    fn numeric_unparseable_value_evaluates_to_false() {
2198        // Pinned: a tag whose value is not numeric must NOT panic
2199        // and must NOT match a numeric predicate. Federated queries
2200        // rely on this — a malformed tag from a peer's binding
2201        // shouldn't fault our query.
2202        let tags = [axis_eq(TaxonomyAxis::Hardware, "cpu_cores", "many")];
2203        let meta = empty_meta();
2204        assert!(!pred!(num_at_least "hardware.cpu_cores", 1.0).evaluate(&ctx(&tags, &meta)));
2205    }
2206    // ---- semver ---------------------------------------------------------
2207
2208    #[test]
2209    fn semver_at_least_basic() {
2210        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "12.4.1")];
2211        let meta = empty_meta();
2212        assert!(pred!(semver_at_least "software.runtime", "12.0.0").evaluate(&ctx(&tags, &meta)));
2213        assert!(pred!(semver_at_least "software.runtime", "12.4.0").evaluate(&ctx(&tags, &meta)));
2214        assert!(!pred!(semver_at_least "software.runtime", "13.0.0").evaluate(&ctx(&tags, &meta)));
2215    }
2216    #[test]
2217    fn semver_compatible_caret_rule() {
2218        // 1.x.y compatibility: same major.
2219        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "1.5.2")];
2220        let meta = empty_meta();
2221        assert!(pred!(semver_compatible "software.runtime", "1.0.0").evaluate(&ctx(&tags, &meta)));
2222        assert!(pred!(semver_compatible "software.runtime", "1.4.0").evaluate(&ctx(&tags, &meta)));
2223        assert!(!pred!(semver_compatible "software.runtime", "0.9.0").evaluate(&ctx(&tags, &meta)));
2224        assert!(!pred!(semver_compatible "software.runtime", "2.0.0").evaluate(&ctx(&tags, &meta)));
2225
2226        // 0.x.y compatibility: same minor.
2227        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "0.5.7")];
2228        assert!(pred!(semver_compatible "software.runtime", "0.5.0").evaluate(&ctx(&tags, &meta)));
2229        assert!(!pred!(semver_compatible "software.runtime", "0.4.0").evaluate(&ctx(&tags, &meta)));
2230    }
2231    /// Regression: `0.0.x` is exact-only under cargo's caret rule.
2232    /// The pre-fix `rhs.0 == 0 → rhs.1 == lhs.1` branch ignored the
2233    /// patch component and admitted any `0.0.y >= 0.0.x` as
2234    /// compatible — concretely, `^0.0.1` would match a peer running
2235    /// `0.0.2`, which is a breaking-change boundary.
2236    #[test]
2237    fn semver_compatible_zero_zero_patch_is_exact_only() {
2238        let meta = empty_meta();
2239
2240        // Exact match passes.
2241        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "0.0.3")];
2242        assert!(pred!(semver_compatible "software.runtime", "0.0.3").evaluate(&ctx(&tags, &meta)));
2243
2244        // Higher patch must NOT match (was admitted pre-fix).
2245        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "0.0.4")];
2246        assert!(!pred!(semver_compatible "software.runtime", "0.0.3").evaluate(&ctx(&tags, &meta)));
2247
2248        // Lower patch fails (already covered by the lhs >= rhs guard).
2249        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "0.0.2")];
2250        assert!(!pred!(semver_compatible "software.runtime", "0.0.3").evaluate(&ctx(&tags, &meta)));
2251
2252        // Cross-band (different minor) still fails.
2253        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "0.1.0")];
2254        assert!(!pred!(semver_compatible "software.runtime", "0.0.3").evaluate(&ctx(&tags, &meta)));
2255    }
2256    /// Q1: a non-zero major `lhs` is NOT compatible with a 0.x.y
2257    /// `rhs` — Cargo's caret rule treats 0.x.y as the band only
2258    /// when the major is also 0. Pre-fix, `rhs.1 == lhs.1` alone
2259    /// passed for `lhs = 1.2.5` against `rhs = 0.2.3` (lhs >= rhs
2260    /// passes since 1 > 0; minors match). 1.x.y running against
2261    /// `^0.2.3` is a major-version regression that should fail
2262    /// the compatibility check.
2263    #[test]
2264    fn semver_compatible_zero_x_band_requires_lhs_major_zero() {
2265        let meta = empty_meta();
2266
2267        // 0.2.x band: same-major-zero, same-minor matches.
2268        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "0.2.5")];
2269        assert!(pred!(semver_compatible "software.runtime", "0.2.3").evaluate(&ctx(&tags, &meta)));
2270
2271        // 0.2.x band: lhs major != 0 must NOT match (was admitted
2272        // pre-fix).
2273        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "1.2.5")];
2274        assert!(!pred!(semver_compatible "software.runtime", "0.2.3").evaluate(&ctx(&tags, &meta)));
2275
2276        // Sanity: 2.2.5 vs 0.2.3 also fails.
2277        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "2.2.5")];
2278        assert!(!pred!(semver_compatible "software.runtime", "0.2.3").evaluate(&ctx(&tags, &meta)));
2279    }
2280    /// Regression: presence-only tags (`Tag::AxisPresent`) must not
2281    /// match value-bearing predicates. Pre-fix, `match_axis_tag` fed
2282    /// `""` through `value_pred`, which let `Equals(_, "")` /
2283    /// `StringPrefix(_, "")` / `StringMatches(_, "")` accept any
2284    /// presence tag. Use `Exists` for key-presence checks.
2285    #[test]
2286    fn axis_present_does_not_satisfy_value_predicates() {
2287        let tags = [axis_present(TaxonomyAxis::Hardware, "gpu")];
2288        let meta = empty_meta();
2289
2290        // Equality with empty string was the worst offender — every
2291        // presence tag matched it pre-fix.
2292        assert!(!pred!(equals "hardware.gpu", "").evaluate(&ctx(&tags, &meta)));
2293        // Equality with any non-empty value also doesn't match a
2294        // presence tag (no value to compare against).
2295        assert!(!pred!(equals "hardware.gpu", "nvidia").evaluate(&ctx(&tags, &meta)));
2296        // String predicates anchored at the empty string used to
2297        // permissively accept presence tags.
2298        assert!(!pred!(prefix "hardware.gpu", "").evaluate(&ctx(&tags, &meta)));
2299        assert!(!pred!(matches "hardware.gpu", "").evaluate(&ctx(&tags, &meta)));
2300
2301        // `Exists` is the correct check for key presence — it still
2302        // matches both `AxisPresent` and `AxisValue` shapes.
2303        assert!(pred!(exists "hardware.gpu").evaluate(&ctx(&tags, &meta)));
2304        let tags = [axis_eq(TaxonomyAxis::Hardware, "gpu", "nvidia")];
2305        assert!(pred!(exists "hardware.gpu").evaluate(&ctx(&tags, &meta)));
2306    }
2307    #[test]
2308    fn semver_lenient_parser() {
2309        // Pinned: the inline parser accepts truncated versions
2310        // (`1` → `(1, 0, 0)`, `1.2` → `(1, 2, 0)`). Applications in
2311        // the wild emit these; the parser shouldn't reject them.
2312        assert_eq!(parse_semver("1"), Some((1, 0, 0)));
2313        assert_eq!(parse_semver("1.2"), Some((1, 2, 0)));
2314        assert_eq!(parse_semver("1.2.3"), Some((1, 2, 3)));
2315        assert_eq!(parse_semver("1.2.3-beta"), Some((1, 2, 3)));
2316        assert_eq!(parse_semver("1.2.3+build.42"), Some((1, 2, 3)));
2317        // Invalid: 4+ components, non-numeric.
2318        assert_eq!(parse_semver("1.2.3.4"), None);
2319        assert_eq!(parse_semver("a.b.c"), None);
2320        assert_eq!(parse_semver(""), None);
2321    }
2322    // ---- string ---------------------------------------------------------
2323
2324    #[test]
2325    fn string_prefix_and_matches() {
2326        let tags = [axis_eq(TaxonomyAxis::Software, "tool", "ffmpeg-7.0")];
2327        let meta = empty_meta();
2328        assert!(pred!(prefix "software.tool", "ffmpeg").evaluate(&ctx(&tags, &meta)));
2329        assert!(!pred!(prefix "software.tool", "imagemagick").evaluate(&ctx(&tags, &meta)));
2330        assert!(pred!(matches "software.tool", "7.0").evaluate(&ctx(&tags, &meta)));
2331        assert!(!pred!(matches "software.tool", "8.0").evaluate(&ctx(&tags, &meta)));
2332    }
2333    // ---- metadata -------------------------------------------------------
2334
2335    #[test]
2336    fn metadata_predicates() {
2337        let tags: Vec<Tag> = vec![];
2338        let mut meta = BTreeMap::new();
2339        meta.insert("intent".into(), "ml-training".into());
2340        meta.insert("priority".into(), "5".into());
2341
2342        assert!(pred!(metadata_exists "intent").evaluate(&ctx(&tags, &meta)));
2343        assert!(!pred!(metadata_exists "missing").evaluate(&ctx(&tags, &meta)));
2344        assert!(pred!(metadata_equals "intent", "ml-training").evaluate(&ctx(&tags, &meta)));
2345        assert!(!pred!(metadata_equals "intent", "billing").evaluate(&ctx(&tags, &meta)));
2346        assert!(pred!(metadata_matches "intent", "training").evaluate(&ctx(&tags, &meta)));
2347        assert!(pred!(metadata_num_at_least "priority", 3.0).evaluate(&ctx(&tags, &meta)));
2348        assert!(!pred!(metadata_num_at_least "priority", 10.0).evaluate(&ctx(&tags, &meta)));
2349    }
2350    // ---- boolean composition --------------------------------------------
2351
2352    #[test]
2353    fn and_or_not_composition() {
2354        let tags = [
2355            axis_present(TaxonomyAxis::Hardware, "gpu"),
2356            axis_eq(TaxonomyAxis::Hardware, "gpu.vram_gb", "80"),
2357        ];
2358        let meta = empty_meta();
2359
2360        // AND: both clauses match.
2361        let p = pred!(and [
2362            pred!(exists "hardware.gpu"),
2363            pred!(num_at_least "hardware.gpu.vram_gb", 24.0),
2364        ]);
2365        assert!(p.evaluate(&ctx(&tags, &meta)));
2366
2367        // AND: one fails.
2368        let p = pred!(and [
2369            pred!(exists "hardware.gpu"),
2370            pred!(num_at_least "hardware.gpu.vram_gb", 96.0),
2371        ]);
2372        assert!(!p.evaluate(&ctx(&tags, &meta)));
2373
2374        // OR: at least one matches.
2375        let p = pred!(or [
2376            pred!(exists "hardware.tpu"),
2377            pred!(exists "hardware.gpu"),
2378        ]);
2379        assert!(p.evaluate(&ctx(&tags, &meta)));
2380
2381        // NOT: inverts.
2382        let p = pred!(not pred!(exists "hardware.tpu"));
2383        assert!(p.evaluate(&ctx(&tags, &meta)));
2384        let p = pred!(not pred!(exists "hardware.gpu"));
2385        assert!(!p.evaluate(&ctx(&tags, &meta)));
2386    }
2387    #[test]
2388    fn empty_and_is_vacuously_true() {
2389        // Standard math/logic convention: `forall` over empty set
2390        // is `true`. Pinned because alternatives surprise readers.
2391        let tags: Vec<Tag> = vec![];
2392        let meta = empty_meta();
2393        assert!(Predicate::and(vec![]).evaluate(&ctx(&tags, &meta)));
2394    }
2395    #[test]
2396    fn empty_or_is_vacuously_false() {
2397        // Dual convention: `exists` over empty set is `false`.
2398        let tags: Vec<Tag> = vec![];
2399        let meta = empty_meta();
2400        assert!(!Predicate::or(vec![]).evaluate(&ctx(&tags, &meta)));
2401    }
2402    // ---- not predicate over unparseable value ---------------------------
2403
2404    #[test]
2405    fn not_does_not_flip_unparseable_to_true() {
2406        // Pinned by the substrate plan's "Predicate::Not(NumericAtLeast)
2407        // against an unparseable value yields `false`, NOT `true`"
2408        // contract. The inner numeric predicate fails (returns
2409        // false); Not(false) = true. But the spec explicitly says
2410        // "predicate failure is a hard miss, not a logical inversion":
2411        // the inner check fails to find any matching tag at all, so
2412        // the inner predicate evaluates to `false`, and `Not(false)`
2413        // evaluates to `true`. This test pins the documented
2414        // behavior so a future change is intentional.
2415        let tags = [axis_eq(TaxonomyAxis::Hardware, "cpu_cores", "many")];
2416        let meta = empty_meta();
2417        // Inner: NumericAtLeast against "many" → false (parse fails).
2418        // Outer: Not(false) → true.
2419        let p = pred!(not pred!(num_at_least "hardware.cpu_cores", 1.0));
2420        assert!(p.evaluate(&ctx(&tags, &meta)));
2421    }
2422    // ---- structural equality ------------------------------------------
2423    //
2424    // Serde wire format is deferred to Phase E (federated query
2425    // primitives) — see the comment on the `Predicate` declaration.
2426    // Phase A pins structural-equality round-trip via Clone + PartialEq
2427    // so a future serde drop-in has a reference behavior to match.
2428
2429    #[test]
2430    fn clone_and_eq_preserve_ast() {
2431        let p = pred!(and [
2432            pred!(exists "hardware.gpu"),
2433            pred!(num_at_least "hardware.gpu.vram_gb", 24.0),
2434            pred!(or [
2435                pred!(equals "software.runtime", "cuda-12.4"),
2436                pred!(semver_compatible "software.runtime", "13.0"),
2437            ]),
2438            pred!(not pred!(metadata_exists "decommissioning")),
2439        ]);
2440        let p2 = p.clone();
2441        assert_eq!(p, p2);
2442    }
2443    // ---- macro ----------------------------------------------------------
2444
2445    #[test]
2446    #[should_panic(expected = "unknown axis prefix")]
2447    fn pred_macro_panics_on_unknown_axis() {
2448        let _ = pred!(exists "bogus.foo");
2449    }
2450    #[test]
2451    #[should_panic(expected = "must be `<axis>.<key>`")]
2452    fn pred_macro_panics_on_missing_dot() {
2453        let _ = pred!(exists "hardware");
2454    }
2455    // ========================================================================
2456    // Query planner — Phase 4 of CAPABILITY_ENHANCEMENTS_PLAN.md.
2457    // ========================================================================
2458
2459    fn meta_with(pairs: &[(&str, &str)]) -> BTreeMap<String, String> {
2460        pairs
2461            .iter()
2462            .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
2463            .collect()
2464    }
2465    /// Worst-case AST: high-selectivity metadata-equals clause buried
2466    /// LAST among 5 children. Unplanned eval pays for the four
2467    /// preceding clauses on every false case; planned eval runs the
2468    /// metadata-equals first and short-circuits.
2469    fn worst_case_and() -> Predicate {
2470        Predicate::And(vec![
2471            Predicate::SemverCompatible {
2472                key: TagKey::new(TaxonomyAxis::Software, "runtime.python"),
2473                version: "3.11".into(),
2474            },
2475            Predicate::StringMatches {
2476                key: TagKey::new(TaxonomyAxis::Software, "os"),
2477                pattern: "linux".into(),
2478            },
2479            Predicate::NumericAtLeast {
2480                key: TagKey::new(TaxonomyAxis::Hardware, "memory_gb"),
2481                threshold: 64.0,
2482            },
2483            Predicate::Exists {
2484                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2485            },
2486            Predicate::MetadataEquals {
2487                key: "intent".into(),
2488                value: "ml-training".into(),
2489            },
2490        ])
2491    }
2492    #[test]
2493    fn planner_reorders_and_children_cheap_first() {
2494        // Pin the planner's ordering on the worst-case AST.
2495        // The cheapest leaf (`MetadataEquals`, cost=11) must run
2496        // before the heaviest (`SemverCompatible`, cost=60).
2497        let ast = worst_case_and();
2498        if let Predicate::And(children) = &ast {
2499            // Verify costs as expected from the static_cost table.
2500            let costs: Vec<u32> = children.iter().map(|c| c.static_cost()).collect();
2501            assert_eq!(costs, vec![60, 50, 30, 20, 11]);
2502        } else {
2503            panic!("worst_case_and produced non-And");
2504        }
2505    }
2506    #[test]
2507    fn planner_preserves_semantics_on_short_circuit_false() {
2508        // Pin: planner-vs-unplanned equivalence on a clearly-false
2509        // input. Both must return false; planner short-circuits
2510        // earlier but the result is identical.
2511        let tags: Vec<Tag> = vec![axis_eq(TaxonomyAxis::Hardware, "memory_gb", "32")];
2512        let meta = empty_meta();
2513        let cx = ctx(&tags, &meta);
2514        let ast = worst_case_and();
2515        // Memory is 32 < 64, so the AND fails. Both paths
2516        // agree.
2517        assert!(!ast.evaluate(&cx));
2518        assert!(!ast.evaluate_unplanned(&cx));
2519    }
2520    #[test]
2521    fn planner_preserves_semantics_on_full_match() {
2522        let tags: Vec<Tag> = vec![
2523            axis_eq(TaxonomyAxis::Hardware, "memory_gb", "128"),
2524            axis_present(TaxonomyAxis::Hardware, "gpu"),
2525            axis_eq(TaxonomyAxis::Software, "os", "linux"),
2526            axis_eq(TaxonomyAxis::Software, "runtime.python", "3.11.5"),
2527        ];
2528        let meta = meta_with(&[("intent", "ml-training")]);
2529        let cx = ctx(&tags, &meta);
2530        let ast = worst_case_and();
2531        assert!(ast.evaluate(&cx));
2532        assert!(ast.evaluate_unplanned(&cx));
2533    }
2534    #[test]
2535    fn planner_preserves_or_short_circuit_semantics() {
2536        // Or with mixed costs: cheap clause that's true should win
2537        // either way (planner runs it first; unplanned still finds
2538        // it eventually).
2539        let ast = Predicate::Or(vec![
2540            Predicate::SemverCompatible {
2541                key: TagKey::new(TaxonomyAxis::Software, "runtime.python"),
2542                version: "9.9".into(),
2543            },
2544            Predicate::MetadataEquals {
2545                key: "intent".into(),
2546                value: "ml-training".into(),
2547            },
2548        ]);
2549        let meta = meta_with(&[("intent", "ml-training")]);
2550        let cx = ctx(&[], &meta);
2551        assert!(ast.evaluate(&cx));
2552        assert!(ast.evaluate_unplanned(&cx));
2553    }
2554    #[test]
2555    fn planner_static_cost_compositees_sum_children() {
2556        // And/Or cost = sum of children. Used to prefer shallow
2557        // branches over deep ones when ordering nested compositions.
2558        let cheap = Predicate::MetadataExists { key: "k".into() };
2559        let expensive = Predicate::SemverCompatible {
2560            key: TagKey::new(TaxonomyAxis::Software, "x"),
2561            version: "1.0".into(),
2562        };
2563        let nested = Predicate::And(vec![cheap.clone(), expensive.clone()]);
2564        let leaf_cost = cheap.static_cost() + expensive.static_cost();
2565        assert_eq!(nested.static_cost(), leaf_cost);
2566
2567        // Not(inner) keeps inner's cost (no overhead for negation).
2568        let negated = Predicate::Not(Box::new(expensive.clone()));
2569        assert_eq!(negated.static_cost(), expensive.static_cost());
2570    }
2571    #[test]
2572    fn planner_handles_empty_and_or_correctly() {
2573        // Empty And is vacuous true; empty Or is vacuous false.
2574        // Planner reordering on empty children is a no-op, but
2575        // pin the contract so a future "ordered eval requires
2576        // children" assertion doesn't slip in.
2577        let meta = BTreeMap::new();
2578        let cx = ctx(&[], &meta);
2579        assert!(Predicate::And(vec![]).evaluate(&cx));
2580        assert!(!Predicate::Or(vec![]).evaluate(&cx));
2581        assert!(Predicate::And(vec![]).evaluate_unplanned(&cx));
2582        assert!(!Predicate::Or(vec![]).evaluate_unplanned(&cx));
2583    }
2584    /// Exhaustive small-input parity: enumerate a handful of small
2585    /// `(ast, ctx)` combinations and assert planned = unplanned.
2586    /// Phase 4 doesn't ship full property-based fuzzing
2587    /// (no proptest dep yet); this hand-rolled equivalence test
2588    /// covers the load-bearing cases.
2589    #[test]
2590    fn planner_evaluate_matches_unplanned_across_canonical_inputs() {
2591        // Build a corpus of N predicates × M contexts and assert
2592        // planned == unplanned for every combination.
2593        let predicates: Vec<Predicate> = vec![
2594            // Simple leaves
2595            Predicate::Exists {
2596                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2597            },
2598            Predicate::MetadataEquals {
2599                key: "intent".into(),
2600                value: "ml-training".into(),
2601            },
2602            Predicate::NumericAtLeast {
2603                key: TagKey::new(TaxonomyAxis::Hardware, "memory_gb"),
2604                threshold: 64.0,
2605            },
2606            // Composites
2607            worst_case_and(),
2608            Predicate::Or(vec![
2609                Predicate::Exists {
2610                    key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2611                },
2612                Predicate::MetadataEquals {
2613                    key: "intent".into(),
2614                    value: "ml-training".into(),
2615                },
2616            ]),
2617            // Nested And-of-Or-of-And
2618            Predicate::And(vec![
2619                Predicate::Or(vec![
2620                    Predicate::Exists {
2621                        key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2622                    },
2623                    Predicate::And(vec![
2624                        Predicate::NumericAtLeast {
2625                            key: TagKey::new(TaxonomyAxis::Hardware, "memory_gb"),
2626                            threshold: 64.0,
2627                        },
2628                        Predicate::MetadataExists {
2629                            key: "intent".into(),
2630                        },
2631                    ]),
2632                ]),
2633                Predicate::Not(Box::new(Predicate::MetadataEquals {
2634                    key: "decommissioning".into(),
2635                    value: "true".into(),
2636                })),
2637            ]),
2638        ];
2639
2640        let contexts: Vec<(Vec<Tag>, BTreeMap<String, String>)> = vec![
2641            // Empty
2642            (vec![], BTreeMap::new()),
2643            // Hardware match only
2644            (
2645                vec![
2646                    axis_present(TaxonomyAxis::Hardware, "gpu"),
2647                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "128"),
2648                ],
2649                BTreeMap::new(),
2650            ),
2651            // Metadata match only
2652            (vec![], meta_with(&[("intent", "ml-training")])),
2653            // Full match
2654            (
2655                vec![
2656                    axis_present(TaxonomyAxis::Hardware, "gpu"),
2657                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "128"),
2658                    axis_eq(TaxonomyAxis::Software, "os", "linux"),
2659                    axis_eq(TaxonomyAxis::Software, "runtime.python", "3.11.5"),
2660                ],
2661                meta_with(&[("intent", "ml-training")]),
2662            ),
2663            // Full match + decommissioning marker (should fail the
2664            // last nested predicate's `Not` clause).
2665            (
2666                vec![
2667                    axis_present(TaxonomyAxis::Hardware, "gpu"),
2668                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "128"),
2669                ],
2670                meta_with(&[("intent", "ml-training"), ("decommissioning", "true")]),
2671            ),
2672        ];
2673
2674        for (i, ast) in predicates.iter().enumerate() {
2675            for (j, (tags, meta)) in contexts.iter().enumerate() {
2676                let cx = ctx(tags, meta);
2677                let planned = ast.evaluate(&cx);
2678                let unplanned = ast.evaluate_unplanned(&cx);
2679                assert_eq!(
2680                    planned, unplanned,
2681                    "predicate[{i}] ctx[{j}]: planned={planned} != unplanned={unplanned}"
2682                );
2683            }
2684        }
2685    }
2686    // ========================================================================
2687    // Predicate debug session — Phase 6 of CAPABILITY_ENHANCEMENTS_PLAN.md.
2688    // ========================================================================
2689
2690    #[test]
2691    fn evaluate_with_trace_returns_same_result_as_evaluate() {
2692        // Pin: the trace-instrumented evaluation produces the
2693        // same boolean result as `evaluate()`. Trace is a side
2694        // channel; the predicate semantic is unchanged.
2695        let ast = worst_case_and();
2696        let tags: Vec<Tag> = vec![axis_eq(TaxonomyAxis::Hardware, "memory_gb", "32")];
2697        let meta = empty_meta();
2698        let cx = ctx(&tags, &meta);
2699        let plain_result = ast.evaluate(&cx);
2700        let (traced_result, _trace) = ast.evaluate_with_trace(&cx);
2701        assert_eq!(plain_result, traced_result);
2702    }
2703    #[test]
2704    fn evaluate_with_trace_short_circuits_drop_unevaluated_siblings() {
2705        // Pin: when an `And` short-circuits on a false child, the
2706        // trace for the And node only carries the children that
2707        // actually ran. Lets operators see "the metadata clause
2708        // failed; we never got to the GPU check."
2709        let ast = Predicate::And(vec![
2710            // Cheap leaf, false → short-circuit
2711            Predicate::MetadataEquals {
2712                key: "intent".into(),
2713                value: "ml-training".into(),
2714            },
2715            // Heavier leaf — should not be evaluated
2716            Predicate::SemverCompatible {
2717                key: TagKey::new(TaxonomyAxis::Software, "runtime.python"),
2718                version: "3.11".into(),
2719            },
2720        ]);
2721        let meta = empty_meta();
2722        let cx = ctx(&[], &meta); // no metadata → first clause false
2723        let (result, trace) = ast.evaluate_with_trace(&cx);
2724        assert!(!result);
2725        // And's children: only one entry (the metadata clause that
2726        // returned false and short-circuited the rest).
2727        assert_eq!(
2728            trace.children.len(),
2729            1,
2730            "And trace should drop unevaluated siblings; got {trace:?}"
2731        );
2732        assert!(trace.children[0].label.starts_with("MetadataEquals"));
2733        assert!(!trace.children[0].result);
2734    }
2735    #[test]
2736    fn evaluate_with_trace_captures_full_evaluation_when_no_short_circuit() {
2737        // Pin: when no clause short-circuits (all true in an And,
2738        // all false in an Or), the trace covers every child.
2739        let ast = Predicate::And(vec![
2740            Predicate::MetadataExists {
2741                key: "intent".into(),
2742            },
2743            Predicate::Exists {
2744                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2745            },
2746        ]);
2747        let tags: Vec<Tag> = vec![axis_present(TaxonomyAxis::Hardware, "gpu")];
2748        let meta = meta_with(&[("intent", "ml-training")]);
2749        let cx = ctx(&tags, &meta);
2750        let (result, trace) = ast.evaluate_with_trace(&cx);
2751        assert!(result);
2752        assert_eq!(trace.children.len(), 2);
2753        for child in &trace.children {
2754            assert!(child.result, "all children must have matched: {child:?}");
2755        }
2756    }
2757    #[test]
2758    fn evaluate_with_trace_records_not_inversion() {
2759        // Pin: Not's trace child carries the inner result (pre-
2760        // negation); the Not node carries the post-negation result.
2761        let ast = Predicate::Not(Box::new(Predicate::Exists {
2762            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2763        }));
2764        let meta = empty_meta();
2765        let cx = ctx(&[], &meta); // gpu absent → inner false → Not true
2766        let (result, trace) = ast.evaluate_with_trace(&cx);
2767        assert!(result, "Not(absent) should be true");
2768        assert_eq!(trace.label, "Not");
2769        assert!(trace.result);
2770        assert_eq!(trace.children.len(), 1);
2771        assert!(!trace.children[0].result, "inner Exists should be false");
2772    }
2773    #[test]
2774    fn debug_report_aggregates_match_counts() {
2775        // 3 candidates, 1 matches.
2776        let pred = Predicate::Exists {
2777            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2778        };
2779        let no_gpu_tags: Vec<Tag> = vec![];
2780        let gpu_tags: Vec<Tag> = vec![axis_present(TaxonomyAxis::Hardware, "gpu")];
2781        let meta = empty_meta();
2782
2783        let contexts = vec![
2784            ctx(&no_gpu_tags, &meta),
2785            ctx(&gpu_tags, &meta),
2786            ctx(&no_gpu_tags, &meta),
2787        ];
2788        let report = PredicateDebugReport::from_evaluations(&pred, contexts);
2789        assert_eq!(report.total_candidates, 3);
2790        assert_eq!(report.matched, 1);
2791        // One leaf clause.
2792        assert_eq!(report.clause_stats.len(), 1);
2793        let stats = report.clause_stats.values().next().unwrap();
2794        assert_eq!(stats.evaluated, 3);
2795        assert_eq!(stats.matched, 1);
2796    }
2797    #[test]
2798    fn debug_report_separates_per_clause_stats_in_composite() {
2799        // For an And of two clauses, the report should carry stats
2800        // for the And node + each leaf. Short-circuited clauses
2801        // get fewer evaluations.
2802        let pred = Predicate::And(vec![
2803            Predicate::MetadataEquals {
2804                key: "intent".into(),
2805                value: "ml-training".into(),
2806            }, // cheap, often false
2807            Predicate::Exists {
2808                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2809            }, // moderate
2810        ]);
2811
2812        // 4 candidates: only one has the right intent + GPU.
2813        let no_meta = empty_meta();
2814        let intent_meta = meta_with(&[("intent", "ml-training")]);
2815        let no_gpu: Vec<Tag> = vec![];
2816        let gpu: Vec<Tag> = vec![axis_present(TaxonomyAxis::Hardware, "gpu")];
2817
2818        let contexts = vec![
2819            ctx(&no_gpu, &no_meta),     // both fail; short-circuit on metadata
2820            ctx(&gpu, &no_meta),        // both fail; short-circuit on metadata
2821            ctx(&no_gpu, &intent_meta), // metadata true, gpu fail
2822            ctx(&gpu, &intent_meta),    // both true → match
2823        ];
2824        let report = PredicateDebugReport::from_evaluations(&pred, contexts);
2825
2826        assert_eq!(report.total_candidates, 4);
2827        assert_eq!(report.matched, 1);
2828
2829        // 3 entries: And node + MetadataEquals leaf + Exists leaf.
2830        assert_eq!(report.clause_stats.len(), 3);
2831
2832        let metadata_stats = report
2833            .clause_stats
2834            .values()
2835            .find(|s| s.label.starts_with("MetadataEquals"))
2836            .expect("MetadataEquals stats present");
2837        assert_eq!(
2838            metadata_stats.evaluated, 4,
2839            "metadata clause runs every time"
2840        );
2841        assert_eq!(metadata_stats.matched, 2, "intent matches in 2 of 4");
2842
2843        let exists_stats = report
2844            .clause_stats
2845            .values()
2846            .find(|s| s.label.starts_with("Exists"))
2847            .expect("Exists stats present");
2848        // Only the 2 candidates with intent_meta got past the
2849        // short-circuit; gpu check ran twice.
2850        assert_eq!(
2851            exists_stats.evaluated, 2,
2852            "gpu clause only runs after metadata passes"
2853        );
2854        assert_eq!(exists_stats.matched, 1);
2855    }
2856    #[test]
2857    fn debug_report_render_includes_summary_and_clauses() {
2858        let pred = Predicate::Exists {
2859            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2860        };
2861        let report = PredicateDebugReport::from_evaluations(&pred, vec![ctx(&[], &empty_meta())]);
2862        let rendered = report.render();
2863        // Pin the load-bearing parts of the format. Operators read
2864        // the report by these markers; CI fails loudly if they drift.
2865        assert!(rendered.contains("Predicate evaluation report"));
2866        assert!(rendered.contains("Total candidates: 1"));
2867        assert!(rendered.contains("Matched:          0"));
2868        assert!(rendered.contains("Per-clause stats"));
2869        assert!(rendered.contains("Exists(hardware.gpu)"));
2870    }
2871    #[test]
2872    fn debug_report_handles_empty_corpus() {
2873        let pred = Predicate::Exists {
2874            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2875        };
2876        let report = PredicateDebugReport::from_evaluations(&pred, Vec::<EvalContext>::new());
2877        assert_eq!(report.total_candidates, 0);
2878        assert_eq!(report.matched, 0);
2879        assert!(report.clause_stats.is_empty());
2880        // Render must not panic on empty.
2881        let rendered = report.render();
2882        assert!(rendered.contains("Total candidates: 0"));
2883    }
2884    // ========================================================================
2885    // PredicateWire (flat-tree IR) — Phase 5 of CAPABILITY_ENHANCEMENTS_PLAN.md.
2886    // ========================================================================
2887
2888    fn sample_complex_predicate() -> Predicate {
2889        // And-of-Or-of-And + Not — exercises every composite variant
2890        // and a sampling of leaf variants.
2891        Predicate::And(vec![
2892            Predicate::Or(vec![
2893                Predicate::Exists {
2894                    key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2895                },
2896                Predicate::And(vec![
2897                    Predicate::NumericAtLeast {
2898                        key: TagKey::new(TaxonomyAxis::Hardware, "memory_gb"),
2899                        threshold: 64.0,
2900                    },
2901                    Predicate::MetadataExists {
2902                        key: "intent".into(),
2903                    },
2904                ]),
2905            ]),
2906            Predicate::Not(Box::new(Predicate::MetadataEquals {
2907                key: "decommissioning".into(),
2908                value: "true".into(),
2909            })),
2910            Predicate::SemverCompatible {
2911                key: TagKey::new(TaxonomyAxis::Software, "runtime.python"),
2912                version: "3.11".into(),
2913            },
2914        ])
2915    }
2916    #[test]
2917    fn wire_round_trip_preserves_complex_predicate() {
2918        // Pin: `Predicate → PredicateWire → Predicate` is identity.
2919        let original = sample_complex_predicate();
2920        let wire = original.to_wire();
2921        let rebuilt = wire.into_predicate().expect("rebuild");
2922        assert_eq!(original, rebuilt);
2923    }
2924    #[test]
2925    fn wire_round_trip_through_serde_json() {
2926        // Pin: the wire format serializes through serde_json
2927        // cleanly (no recursion-limit blowup like raw Predicate).
2928        let original = sample_complex_predicate();
2929        let wire = original.to_wire();
2930        let json = serde_json::to_string(&wire).expect("serialize wire");
2931        let parsed: PredicateWire = serde_json::from_str(&json).expect("deserialize wire");
2932        let rebuilt = parsed.into_predicate().expect("rebuild");
2933        assert_eq!(original, rebuilt);
2934    }
2935    #[test]
2936    fn wire_root_is_at_highest_index_in_post_order_emission() {
2937        // Pin: `to_wire` emits children before parents, so the
2938        // root always sits at `nodes.len() - 1` for a freshly-
2939        // emitted wire payload. The substrate's invariant
2940        // (children at lower indices) leans on this.
2941        let pred = sample_complex_predicate();
2942        let wire = pred.to_wire();
2943        assert_eq!(wire.root_idx as usize, wire.nodes.len() - 1);
2944    }
2945    #[test]
2946    fn wire_round_trip_byte_stable_across_calls() {
2947        // Pin: two `to_wire()` calls on equal predicates produce
2948        // identical wire bytes. Required for cross-binding fixture
2949        // pinning.
2950        let pred = sample_complex_predicate();
2951        let wire_a = pred.to_wire();
2952        let wire_b = pred.to_wire();
2953        assert_eq!(wire_a, wire_b);
2954        let json_a = serde_json::to_string(&wire_a).unwrap();
2955        let json_b = serde_json::to_string(&wire_b).unwrap();
2956        assert_eq!(json_a, json_b);
2957    }
2958    #[test]
2959    fn wire_round_trip_preserves_evaluation_semantics() {
2960        // Pin: a rebuilt predicate produces identical evaluation
2961        // results to the original on a fixed corpus. The serde
2962        // round-trip is semantically transparent.
2963        let original = sample_complex_predicate();
2964        let wire = original.to_wire();
2965        let rebuilt = wire.into_predicate().unwrap();
2966
2967        let no_meta = empty_meta();
2968        let intent_meta = meta_with(&[("intent", "ml-training")]);
2969        let decommission_meta =
2970            meta_with(&[("intent", "ml-training"), ("decommissioning", "true")]);
2971        let no_gpu: Vec<Tag> = vec![];
2972        let gpu: Vec<Tag> = vec![axis_present(TaxonomyAxis::Hardware, "gpu")];
2973        let gpu_with_runtime: Vec<Tag> = vec![
2974            axis_present(TaxonomyAxis::Hardware, "gpu"),
2975            axis_eq(TaxonomyAxis::Software, "runtime.python", "3.11.5"),
2976        ];
2977
2978        let cases: Vec<(&[Tag], &BTreeMap<String, String>)> = vec![
2979            (&no_gpu, &no_meta),
2980            (&gpu, &no_meta),
2981            (&gpu, &intent_meta),
2982            (&gpu_with_runtime, &intent_meta),
2983            (&gpu_with_runtime, &decommission_meta),
2984        ];
2985
2986        for (i, (tags, meta)) in cases.iter().enumerate() {
2987            let cx = ctx(tags, meta);
2988            assert_eq!(
2989                original.evaluate(&cx),
2990                rebuilt.evaluate(&cx),
2991                "case {i}: original vs rebuilt diverged on evaluation",
2992            );
2993        }
2994    }
2995    #[test]
2996    fn wire_from_empty_nodes_table_errors_gracefully() {
2997        let wire = PredicateWire {
2998            nodes: Vec::new(),
2999            root_idx: 0,
3000        };
3001        assert_eq!(wire.into_predicate(), Err(PredicateWireError::Empty));
3002    }
3003    #[test]
3004    fn wire_from_out_of_bounds_root_errors_gracefully() {
3005        let wire = PredicateWire {
3006            nodes: vec![PredicateNodeWire::Exists {
3007                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3008            }],
3009            root_idx: 5,
3010        };
3011        assert_eq!(
3012            wire.into_predicate(),
3013            Err(PredicateWireError::RootOutOfBounds {
3014                root_idx: 5,
3015                len: 1,
3016            }),
3017        );
3018    }
3019    #[test]
3020    fn wire_from_cycle_in_and_children_errors_gracefully() {
3021        // Malformed: the `And` at index 0 references child index
3022        // 1, which doesn't exist yet (post-order requires
3023        // child < parent). Catches index cycles.
3024        let wire = PredicateWire {
3025            nodes: vec![PredicateNodeWire::And { children: vec![1] }],
3026            root_idx: 0,
3027        };
3028        let err = wire.into_predicate().unwrap_err();
3029        assert!(
3030            matches!(
3031                err,
3032                PredicateWireError::CycleDetected {
3033                    parent: 0,
3034                    child: 1
3035                }
3036            ),
3037            "expected CycleDetected; got {err:?}",
3038        );
3039    }
3040    #[test]
3041    fn wire_from_self_referencing_not_errors_gracefully() {
3042        // `Not` referencing its own index is the simplest cycle.
3043        let wire = PredicateWire {
3044            nodes: vec![PredicateNodeWire::Not { child: 0 }],
3045            root_idx: 0,
3046        };
3047        let err = wire.into_predicate().unwrap_err();
3048        assert!(
3049            matches!(
3050                err,
3051                PredicateWireError::CycleDetected {
3052                    parent: 0,
3053                    child: 0
3054                }
3055            ),
3056            "expected CycleDetected; got {err:?}",
3057        );
3058    }
3059    #[test]
3060    fn wire_simple_leaf_round_trips() {
3061        // Smallest case: a single leaf predicate. nodes has one
3062        // entry; root_idx is 0.
3063        let pred = Predicate::Exists {
3064            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3065        };
3066        let wire = pred.to_wire();
3067        assert_eq!(wire.nodes.len(), 1);
3068        assert_eq!(wire.root_idx, 0);
3069        assert_eq!(wire.into_predicate().unwrap(), pred);
3070    }
3071    #[test]
3072    fn wire_rebuilt_predicate_matches_planner_evaluation() {
3073        // Pin: planner-aware evaluation continues to work after
3074        // round-trip. The flat IR doesn't lose the AST shape;
3075        // `evaluate()` still finds And/Or to reorder.
3076        let original = sample_complex_predicate();
3077        let wire = original.to_wire();
3078        let rebuilt = wire.into_predicate().unwrap();
3079
3080        let tags: Vec<Tag> = vec![
3081            axis_present(TaxonomyAxis::Hardware, "gpu"),
3082            axis_eq(TaxonomyAxis::Software, "runtime.python", "3.11.5"),
3083        ];
3084        let meta = meta_with(&[("intent", "ml-training")]);
3085        let cx = ctx(&tags, &meta);
3086
3087        // Both planned and unplanned must agree, AND match between
3088        // original and rebuilt.
3089        let orig_planned = original.evaluate(&cx);
3090        let orig_unplanned = original.evaluate_unplanned(&cx);
3091        let rebuilt_planned = rebuilt.evaluate(&cx);
3092        let rebuilt_unplanned = rebuilt.evaluate_unplanned(&cx);
3093
3094        assert_eq!(orig_planned, orig_unplanned);
3095        assert_eq!(rebuilt_planned, rebuilt_unplanned);
3096        assert_eq!(orig_planned, rebuilt_planned);
3097    }
3098    // ========================================================================
3099    // nRPC envelope helpers — Phase 5.B of CAPABILITY_ENHANCEMENTS_PLAN.md.
3100    // ========================================================================
3101
3102    #[test]
3103    fn rpc_header_round_trip_preserves_predicate() {
3104        // Pin the canonical happy path: predicate → header → headers
3105        // table on the server side → decoded predicate. Service
3106        // handlers will use exactly this flow.
3107        let original = sample_complex_predicate();
3108        let header = predicate_to_rpc_header(&original).expect("encode");
3109        assert_eq!(header.0, RPC_WHERE_HEADER);
3110
3111        // Receiver: a Vec<RpcHeader>-shaped surface, with our
3112        // `where:` header alongside others (trace context, etc.).
3113        let headers = vec![
3114            ("trace-id".to_string(), b"abc123".to_vec()),
3115            header,
3116            ("idempotency-key".to_string(), b"def456".to_vec()),
3117        ];
3118        let decoded = predicate_from_rpc_headers(&headers)
3119            .expect("header present")
3120            .expect("decode succeeds");
3121        assert_eq!(decoded, original);
3122    }
3123    #[test]
3124    fn rpc_header_missing_returns_none() {
3125        // Service that doesn't see a `net-where` header
3126        // should treat the request as unfiltered. `None` is the
3127        // signal; service defaults to "match all".
3128        let headers = vec![
3129            ("trace-id".to_string(), b"abc123".to_vec()),
3130            ("idempotency-key".to_string(), b"def456".to_vec()),
3131        ];
3132        assert!(predicate_from_rpc_headers(&headers).is_none());
3133    }
3134    #[test]
3135    fn rpc_header_empty_returns_none() {
3136        let headers: Vec<(String, Vec<u8>)> = Vec::new();
3137        assert!(predicate_from_rpc_headers(&headers).is_none());
3138    }
3139    #[test]
3140    fn rpc_header_malformed_json_returns_decode_error() {
3141        // Service receiving a `net-where` header with garbage
3142        // bytes should reject the request, not silently default to
3143        // unfiltered. Silent fallback would let an attacker / bug
3144        // return more rows than the caller intended.
3145        let headers = vec![(RPC_WHERE_HEADER.to_string(), b"not-json".to_vec())];
3146        let result = predicate_from_rpc_headers(&headers).unwrap();
3147        assert!(
3148            matches!(result, Err(PredicateRpcDecodeError::Json(_))),
3149            "expected JSON decode error; got {result:?}",
3150        );
3151    }
3152    #[test]
3153    fn rpc_header_oversize_payload_returns_decode_error() {
3154        // N-6 regression: decode path enforces the
3155        // `MAX_PREDICATE_RPC_HEADER_VALUE_LEN` cap symmetrically with
3156        // the encode path. Pre-fix `predicate_from_rpc_headers` had
3157        // no length check, so an oversize JSON blob walked through
3158        // `serde_json::from_slice` + `rebuild_predicate` with depth
3159        // bounded only by input size — a cheap parse-bomb DoS shape
3160        // if a transport cap was ever relaxed.
3161        let oversize = vec![b' '; MAX_PREDICATE_RPC_HEADER_VALUE_LEN + 1];
3162        let headers = vec![(RPC_WHERE_HEADER.to_string(), oversize)];
3163        let result = predicate_from_rpc_headers(&headers).unwrap();
3164        assert!(
3165            matches!(
3166                result,
3167                Err(PredicateRpcDecodeError::Oversize { actual, limit })
3168                    if actual == MAX_PREDICATE_RPC_HEADER_VALUE_LEN + 1
3169                       && limit == MAX_PREDICATE_RPC_HEADER_VALUE_LEN
3170            ),
3171            "expected Oversize decode error; got {result:?}",
3172        );
3173    }
3174    #[test]
3175    fn rpc_header_cycle_in_payload_returns_decode_error() {
3176        // Defensive: a wire payload with a child-index cycle
3177        // (legal JSON but structurally invalid) is rejected.
3178        let bad_wire = PredicateWire {
3179            nodes: vec![PredicateNodeWire::Not { child: 0 }],
3180            root_idx: 0,
3181        };
3182        let bad_bytes = serde_json::to_vec(&bad_wire).unwrap();
3183        let headers = vec![(RPC_WHERE_HEADER.to_string(), bad_bytes)];
3184        let result = predicate_from_rpc_headers(&headers).unwrap();
3185        assert!(
3186            matches!(
3187                result,
3188                Err(PredicateRpcDecodeError::Wire(
3189                    PredicateWireError::CycleDetected { .. }
3190                ))
3191            ),
3192            "expected wire cycle error; got {result:?}",
3193        );
3194    }
3195    #[test]
3196    fn rpc_header_first_match_wins_on_duplicate_headers() {
3197        // Per the helper's documented contract: duplicate headers
3198        // under `net-where` are not coalesced; the first
3199        // match wins. Pin so a future "merge duplicates" change
3200        // is loud.
3201        let pred_a = Predicate::Exists {
3202            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3203        };
3204        let pred_b = Predicate::MetadataEquals {
3205            key: "intent".into(),
3206            value: "ml-training".into(),
3207        };
3208        let header_a = predicate_to_rpc_header(&pred_a).unwrap();
3209        let header_b = predicate_to_rpc_header(&pred_b).unwrap();
3210        let headers = vec![header_a, header_b];
3211        let decoded = predicate_from_rpc_headers(&headers).unwrap().unwrap();
3212        assert_eq!(decoded, pred_a);
3213    }
3214    #[test]
3215    fn rpc_header_oversize_predicate_rejected_at_encode() {
3216        // A predicate that would exceed the header-value cap is
3217        // rejected by `predicate_to_rpc_header` rather than being
3218        // truncated / silently dropped. Caller decides how to
3219        // surface this (split the predicate, simplify, or fail).
3220        // Build a many-clause Or that overflows the 4 KB cap.
3221        let mut clauses = Vec::new();
3222        // ~30 chars of metadata key per clause; 200 clauses ≈ 6 KB JSON.
3223        for i in 0..200 {
3224            clauses.push(Predicate::MetadataEquals {
3225                key: format!("very-long-metadata-key-{i:04}"),
3226                value: format!("very-long-metadata-value-{i:04}"),
3227            });
3228        }
3229        let huge = Predicate::Or(clauses);
3230        let result = predicate_to_rpc_header(&huge);
3231        assert!(
3232            matches!(result, Err(PredicateRpcEncodeError::TooLarge { actual, limit })
3233                if actual > limit && limit == MAX_PREDICATE_RPC_HEADER_VALUE_LEN),
3234            "expected TooLarge; got {result:?}",
3235        );
3236    }
3237    #[test]
3238    fn rpc_header_typical_predicate_fits_well_under_cap() {
3239        // Sanity bound: a representative predicate (5 leaves +
3240        // some boolean composition) should encode well under
3241        // the 4 KB cap. This is the load-bearing case for
3242        // production use.
3243        let pred = sample_complex_predicate();
3244        let header = predicate_to_rpc_header(&pred).expect("encode");
3245        // Should be well under the cap. Loose upper bound: 1 KB.
3246        assert!(
3247            header.1.len() < 1024,
3248            "encoded predicate is {} bytes, expected < 1024",
3249            header.1.len(),
3250        );
3251    }
3252    #[test]
3253    fn rpc_header_can_be_decoded_via_borrow_or_owned_tuple() {
3254        // Pin: the `AsRpcHeader` trait accepts both `&(String, Vec<u8>)`
3255        // and `(String, Vec<u8>)` so service handlers can iterate
3256        // either an owned vec or a borrowed slice.
3257        let pred = Predicate::Exists {
3258            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3259        };
3260        let header = predicate_to_rpc_header(&pred).unwrap();
3261        let headers = vec![header];
3262
3263        // Owned slice.
3264        let decoded_owned = predicate_from_rpc_headers(&headers).unwrap().unwrap();
3265        assert_eq!(decoded_owned, pred);
3266
3267        // Borrow-collected slice.
3268        let by_ref: Vec<&(String, Vec<u8>)> = headers.iter().collect();
3269        let decoded_borrow = predicate_from_rpc_headers(&by_ref).unwrap().unwrap();
3270        assert_eq!(decoded_borrow, pred);
3271    }
3272    #[test]
3273    fn rpc_header_json_format_is_human_readable() {
3274        // Pin the wire format as JSON (not postcard) so cross-
3275        // binding fixtures and tcpdump captures are diff-able.
3276        // Phase 9b of CAPABILITY_SYSTEM_SDK_PLAN.md uses this same
3277        // shape for the `predicate_nrpc_envelope.json` fixture.
3278        let pred = Predicate::MetadataEquals {
3279            key: "intent".into(),
3280            value: "ml-training".into(),
3281        };
3282        let header = predicate_to_rpc_header(&pred).unwrap();
3283        let json = std::str::from_utf8(&header.1).expect("JSON is UTF-8");
3284        assert!(
3285            json.contains("\"kind\":\"metadata_equals\""),
3286            "unexpected JSON shape: {json}",
3287        );
3288        assert!(json.contains("\"key\":\"intent\""), "missing key: {json}");
3289        assert!(
3290            json.contains("\"value\":\"ml-training\""),
3291            "missing value: {json}",
3292        );
3293    }
3294    /// N-9 regression: `dynamic_cost` and `dynamic_cost_or` must
3295    /// saturate `usize` cardinalities to `u32::MAX` rather than
3296    /// wrapping. Pre-fix the `(cardinality as u32)` cast wrapped
3297    /// modulo `2³²`, so a cardinality of `u32::MAX + 1` divided
3298    /// `static_cost / 0.max(1) = static_cost`, while
3299    /// `u32::MAX + 2` divided by `1`, treating the most-selective
3300    /// key as if it had only 1 distinct value. Real-world trigger:
3301    /// long-running fleets with unbounded-cardinality metadata
3302    /// keys (session id, request id, anything per-call).
3303    #[test]
3304    fn dynamic_cost_saturates_huge_cardinality_to_u32_max() {
3305        struct HugeCardinality;
3306        impl crate::adapter::net::behavior::CardinalityProvider for HugeCardinality {
3307            fn axis_cardinality(&self, _key: &crate::adapter::net::behavior::tag::TagKey) -> usize {
3308                // > u32::MAX. On 64-bit hosts this wraps if cast
3309                // via `as u32`; the fix uses `u32::try_from(...)`
3310                // with a saturating fallback.
3311                (u32::MAX as usize).wrapping_add(2)
3312            }
3313            fn metadata_value_cardinality(&self, _key: &str) -> usize {
3314                (u32::MAX as usize).wrapping_add(2)
3315            }
3316        }
3317
3318        let clause = Predicate::Equals {
3319            key: TagKey::new(TaxonomyAxis::Hardware, "memory_gb"),
3320            value: "1".into(),
3321        };
3322        let dyn_cost = clause.dynamic_cost(&HugeCardinality);
3323        let static_c = clause.static_cost();
3324        // With saturation, cardinality clamps to u32::MAX so
3325        // `static_c / u32::MAX == 0` (since static_c < u32::MAX).
3326        // Pre-fix the cast wrapped to 1, giving `static_c / 1 ==
3327        // static_c` — the bug shape.
3328        assert!(
3329            dyn_cost < static_c,
3330            "dynamic_cost must reflect saturation (got {dyn_cost}, static={static_c})",
3331        );
3332
3333        // Same pin for the Or-side: `static_c.saturating_mul(u32::MAX)`
3334        // saturates to u32::MAX rather than wrapping back to a
3335        // tiny number.
3336        let or_cost = clause.dynamic_cost_or(&HugeCardinality);
3337        assert_eq!(
3338            or_cost,
3339            u32::MAX,
3340            "dynamic_cost_or must saturate to u32::MAX on huge cardinality",
3341        );
3342    }
3343    // ========================================================================
3344    // Service-side row filter ergonomics — Phase 5.B follow-on of
3345    // CAPABILITY_ENHANCEMENTS_PLAN.md.
3346    // ========================================================================
3347
3348    #[test]
3349    fn matches_capability_set_evaluates_against_caps_tags_and_metadata() {
3350        // Pin: `Predicate::matches_capability_set` is a one-line
3351        // entry point for "does this CapabilitySet match this
3352        // predicate?". Internally materializes caps.tags as a Vec
3353        // for the slice-based EvalContext.
3354        let pred = Predicate::And(vec![
3355            Predicate::Exists {
3356                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3357            },
3358            Predicate::MetadataEquals {
3359                key: "intent".into(),
3360                value: "ml-training".into(),
3361            },
3362        ]);
3363
3364        // Match: caps has both tag and metadata.
3365        let caps_match = CapabilitySet::new()
3366            .with_hardware(HardwareCapabilities::new().with_gpu(GpuInfo::new(
3367                GpuVendor::Nvidia,
3368                "h100",
3369                80,
3370            )))
3371            .with_metadata("intent", "ml-training");
3372        assert!(pred.matches_capability_set(&caps_match));
3373
3374        // Miss on the metadata side.
3375        let caps_miss_meta = CapabilitySet::new().with_hardware(
3376            HardwareCapabilities::new().with_gpu(GpuInfo::new(GpuVendor::Nvidia, "h100", 80)),
3377        );
3378        assert!(!pred.matches_capability_set(&caps_miss_meta));
3379
3380        // Miss on the tag side.
3381        let caps_miss_tag = CapabilitySet::new().with_metadata("intent", "ml-training");
3382        assert!(!pred.matches_capability_set(&caps_miss_tag));
3383
3384        // Empty caps don't match.
3385        assert!(!pred.matches_capability_set(&CapabilitySet::default()));
3386    }
3387    /// Application row type used to exercise `RpcPredicateContext`
3388    /// and `filter_by_predicate`. Mirrors what a service
3389    /// handler's row would look like.
3390    struct TestJob {
3391        id: u64,
3392        tags: Vec<Tag>,
3393        metadata: BTreeMap<String, String>,
3394    }
3395    impl RpcPredicateContext for TestJob {
3396        fn rpc_predicate_tags(&self) -> &[Tag] {
3397            &self.tags
3398        }
3399        fn rpc_predicate_metadata(&self) -> &BTreeMap<String, String> {
3400            &self.metadata
3401        }
3402    }
3403    #[test]
3404    fn filter_by_predicate_returns_all_rows_when_predicate_is_none() {
3405        // Pin: `pred = None` is the no-filter case (request didn't
3406        // include `net-where`). Every row passes through.
3407        let jobs = vec![
3408            TestJob {
3409                id: 1,
3410                tags: vec![],
3411                metadata: BTreeMap::new(),
3412            },
3413            TestJob {
3414                id: 2,
3415                tags: vec![axis_present(TaxonomyAxis::Hardware, "gpu")],
3416                metadata: BTreeMap::new(),
3417            },
3418        ];
3419        let filtered: Vec<u64> = filter_by_predicate(jobs, None).map(|j| j.id).collect();
3420        assert_eq!(filtered, vec![1, 2]);
3421    }
3422    #[test]
3423    fn filter_by_predicate_keeps_only_matching_rows() {
3424        // Pin: with a predicate set, only rows whose tags +
3425        // metadata satisfy it survive the filter.
3426        let pred = Predicate::Exists {
3427            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3428        };
3429        let jobs = vec![
3430            TestJob {
3431                id: 1,
3432                tags: vec![],
3433                metadata: BTreeMap::new(),
3434            },
3435            TestJob {
3436                id: 2,
3437                tags: vec![axis_present(TaxonomyAxis::Hardware, "gpu")],
3438                metadata: BTreeMap::new(),
3439            },
3440            TestJob {
3441                id: 3,
3442                tags: vec![axis_eq(TaxonomyAxis::Hardware, "gpu.vendor", "nvidia")],
3443                metadata: BTreeMap::new(),
3444            },
3445            TestJob {
3446                id: 4,
3447                tags: vec![
3448                    axis_present(TaxonomyAxis::Hardware, "gpu"),
3449                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "64"),
3450                ],
3451                metadata: BTreeMap::new(),
3452            },
3453        ];
3454        let filtered: Vec<u64> = filter_by_predicate(jobs, Some(&pred))
3455            .map(|j| j.id)
3456            .collect();
3457        // Only ids 2 and 4 have the gpu presence tag.
3458        assert_eq!(filtered, vec![2, 4]);
3459    }
3460    #[test]
3461    fn filter_by_predicate_combined_axis_and_metadata_clauses() {
3462        // Pin: predicates with both axis-tag AND metadata clauses
3463        // work end-to-end through the filter helper. Mirrors the
3464        // canonical "where: gpu AND intent = ml-training" use case.
3465        let pred = Predicate::And(vec![
3466            Predicate::Exists {
3467                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3468            },
3469            Predicate::MetadataEquals {
3470                key: "intent".into(),
3471                value: "ml-training".into(),
3472            },
3473        ]);
3474        let jobs = vec![
3475            TestJob {
3476                id: 1,
3477                tags: vec![axis_present(TaxonomyAxis::Hardware, "gpu")],
3478                metadata: meta_with(&[("intent", "embedding-cache")]),
3479            },
3480            TestJob {
3481                id: 2,
3482                tags: vec![axis_present(TaxonomyAxis::Hardware, "gpu")],
3483                metadata: meta_with(&[("intent", "ml-training")]),
3484            },
3485            TestJob {
3486                id: 3,
3487                tags: vec![],
3488                metadata: meta_with(&[("intent", "ml-training")]),
3489            },
3490        ];
3491        let filtered: Vec<u64> = filter_by_predicate(jobs, Some(&pred))
3492            .map(|j| j.id)
3493            .collect();
3494        // Only id 2 has both gpu AND intent=ml-training.
3495        assert_eq!(filtered, vec![2]);
3496    }
3497    #[test]
3498    fn filter_by_predicate_empty_input_yields_empty_iterator() {
3499        let pred = Predicate::Exists {
3500            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3501        };
3502        let jobs: Vec<TestJob> = Vec::new();
3503        let filtered: Vec<u64> = filter_by_predicate(jobs, Some(&pred))
3504            .map(|j| j.id)
3505            .collect();
3506        assert!(filtered.is_empty());
3507    }
3508    #[test]
3509    fn end_to_end_predicate_pushdown_flow() {
3510        // Pin the canonical Phase 5.B usage: client builds a
3511        // predicate, encodes to an RPC header, server decodes and
3512        // filters its row stream. This is the load-bearing
3513        // workflow Phase 5.B exists for.
3514
3515        // Client side: build predicate, encode.
3516        let pred = Predicate::And(vec![
3517            Predicate::Exists {
3518                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3519            },
3520            Predicate::NumericAtLeast {
3521                key: TagKey::new(TaxonomyAxis::Hardware, "memory_gb"),
3522                threshold: 32.0,
3523            },
3524        ]);
3525        let encoded = predicate_to_rpc_header(&pred).expect("encode");
3526
3527        // Server side: receive request with this header alongside
3528        // standard tracing/idempotency keys. Decode the predicate.
3529        let request_headers = vec![
3530            ("trace-id".to_string(), b"abc123".to_vec()),
3531            encoded,
3532            ("idempotency-key".to_string(), b"def456".to_vec()),
3533        ];
3534        let decoded_pred = predicate_from_rpc_headers(&request_headers)
3535            .expect("header present")
3536            .expect("decode");
3537
3538        // Server side: filter the row stream.
3539        let jobs = vec![
3540            TestJob {
3541                id: 1, // No GPU.
3542                tags: vec![axis_eq(TaxonomyAxis::Hardware, "memory_gb", "64")],
3543                metadata: BTreeMap::new(),
3544            },
3545            TestJob {
3546                id: 2, // GPU + 32 GB → matches.
3547                tags: vec![
3548                    axis_present(TaxonomyAxis::Hardware, "gpu"),
3549                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "32"),
3550                ],
3551                metadata: BTreeMap::new(),
3552            },
3553            TestJob {
3554                id: 3, // GPU + 16 GB → too little memory.
3555                tags: vec![
3556                    axis_present(TaxonomyAxis::Hardware, "gpu"),
3557                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "16"),
3558                ],
3559                metadata: BTreeMap::new(),
3560            },
3561            TestJob {
3562                id: 4, // GPU + 65 GB → matches.
3563                tags: vec![
3564                    axis_present(TaxonomyAxis::Hardware, "gpu"),
3565                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "64"),
3566                ],
3567                metadata: BTreeMap::new(),
3568            },
3569        ];
3570        let matched: Vec<u64> = filter_by_predicate(jobs, Some(&decoded_pred))
3571            .map(|j| j.id)
3572            .collect();
3573        assert_eq!(matched, vec![2, 4]);
3574    }
3575    #[test]
3576    fn to_wire_handles_deep_nesting_without_stack_overflow() {
3577        // Regression: the prior recursive append_to_wire would
3578        // blow the thread stack on caller-controlled deeply
3579        // nested predicates. The iterative version uses a
3580        // heap-allocated work stack — depth-unbounded.
3581        //
3582        // Build a 10_000-deep `Not(Not(...Exists))` chain;
3583        // confirm to_wire produces 10_001 nodes (10k Not + 1
3584        // leaf) and that the root index is the topmost Not.
3585        // (The mirror rebuild path `into_predicate` is still
3586        // recursive — out of scope for this fix; the FFI parser
3587        // caps depth at 64, and SDK consumers build trees via
3588        // typed factories where recursion-driven depth is a
3589        // developer-controlled property.)
3590        let leaf = Predicate::Exists {
3591            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3592        };
3593        let mut p = leaf;
3594        for _ in 0..10_000 {
3595            p = Predicate::Not(Box::new(p));
3596        }
3597        let wire = p.to_wire();
3598        assert_eq!(wire.nodes.len(), 10_001);
3599        let root = &wire.nodes[wire.root_idx as usize];
3600        assert!(matches!(root, PredicateNodeWire::Not { .. }));
3601    }
3602    #[test]
3603    fn to_wire_preserves_left_to_right_child_ordering() {
3604        // The iterative walk pushes children in reverse so the
3605        // leftmost child is popped first; pin the output to
3606        // catch any regression that flips order.
3607        let p = Predicate::And(vec![
3608            Predicate::Exists {
3609                key: TagKey::new(TaxonomyAxis::Hardware, "a"),
3610            },
3611            Predicate::Exists {
3612                key: TagKey::new(TaxonomyAxis::Hardware, "b"),
3613            },
3614            Predicate::Exists {
3615                key: TagKey::new(TaxonomyAxis::Hardware, "c"),
3616            },
3617        ]);
3618        let wire = p.to_wire();
3619        // 3 leaves + 1 And = 4 nodes.
3620        assert_eq!(wire.nodes.len(), 4);
3621        // Root is the And.
3622        let PredicateNodeWire::And { children } = &wire.nodes[wire.root_idx as usize] else {
3623            panic!("root should be And");
3624        };
3625        // Children should reference leaves at indices [0,1,2]
3626        // — emitted in input order.
3627        assert_eq!(children.as_slice(), &[0u32, 1, 2]);
3628        // And each leaf should match the expected key.
3629        for (i, key) in ["a", "b", "c"].iter().enumerate() {
3630            let PredicateNodeWire::Exists { key: k } = &wire.nodes[i] else {
3631                panic!("expected Exists at index {i}");
3632            };
3633            assert_eq!(k.key.as_str(), *key);
3634        }
3635    }
3636}