Skip to main content

net/adapter/net/behavior/
predicate.rs

1//! Capability predicate AST — Phase A foundation for the federated
2//! query primitives in `CAPABILITY_SYSTEM_PLAN.md` §6a.
3//!
4//! Ships the `Predicate` enum with all 17 variants the substrate plan
5//! pins, an evaluator that takes a `(tags, metadata)` context, and
6//! constructor helpers + the `pred!` macro that the cross-binding
7//! SDK plan exposes language-idiomatic builders for.
8//!
9//! ## Variants
10//!
11//! Existence + equality (axis tags):
12//! - [`Predicate::Exists`] — tag with this `(axis, key)` is present.
13//! - [`Predicate::Equals`] — tag's value matches exactly.
14//!
15//! Numeric (axis tags whose value parses to `f64`):
16//! - [`Predicate::NumericAtLeast`] / [`Predicate::NumericAtMost`] / [`Predicate::NumericInRange`]
17//!
18//! Semver (axis tags whose value parses to `MAJOR.MINOR.PATCH`):
19//! - [`Predicate::SemverAtLeast`] / [`Predicate::SemverAtMost`]
20//! - [`Predicate::SemverCompatible`] — same major-version family
21//!   (or, for `0.x.y`, same minor) per the standard semver
22//!   compatibility rules.
23//!
24//! String (axis tag values):
25//! - [`Predicate::StringPrefix`] — value starts with the prefix.
26//! - [`Predicate::StringMatches`] — value contains the substring.
27//!   Phase E will swap this to regex behind the existing `regex`
28//!   feature gate; semantics today are substring-only.
29//!
30//! Metadata (the `BTreeMap<String, String>` field added in Phase C):
31//! - [`Predicate::MetadataExists`] / [`Predicate::MetadataEquals`]
32//! - [`Predicate::MetadataMatches`] (substring; same Phase-E swap)
33//! - [`Predicate::MetadataNumericAtLeast`]
34//!
35//! Boolean composition:
36//! - [`Predicate::And`] / [`Predicate::Or`] / [`Predicate::Not`]
37//!
38//! ## Evaluation
39//!
40//! `Predicate::evaluate` is a pure function over [`EvalContext`]
41//! (`(tags, metadata)`) — no I/O, no allocation outside what the
42//! pattern variants explicitly need (regex compilation lands with
43//! the Phase E swap). Numeric / semver parse failures evaluate to
44//! `false` rather than panicking; cross-binding queries should not
45//! fault on a malformed tag value.
46
47use std::collections::BTreeMap;
48
49use crate::adapter::net::behavior::tag::{Tag, TagKey};
50
51// =============================================================================
52// EvalContext
53// =============================================================================
54
55/// `(tags, metadata)` context passed to [`Predicate::evaluate`].
56/// Decoupled from `CapabilitySet` so the predicate evaluator works
57/// against the substrate's pre-Phase-A.5 capability shape AND the
58/// post-migration shape (`tags: HashSet<Tag>`) without churn.
59#[derive(Debug, Clone, Copy)]
60pub struct EvalContext<'a> {
61    /// Tag set against which axis predicates evaluate.
62    pub tags: &'a [Tag],
63    /// Key-value metadata against which metadata predicates evaluate.
64    pub metadata: &'a BTreeMap<String, String>,
65}
66
67impl<'a> EvalContext<'a> {
68    /// Build a context from explicit slices. The most common
69    /// constructor for callers that hold a `Vec<Tag>` or `&[Tag]`.
70    pub fn new(tags: &'a [Tag], metadata: &'a BTreeMap<String, String>) -> Self {
71        Self { tags, metadata }
72    }
73}
74
75// =============================================================================
76// Predicate
77// =============================================================================
78
79/// AST for capability queries. Pure data — clones, equality, and
80/// serde round-trip are the basis of cross-binding wire format.
81///
82/// See module docs for the variant taxonomy.
83// `PartialEq` only because `f64` doesn't implement `Eq` (NaN
84// asymmetry). Predicate equality is structural, not hashable —
85// we never use it as a HashMap key.
86//
87// Serde derive intentionally OMITTED for Phase A. The recursive
88// `Box<Predicate>` + `Vec<Predicate>` shape compounds with the
89// existing `event::*` serializer monomorphization graph and
90// pushes the test-build's recursion-limit / compile-time past
91// the project's budget. Phase E (federated query primitives)
92// adds cross-binding wire format with a flat-tree IR (or
93// postcard, which handles recursion better than serde_json's
94// derive expansion). For Phase A, the AST + evaluator are
95// process-local — no need to serialize.
96#[derive(Debug, Clone, PartialEq)]
97pub enum Predicate {
98    // ---- Axis tags: existence + equality --------------------------------
99    /// Tag with this `(axis, key)` is present (regardless of value).
100    Exists {
101        /// Tag key to probe.
102        key: TagKey,
103    },
104    /// Tag's value matches exactly. Presence-only tags don't match
105    /// (use [`Predicate::Exists`] for that).
106    Equals {
107        /// Tag key.
108        key: TagKey,
109        /// Required value (string-equality).
110        value: String,
111    },
112
113    // ---- Axis tags: numeric ---------------------------------------------
114    /// Tag's value parses to `f64` and is `>= threshold`.
115    NumericAtLeast {
116        /// Tag key.
117        key: TagKey,
118        /// Inclusive lower bound.
119        threshold: f64,
120    },
121    /// Tag's value parses to `f64` and is `<= threshold`.
122    NumericAtMost {
123        /// Tag key.
124        key: TagKey,
125        /// Inclusive upper bound.
126        threshold: f64,
127    },
128    /// Tag's value parses to `f64` and lies in `[min, max]` inclusive.
129    NumericInRange {
130        /// Tag key.
131        key: TagKey,
132        /// Inclusive lower bound.
133        min: f64,
134        /// Inclusive upper bound.
135        max: f64,
136    },
137
138    // ---- Axis tags: semver ----------------------------------------------
139    /// Tag's value parses to `MAJOR.MINOR.PATCH` and is `>= version`.
140    SemverAtLeast {
141        /// Tag key.
142        key: TagKey,
143        /// Reference version.
144        version: String,
145    },
146    /// Tag's value parses to `MAJOR.MINOR.PATCH` and is `<= version`.
147    SemverAtMost {
148        /// Tag key.
149        key: TagKey,
150        /// Reference version.
151        version: String,
152    },
153    /// Tag's value parses to `MAJOR.MINOR.PATCH` and is in the same
154    /// compatibility band: same major for `>= 1.0.0`, same minor for
155    /// `0.x.y`. Mirrors the standard semver caret-compatibility rule.
156    SemverCompatible {
157        /// Tag key.
158        key: TagKey,
159        /// Reference version.
160        version: String,
161    },
162
163    // ---- Axis tags: string ----------------------------------------------
164    /// Tag's value starts with `prefix`.
165    StringPrefix {
166        /// Tag key.
167        key: TagKey,
168        /// Prefix to match.
169        prefix: String,
170    },
171    /// Tag's value contains `pattern` as a substring. Phase E will
172    /// upgrade to regex behind the `regex` feature gate; semantics
173    /// today are substring-only.
174    StringMatches {
175        /// Tag key.
176        key: TagKey,
177        /// Substring pattern.
178        pattern: String,
179    },
180
181    // ---- Metadata -------------------------------------------------------
182    /// Metadata key is present.
183    MetadataExists {
184        /// Metadata key.
185        key: String,
186    },
187    /// Metadata value matches exactly.
188    MetadataEquals {
189        /// Metadata key.
190        key: String,
191        /// Required value (string-equality).
192        value: String,
193    },
194    /// Metadata value contains `pattern` as a substring (same
195    /// substring-only semantics as [`Predicate::StringMatches`]).
196    MetadataMatches {
197        /// Metadata key.
198        key: String,
199        /// Substring pattern.
200        pattern: String,
201    },
202    /// Metadata value parses to `f64` and is `>= threshold`.
203    MetadataNumericAtLeast {
204        /// Metadata key.
205        key: String,
206        /// Inclusive lower bound.
207        threshold: f64,
208    },
209
210    // ---- Boolean composition --------------------------------------------
211    /// Conjunction. Empty `Vec` evaluates to `true` (vacuous match —
212    /// matches the standard math/logic convention; pin in tests).
213    And(Vec<Predicate>),
214    /// Disjunction. Empty `Vec` evaluates to `false` (vacuous miss).
215    Or(Vec<Predicate>),
216    /// Negation.
217    Not(Box<Predicate>),
218}
219
220// =============================================================================
221// Wire format — Phase 5 of CAPABILITY_ENHANCEMENTS_PLAN.md.
222//
223// The recursive `Box<Predicate>` + `Vec<Predicate>` shape compounds
224// with the existing `event::*` serializer monomorphization graph
225// and pushes test-build recursion-limit / compile-time past the
226// project's budget (per the comment at the head of this module).
227//
228// The flat-tree IR below sidesteps that: nodes live in a single
229// `Vec<PredicateNodeWire>`; And/Or/Not reference children via
230// `u32` indices into that table. No variant of `PredicateNodeWire`
231// transitively references `PredicateWire` itself, so serde derive
232// expansion stays bounded.
233//
234// Round-trip:
235//
236//   Predicate::to_wire()        →  PredicateWire
237//   PredicateWire::into_predicate() →  Result<Predicate, _>
238//
239// Pinned in `wire_round_trip_*` tests below.
240// =============================================================================
241
242/// One node in the flat predicate wire format. `And`/`Or`/`Not`
243/// reference their children via `u32` indices into the parent
244/// [`PredicateWire`]'s `nodes` table.
245///
246/// Node ordering invariant: children always appear at lower
247/// indices than their parent (post-order serialization). The
248/// rebuild path enforces this to catch malformed wire payloads
249/// that attempt index cycles.
250#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
251#[serde(rename_all = "snake_case", tag = "kind")]
252pub enum PredicateNodeWire {
253    /// Leaf: tag with this `(axis, key)` is present.
254    Exists {
255        /// Tag key.
256        key: TagKey,
257    },
258    /// Leaf: tag's value matches exactly.
259    Equals {
260        /// Tag key.
261        key: TagKey,
262        /// Required value.
263        value: String,
264    },
265    /// Leaf: tag's value parses to `f64` and is `>= threshold`.
266    NumericAtLeast {
267        /// Tag key.
268        key: TagKey,
269        /// Inclusive lower bound.
270        threshold: f64,
271    },
272    /// Leaf: tag's value parses to `f64` and is `<= threshold`.
273    NumericAtMost {
274        /// Tag key.
275        key: TagKey,
276        /// Inclusive upper bound.
277        threshold: f64,
278    },
279    /// Leaf: tag's value parses to `f64` and lies in `[min, max]`.
280    NumericInRange {
281        /// Tag key.
282        key: TagKey,
283        /// Inclusive lower bound.
284        min: f64,
285        /// Inclusive upper bound.
286        max: f64,
287    },
288    /// Leaf: tag's value parses to a semver triple and is `>= version`.
289    SemverAtLeast {
290        /// Tag key.
291        key: TagKey,
292        /// Reference version.
293        version: String,
294    },
295    /// Leaf: tag's value parses to a semver triple and is `<= version`.
296    SemverAtMost {
297        /// Tag key.
298        key: TagKey,
299        /// Reference version.
300        version: String,
301    },
302    /// Leaf: tag's value parses to a semver triple and is in the
303    /// same compatibility band as `version`.
304    SemverCompatible {
305        /// Tag key.
306        key: TagKey,
307        /// Reference version.
308        version: String,
309    },
310    /// Leaf: tag's value starts with `prefix`.
311    StringPrefix {
312        /// Tag key.
313        key: TagKey,
314        /// Prefix to match.
315        prefix: String,
316    },
317    /// Leaf: tag's value contains `pattern` as a substring.
318    StringMatches {
319        /// Tag key.
320        key: TagKey,
321        /// Substring pattern.
322        pattern: String,
323    },
324    /// Leaf: metadata key is present.
325    MetadataExists {
326        /// Metadata key.
327        key: String,
328    },
329    /// Leaf: metadata value matches exactly.
330    MetadataEquals {
331        /// Metadata key.
332        key: String,
333        /// Required value.
334        value: String,
335    },
336    /// Leaf: metadata value contains `pattern` as a substring.
337    MetadataMatches {
338        /// Metadata key.
339        key: String,
340        /// Substring pattern.
341        pattern: String,
342    },
343    /// Leaf: metadata value parses to `f64` and is `>= threshold`.
344    MetadataNumericAtLeast {
345        /// Metadata key.
346        key: String,
347        /// Inclusive lower bound.
348        threshold: f64,
349    },
350    /// Composite: conjunction of children at the named indices.
351    And {
352        /// Child indices into the parent `PredicateWire::nodes`.
353        children: Vec<u32>,
354    },
355    /// Composite: disjunction of children at the named indices.
356    Or {
357        /// Child indices into the parent `PredicateWire::nodes`.
358        children: Vec<u32>,
359    },
360    /// Composite: negation of the child at the named index.
361    Not {
362        /// Child index into the parent `PredicateWire::nodes`.
363        child: u32,
364    },
365}
366
367/// Wire format for [`Predicate`]. Flat node table with index
368/// references for `And`/`Or`/`Not` children.
369///
370/// Phase 5 of `CAPABILITY_ENHANCEMENTS_PLAN.md`. Crosses the
371/// nRPC envelope as serde-encoded bytes (postcard for cross-binding,
372/// JSON for debug fixtures); the substrate's capability
373/// announcement path is unchanged.
374///
375/// Build via [`Predicate::to_wire`]; rebuild via
376/// [`PredicateWire::into_predicate`].
377#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
378pub struct PredicateWire {
379    /// Flat node table. Children always live at lower indices
380    /// than their parents.
381    pub nodes: Vec<PredicateNodeWire>,
382    /// Index of the root node within `nodes`. Always
383    /// `nodes.len() - 1` for a freshly-emitted `to_wire()` output;
384    /// callers receiving an externally-built wire payload should
385    /// not assume that.
386    pub root_idx: u32,
387}
388
389/// Errors raised by [`PredicateWire::into_predicate`].
390#[derive(Debug, Clone, PartialEq, thiserror::Error)]
391pub enum PredicateWireError {
392    /// Wire payload had an empty `nodes` table.
393    #[error("predicate wire has empty nodes table")]
394    Empty,
395    /// `root_idx` was out of bounds for the `nodes` table.
396    #[error("predicate wire root_idx {root_idx} >= nodes len {len}")]
397    RootOutOfBounds {
398        /// The provided `root_idx`.
399        root_idx: u32,
400        /// Length of the `nodes` table.
401        len: usize,
402    },
403    /// A composite node referenced a child index that was out of
404    /// bounds.
405    #[error("predicate wire child index {child} out of bounds for nodes len {len}")]
406    ChildOutOfBounds {
407        /// The malformed child index.
408        child: u32,
409        /// Length of the `nodes` table.
410        len: usize,
411    },
412    /// A composite node referenced a child index that was greater
413    /// than or equal to its own. Catches index cycles introduced
414    /// by malformed / malicious wire payloads.
415    #[error("predicate wire child index {child} >= parent index {parent} (cycle)")]
416    CycleDetected {
417        /// Parent node index.
418        parent: u32,
419        /// Offending child index.
420        child: u32,
421    },
422}
423
424impl Predicate {
425    /// Convert to the flat wire format. Post-order serialization:
426    /// leaves land first, the root has the highest index.
427    ///
428    /// Output is byte-stable across calls — two `to_wire()`s on
429    /// equal predicates produce identical `PredicateWire` values
430    /// (and identical bytes through any serde encoder).
431    pub fn to_wire(&self) -> PredicateWire {
432        let mut nodes = Vec::new();
433        let root_idx = self.append_to_wire(&mut nodes);
434        PredicateWire { nodes, root_idx }
435    }
436
437    /// Iterative helper: append `self` (and any sub-tree) into
438    /// `nodes`, returning the index of the root of the sub-tree.
439    ///
440    /// Implemented as a heap-allocated work stack rather than
441    /// straight recursion. A deeply-nested predicate
442    /// (`Not(Not(Not(...)))` 10k deep, or `And([many And([...])])`)
443    /// otherwise overflows the thread stack — caller-controlled
444    /// input from the FFI shims can build arbitrarily-deep
445    /// `Predicate` trees via typed factories. Output is identical
446    /// to the prior recursive implementation: post-order
447    /// serialization, children at lower indices than their
448    /// parents.
449    #[expect(
450        clippy::expect_used,
451        reason = "predicate tree-walk invariants: every FinishX step is preceded by a matching number of Begin steps that push children; the final pop yields the root that the walk always pushes"
452    )]
453    fn append_to_wire(&self, nodes: &mut Vec<PredicateNodeWire>) -> u32 {
454        enum Step<'a> {
455            /// Visit a predicate. Leaves emit immediately;
456            /// composites schedule a matching `Finish*` after
457            /// pushing their children.
458            Visit(&'a Predicate),
459            /// Pop `n` child indices off `child_stack` and emit
460            /// an `And` referring to them, in the order the
461            /// children were visited (left to right).
462            FinishAnd(usize),
463            /// As `FinishAnd` but for `Or`.
464            FinishOr(usize),
465            /// Pop one child index off `child_stack` and emit a
466            /// `Not`.
467            FinishNot,
468        }
469
470        let mut work: Vec<Step<'_>> = Vec::with_capacity(8);
471        work.push(Step::Visit(self));
472        // Each child push records the node index it landed at;
473        // composite `Finish*` steps drain the trailing N entries.
474        let mut child_stack: Vec<u32> = Vec::with_capacity(8);
475
476        // Helper: emit a leaf node, push its index on the
477        // child_stack so the enclosing composite picks it up.
478        let emit = |nodes: &mut Vec<PredicateNodeWire>,
479                    child_stack: &mut Vec<u32>,
480                    node: PredicateNodeWire| {
481            let idx = nodes.len() as u32;
482            nodes.push(node);
483            child_stack.push(idx);
484        };
485
486        while let Some(step) = work.pop() {
487            match step {
488                Step::Visit(p) => match p {
489                    Self::Exists { key } => emit(
490                        nodes,
491                        &mut child_stack,
492                        PredicateNodeWire::Exists { key: key.clone() },
493                    ),
494                    Self::Equals { key, value } => emit(
495                        nodes,
496                        &mut child_stack,
497                        PredicateNodeWire::Equals {
498                            key: key.clone(),
499                            value: value.clone(),
500                        },
501                    ),
502                    Self::NumericAtLeast { key, threshold } => emit(
503                        nodes,
504                        &mut child_stack,
505                        PredicateNodeWire::NumericAtLeast {
506                            key: key.clone(),
507                            threshold: *threshold,
508                        },
509                    ),
510                    Self::NumericAtMost { key, threshold } => emit(
511                        nodes,
512                        &mut child_stack,
513                        PredicateNodeWire::NumericAtMost {
514                            key: key.clone(),
515                            threshold: *threshold,
516                        },
517                    ),
518                    Self::NumericInRange { key, min, max } => emit(
519                        nodes,
520                        &mut child_stack,
521                        PredicateNodeWire::NumericInRange {
522                            key: key.clone(),
523                            min: *min,
524                            max: *max,
525                        },
526                    ),
527                    Self::SemverAtLeast { key, version } => emit(
528                        nodes,
529                        &mut child_stack,
530                        PredicateNodeWire::SemverAtLeast {
531                            key: key.clone(),
532                            version: version.clone(),
533                        },
534                    ),
535                    Self::SemverAtMost { key, version } => emit(
536                        nodes,
537                        &mut child_stack,
538                        PredicateNodeWire::SemverAtMost {
539                            key: key.clone(),
540                            version: version.clone(),
541                        },
542                    ),
543                    Self::SemverCompatible { key, version } => emit(
544                        nodes,
545                        &mut child_stack,
546                        PredicateNodeWire::SemverCompatible {
547                            key: key.clone(),
548                            version: version.clone(),
549                        },
550                    ),
551                    Self::StringPrefix { key, prefix } => emit(
552                        nodes,
553                        &mut child_stack,
554                        PredicateNodeWire::StringPrefix {
555                            key: key.clone(),
556                            prefix: prefix.clone(),
557                        },
558                    ),
559                    Self::StringMatches { key, pattern } => emit(
560                        nodes,
561                        &mut child_stack,
562                        PredicateNodeWire::StringMatches {
563                            key: key.clone(),
564                            pattern: pattern.clone(),
565                        },
566                    ),
567                    Self::MetadataExists { key } => emit(
568                        nodes,
569                        &mut child_stack,
570                        PredicateNodeWire::MetadataExists { key: key.clone() },
571                    ),
572                    Self::MetadataEquals { key, value } => emit(
573                        nodes,
574                        &mut child_stack,
575                        PredicateNodeWire::MetadataEquals {
576                            key: key.clone(),
577                            value: value.clone(),
578                        },
579                    ),
580                    Self::MetadataMatches { key, pattern } => emit(
581                        nodes,
582                        &mut child_stack,
583                        PredicateNodeWire::MetadataMatches {
584                            key: key.clone(),
585                            pattern: pattern.clone(),
586                        },
587                    ),
588                    Self::MetadataNumericAtLeast { key, threshold } => emit(
589                        nodes,
590                        &mut child_stack,
591                        PredicateNodeWire::MetadataNumericAtLeast {
592                            key: key.clone(),
593                            threshold: *threshold,
594                        },
595                    ),
596                    Self::And(children) => {
597                        work.push(Step::FinishAnd(children.len()));
598                        // Push children in reverse so that the
599                        // leftmost child is popped first; this
600                        // preserves the left-to-right child
601                        // ordering of the recursive version.
602                        for c in children.iter().rev() {
603                            work.push(Step::Visit(c));
604                        }
605                    }
606                    Self::Or(children) => {
607                        work.push(Step::FinishOr(children.len()));
608                        for c in children.iter().rev() {
609                            work.push(Step::Visit(c));
610                        }
611                    }
612                    Self::Not(inner) => {
613                        work.push(Step::FinishNot);
614                        work.push(Step::Visit(inner));
615                    }
616                },
617                Step::FinishAnd(n) => {
618                    let start = child_stack.len() - n;
619                    let kids: Vec<u32> = child_stack.drain(start..).collect();
620                    let idx = nodes.len() as u32;
621                    nodes.push(PredicateNodeWire::And { children: kids });
622                    child_stack.push(idx);
623                }
624                Step::FinishOr(n) => {
625                    let start = child_stack.len() - n;
626                    let kids: Vec<u32> = child_stack.drain(start..).collect();
627                    let idx = nodes.len() as u32;
628                    nodes.push(PredicateNodeWire::Or { children: kids });
629                    child_stack.push(idx);
630                }
631                Step::FinishNot => {
632                    let child = child_stack
633                        .pop()
634                        .expect("Not body must emit one child before FinishNot");
635                    let idx = nodes.len() as u32;
636                    nodes.push(PredicateNodeWire::Not { child });
637                    child_stack.push(idx);
638                }
639            }
640        }
641
642        child_stack
643            .pop()
644            .expect("predicate must produce at least one node")
645    }
646}
647
648impl PredicateWire {
649    /// Rebuild a [`Predicate`] AST from the flat wire format.
650    ///
651    /// Validates structural integrity: empty tables, out-of-bounds
652    /// indices, and child-index cycles are surfaced as typed
653    /// [`PredicateWireError`] rather than panicking. A successful
654    /// rebuild is byte-equal to the input of the matching
655    /// [`Predicate::to_wire`] call.
656    pub fn into_predicate(self) -> Result<Predicate, PredicateWireError> {
657        if self.nodes.is_empty() {
658            return Err(PredicateWireError::Empty);
659        }
660        let len = self.nodes.len();
661        if (self.root_idx as usize) >= len {
662            return Err(PredicateWireError::RootOutOfBounds {
663                root_idx: self.root_idx,
664                len,
665            });
666        }
667        rebuild_predicate(&self.nodes, self.root_idx)
668    }
669}
670
671/// Recursive rebuild helper. Walks the flat node table from `idx`,
672/// validating child indices and cycles as it goes.
673fn rebuild_predicate(
674    nodes: &[PredicateNodeWire],
675    idx: u32,
676) -> Result<Predicate, PredicateWireError> {
677    let len = nodes.len();
678    let node = nodes
679        .get(idx as usize)
680        .ok_or(PredicateWireError::ChildOutOfBounds { child: idx, len })?;
681    let result = match node {
682        PredicateNodeWire::Exists { key } => Predicate::Exists { key: key.clone() },
683        PredicateNodeWire::Equals { key, value } => Predicate::Equals {
684            key: key.clone(),
685            value: value.clone(),
686        },
687        PredicateNodeWire::NumericAtLeast { key, threshold } => Predicate::NumericAtLeast {
688            key: key.clone(),
689            threshold: *threshold,
690        },
691        PredicateNodeWire::NumericAtMost { key, threshold } => Predicate::NumericAtMost {
692            key: key.clone(),
693            threshold: *threshold,
694        },
695        PredicateNodeWire::NumericInRange { key, min, max } => Predicate::NumericInRange {
696            key: key.clone(),
697            min: *min,
698            max: *max,
699        },
700        PredicateNodeWire::SemverAtLeast { key, version } => Predicate::SemverAtLeast {
701            key: key.clone(),
702            version: version.clone(),
703        },
704        PredicateNodeWire::SemverAtMost { key, version } => Predicate::SemverAtMost {
705            key: key.clone(),
706            version: version.clone(),
707        },
708        PredicateNodeWire::SemverCompatible { key, version } => Predicate::SemverCompatible {
709            key: key.clone(),
710            version: version.clone(),
711        },
712        PredicateNodeWire::StringPrefix { key, prefix } => Predicate::StringPrefix {
713            key: key.clone(),
714            prefix: prefix.clone(),
715        },
716        PredicateNodeWire::StringMatches { key, pattern } => Predicate::StringMatches {
717            key: key.clone(),
718            pattern: pattern.clone(),
719        },
720        PredicateNodeWire::MetadataExists { key } => Predicate::MetadataExists { key: key.clone() },
721        PredicateNodeWire::MetadataEquals { key, value } => Predicate::MetadataEquals {
722            key: key.clone(),
723            value: value.clone(),
724        },
725        PredicateNodeWire::MetadataMatches { key, pattern } => Predicate::MetadataMatches {
726            key: key.clone(),
727            pattern: pattern.clone(),
728        },
729        PredicateNodeWire::MetadataNumericAtLeast { key, threshold } => {
730            Predicate::MetadataNumericAtLeast {
731                key: key.clone(),
732                threshold: *threshold,
733            }
734        }
735        PredicateNodeWire::And { children } => {
736            check_children_below(children, idx)?;
737            let kids: Result<Vec<_>, _> = children
738                .iter()
739                .map(|&c| rebuild_predicate(nodes, c))
740                .collect();
741            Predicate::And(kids?)
742        }
743        PredicateNodeWire::Or { children } => {
744            check_children_below(children, idx)?;
745            let kids: Result<Vec<_>, _> = children
746                .iter()
747                .map(|&c| rebuild_predicate(nodes, c))
748                .collect();
749            Predicate::Or(kids?)
750        }
751        PredicateNodeWire::Not { child } => {
752            if *child >= idx {
753                return Err(PredicateWireError::CycleDetected {
754                    parent: idx,
755                    child: *child,
756                });
757            }
758            Predicate::Not(Box::new(rebuild_predicate(nodes, *child)?))
759        }
760    };
761    Ok(result)
762}
763
764/// Validate that every child index in `children` is strictly less
765/// than `parent`. Catches cycles introduced by malformed wire
766/// payloads.
767fn check_children_below(children: &[u32], parent: u32) -> Result<(), PredicateWireError> {
768    for &child in children {
769        if child >= parent {
770            return Err(PredicateWireError::CycleDetected { parent, child });
771        }
772    }
773    Ok(())
774}
775
776// =============================================================================
777// nRPC envelope integration — Phase 5.B of CAPABILITY_ENHANCEMENTS_PLAN.md.
778//
779// The cleanest place to attach a `where:` filter to an nRPC call
780// is the existing request-headers slot. Headers already carry
781// out-of-band metadata (trace context, idempotency keys,
782// content-type) and are typed as `(String, Vec<u8>)` — binary-safe,
783// per-header capped at `MAX_RPC_HEADER_VALUE_LEN` (4 KB), passed
784// through opaquely by the substrate.
785//
786// Predicate-handling code uses two helpers:
787//
788//   `predicate_to_rpc_header(&pred)` — JSON-encodes a `PredicateWire`
789//                                      into the canonical
790//                                      `net-where` header.
791//   `predicate_from_rpc_headers(headers)` — locates the header in
792//                                           a request's headers,
793//                                           decodes back to
794//                                           `Predicate`.
795//
796// Service handlers that opt in look for the header; services that
797// don't ignore it. The substrate (cortex/rpc) itself never
798// inspects the header — `eternal-rule §4: no semantic growth at
799// the substrate`. Per-binding API exposure lives in the SDK layer
800// (Phase 9b of `CAPABILITY_SYSTEM_SDK_PLAN.md`).
801//
802// JSON wire format (vs. postcard) trades ~2-3× size for human
803// readability + diff-able cross-binding fixtures. Predicates that
804// fit a typical service filter are ~200-500 bytes JSON, well
805// under the header cap.
806// =============================================================================
807
808/// Canonical header name for a predicate-pushdown filter on an
809/// nRPC request. Lowercase per HTTP-style convention; the substrate
810/// `cortex/rpc` codec passes header names through unchanged, but
811/// this constant is the one downstream callers must agree on.
812pub const RPC_WHERE_HEADER: &str = "net-where";
813
814/// Maximum size of the JSON-encoded `PredicateWire` header value.
815/// Mirrors `cortex::rpc::MAX_RPC_HEADER_VALUE_LEN`; redeclared here
816/// so the predicate helper can reject oversize encodings without
817/// pulling in the `cortex` feature gate.
818pub const MAX_PREDICATE_RPC_HEADER_VALUE_LEN: usize = 4096;
819
820/// Errors raised by [`predicate_to_rpc_header`].
821#[derive(Debug, thiserror::Error)]
822pub enum PredicateRpcEncodeError {
823    /// `serde_json::to_vec` failed on the wire-form predicate.
824    #[error("predicate wire encode failed: {0}")]
825    Encode(#[from] serde_json::Error),
826    /// The encoded payload exceeds the header-value cap.
827    #[error("predicate wire encoding {actual} bytes exceeds header cap {limit}")]
828    TooLarge {
829        /// Encoded byte length.
830        actual: usize,
831        /// Maximum permitted (`MAX_PREDICATE_RPC_HEADER_VALUE_LEN`).
832        limit: usize,
833    },
834}
835
836/// Errors raised by [`predicate_from_rpc_headers`].
837#[derive(Debug, thiserror::Error)]
838pub enum PredicateRpcDecodeError {
839    /// JSON parse failed on the header value.
840    #[error("predicate wire decode failed: {0}")]
841    Json(#[from] serde_json::Error),
842    /// Wire payload was structurally malformed (cycle, OOB index,
843    /// empty table).
844    #[error("predicate wire malformed: {0}")]
845    Wire(#[from] PredicateWireError),
846    /// Header value bytes exceeded the negotiated cap
847    /// (`MAX_PREDICATE_RPC_HEADER_VALUE_LEN`). Mirrors the encode
848    /// side's `TooLarge`; rejects parse-bombs before serde walks
849    /// the payload.
850    #[error("predicate wire payload {actual} bytes exceeds header cap {limit}")]
851    Oversize {
852        /// Observed payload size in bytes.
853        actual: usize,
854        /// Maximum permitted (`MAX_PREDICATE_RPC_HEADER_VALUE_LEN`).
855        limit: usize,
856    },
857}
858
859/// Encode a [`Predicate`] for transport in an nRPC request header.
860///
861/// Returns the canonical header tuple `(name, json_bytes)`. The
862/// service handler reading the request looks up the header by
863/// name (`RPC_WHERE_HEADER`) and decodes via
864/// [`predicate_from_rpc_headers`].
865///
866/// Phase 5.B of `CAPABILITY_ENHANCEMENTS_PLAN.md`. Round-trip
867/// pinned in `predicate_rpc_header_round_trip_*` tests.
868pub fn predicate_to_rpc_header(
869    pred: &Predicate,
870) -> Result<(String, Vec<u8>), PredicateRpcEncodeError> {
871    let wire = pred.to_wire();
872    let bytes = serde_json::to_vec(&wire)?;
873    if bytes.len() > MAX_PREDICATE_RPC_HEADER_VALUE_LEN {
874        return Err(PredicateRpcEncodeError::TooLarge {
875            actual: bytes.len(),
876            limit: MAX_PREDICATE_RPC_HEADER_VALUE_LEN,
877        });
878    }
879    Ok((RPC_WHERE_HEADER.to_string(), bytes))
880}
881
882/// Extract and decode a [`Predicate`] from a request's headers,
883/// if a `net-where` header is present.
884///
885/// Returns:
886///
887/// - `None` — no `net-where` header. Service should default
888///   to "no filter" (return all rows).
889/// - `Some(Ok(pred))` — header present, decoded cleanly. Service
890///   filters its result stream against `pred`.
891/// - `Some(Err(_))` — header present but malformed JSON or
892///   structurally invalid wire payload. Service should reject the
893///   request with a typed error rather than silently ignoring;
894///   silent skip would let a misencoded filter return more rows
895///   than the caller expected, which is a confidentiality concern
896///   in some workloads.
897///
898/// The first matching header wins — duplicate headers under the
899/// same name are not coalesced.
900///
901/// Phase 5.B of `CAPABILITY_ENHANCEMENTS_PLAN.md`.
902pub fn predicate_from_rpc_headers<H>(
903    headers: &[H],
904) -> Option<Result<Predicate, PredicateRpcDecodeError>>
905where
906    H: AsRpcHeader,
907{
908    let value = headers
909        .iter()
910        .find(|h| h.name() == RPC_WHERE_HEADER)?
911        .value();
912    // N-2 second pass: enforce the cap symmetrically with the encode
913    // side. The encode path rejects oversize payloads at line 735;
914    // pre-fix the decode path had no length check, so a peer that
915    // submitted an attacker-shaped JSON of arbitrary size let
916    // `serde_json::from_slice` plus `rebuild_predicate` walk a
917    // payload whose recursion depth was bounded only by the input
918    // size — a cheap parse-bomb DoS shape if a transport cap were
919    // ever relaxed for unrelated reasons.
920    if value.len() > MAX_PREDICATE_RPC_HEADER_VALUE_LEN {
921        return Some(Err(PredicateRpcDecodeError::Oversize {
922            actual: value.len(),
923            limit: MAX_PREDICATE_RPC_HEADER_VALUE_LEN,
924        }));
925    }
926    let result = serde_json::from_slice::<PredicateWire>(value)
927        .map_err(PredicateRpcDecodeError::Json)
928        .and_then(|wire| wire.into_predicate().map_err(PredicateRpcDecodeError::Wire));
929    Some(result)
930}
931
932/// Adapter trait letting [`predicate_from_rpc_headers`] consume any
933/// shape that exposes a `(name, value)` view. Generic over both
934/// `(String, Vec<u8>)` (the substrate's `RpcHeader` alias) and
935/// any binding-side wrapper that exposes name + value accessors.
936pub trait AsRpcHeader {
937    /// Header name (case-sensitive match against `RPC_WHERE_HEADER`).
938    fn name(&self) -> &str;
939    /// Header value bytes.
940    fn value(&self) -> &[u8];
941}
942
943impl AsRpcHeader for (String, Vec<u8>) {
944    fn name(&self) -> &str {
945        &self.0
946    }
947    fn value(&self) -> &[u8] {
948        &self.1
949    }
950}
951
952impl AsRpcHeader for &(String, Vec<u8>) {
953    fn name(&self) -> &str {
954        &self.0
955    }
956    fn value(&self) -> &[u8] {
957        &self.1
958    }
959}
960
961// =============================================================================
962// Service-side row filter ergonomics — Phase 5.B follow-on of
963// CAPABILITY_ENHANCEMENTS_PLAN.md.
964//
965// The Phase 5.B helpers (`predicate_to_rpc_header` /
966// `predicate_from_rpc_headers`) move predicates across the wire,
967// but service handlers still have to manually construct an
968// `EvalContext` per row and dispatch through `Predicate::evaluate`.
969// These helpers close that gap:
970//
971//   - `Predicate::matches_capability_set(caps)` — single-row match
972//     against a `CapabilitySet`.
973//   - `RpcPredicateContext` trait — application rows expose tags +
974//     metadata for predicate evaluation.
975//   - `filter_by_predicate(rows, pred)` — iterator combinator that
976//     skips rows the predicate filters out.
977//
978// All three handle the `Option<&Predicate>` shape returned by
979// `predicate_from_rpc_headers` ergonomically — `None` means "no
980// filter, all rows match".
981// =============================================================================
982
983impl Predicate {
984    /// True if this predicate evaluates to true against the
985    /// given [`super::capability::CapabilitySet`]'s tags + metadata.
986    ///
987    /// Materializes `caps.tags` (a `HashSet<Tag>`) as a `Vec<Tag>`
988    /// for the slice-based `EvalContext`. The cost is a single
989    /// allocation per call; for hot loops over many capability
990    /// sets, callers may prefer to materialize tags once and
991    /// invoke [`Self::evaluate`] directly.
992    pub fn matches_capability_set(
993        &self,
994        caps: &crate::adapter::net::behavior::CapabilitySet,
995    ) -> bool {
996        let tags: Vec<Tag> = caps.tags.iter().cloned().collect();
997        let ctx = EvalContext::new(&tags, &caps.metadata);
998        self.evaluate(&ctx)
999    }
1000}
1001
1002/// Application-row adapter for predicate evaluation.
1003///
1004/// Service handlers that filter custom row types (training jobs,
1005/// documents, sensor readings, …) implement this trait on their
1006/// row to expose tags + metadata to the predicate. The
1007/// [`filter_by_predicate`] helper then provides a one-line
1008/// filter pattern over any iterator of `RpcPredicateContext` rows.
1009///
1010/// Phase 5.B follow-on of `CAPABILITY_ENHANCEMENTS_PLAN.md`.
1011///
1012/// ```ignore
1013/// struct TrainingJob {
1014///     tags: Vec<Tag>,
1015///     metadata: BTreeMap<String, String>,
1016///     payload: ...,
1017/// }
1018///
1019/// impl RpcPredicateContext for TrainingJob {
1020///     fn rpc_predicate_tags(&self) -> &[Tag] { &self.tags }
1021///     fn rpc_predicate_metadata(&self) -> &BTreeMap<String, String> {
1022///         &self.metadata
1023///     }
1024/// }
1025/// ```
1026pub trait RpcPredicateContext {
1027    /// Tags the predicate's axis-keyed clauses match against.
1028    fn rpc_predicate_tags(&self) -> &[Tag];
1029    /// Metadata the predicate's metadata-keyed clauses match against.
1030    fn rpc_predicate_metadata(&self) -> &BTreeMap<String, String>;
1031}
1032
1033/// Filter `rows` by an optional predicate.
1034///
1035/// `pred = None` returns all rows (the no-filter case — the
1036/// caller's request didn't include a `net-where` header).
1037/// `pred = Some(p)` returns only rows where `p` evaluates to true
1038/// against the row's tags + metadata.
1039///
1040/// Service handler usage:
1041///
1042/// ```ignore
1043/// let pred_opt = predicate_from_rpc_headers(&request.headers).transpose()?;
1044/// let matched: Vec<TrainingJob> =
1045///     filter_by_predicate(jobs, pred_opt.as_ref()).collect();
1046/// ```
1047///
1048/// Phase 5.B follow-on of `CAPABILITY_ENHANCEMENTS_PLAN.md`.
1049pub fn filter_by_predicate<R, I>(rows: I, pred: Option<&Predicate>) -> impl Iterator<Item = R> + '_
1050where
1051    R: RpcPredicateContext,
1052    I: IntoIterator<Item = R>,
1053    I::IntoIter: 'static,
1054{
1055    rows.into_iter().filter(move |row| match pred {
1056        None => true,
1057        Some(p) => {
1058            let ctx = EvalContext::new(row.rpc_predicate_tags(), row.rpc_predicate_metadata());
1059            p.evaluate(&ctx)
1060        }
1061    })
1062}
1063
1064// =============================================================================
1065// Constructors
1066// =============================================================================
1067
1068impl Predicate {
1069    /// Build [`Predicate::Exists`] from a [`TagKey`].
1070    pub fn exists(key: TagKey) -> Self {
1071        Self::Exists { key }
1072    }
1073
1074    /// Build [`Predicate::Equals`] from a key + value.
1075    pub fn equals(key: TagKey, value: impl Into<String>) -> Self {
1076        Self::Equals {
1077            key,
1078            value: value.into(),
1079        }
1080    }
1081
1082    /// Build [`Predicate::NumericAtLeast`].
1083    pub fn numeric_at_least(key: TagKey, threshold: f64) -> Self {
1084        Self::NumericAtLeast { key, threshold }
1085    }
1086
1087    /// Build [`Predicate::NumericAtMost`].
1088    pub fn numeric_at_most(key: TagKey, threshold: f64) -> Self {
1089        Self::NumericAtMost { key, threshold }
1090    }
1091
1092    /// Build [`Predicate::NumericInRange`].
1093    pub fn numeric_in_range(key: TagKey, min: f64, max: f64) -> Self {
1094        Self::NumericInRange { key, min, max }
1095    }
1096
1097    /// Build [`Predicate::SemverAtLeast`].
1098    pub fn semver_at_least(key: TagKey, version: impl Into<String>) -> Self {
1099        Self::SemverAtLeast {
1100            key,
1101            version: version.into(),
1102        }
1103    }
1104
1105    /// Build [`Predicate::SemverAtMost`].
1106    pub fn semver_at_most(key: TagKey, version: impl Into<String>) -> Self {
1107        Self::SemverAtMost {
1108            key,
1109            version: version.into(),
1110        }
1111    }
1112
1113    /// Build [`Predicate::SemverCompatible`].
1114    pub fn semver_compatible(key: TagKey, version: impl Into<String>) -> Self {
1115        Self::SemverCompatible {
1116            key,
1117            version: version.into(),
1118        }
1119    }
1120
1121    /// Build [`Predicate::StringPrefix`].
1122    pub fn string_prefix(key: TagKey, prefix: impl Into<String>) -> Self {
1123        Self::StringPrefix {
1124            key,
1125            prefix: prefix.into(),
1126        }
1127    }
1128
1129    /// Build [`Predicate::StringMatches`].
1130    pub fn string_matches(key: TagKey, pattern: impl Into<String>) -> Self {
1131        Self::StringMatches {
1132            key,
1133            pattern: pattern.into(),
1134        }
1135    }
1136
1137    /// Build [`Predicate::MetadataExists`].
1138    pub fn metadata_exists(key: impl Into<String>) -> Self {
1139        Self::MetadataExists { key: key.into() }
1140    }
1141
1142    /// Build [`Predicate::MetadataEquals`].
1143    pub fn metadata_equals(key: impl Into<String>, value: impl Into<String>) -> Self {
1144        Self::MetadataEquals {
1145            key: key.into(),
1146            value: value.into(),
1147        }
1148    }
1149
1150    /// Build [`Predicate::MetadataMatches`].
1151    pub fn metadata_matches(key: impl Into<String>, pattern: impl Into<String>) -> Self {
1152        Self::MetadataMatches {
1153            key: key.into(),
1154            pattern: pattern.into(),
1155        }
1156    }
1157
1158    /// Build [`Predicate::MetadataNumericAtLeast`].
1159    pub fn metadata_numeric_at_least(key: impl Into<String>, threshold: f64) -> Self {
1160        Self::MetadataNumericAtLeast {
1161            key: key.into(),
1162            threshold,
1163        }
1164    }
1165
1166    /// Build [`Predicate::And`] from a `Vec` of clauses.
1167    pub fn and(clauses: Vec<Predicate>) -> Self {
1168        Self::And(clauses)
1169    }
1170
1171    /// Build [`Predicate::Or`] from a `Vec` of clauses.
1172    pub fn or(clauses: Vec<Predicate>) -> Self {
1173        Self::Or(clauses)
1174    }
1175
1176    /// Build [`Predicate::Not`] wrapping a single clause.
1177    ///
1178    /// Named `not` to match `and` / `or` as a constructor —
1179    /// not an `Op<Output = Predicate>` impl. Implementing
1180    /// `std::ops::Not` would force callers to depend on
1181    /// `Predicate: Not` for the `!` operator, which requires
1182    /// `Predicate: Sized + Not<Output = ?>` boilerplate without
1183    /// any expressivity gain over the explicit constructor.
1184    #[allow(clippy::should_implement_trait)]
1185    pub fn not(inner: Predicate) -> Self {
1186        Self::Not(Box::new(inner))
1187    }
1188}
1189
1190// =============================================================================
1191// Evaluation
1192// =============================================================================
1193
1194impl Predicate {
1195    /// Evaluate against `(tags, metadata)`. Pure function.
1196    ///
1197    /// Phase 4 of `CAPABILITY_ENHANCEMENTS_PLAN.md`: at every
1198    /// `And` / `Or` node, children are evaluated in cost-ascending
1199    /// order so cheap+selective clauses short-circuit first. The
1200    /// reordering is a pure local optimization — semantics are
1201    /// identical to [`Self::evaluate_unplanned`]. Pinned by the
1202    /// `planned_evaluate_matches_unplanned_*` property tests.
1203    ///
1204    /// Numeric / semver parse failures yield `false` (a malformed
1205    /// tag value shouldn't fault a federated query).
1206    pub fn evaluate(&self, ctx: &EvalContext<'_>) -> bool {
1207        match self {
1208            Self::And(children) => Self::eval_all_in_cost_order(children, ctx),
1209            Self::Or(children) => Self::eval_any_in_cost_order(children, ctx),
1210            Self::Not(inner) => !inner.evaluate(ctx),
1211            other => other.evaluate_leaf(ctx),
1212        }
1213    }
1214
1215    /// Evaluate without the planner — children of `And` / `Or` run
1216    /// in declaration order.
1217    ///
1218    /// Phase 4 escape hatch for benchmarking and the planner-
1219    /// equivalence property tests. Production callers should use
1220    /// [`Self::evaluate`]; this is a diagnostic surface only.
1221    pub fn evaluate_unplanned(&self, ctx: &EvalContext<'_>) -> bool {
1222        match self {
1223            Self::And(children) => children.iter().all(|c| c.evaluate_unplanned(ctx)),
1224            Self::Or(children) => children.iter().any(|c| c.evaluate_unplanned(ctx)),
1225            Self::Not(inner) => !inner.evaluate_unplanned(ctx),
1226            other => other.evaluate_leaf(ctx),
1227        }
1228    }
1229
1230    /// Evaluate a leaf predicate (anything except `And` / `Or` /
1231    /// `Not`). Shared between [`Self::evaluate`] and
1232    /// [`Self::evaluate_unplanned`] so the leaf logic lives in one
1233    /// place and the two entry points only differ in their
1234    /// composite handling.
1235    fn evaluate_leaf(&self, ctx: &EvalContext<'_>) -> bool {
1236        match self {
1237            // Presence check: matches both `AxisPresent` and
1238            // `AxisValue` for `key`. Cannot route through
1239            // `match_axis_tag` because that helper now skips
1240            // `AxisPresent` (presence-only tags carry no value;
1241            // value predicates would otherwise see `""`).
1242            Self::Exists { key } => ctx
1243                .tags
1244                .iter()
1245                .any(|t| matches!(t.axis_key_ref(), Some((a, k)) if a == key.axis && k == key.key)),
1246            Self::Equals { key, value } => match_axis_tag(ctx.tags, key, |v| v == value.as_str()),
1247            Self::NumericAtLeast { key, threshold } => match_axis_tag(ctx.tags, key, |v| {
1248                v.parse::<f64>().is_ok_and(|n| n >= *threshold)
1249            }),
1250            Self::NumericAtMost { key, threshold } => match_axis_tag(ctx.tags, key, |v| {
1251                v.parse::<f64>().is_ok_and(|n| n <= *threshold)
1252            }),
1253            Self::NumericInRange { key, min, max } => match_axis_tag(ctx.tags, key, |v| {
1254                v.parse::<f64>().is_ok_and(|n| n >= *min && n <= *max)
1255            }),
1256            Self::SemverAtLeast { key, version } => {
1257                let Some(rhs) = parse_semver(version) else {
1258                    return false;
1259                };
1260                match_axis_tag(ctx.tags, key, |v| {
1261                    parse_semver(v).is_some_and(|lhs| lhs >= rhs)
1262                })
1263            }
1264            Self::SemverAtMost { key, version } => {
1265                let Some(rhs) = parse_semver(version) else {
1266                    return false;
1267                };
1268                match_axis_tag(ctx.tags, key, |v| {
1269                    parse_semver(v).is_some_and(|lhs| lhs <= rhs)
1270                })
1271            }
1272            Self::SemverCompatible { key, version } => {
1273                let Some(rhs) = parse_semver(version) else {
1274                    return false;
1275                };
1276                match_axis_tag(ctx.tags, key, |v| {
1277                    parse_semver(v).is_some_and(|lhs| semver_compatible(lhs, rhs))
1278                })
1279            }
1280            Self::StringPrefix { key, prefix } => {
1281                match_axis_tag(ctx.tags, key, |v| v.starts_with(prefix.as_str()))
1282            }
1283            Self::StringMatches { key, pattern } => {
1284                match_axis_tag(ctx.tags, key, |v| v.contains(pattern.as_str()))
1285            }
1286            Self::MetadataExists { key } => ctx.metadata.contains_key(key),
1287            Self::MetadataEquals { key, value } => {
1288                ctx.metadata.get(key).is_some_and(|v| v == value)
1289            }
1290            Self::MetadataMatches { key, pattern } => ctx
1291                .metadata
1292                .get(key)
1293                .is_some_and(|v| v.contains(pattern.as_str())),
1294            Self::MetadataNumericAtLeast { key, threshold } => ctx
1295                .metadata
1296                .get(key)
1297                .and_then(|v| v.parse::<f64>().ok())
1298                .is_some_and(|n| n >= *threshold),
1299            // Composite variants are routed through `evaluate` /
1300            // `evaluate_unplanned`, never reach this leaf-only path.
1301            Self::And(_) | Self::Or(_) | Self::Not(_) => unreachable!(
1302                "evaluate_leaf called with a composite Predicate; \
1303                 routing bug in evaluate / evaluate_unplanned"
1304            ),
1305        }
1306    }
1307
1308    /// `And` short-circuit evaluation in cost-ascending child order.
1309    ///
1310    /// PERF_AUDIT §4.7 — pre-fix this allocated a fresh
1311    /// `Vec<usize>` per evaluation and called
1312    /// `sort_by_key(|&i| children[i].static_cost())`, which
1313    /// re-runs `static_cost()` on each comparison;
1314    /// `static_cost()` itself recurses through the subtree, so
1315    /// for an N-child And node every row paid
1316    /// O(N · subtree_size · log N) just to plan the walk. With
1317    /// the fixed predicates `find_nodes_matching` evaluates per
1318    /// row, that work is invariant — we want to do it once.
1319    ///
1320    /// The change: precompute each child's `(index, cost)` into a
1321    /// small stack array (common case ≤ 8 children — well within
1322    /// the structural cap callers honor), sort the precomputed
1323    /// tuples, then walk. Cost computation runs exactly once per
1324    /// child per evaluation (vs O(log N) times under the prior
1325    /// `sort_by_key` shape). For deeper And nodes we still
1326    /// allocate, but the typical case avoids both the Vec alloc
1327    /// and the redundant cost recompute.
1328    fn eval_all_in_cost_order(children: &[Predicate], ctx: &EvalContext<'_>) -> bool {
1329        // First false wins; vacuously true — `.all()` semantics.
1330        Self::eval_in_cost_order(children, ctx, false)
1331    }
1332
1333    /// Shared planner core for the And/Or cost-ordered walks.
1334    /// Walks `children` cheapest-first; the first child that
1335    /// evaluates to `stop_on` short-circuits and returns
1336    /// `stop_on`; if none does, returns `!stop_on`. And-semantics
1337    /// = `stop_on == false` (first false wins, vacuously true);
1338    /// Or-semantics = `stop_on == true` (first true wins,
1339    /// vacuously false).
1340    ///
1341    /// Sort key is `(cost, index)` — the index tiebreak keeps
1342    /// equal-cost children in declaration order, matching the
1343    /// pre-§4.7 STABLE `sort_by_key` exactly (an unstable sort
1344    /// on cost alone would reorder ties). Evaluation is pure so
1345    /// results can't differ, but the deterministic walk order
1346    /// keeps timing/short-circuit behavior reproducible.
1347    fn eval_in_cost_order(children: &[Predicate], ctx: &EvalContext<'_>, stop_on: bool) -> bool {
1348        const STACK_CAP: usize = 8;
1349        if children.len() <= STACK_CAP {
1350            let mut tuples: [(usize, u32); STACK_CAP] = [(0, 0); STACK_CAP];
1351            for (i, child) in children.iter().enumerate() {
1352                tuples[i] = (i, child.static_cost());
1353            }
1354            tuples[..children.len()].sort_unstable_by_key(|&(i, c)| (c, i));
1355            for &(i, _) in &tuples[..children.len()] {
1356                if children[i].evaluate(ctx) == stop_on {
1357                    return stop_on;
1358                }
1359            }
1360            !stop_on
1361        } else {
1362            let mut tuples: Vec<(usize, u32)> = children
1363                .iter()
1364                .enumerate()
1365                .map(|(i, c)| (i, c.static_cost()))
1366                .collect();
1367            tuples.sort_unstable_by_key(|&(i, c)| (c, i));
1368            for &(i, _) in &tuples {
1369                if children[i].evaluate(ctx) == stop_on {
1370                    return stop_on;
1371                }
1372            }
1373            !stop_on
1374        }
1375    }
1376
1377    /// `Or` short-circuit evaluation in cost-ascending child order.
1378    ///
1379    /// CR-18: this uses the same `static_cost` as the And path,
1380    /// not a mirrored Or-specific function. The And-vs-Or
1381    /// asymmetry (And wants rare-true clauses first to fail
1382    /// fast; Or wants often-true clauses first to succeed fast)
1383    /// is encoded in the index-aware path via
1384    /// [`Self::dynamic_cost`] vs [`Self::dynamic_cost_or`], which
1385    /// invert the cardinality direction. Without an index the
1386    /// planner has no per-clause trueness signal, so it falls
1387    /// back to ordering by raw evaluation work — neutral between
1388    /// And and Or, which is the best we can do here.
1389    ///
1390    /// PERF_AUDIT §4.7 — same precomputed-tuple walk as
1391    /// `eval_all_in_cost_order`; both delegate to
1392    /// [`Self::eval_in_cost_order`].
1393    fn eval_any_in_cost_order(children: &[Predicate], ctx: &EvalContext<'_>) -> bool {
1394        // First true wins; vacuously false — `.any()` semantics.
1395        Self::eval_in_cost_order(children, ctx, true)
1396    }
1397
1398    /// Static cost estimate for the planner. Lower = cheaper to
1399    /// evaluate; planner sorts children ascending.
1400    ///
1401    /// Phase 4 first cut uses fixed-per-variant costs (no index
1402    /// integration). The ordering reflects empirical evaluation
1403    /// cost: hashmap lookups < tag-set scans with simple parses
1404    /// < substring scans < semver parses.
1405    ///
1406    /// Composite costs sum the children's costs, so a deeply
1407    /// nested branch is heavier than a shallow one with the same
1408    /// leaf shape.
1409    fn static_cost(&self) -> u32 {
1410        match self {
1411            // Tier 1: O(1) hashmap lookup.
1412            Self::MetadataExists { .. } => 10,
1413            Self::MetadataEquals { .. } => 11,
1414            // Tier 2: O(N) tag-set scan with cheap value handling.
1415            Self::Exists { .. } => 20,
1416            Self::Equals { .. } => 21,
1417            Self::MetadataNumericAtLeast { .. } => 25,
1418            // Tier 3: O(N) tag-set scan + numeric parse per match.
1419            Self::NumericAtLeast { .. }
1420            | Self::NumericAtMost { .. }
1421            | Self::NumericInRange { .. } => 30,
1422            // Tier 4: O(N) scan + substring / prefix scan.
1423            Self::StringPrefix { .. } => 40,
1424            Self::MetadataMatches { .. } => 45,
1425            Self::StringMatches { .. } => 50,
1426            // Tier 5: semver triple parse (heaviest leaf).
1427            Self::SemverAtLeast { .. }
1428            | Self::SemverAtMost { .. }
1429            | Self::SemverCompatible { .. } => 60,
1430            // Composites: sum of children. Caps avoid u32 overflow
1431            // by saturating at u32::MAX (a predicate this big
1432            // would have a different problem already).
1433            Self::And(c) | Self::Or(c) => c
1434                .iter()
1435                .map(|c| c.static_cost())
1436                .fold(0u32, |a, b| a.saturating_add(b)),
1437            Self::Not(inner) => inner.static_cost(),
1438        }
1439    }
1440
1441    /// Cardinality-aware cost estimate. Refines [`Self::static_cost`]
1442    /// with per-key distinct-value counts from a
1443    /// [`CardinalityProvider`](crate::adapter::net::behavior::CardinalityProvider).
1444    ///
1445    /// Phase 4 follow-on of `CAPABILITY_ENHANCEMENTS_PLAN.md`. A
1446    /// leaf clause keyed on a high-cardinality tag (many distinct
1447    /// values across nodes) is more selective than one keyed on
1448    /// a low-cardinality tag — running it first short-circuits
1449    /// faster on the common-mismatch case.
1450    ///
1451    /// The intuition: an `Equals(key, v)` clause has roughly
1452    /// `1 / cardinality` chance of matching a uniformly-random
1453    /// node, so expected work is `static_cost / cardinality`.
1454    ///
1455    /// Behavior:
1456    ///
1457    /// - Tag-keyed leaves (Exists / Equals / Numeric* / Semver* /
1458    ///   String*): `static_cost / max(1, cardinality)`. A
1459    ///   cardinality of zero (key not yet indexed) falls back to
1460    ///   raw `static_cost` — conservative.
1461    /// - Metadata leaves: `static_cost` unchanged. The
1462    ///   provider trait doesn't track metadata cardinality by
1463    ///   default (Phase D / E may add a metadata index; lands then).
1464    /// - Composites: sum of child dynamic costs (saturating).
1465    /// - `Not`: passes through inner cost.
1466    fn dynamic_cost<P: crate::adapter::net::behavior::CardinalityProvider>(
1467        &self,
1468        index: &P,
1469    ) -> u32 {
1470        match self {
1471            // Tag-keyed leaves: static_cost / cardinality.
1472            Self::Exists { key }
1473            | Self::Equals { key, .. }
1474            | Self::NumericAtLeast { key, .. }
1475            | Self::NumericAtMost { key, .. }
1476            | Self::NumericInRange { key, .. }
1477            | Self::SemverAtLeast { key, .. }
1478            | Self::SemverAtMost { key, .. }
1479            | Self::SemverCompatible { key, .. }
1480            | Self::StringPrefix { key, .. }
1481            | Self::StringMatches { key, .. } => {
1482                let static_c = self.static_cost();
1483                let cardinality = index.axis_cardinality(key);
1484                if cardinality > 0 {
1485                    static_c.saturating_div(u32::try_from(cardinality).unwrap_or(u32::MAX).max(1))
1486                } else {
1487                    // Key absent from the index — could be a brand-new
1488                    // tag the substrate hasn't observed yet. Conservatively
1489                    // keep static_cost so we don't underestimate work.
1490                    static_c
1491                }
1492            }
1493            // Metadata leaves: refine via the index's metadata
1494            // cardinality tracking (mirrors the axis-tag side).
1495            Self::MetadataExists { key }
1496            | Self::MetadataEquals { key, .. }
1497            | Self::MetadataMatches { key, .. }
1498            | Self::MetadataNumericAtLeast { key, .. } => {
1499                let static_c = self.static_cost();
1500                let cardinality = index.metadata_value_cardinality(key);
1501                if cardinality > 0 {
1502                    static_c.saturating_div(u32::try_from(cardinality).unwrap_or(u32::MAX).max(1))
1503                } else {
1504                    // Key absent from index → fall back to static cost.
1505                    static_c
1506                }
1507            }
1508            // Composites: sum of children's dynamic costs.
1509            Self::And(c) | Self::Or(c) => c
1510                .iter()
1511                .map(|c| c.dynamic_cost(index))
1512                .fold(0u32, |a, b| a.saturating_add(b)),
1513            Self::Not(inner) => inner.dynamic_cost(index),
1514        }
1515    }
1516
1517    /// Evaluate against `ctx`, using `index`'s per-key cardinality
1518    /// data to refine the planner's clause ordering at every
1519    /// `And` / `Or` node.
1520    ///
1521    /// Phase 4 follow-on of `CAPABILITY_ENHANCEMENTS_PLAN.md`.
1522    /// Produces the same boolean result as
1523    /// [`Self::evaluate_unplanned`] for any `(ast, ctx)`; the index
1524    /// only changes execution order, not semantics. Pinned in the
1525    /// `index_planner_evaluate_matches_unplanned_*` property tests.
1526    ///
1527    /// When the index is available, prefer this entry point over
1528    /// [`Self::evaluate`] (static-cost planner) — cardinality data
1529    /// catches selective clauses the static planner misses (e.g.,
1530    /// a `MetadataEquals` happens to be the cheapest leaf
1531    /// statically, but a high-cardinality `Equals` on an axis tag
1532    /// is even more selective in this index's data).
1533    ///
1534    /// When the index is unavailable or unhelpful (zero-cardinality
1535    /// for every key — empty index), this falls back to behavior
1536    /// equivalent to [`Self::evaluate`].
1537    pub fn evaluate_with_index<P: crate::adapter::net::behavior::CardinalityProvider>(
1538        &self,
1539        ctx: &EvalContext<'_>,
1540        index: &P,
1541    ) -> bool {
1542        match self {
1543            Self::And(children) => Self::eval_all_with_index(children, ctx, index),
1544            Self::Or(children) => Self::eval_any_with_index(children, ctx, index),
1545            Self::Not(inner) => !inner.evaluate_with_index(ctx, index),
1546            other => other.evaluate_leaf(ctx),
1547        }
1548    }
1549
1550    /// `And` short-circuit evaluation in dynamic-cost-ascending
1551    /// child order.
1552    fn eval_all_with_index<P: crate::adapter::net::behavior::CardinalityProvider>(
1553        children: &[Predicate],
1554        ctx: &EvalContext<'_>,
1555        index: &P,
1556    ) -> bool {
1557        let mut order: Vec<usize> = (0..children.len()).collect();
1558        order.sort_by_key(|&i| children[i].dynamic_cost(index));
1559        order
1560            .into_iter()
1561            .all(|i| children[i].evaluate_with_index(ctx, index))
1562    }
1563
1564    /// `Or` short-circuit evaluation in Or-mode-cost-ascending
1565    /// child order.
1566    ///
1567    /// Phase 4 final close of `CAPABILITY_ENHANCEMENTS_PLAN.md`.
1568    /// Uses [`Self::dynamic_cost_or`] (the inverted formula
1569    /// favoring low-cardinality "often-true" clauses) instead of
1570    /// the And-mode [`Self::dynamic_cost`]. The asymmetry matches
1571    /// short-circuit semantics: And short-circuits on first false
1572    /// (run rare-true clauses first), Or short-circuits on first
1573    /// true (run often-true clauses first).
1574    fn eval_any_with_index<P: crate::adapter::net::behavior::CardinalityProvider>(
1575        children: &[Predicate],
1576        ctx: &EvalContext<'_>,
1577        index: &P,
1578    ) -> bool {
1579        let mut order: Vec<usize> = (0..children.len()).collect();
1580        order.sort_by_key(|&i| children[i].dynamic_cost_or(index));
1581        order
1582            .into_iter()
1583            .any(|i| children[i].evaluate_with_index(ctx, index))
1584    }
1585
1586    /// Or-mode dynamic cost. Inverts the cardinality direction
1587    /// from [`Self::dynamic_cost`] so low-cardinality clauses
1588    /// (likely to match many candidates → often-true) sort first.
1589    ///
1590    /// Phase 4 final close of `CAPABILITY_ENHANCEMENTS_PLAN.md`.
1591    ///
1592    /// Behavior at leaves:
1593    ///
1594    /// - Tag-keyed leaves: `static_cost × max(1, cardinality)`.
1595    ///   High cardinality → many distinct values → each rare → high
1596    ///   Or-cost (run later). Low cardinality → matches concentrated
1597    ///   on few values → each common → low Or-cost (run first).
1598    /// - Metadata leaves: same shape against
1599    ///   `metadata_value_cardinality`.
1600    /// - Cardinality-0 (key absent from index) → fall back to
1601    ///   `static_cost`, conservative.
1602    ///
1603    /// Behavior at composites:
1604    ///
1605    /// - `And(children)` recurses with And-mode `dynamic_cost`
1606    ///   (the And's own internal ordering).
1607    /// - `Or(children)` recurses with Or-mode `dynamic_cost_or`.
1608    /// - `Not(inner)` passes through the same Or-mode recursion.
1609    ///
1610    /// Note: this is a leaf-level asymmetry. A rigorous treatment
1611    /// would also penalize And-as-Or-child with a "rare-true"
1612    /// score (since an And is true only when all children are
1613    /// true), but doing that requires modeling per-clause
1614    /// truthiness probability (a separate piece of work). For
1615    /// typical predicate shapes (mostly leaf-or-mixed, not
1616    /// deeply-nested And-of-Or-of-And), the leaf-level
1617    /// asymmetry catches the load-bearing case.
1618    fn dynamic_cost_or<P: crate::adapter::net::behavior::CardinalityProvider>(
1619        &self,
1620        index: &P,
1621    ) -> u32 {
1622        match self {
1623            Self::Exists { key }
1624            | Self::Equals { key, .. }
1625            | Self::NumericAtLeast { key, .. }
1626            | Self::NumericAtMost { key, .. }
1627            | Self::NumericInRange { key, .. }
1628            | Self::SemverAtLeast { key, .. }
1629            | Self::SemverAtMost { key, .. }
1630            | Self::SemverCompatible { key, .. }
1631            | Self::StringPrefix { key, .. }
1632            | Self::StringMatches { key, .. } => {
1633                let static_c = self.static_cost();
1634                let cardinality = index.axis_cardinality(key);
1635                if cardinality == 0 {
1636                    return static_c;
1637                }
1638                static_c.saturating_mul(u32::try_from(cardinality).unwrap_or(u32::MAX).max(1))
1639            }
1640            Self::MetadataExists { key }
1641            | Self::MetadataEquals { key, .. }
1642            | Self::MetadataMatches { key, .. }
1643            | Self::MetadataNumericAtLeast { key, .. } => {
1644                let static_c = self.static_cost();
1645                let cardinality = index.metadata_value_cardinality(key);
1646                if cardinality == 0 {
1647                    return static_c;
1648                }
1649                static_c.saturating_mul(u32::try_from(cardinality).unwrap_or(u32::MAX).max(1))
1650            }
1651            // Composites: recurse with mode appropriate to the
1652            // composite's own type. This is a leaf-level asymmetry —
1653            // the cost reflects the composite's own internal
1654            // expected work, not its truthiness probability.
1655            Self::And(c) => c
1656                .iter()
1657                .map(|c| c.dynamic_cost(index))
1658                .fold(0u32, |a, b| a.saturating_add(b)),
1659            Self::Or(c) => c
1660                .iter()
1661                .map(|c| c.dynamic_cost_or(index))
1662                .fold(0u32, |a, b| a.saturating_add(b)),
1663            Self::Not(inner) => inner.dynamic_cost_or(index),
1664        }
1665    }
1666}
1667
1668// =============================================================================
1669// Debug session — Phase 6 of CAPABILITY_ENHANCEMENTS_PLAN.md.
1670//
1671// `Predicate::evaluate_with_trace` instruments a single evaluation,
1672// producing a tree of clause traces showing which clauses ran and
1673// what they returned. `PredicateDebugReport` aggregates traces over
1674// a candidate corpus into per-clause hit/miss stats plus a printable
1675// summary.
1676//
1677// Opt-in only — production hot paths use `evaluate()`, never this
1678// path. The instrumentation overhead is dominated by the per-clause
1679// label allocation (`format!`); production-grade ~5% overhead is
1680// achievable but the current implementation favors simplicity.
1681// =============================================================================
1682
1683/// Tree-shaped trace from one debug evaluation against a single
1684/// `EvalContext`. Mirrors the AST of the predicate that was
1685/// evaluated, except `And` / `Or` short-circuits drop unevaluated
1686/// siblings — the trace only carries clauses that actually ran.
1687///
1688/// Phase 6 of `CAPABILITY_ENHANCEMENTS_PLAN.md`. Returned by
1689/// [`Predicate::evaluate_with_trace`].
1690#[derive(Debug, Clone, PartialEq)]
1691pub struct ClauseTrace {
1692    /// One-line summary of the clause (`"Exists(hardware.gpu)"`,
1693    /// `"And(3 clauses)"`, `"MetadataEquals(intent=ml-training)"`).
1694    /// Aggregated stats merge by label, so two structurally-equal
1695    /// leaf clauses share one entry in the report.
1696    pub label: String,
1697    /// Final result of evaluating this clause.
1698    pub result: bool,
1699    /// Children traces in evaluation order. For `And` / `Or` this is
1700    /// the planner-ordered (cost-ascending) sequence of children
1701    /// that actually ran (short-circuited siblings are absent).
1702    /// `Not` has exactly one child. Leaves have an empty children
1703    /// list.
1704    pub children: Vec<ClauseTrace>,
1705}
1706
1707impl Predicate {
1708    /// Evaluate against `ctx`, also producing a tree of per-clause
1709    /// traces.
1710    ///
1711    /// The result equals `self.evaluate(ctx)`; this entry point adds
1712    /// the [`ClauseTrace`] tree as a side channel for debug
1713    /// inspection. Composite clauses retain the planner's
1714    /// short-circuit behavior — descendants that didn't run aren't
1715    /// in the trace.
1716    ///
1717    /// Phase 6 of `CAPABILITY_ENHANCEMENTS_PLAN.md`. Opt-in only;
1718    /// production callers use [`Predicate::evaluate`].
1719    pub fn evaluate_with_trace(&self, ctx: &EvalContext<'_>) -> (bool, ClauseTrace) {
1720        let label = self.debug_label();
1721        match self {
1722            Self::And(children) => {
1723                let mut order: Vec<usize> = (0..children.len()).collect();
1724                order.sort_by_key(|&i| children[i].static_cost());
1725                let mut traces = Vec::with_capacity(order.len());
1726                let mut result = true;
1727                for i in order {
1728                    let (r, t) = children[i].evaluate_with_trace(ctx);
1729                    traces.push(t);
1730                    if !r {
1731                        result = false;
1732                        break;
1733                    }
1734                }
1735                (
1736                    result,
1737                    ClauseTrace {
1738                        label,
1739                        result,
1740                        children: traces,
1741                    },
1742                )
1743            }
1744            Self::Or(children) => {
1745                let mut order: Vec<usize> = (0..children.len()).collect();
1746                order.sort_by_key(|&i| children[i].static_cost());
1747                let mut traces = Vec::with_capacity(order.len());
1748                let mut result = false;
1749                for i in order {
1750                    let (r, t) = children[i].evaluate_with_trace(ctx);
1751                    traces.push(t);
1752                    if r {
1753                        result = true;
1754                        break;
1755                    }
1756                }
1757                (
1758                    result,
1759                    ClauseTrace {
1760                        label,
1761                        result,
1762                        children: traces,
1763                    },
1764                )
1765            }
1766            Self::Not(inner) => {
1767                let (r, t) = inner.evaluate_with_trace(ctx);
1768                (
1769                    !r,
1770                    ClauseTrace {
1771                        label,
1772                        result: !r,
1773                        children: vec![t],
1774                    },
1775                )
1776            }
1777            leaf => {
1778                let result = leaf.evaluate_leaf(ctx);
1779                (
1780                    result,
1781                    ClauseTrace {
1782                        label,
1783                        result,
1784                        children: Vec::new(),
1785                    },
1786                )
1787            }
1788        }
1789    }
1790
1791    /// One-line debug label for this clause. Used by
1792    /// [`ClauseTrace`] and [`PredicateDebugReport`] to identify
1793    /// clauses in human-readable output.
1794    fn debug_label(&self) -> String {
1795        match self {
1796            Self::Exists { key } => format!("Exists({key})"),
1797            Self::Equals { key, value } => format!("Equals({key}={value})"),
1798            Self::NumericAtLeast { key, threshold } => {
1799                format!("NumericAtLeast({key} >= {threshold})")
1800            }
1801            Self::NumericAtMost { key, threshold } => {
1802                format!("NumericAtMost({key} <= {threshold})")
1803            }
1804            Self::NumericInRange { key, min, max } => {
1805                format!("NumericInRange({key} in [{min}, {max}])")
1806            }
1807            Self::SemverAtLeast { key, version } => {
1808                format!("SemverAtLeast({key} >= {version})")
1809            }
1810            Self::SemverAtMost { key, version } => {
1811                format!("SemverAtMost({key} <= {version})")
1812            }
1813            Self::SemverCompatible { key, version } => {
1814                format!("SemverCompatible({key} ~= {version})")
1815            }
1816            Self::StringPrefix { key, prefix } => {
1817                format!("StringPrefix({key} starts with {prefix:?})")
1818            }
1819            Self::StringMatches { key, pattern } => {
1820                format!("StringMatches({key} contains {pattern:?})")
1821            }
1822            Self::MetadataExists { key } => format!("MetadataExists({key})"),
1823            Self::MetadataEquals { key, value } => {
1824                format!("MetadataEquals({key}={value})")
1825            }
1826            Self::MetadataMatches { key, pattern } => {
1827                format!("MetadataMatches({key} contains {pattern:?})")
1828            }
1829            Self::MetadataNumericAtLeast { key, threshold } => {
1830                format!("MetadataNumericAtLeast({key} >= {threshold})")
1831            }
1832            Self::And(c) => format!("And({} clauses)", c.len()),
1833            Self::Or(c) => format!("Or({} clauses)", c.len()),
1834            Self::Not(_) => "Not".to_string(),
1835        }
1836    }
1837}
1838
1839/// Per-clause aggregated stats across a candidate corpus.
1840///
1841/// Merged by `label`: two structurally-equal clauses (same variant,
1842/// same key, same value) share one [`ClauseStats`] entry. This is
1843/// generally what an operator wants — "how often does
1844/// `MetadataEquals(intent=ml-training)` succeed?" doesn't depend on
1845/// where in the AST that clause sits.
1846#[derive(Debug, Clone, Default, PartialEq)]
1847pub struct ClauseStats {
1848    /// Clause label (matches the `label` field on [`ClauseTrace`]).
1849    pub label: String,
1850    /// Number of candidates whose evaluation reached this clause
1851    /// (i.e. wasn't short-circuited away by an earlier sibling).
1852    pub evaluated: usize,
1853    /// Number of those evaluations that returned `true`.
1854    pub matched: usize,
1855}
1856
1857/// Aggregate report from running a [`Predicate`] across a corpus
1858/// of candidate evaluation contexts.
1859///
1860/// Phase 6 of `CAPABILITY_ENHANCEMENTS_PLAN.md`. Built by
1861/// [`PredicateDebugReport::from_evaluations`].
1862///
1863/// The report answers: "given this predicate and these candidates,
1864/// how many matched, and how often did each clause filter?". A
1865/// clause with 1042 evaluations and 12 matches has 1.2% positive
1866/// selectivity — operators use that to spot mismatches between
1867/// their mental model of the data and the actual data.
1868#[derive(Debug, Clone, Default, PartialEq)]
1869pub struct PredicateDebugReport {
1870    /// Number of candidates the predicate was evaluated against.
1871    pub total_candidates: usize,
1872    /// Number of candidates the predicate matched (returned `true`).
1873    pub matched: usize,
1874    /// Per-clause aggregated stats, keyed by the clause's debug
1875    /// label. `BTreeMap` for deterministic iteration order in
1876    /// printed output.
1877    pub clause_stats: std::collections::BTreeMap<String, ClauseStats>,
1878}
1879
1880impl PredicateDebugReport {
1881    /// Run `pred` against each context in `contexts`, accumulating
1882    /// per-clause hit / miss stats.
1883    ///
1884    /// Each context contributes one trace; the trace tree is walked
1885    /// post-order to update the per-label `ClauseStats`. Composite
1886    /// clauses (And / Or / Not) get their own labels too, so an
1887    /// operator can see "the And short-circuited 730/1042 times" at
1888    /// a glance.
1889    pub fn from_evaluations<'a, I>(pred: &Predicate, contexts: I) -> Self
1890    where
1891        I: IntoIterator<Item = EvalContext<'a>>,
1892    {
1893        let mut report = Self::default();
1894        for ctx in contexts {
1895            report.total_candidates += 1;
1896            let (matched, trace) = pred.evaluate_with_trace(&ctx);
1897            if matched {
1898                report.matched += 1;
1899            }
1900            accumulate_trace(&trace, &mut report.clause_stats);
1901        }
1902        report
1903    }
1904
1905    /// Format a human-readable summary suitable for terminal output.
1906    ///
1907    /// Returned as a `String` rather than printed directly so tests
1908    /// can pin the format and callers can route to their own logger.
1909    pub fn render(&self) -> String {
1910        let mut out = String::new();
1911        let pct = |num: usize, denom: usize| -> f64 {
1912            if denom == 0 {
1913                0.0
1914            } else {
1915                100.0 * (num as f64) / (denom as f64)
1916            }
1917        };
1918        out.push_str("Predicate evaluation report\n");
1919        out.push_str("─────────────────────────────────────────\n");
1920        out.push_str(&format!(
1921            "Total candidates: {}\nMatched:          {} ({:.1}%)\n\n",
1922            self.total_candidates,
1923            self.matched,
1924            pct(self.matched, self.total_candidates),
1925        ));
1926        out.push_str("Per-clause stats (alphabetical):\n");
1927        for stats in self.clause_stats.values() {
1928            out.push_str(&format!(
1929                "  {:<60} evaluated {:>5}, matched {:>5} ({:>5.1}%)\n",
1930                stats.label,
1931                stats.evaluated,
1932                stats.matched,
1933                pct(stats.matched, stats.evaluated),
1934            ));
1935        }
1936        out
1937    }
1938}
1939
1940/// Walk a [`ClauseTrace`] tree post-order, updating per-label
1941/// stats in `acc`.
1942fn accumulate_trace(
1943    trace: &ClauseTrace,
1944    acc: &mut std::collections::BTreeMap<String, ClauseStats>,
1945) {
1946    let entry = acc
1947        .entry(trace.label.clone())
1948        .or_insert_with(|| ClauseStats {
1949            label: trace.label.clone(),
1950            evaluated: 0,
1951            matched: 0,
1952        });
1953    entry.evaluated += 1;
1954    if trace.result {
1955        entry.matched += 1;
1956    }
1957    for child in &trace.children {
1958        accumulate_trace(child, acc);
1959    }
1960}
1961
1962/// Find any value-bearing tag in `tags` matching `key` and run
1963/// `value_pred` against its value. [`Tag::AxisPresent`] tags carry
1964/// no value and are skipped — feeding `""` through `value_pred`
1965/// would let an empty-string `Equals` / `StringPrefix` /
1966/// `StringMatches` predicate spuriously match a presence-only tag.
1967/// Use [`Predicate::Exists`] (which goes through a separate
1968/// presence-aware path in `evaluate_leaf`) when key-presence
1969/// without a value is the intended check.
1970fn match_axis_tag(tags: &[Tag], key: &TagKey, value_pred: impl Fn(&str) -> bool) -> bool {
1971    tags.iter().any(|t| match t {
1972        Tag::AxisValue {
1973            axis,
1974            key: k,
1975            value,
1976            ..
1977        } => *axis == key.axis && k == &key.key && value_pred(value),
1978        _ => false,
1979    })
1980}
1981
1982// =============================================================================
1983// Semver — minimal inline parser
1984// =============================================================================
1985
1986/// Semver triple `(major, minor, patch)`. Pre-release / build
1987/// metadata is stripped at parse time; comparing only the triple is
1988/// enough for this plan's `NumericAtLeast` / `Compatible` semantics.
1989type SemverTriple = (u64, u64, u64);
1990
1991/// Parse a `MAJOR.MINOR.PATCH[-prerelease][+build]` string. Returns
1992/// `None` on any malformed input. Lenient on missing components: `1`
1993/// → `(1, 0, 0)`, `1.2` → `(1, 2, 0)` — matches caller expectation
1994/// when applications emit truncated version strings.
1995fn parse_semver(s: &str) -> Option<SemverTriple> {
1996    // Drop pre-release / build suffix.
1997    let core = s
1998        .split_once('-')
1999        .map(|(c, _)| c)
2000        .unwrap_or(s)
2001        .split_once('+')
2002        .map(|(c, _)| c)
2003        .unwrap_or_else(|| s.split_once('-').map(|(c, _)| c).unwrap_or(s));
2004    let mut parts = core.split('.').map(str::trim);
2005    let major = parts.next()?.parse().ok()?;
2006    let minor = parts.next().map(|p| p.parse().ok()).unwrap_or(Some(0))?;
2007    let patch = parts.next().map(|p| p.parse().ok()).unwrap_or(Some(0))?;
2008    if parts.next().is_some() {
2009        return None; // 4+ components is not semver
2010    }
2011    Some((major, minor, patch))
2012}
2013
2014/// `lhs` is caret-compatible with `rhs` per the standard semver
2015/// rule: same major (or same minor for `0.x.y`, exact for `0.0.x`),
2016/// and `lhs >= rhs`. Mirrors cargo's `^` operator semantics.
2017fn semver_compatible(lhs: SemverTriple, rhs: SemverTriple) -> bool {
2018    if lhs < rhs {
2019        return false;
2020    }
2021    if rhs.0 == 0 {
2022        if rhs.1 == 0 {
2023            // 0.0.x — patch is the compatibility band; anything
2024            // other than the exact tuple is a breaking change.
2025            // Combined with the `lhs >= rhs` guard above this
2026            // collapses to lhs == rhs.
2027            lhs == rhs
2028        } else {
2029            // 0.x.y — minor is the compatibility band, AND the
2030            // major must also be 0. Pre-fix `rhs.1 == lhs.1`
2031            // alone admitted `lhs = 1.2.5` as compatible with
2032            // `rhs = 0.2.3` (the lhs >= rhs guard passes since
2033            // 1 > 0, then minors match). 1.2.5 is not `^0.2.3`-
2034            // compatible per Cargo: 0.x.y treats minor as the
2035            // band IFF the band itself is 0.x.y.
2036            lhs.0 == 0 && rhs.1 == lhs.1
2037        }
2038    } else {
2039        rhs.0 == lhs.0
2040    }
2041}
2042
2043// =============================================================================
2044// pred! macro
2045// =============================================================================
2046
2047/// Lightweight macro sugar over [`Predicate`] constructors. Mirrors
2048/// the substrate plan's macro-style examples in §6a; lowers to plain
2049/// constructor calls so the AST stays the single source of truth.
2050///
2051/// ## Forms
2052///
2053/// ```ignore
2054/// pred!(exists "hardware.gpu");
2055/// pred!(equals "software.runtime", "cuda-12.4");
2056/// pred!(num_at_least "hardware.gpu.vram_gb", 24.0);
2057/// pred!(num_at_most "hardware.gpu.vram_gb", 80.0);
2058/// pred!(num_in_range "hardware.cpu_cores", 8.0, 64.0);
2059/// pred!(semver_at_least "software.runtime", "12.0");
2060/// pred!(semver_compatible "software.runtime", "12.0");
2061/// pred!(prefix "software.tool", "ffmpeg");
2062/// pred!(matches "software.daemon", "postgres");
2063/// pred!(metadata_exists "intent");
2064/// pred!(metadata_equals "intent", "ml-training");
2065/// pred!(and [a, b, c]);
2066/// pred!(or  [a, b, c]);
2067/// pred!(not a);
2068/// ```
2069///
2070/// The string forms are `<axis>.<key>` literals; the macro splits
2071/// them into `(axis, key)` via [`crate::adapter::net::behavior::tag::Tag::parse`]
2072/// and panics at construction time on invalid axis prefixes —
2073/// matching the substrate plan's "validates shapes at parse time"
2074/// contract for the macro.
2075#[macro_export]
2076macro_rules! pred {
2077    (exists $key:literal) => {
2078        $crate::adapter::net::behavior::predicate::Predicate::exists(
2079            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2080        )
2081    };
2082    (equals $key:literal, $value:expr) => {
2083        $crate::adapter::net::behavior::predicate::Predicate::equals(
2084            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2085            $value,
2086        )
2087    };
2088    (num_at_least $key:literal, $t:expr) => {
2089        $crate::adapter::net::behavior::predicate::Predicate::numeric_at_least(
2090            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2091            $t,
2092        )
2093    };
2094    (num_at_most $key:literal, $t:expr) => {
2095        $crate::adapter::net::behavior::predicate::Predicate::numeric_at_most(
2096            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2097            $t,
2098        )
2099    };
2100    (num_in_range $key:literal, $min:expr, $max:expr) => {
2101        $crate::adapter::net::behavior::predicate::Predicate::numeric_in_range(
2102            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2103            $min,
2104            $max,
2105        )
2106    };
2107    (semver_at_least $key:literal, $v:expr) => {
2108        $crate::adapter::net::behavior::predicate::Predicate::semver_at_least(
2109            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2110            $v,
2111        )
2112    };
2113    (semver_at_most $key:literal, $v:expr) => {
2114        $crate::adapter::net::behavior::predicate::Predicate::semver_at_most(
2115            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2116            $v,
2117        )
2118    };
2119    (semver_compatible $key:literal, $v:expr) => {
2120        $crate::adapter::net::behavior::predicate::Predicate::semver_compatible(
2121            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2122            $v,
2123        )
2124    };
2125    (prefix $key:literal, $p:expr) => {
2126        $crate::adapter::net::behavior::predicate::Predicate::string_prefix(
2127            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2128            $p,
2129        )
2130    };
2131    (matches $key:literal, $p:expr) => {
2132        $crate::adapter::net::behavior::predicate::Predicate::string_matches(
2133            $crate::adapter::net::behavior::predicate::__tag_key_from_str($key),
2134            $p,
2135        )
2136    };
2137    (metadata_exists $key:expr) => {
2138        $crate::adapter::net::behavior::predicate::Predicate::metadata_exists($key)
2139    };
2140    (metadata_equals $key:expr, $v:expr) => {
2141        $crate::adapter::net::behavior::predicate::Predicate::metadata_equals($key, $v)
2142    };
2143    (metadata_matches $key:expr, $p:expr) => {
2144        $crate::adapter::net::behavior::predicate::Predicate::metadata_matches($key, $p)
2145    };
2146    (metadata_num_at_least $key:expr, $t:expr) => {
2147        $crate::adapter::net::behavior::predicate::Predicate::metadata_numeric_at_least(
2148            $key, $t,
2149        )
2150    };
2151    (and [ $($clause:expr),* $(,)? ]) => {
2152        $crate::adapter::net::behavior::predicate::Predicate::and(vec![$($clause),*])
2153    };
2154    (or [ $($clause:expr),* $(,)? ]) => {
2155        $crate::adapter::net::behavior::predicate::Predicate::or(vec![$($clause),*])
2156    };
2157    (not $clause:expr) => {
2158        $crate::adapter::net::behavior::predicate::Predicate::not($clause)
2159    };
2160}
2161
2162/// Internal helper used by the [`pred!`] macro to lift an
2163/// `<axis>.<key>` string literal into a [`TagKey`]. Panics on
2164/// unknown axis or empty key — the macro contract is "parse-time
2165/// validation," and violating it at the call site is a programmer
2166/// error caught at the first run (matches the substrate plan's
2167/// macro-validation guarantee).
2168#[doc(hidden)]
2169pub fn __tag_key_from_str(s: &'static str) -> TagKey {
2170    let (axis_str, key) = s
2171        .split_once('.')
2172        .unwrap_or_else(|| panic!("pred! tag key {s:?} must be `<axis>.<key>`"));
2173    let axis = crate::adapter::net::behavior::tag::TaxonomyAxis::from_prefix(axis_str)
2174        .unwrap_or_else(|| {
2175            panic!(
2176                "pred! tag key {s:?} has unknown axis prefix {axis_str:?}; \
2177                 valid axes: hardware, software, devices, dataforts"
2178            )
2179        });
2180    TagKey::new(axis, key.to_string())
2181}
2182
2183// =============================================================================
2184// Tests
2185// =============================================================================
2186
2187#[cfg(test)]
2188mod tests {
2189    use super::*;
2190    use crate::adapter::net::behavior::tag::{Tag, TaxonomyAxis};
2191    use crate::adapter::net::behavior::{CapabilitySet, GpuInfo, GpuVendor, HardwareCapabilities};
2192    fn ctx<'a>(tags: &'a [Tag], metadata: &'a BTreeMap<String, String>) -> EvalContext<'a> {
2193        EvalContext::new(tags, metadata)
2194    }
2195    fn empty_meta() -> BTreeMap<String, String> {
2196        BTreeMap::new()
2197    }
2198    fn axis_present(axis: TaxonomyAxis, key: &str) -> Tag {
2199        Tag::AxisPresent {
2200            axis,
2201            key: key.into(),
2202        }
2203    }
2204    fn axis_eq(axis: TaxonomyAxis, key: &str, value: &str) -> Tag {
2205        Tag::AxisValue {
2206            axis,
2207            key: key.into(),
2208            value: value.into(),
2209            separator: crate::adapter::net::behavior::tag::AxisSeparator::Eq,
2210        }
2211    }
2212    // ---- existence + equality ------------------------------------------
2213
2214    #[test]
2215    fn exists_matches_axis_present_tag() {
2216        let tags = [axis_present(TaxonomyAxis::Hardware, "gpu")];
2217        let meta = empty_meta();
2218        let p = pred!(exists "hardware.gpu");
2219        assert!(p.evaluate(&ctx(&tags, &meta)));
2220    }
2221    #[test]
2222    fn exists_matches_axis_value_tag() {
2223        let tags = [axis_eq(TaxonomyAxis::Hardware, "gpu.vram_gb", "80")];
2224        let meta = empty_meta();
2225        let p = pred!(exists "hardware.gpu.vram_gb");
2226        assert!(p.evaluate(&ctx(&tags, &meta)));
2227    }
2228    #[test]
2229    fn exists_misses_when_axis_differs() {
2230        let tags = [axis_present(TaxonomyAxis::Software, "gpu")];
2231        let meta = empty_meta();
2232        let p = pred!(exists "hardware.gpu");
2233        assert!(!p.evaluate(&ctx(&tags, &meta)));
2234    }
2235    #[test]
2236    fn equals_matches_value_exactly() {
2237        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "cuda-12.4")];
2238        let meta = empty_meta();
2239        assert!(pred!(equals "software.runtime", "cuda-12.4").evaluate(&ctx(&tags, &meta)));
2240        assert!(!pred!(equals "software.runtime", "cuda-11").evaluate(&ctx(&tags, &meta)));
2241    }
2242    // ---- numeric --------------------------------------------------------
2243
2244    #[test]
2245    fn numeric_at_least_compares_value() {
2246        let tags = [axis_eq(TaxonomyAxis::Hardware, "gpu.vram_gb", "80")];
2247        let meta = empty_meta();
2248        assert!(pred!(num_at_least "hardware.gpu.vram_gb", 24.0).evaluate(&ctx(&tags, &meta)));
2249        assert!(pred!(num_at_least "hardware.gpu.vram_gb", 80.0).evaluate(&ctx(&tags, &meta)));
2250        assert!(!pred!(num_at_least "hardware.gpu.vram_gb", 96.0).evaluate(&ctx(&tags, &meta)));
2251    }
2252    #[test]
2253    fn numeric_at_most_and_in_range() {
2254        let tags = [axis_eq(TaxonomyAxis::Hardware, "cpu_cores", "16")];
2255        let meta = empty_meta();
2256        assert!(pred!(num_at_most "hardware.cpu_cores", 32.0).evaluate(&ctx(&tags, &meta)));
2257        assert!(!pred!(num_at_most "hardware.cpu_cores", 8.0).evaluate(&ctx(&tags, &meta)));
2258        assert!(pred!(num_in_range "hardware.cpu_cores", 8.0, 32.0).evaluate(&ctx(&tags, &meta)));
2259        assert!(!pred!(num_in_range "hardware.cpu_cores", 32.0, 64.0).evaluate(&ctx(&tags, &meta)));
2260    }
2261    #[test]
2262    fn numeric_unparseable_value_evaluates_to_false() {
2263        // Pinned: a tag whose value is not numeric must NOT panic
2264        // and must NOT match a numeric predicate. Federated queries
2265        // rely on this — a malformed tag from a peer's binding
2266        // shouldn't fault our query.
2267        let tags = [axis_eq(TaxonomyAxis::Hardware, "cpu_cores", "many")];
2268        let meta = empty_meta();
2269        assert!(!pred!(num_at_least "hardware.cpu_cores", 1.0).evaluate(&ctx(&tags, &meta)));
2270    }
2271    // ---- semver ---------------------------------------------------------
2272
2273    #[test]
2274    fn semver_at_least_basic() {
2275        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "12.4.1")];
2276        let meta = empty_meta();
2277        assert!(pred!(semver_at_least "software.runtime", "12.0.0").evaluate(&ctx(&tags, &meta)));
2278        assert!(pred!(semver_at_least "software.runtime", "12.4.0").evaluate(&ctx(&tags, &meta)));
2279        assert!(!pred!(semver_at_least "software.runtime", "13.0.0").evaluate(&ctx(&tags, &meta)));
2280    }
2281    #[test]
2282    fn semver_compatible_caret_rule() {
2283        // 1.x.y compatibility: same major.
2284        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "1.5.2")];
2285        let meta = empty_meta();
2286        assert!(pred!(semver_compatible "software.runtime", "1.0.0").evaluate(&ctx(&tags, &meta)));
2287        assert!(pred!(semver_compatible "software.runtime", "1.4.0").evaluate(&ctx(&tags, &meta)));
2288        assert!(!pred!(semver_compatible "software.runtime", "0.9.0").evaluate(&ctx(&tags, &meta)));
2289        assert!(!pred!(semver_compatible "software.runtime", "2.0.0").evaluate(&ctx(&tags, &meta)));
2290
2291        // 0.x.y compatibility: same minor.
2292        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "0.5.7")];
2293        assert!(pred!(semver_compatible "software.runtime", "0.5.0").evaluate(&ctx(&tags, &meta)));
2294        assert!(!pred!(semver_compatible "software.runtime", "0.4.0").evaluate(&ctx(&tags, &meta)));
2295    }
2296    /// Regression: `0.0.x` is exact-only under cargo's caret rule.
2297    /// The pre-fix `rhs.0 == 0 → rhs.1 == lhs.1` branch ignored the
2298    /// patch component and admitted any `0.0.y >= 0.0.x` as
2299    /// compatible — concretely, `^0.0.1` would match a peer running
2300    /// `0.0.2`, which is a breaking-change boundary.
2301    #[test]
2302    fn semver_compatible_zero_zero_patch_is_exact_only() {
2303        let meta = empty_meta();
2304
2305        // Exact match passes.
2306        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "0.0.3")];
2307        assert!(pred!(semver_compatible "software.runtime", "0.0.3").evaluate(&ctx(&tags, &meta)));
2308
2309        // Higher patch must NOT match (was admitted pre-fix).
2310        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "0.0.4")];
2311        assert!(!pred!(semver_compatible "software.runtime", "0.0.3").evaluate(&ctx(&tags, &meta)));
2312
2313        // Lower patch fails (already covered by the lhs >= rhs guard).
2314        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "0.0.2")];
2315        assert!(!pred!(semver_compatible "software.runtime", "0.0.3").evaluate(&ctx(&tags, &meta)));
2316
2317        // Cross-band (different minor) still fails.
2318        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "0.1.0")];
2319        assert!(!pred!(semver_compatible "software.runtime", "0.0.3").evaluate(&ctx(&tags, &meta)));
2320    }
2321    /// Q1: a non-zero major `lhs` is NOT compatible with a 0.x.y
2322    /// `rhs` — Cargo's caret rule treats 0.x.y as the band only
2323    /// when the major is also 0. Pre-fix, `rhs.1 == lhs.1` alone
2324    /// passed for `lhs = 1.2.5` against `rhs = 0.2.3` (lhs >= rhs
2325    /// passes since 1 > 0; minors match). 1.x.y running against
2326    /// `^0.2.3` is a major-version regression that should fail
2327    /// the compatibility check.
2328    #[test]
2329    fn semver_compatible_zero_x_band_requires_lhs_major_zero() {
2330        let meta = empty_meta();
2331
2332        // 0.2.x band: same-major-zero, same-minor matches.
2333        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "0.2.5")];
2334        assert!(pred!(semver_compatible "software.runtime", "0.2.3").evaluate(&ctx(&tags, &meta)));
2335
2336        // 0.2.x band: lhs major != 0 must NOT match (was admitted
2337        // pre-fix).
2338        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "1.2.5")];
2339        assert!(!pred!(semver_compatible "software.runtime", "0.2.3").evaluate(&ctx(&tags, &meta)));
2340
2341        // Sanity: 2.2.5 vs 0.2.3 also fails.
2342        let tags = [axis_eq(TaxonomyAxis::Software, "runtime", "2.2.5")];
2343        assert!(!pred!(semver_compatible "software.runtime", "0.2.3").evaluate(&ctx(&tags, &meta)));
2344    }
2345    /// Regression: presence-only tags (`Tag::AxisPresent`) must not
2346    /// match value-bearing predicates. Pre-fix, `match_axis_tag` fed
2347    /// `""` through `value_pred`, which let `Equals(_, "")` /
2348    /// `StringPrefix(_, "")` / `StringMatches(_, "")` accept any
2349    /// presence tag. Use `Exists` for key-presence checks.
2350    #[test]
2351    fn axis_present_does_not_satisfy_value_predicates() {
2352        let tags = [axis_present(TaxonomyAxis::Hardware, "gpu")];
2353        let meta = empty_meta();
2354
2355        // Equality with empty string was the worst offender — every
2356        // presence tag matched it pre-fix.
2357        assert!(!pred!(equals "hardware.gpu", "").evaluate(&ctx(&tags, &meta)));
2358        // Equality with any non-empty value also doesn't match a
2359        // presence tag (no value to compare against).
2360        assert!(!pred!(equals "hardware.gpu", "nvidia").evaluate(&ctx(&tags, &meta)));
2361        // String predicates anchored at the empty string used to
2362        // permissively accept presence tags.
2363        assert!(!pred!(prefix "hardware.gpu", "").evaluate(&ctx(&tags, &meta)));
2364        assert!(!pred!(matches "hardware.gpu", "").evaluate(&ctx(&tags, &meta)));
2365
2366        // `Exists` is the correct check for key presence — it still
2367        // matches both `AxisPresent` and `AxisValue` shapes.
2368        assert!(pred!(exists "hardware.gpu").evaluate(&ctx(&tags, &meta)));
2369        let tags = [axis_eq(TaxonomyAxis::Hardware, "gpu", "nvidia")];
2370        assert!(pred!(exists "hardware.gpu").evaluate(&ctx(&tags, &meta)));
2371    }
2372    #[test]
2373    fn semver_lenient_parser() {
2374        // Pinned: the inline parser accepts truncated versions
2375        // (`1` → `(1, 0, 0)`, `1.2` → `(1, 2, 0)`). Applications in
2376        // the wild emit these; the parser shouldn't reject them.
2377        assert_eq!(parse_semver("1"), Some((1, 0, 0)));
2378        assert_eq!(parse_semver("1.2"), Some((1, 2, 0)));
2379        assert_eq!(parse_semver("1.2.3"), Some((1, 2, 3)));
2380        assert_eq!(parse_semver("1.2.3-beta"), Some((1, 2, 3)));
2381        assert_eq!(parse_semver("1.2.3+build.42"), Some((1, 2, 3)));
2382        // Invalid: 4+ components, non-numeric.
2383        assert_eq!(parse_semver("1.2.3.4"), None);
2384        assert_eq!(parse_semver("a.b.c"), None);
2385        assert_eq!(parse_semver(""), None);
2386    }
2387    // ---- string ---------------------------------------------------------
2388
2389    #[test]
2390    fn string_prefix_and_matches() {
2391        let tags = [axis_eq(TaxonomyAxis::Software, "tool", "ffmpeg-7.0")];
2392        let meta = empty_meta();
2393        assert!(pred!(prefix "software.tool", "ffmpeg").evaluate(&ctx(&tags, &meta)));
2394        assert!(!pred!(prefix "software.tool", "imagemagick").evaluate(&ctx(&tags, &meta)));
2395        assert!(pred!(matches "software.tool", "7.0").evaluate(&ctx(&tags, &meta)));
2396        assert!(!pred!(matches "software.tool", "8.0").evaluate(&ctx(&tags, &meta)));
2397    }
2398    // ---- metadata -------------------------------------------------------
2399
2400    #[test]
2401    fn metadata_predicates() {
2402        let tags: Vec<Tag> = vec![];
2403        let mut meta = BTreeMap::new();
2404        meta.insert("intent".into(), "ml-training".into());
2405        meta.insert("priority".into(), "5".into());
2406
2407        assert!(pred!(metadata_exists "intent").evaluate(&ctx(&tags, &meta)));
2408        assert!(!pred!(metadata_exists "missing").evaluate(&ctx(&tags, &meta)));
2409        assert!(pred!(metadata_equals "intent", "ml-training").evaluate(&ctx(&tags, &meta)));
2410        assert!(!pred!(metadata_equals "intent", "billing").evaluate(&ctx(&tags, &meta)));
2411        assert!(pred!(metadata_matches "intent", "training").evaluate(&ctx(&tags, &meta)));
2412        assert!(pred!(metadata_num_at_least "priority", 3.0).evaluate(&ctx(&tags, &meta)));
2413        assert!(!pred!(metadata_num_at_least "priority", 10.0).evaluate(&ctx(&tags, &meta)));
2414    }
2415    // ---- boolean composition --------------------------------------------
2416
2417    #[test]
2418    fn and_or_not_composition() {
2419        let tags = [
2420            axis_present(TaxonomyAxis::Hardware, "gpu"),
2421            axis_eq(TaxonomyAxis::Hardware, "gpu.vram_gb", "80"),
2422        ];
2423        let meta = empty_meta();
2424
2425        // AND: both clauses match.
2426        let p = pred!(and [
2427            pred!(exists "hardware.gpu"),
2428            pred!(num_at_least "hardware.gpu.vram_gb", 24.0),
2429        ]);
2430        assert!(p.evaluate(&ctx(&tags, &meta)));
2431
2432        // AND: one fails.
2433        let p = pred!(and [
2434            pred!(exists "hardware.gpu"),
2435            pred!(num_at_least "hardware.gpu.vram_gb", 96.0),
2436        ]);
2437        assert!(!p.evaluate(&ctx(&tags, &meta)));
2438
2439        // OR: at least one matches.
2440        let p = pred!(or [
2441            pred!(exists "hardware.tpu"),
2442            pred!(exists "hardware.gpu"),
2443        ]);
2444        assert!(p.evaluate(&ctx(&tags, &meta)));
2445
2446        // NOT: inverts.
2447        let p = pred!(not pred!(exists "hardware.tpu"));
2448        assert!(p.evaluate(&ctx(&tags, &meta)));
2449        let p = pred!(not pred!(exists "hardware.gpu"));
2450        assert!(!p.evaluate(&ctx(&tags, &meta)));
2451    }
2452    #[test]
2453    fn empty_and_is_vacuously_true() {
2454        // Standard math/logic convention: `forall` over empty set
2455        // is `true`. Pinned because alternatives surprise readers.
2456        let tags: Vec<Tag> = vec![];
2457        let meta = empty_meta();
2458        assert!(Predicate::and(vec![]).evaluate(&ctx(&tags, &meta)));
2459    }
2460    #[test]
2461    fn empty_or_is_vacuously_false() {
2462        // Dual convention: `exists` over empty set is `false`.
2463        let tags: Vec<Tag> = vec![];
2464        let meta = empty_meta();
2465        assert!(!Predicate::or(vec![]).evaluate(&ctx(&tags, &meta)));
2466    }
2467    // ---- not predicate over unparseable value ---------------------------
2468
2469    #[test]
2470    fn not_does_not_flip_unparseable_to_true() {
2471        // Pinned by the substrate plan's "Predicate::Not(NumericAtLeast)
2472        // against an unparseable value yields `false`, NOT `true`"
2473        // contract. The inner numeric predicate fails (returns
2474        // false); Not(false) = true. But the spec explicitly says
2475        // "predicate failure is a hard miss, not a logical inversion":
2476        // the inner check fails to find any matching tag at all, so
2477        // the inner predicate evaluates to `false`, and `Not(false)`
2478        // evaluates to `true`. This test pins the documented
2479        // behavior so a future change is intentional.
2480        let tags = [axis_eq(TaxonomyAxis::Hardware, "cpu_cores", "many")];
2481        let meta = empty_meta();
2482        // Inner: NumericAtLeast against "many" → false (parse fails).
2483        // Outer: Not(false) → true.
2484        let p = pred!(not pred!(num_at_least "hardware.cpu_cores", 1.0));
2485        assert!(p.evaluate(&ctx(&tags, &meta)));
2486    }
2487    // ---- structural equality ------------------------------------------
2488    //
2489    // Serde wire format is deferred to Phase E (federated query
2490    // primitives) — see the comment on the `Predicate` declaration.
2491    // Phase A pins structural-equality round-trip via Clone + PartialEq
2492    // so a future serde drop-in has a reference behavior to match.
2493
2494    #[test]
2495    fn clone_and_eq_preserve_ast() {
2496        let p = pred!(and [
2497            pred!(exists "hardware.gpu"),
2498            pred!(num_at_least "hardware.gpu.vram_gb", 24.0),
2499            pred!(or [
2500                pred!(equals "software.runtime", "cuda-12.4"),
2501                pred!(semver_compatible "software.runtime", "13.0"),
2502            ]),
2503            pred!(not pred!(metadata_exists "decommissioning")),
2504        ]);
2505        let p2 = p.clone();
2506        assert_eq!(p, p2);
2507    }
2508    // ---- macro ----------------------------------------------------------
2509
2510    #[test]
2511    #[should_panic(expected = "unknown axis prefix")]
2512    fn pred_macro_panics_on_unknown_axis() {
2513        let _ = pred!(exists "bogus.foo");
2514    }
2515    #[test]
2516    #[should_panic(expected = "must be `<axis>.<key>`")]
2517    fn pred_macro_panics_on_missing_dot() {
2518        let _ = pred!(exists "hardware");
2519    }
2520    // ========================================================================
2521    // Query planner — Phase 4 of CAPABILITY_ENHANCEMENTS_PLAN.md.
2522    // ========================================================================
2523
2524    fn meta_with(pairs: &[(&str, &str)]) -> BTreeMap<String, String> {
2525        pairs
2526            .iter()
2527            .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
2528            .collect()
2529    }
2530    /// Worst-case AST: high-selectivity metadata-equals clause buried
2531    /// LAST among 5 children. Unplanned eval pays for the four
2532    /// preceding clauses on every false case; planned eval runs the
2533    /// metadata-equals first and short-circuits.
2534    fn worst_case_and() -> Predicate {
2535        Predicate::And(vec![
2536            Predicate::SemverCompatible {
2537                key: TagKey::new(TaxonomyAxis::Software, "runtime.python"),
2538                version: "3.11".into(),
2539            },
2540            Predicate::StringMatches {
2541                key: TagKey::new(TaxonomyAxis::Software, "os"),
2542                pattern: "linux".into(),
2543            },
2544            Predicate::NumericAtLeast {
2545                key: TagKey::new(TaxonomyAxis::Hardware, "memory_gb"),
2546                threshold: 64.0,
2547            },
2548            Predicate::Exists {
2549                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2550            },
2551            Predicate::MetadataEquals {
2552                key: "intent".into(),
2553                value: "ml-training".into(),
2554            },
2555        ])
2556    }
2557    #[test]
2558    fn planner_reorders_and_children_cheap_first() {
2559        // Pin the planner's ordering on the worst-case AST.
2560        // The cheapest leaf (`MetadataEquals`, cost=11) must run
2561        // before the heaviest (`SemverCompatible`, cost=60).
2562        let ast = worst_case_and();
2563        if let Predicate::And(children) = &ast {
2564            // Verify costs as expected from the static_cost table.
2565            let costs: Vec<u32> = children.iter().map(|c| c.static_cost()).collect();
2566            assert_eq!(costs, vec![60, 50, 30, 20, 11]);
2567        } else {
2568            panic!("worst_case_and produced non-And");
2569        }
2570    }
2571    #[test]
2572    fn planner_preserves_semantics_on_short_circuit_false() {
2573        // Pin: planner-vs-unplanned equivalence on a clearly-false
2574        // input. Both must return false; planner short-circuits
2575        // earlier but the result is identical.
2576        let tags: Vec<Tag> = vec![axis_eq(TaxonomyAxis::Hardware, "memory_gb", "32")];
2577        let meta = empty_meta();
2578        let cx = ctx(&tags, &meta);
2579        let ast = worst_case_and();
2580        // Memory is 32 < 64, so the AND fails. Both paths
2581        // agree.
2582        assert!(!ast.evaluate(&cx));
2583        assert!(!ast.evaluate_unplanned(&cx));
2584    }
2585    #[test]
2586    fn planner_preserves_semantics_on_full_match() {
2587        let tags: Vec<Tag> = vec![
2588            axis_eq(TaxonomyAxis::Hardware, "memory_gb", "128"),
2589            axis_present(TaxonomyAxis::Hardware, "gpu"),
2590            axis_eq(TaxonomyAxis::Software, "os", "linux"),
2591            axis_eq(TaxonomyAxis::Software, "runtime.python", "3.11.5"),
2592        ];
2593        let meta = meta_with(&[("intent", "ml-training")]);
2594        let cx = ctx(&tags, &meta);
2595        let ast = worst_case_and();
2596        assert!(ast.evaluate(&cx));
2597        assert!(ast.evaluate_unplanned(&cx));
2598    }
2599    #[test]
2600    fn planner_preserves_or_short_circuit_semantics() {
2601        // Or with mixed costs: cheap clause that's true should win
2602        // either way (planner runs it first; unplanned still finds
2603        // it eventually).
2604        let ast = Predicate::Or(vec![
2605            Predicate::SemverCompatible {
2606                key: TagKey::new(TaxonomyAxis::Software, "runtime.python"),
2607                version: "9.9".into(),
2608            },
2609            Predicate::MetadataEquals {
2610                key: "intent".into(),
2611                value: "ml-training".into(),
2612            },
2613        ]);
2614        let meta = meta_with(&[("intent", "ml-training")]);
2615        let cx = ctx(&[], &meta);
2616        assert!(ast.evaluate(&cx));
2617        assert!(ast.evaluate_unplanned(&cx));
2618    }
2619    #[test]
2620    fn planner_static_cost_compositees_sum_children() {
2621        // And/Or cost = sum of children. Used to prefer shallow
2622        // branches over deep ones when ordering nested compositions.
2623        let cheap = Predicate::MetadataExists { key: "k".into() };
2624        let expensive = Predicate::SemverCompatible {
2625            key: TagKey::new(TaxonomyAxis::Software, "x"),
2626            version: "1.0".into(),
2627        };
2628        let nested = Predicate::And(vec![cheap.clone(), expensive.clone()]);
2629        let leaf_cost = cheap.static_cost() + expensive.static_cost();
2630        assert_eq!(nested.static_cost(), leaf_cost);
2631
2632        // Not(inner) keeps inner's cost (no overhead for negation).
2633        let negated = Predicate::Not(Box::new(expensive.clone()));
2634        assert_eq!(negated.static_cost(), expensive.static_cost());
2635    }
2636    #[test]
2637    fn planner_handles_empty_and_or_correctly() {
2638        // Empty And is vacuous true; empty Or is vacuous false.
2639        // Planner reordering on empty children is a no-op, but
2640        // pin the contract so a future "ordered eval requires
2641        // children" assertion doesn't slip in.
2642        let meta = BTreeMap::new();
2643        let cx = ctx(&[], &meta);
2644        assert!(Predicate::And(vec![]).evaluate(&cx));
2645        assert!(!Predicate::Or(vec![]).evaluate(&cx));
2646        assert!(Predicate::And(vec![]).evaluate_unplanned(&cx));
2647        assert!(!Predicate::Or(vec![]).evaluate_unplanned(&cx));
2648    }
2649    /// PERF_AUDIT §4.7 — the planner's stack-array fast path
2650    /// covers ≤ 8 children; And/Or nodes past that capacity must
2651    /// spill to the heap path with identical semantics. The
2652    /// verdict-deciding clause sits PAST index 8 in every case, so
2653    /// a planner that silently truncated at the stack capacity
2654    /// (instead of spilling) would flip each assertion.
2655    #[test]
2656    fn planner_over_stack_capacity_spills_to_heap_and_matches_unplanned() {
2657        let n = 12usize;
2658        // And: clause i requires metadata key "k{i}".
2659        let and_ast = Predicate::And(
2660            (0..n)
2661                .map(|i| Predicate::MetadataExists {
2662                    key: format!("k{i}"),
2663                })
2664                .collect(),
2665        );
2666        // Or: clause i matches metadata value "v{i}" under "sel".
2667        let or_ast = Predicate::Or(
2668            (0..n)
2669                .map(|i| Predicate::MetadataEquals {
2670                    key: "sel".into(),
2671                    value: format!("v{i}"),
2672                })
2673                .collect(),
2674        );
2675
2676        // All 12 keys present → And true.
2677        let mut all = BTreeMap::new();
2678        for i in 0..n {
2679            all.insert(format!("k{i}"), "1".to_string());
2680        }
2681        let cx_all = ctx(&[], &all);
2682        assert!(and_ast.evaluate(&cx_all));
2683        assert!(and_ast.evaluate_unplanned(&cx_all));
2684
2685        // k11 — a clause past the stack capacity — missing → And
2686        // false. Truncation at 8 children would return true.
2687        let mut missing_tail = all.clone();
2688        missing_tail.remove("k11");
2689        let cx_missing = ctx(&[], &missing_tail);
2690        assert!(!and_ast.evaluate(&cx_missing));
2691        assert!(!and_ast.evaluate_unplanned(&cx_missing));
2692
2693        // Or: only the 12th alternative matches → true.
2694        // Truncation at 8 children would return false.
2695        let only_last = meta_with(&[("sel", "v11")]);
2696        let cx_last = ctx(&[], &only_last);
2697        assert!(or_ast.evaluate(&cx_last));
2698        assert!(or_ast.evaluate_unplanned(&cx_last));
2699
2700        // No alternative matches → Or false.
2701        let none = meta_with(&[("sel", "nope")]);
2702        let cx_none = ctx(&[], &none);
2703        assert!(!or_ast.evaluate(&cx_none));
2704        assert!(!or_ast.evaluate_unplanned(&cx_none));
2705    }
2706
2707    /// Vacuous-children semantics through the shared cost-order
2708    /// walk: `And([])` is true and `Or([])` is false (`.all()` /
2709    /// `.any()` semantics). Pins the `!stop_on` fall-through in
2710    /// `eval_in_cost_order` now that both composite arms route
2711    /// through one helper — a sign flip there would invert every
2712    /// empty composite while leaving non-empty cases green.
2713    #[test]
2714    fn and_or_empty_children_are_vacuously_true_and_false() {
2715        let empty_meta = BTreeMap::new();
2716        let cx = ctx(&[], &empty_meta);
2717        assert!(Predicate::And(vec![]).evaluate(&cx));
2718        assert!(Predicate::And(vec![]).evaluate_unplanned(&cx));
2719        assert!(!Predicate::Or(vec![]).evaluate(&cx));
2720        assert!(!Predicate::Or(vec![]).evaluate_unplanned(&cx));
2721    }
2722
2723    /// Exhaustive small-input parity: enumerate a handful of small
2724    /// `(ast, ctx)` combinations and assert planned = unplanned.
2725    /// Phase 4 doesn't ship full property-based fuzzing
2726    /// (no proptest dep yet); this hand-rolled equivalence test
2727    /// covers the load-bearing cases.
2728    #[test]
2729    fn planner_evaluate_matches_unplanned_across_canonical_inputs() {
2730        // Build a corpus of N predicates × M contexts and assert
2731        // planned == unplanned for every combination.
2732        let predicates: Vec<Predicate> = vec![
2733            // Simple leaves
2734            Predicate::Exists {
2735                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2736            },
2737            Predicate::MetadataEquals {
2738                key: "intent".into(),
2739                value: "ml-training".into(),
2740            },
2741            Predicate::NumericAtLeast {
2742                key: TagKey::new(TaxonomyAxis::Hardware, "memory_gb"),
2743                threshold: 64.0,
2744            },
2745            // Composites
2746            worst_case_and(),
2747            Predicate::Or(vec![
2748                Predicate::Exists {
2749                    key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2750                },
2751                Predicate::MetadataEquals {
2752                    key: "intent".into(),
2753                    value: "ml-training".into(),
2754                },
2755            ]),
2756            // Nested And-of-Or-of-And
2757            Predicate::And(vec![
2758                Predicate::Or(vec![
2759                    Predicate::Exists {
2760                        key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2761                    },
2762                    Predicate::And(vec![
2763                        Predicate::NumericAtLeast {
2764                            key: TagKey::new(TaxonomyAxis::Hardware, "memory_gb"),
2765                            threshold: 64.0,
2766                        },
2767                        Predicate::MetadataExists {
2768                            key: "intent".into(),
2769                        },
2770                    ]),
2771                ]),
2772                Predicate::Not(Box::new(Predicate::MetadataEquals {
2773                    key: "decommissioning".into(),
2774                    value: "true".into(),
2775                })),
2776            ]),
2777        ];
2778
2779        let contexts: Vec<(Vec<Tag>, BTreeMap<String, String>)> = vec![
2780            // Empty
2781            (vec![], BTreeMap::new()),
2782            // Hardware match only
2783            (
2784                vec![
2785                    axis_present(TaxonomyAxis::Hardware, "gpu"),
2786                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "128"),
2787                ],
2788                BTreeMap::new(),
2789            ),
2790            // Metadata match only
2791            (vec![], meta_with(&[("intent", "ml-training")])),
2792            // Full match
2793            (
2794                vec![
2795                    axis_present(TaxonomyAxis::Hardware, "gpu"),
2796                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "128"),
2797                    axis_eq(TaxonomyAxis::Software, "os", "linux"),
2798                    axis_eq(TaxonomyAxis::Software, "runtime.python", "3.11.5"),
2799                ],
2800                meta_with(&[("intent", "ml-training")]),
2801            ),
2802            // Full match + decommissioning marker (should fail the
2803            // last nested predicate's `Not` clause).
2804            (
2805                vec![
2806                    axis_present(TaxonomyAxis::Hardware, "gpu"),
2807                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "128"),
2808                ],
2809                meta_with(&[("intent", "ml-training"), ("decommissioning", "true")]),
2810            ),
2811        ];
2812
2813        for (i, ast) in predicates.iter().enumerate() {
2814            for (j, (tags, meta)) in contexts.iter().enumerate() {
2815                let cx = ctx(tags, meta);
2816                let planned = ast.evaluate(&cx);
2817                let unplanned = ast.evaluate_unplanned(&cx);
2818                assert_eq!(
2819                    planned, unplanned,
2820                    "predicate[{i}] ctx[{j}]: planned={planned} != unplanned={unplanned}"
2821                );
2822            }
2823        }
2824    }
2825    // ========================================================================
2826    // Predicate debug session — Phase 6 of CAPABILITY_ENHANCEMENTS_PLAN.md.
2827    // ========================================================================
2828
2829    #[test]
2830    fn evaluate_with_trace_returns_same_result_as_evaluate() {
2831        // Pin: the trace-instrumented evaluation produces the
2832        // same boolean result as `evaluate()`. Trace is a side
2833        // channel; the predicate semantic is unchanged.
2834        let ast = worst_case_and();
2835        let tags: Vec<Tag> = vec![axis_eq(TaxonomyAxis::Hardware, "memory_gb", "32")];
2836        let meta = empty_meta();
2837        let cx = ctx(&tags, &meta);
2838        let plain_result = ast.evaluate(&cx);
2839        let (traced_result, _trace) = ast.evaluate_with_trace(&cx);
2840        assert_eq!(plain_result, traced_result);
2841    }
2842    #[test]
2843    fn evaluate_with_trace_short_circuits_drop_unevaluated_siblings() {
2844        // Pin: when an `And` short-circuits on a false child, the
2845        // trace for the And node only carries the children that
2846        // actually ran. Lets operators see "the metadata clause
2847        // failed; we never got to the GPU check."
2848        let ast = Predicate::And(vec![
2849            // Cheap leaf, false → short-circuit
2850            Predicate::MetadataEquals {
2851                key: "intent".into(),
2852                value: "ml-training".into(),
2853            },
2854            // Heavier leaf — should not be evaluated
2855            Predicate::SemverCompatible {
2856                key: TagKey::new(TaxonomyAxis::Software, "runtime.python"),
2857                version: "3.11".into(),
2858            },
2859        ]);
2860        let meta = empty_meta();
2861        let cx = ctx(&[], &meta); // no metadata → first clause false
2862        let (result, trace) = ast.evaluate_with_trace(&cx);
2863        assert!(!result);
2864        // And's children: only one entry (the metadata clause that
2865        // returned false and short-circuited the rest).
2866        assert_eq!(
2867            trace.children.len(),
2868            1,
2869            "And trace should drop unevaluated siblings; got {trace:?}"
2870        );
2871        assert!(trace.children[0].label.starts_with("MetadataEquals"));
2872        assert!(!trace.children[0].result);
2873    }
2874    #[test]
2875    fn evaluate_with_trace_captures_full_evaluation_when_no_short_circuit() {
2876        // Pin: when no clause short-circuits (all true in an And,
2877        // all false in an Or), the trace covers every child.
2878        let ast = Predicate::And(vec![
2879            Predicate::MetadataExists {
2880                key: "intent".into(),
2881            },
2882            Predicate::Exists {
2883                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2884            },
2885        ]);
2886        let tags: Vec<Tag> = vec![axis_present(TaxonomyAxis::Hardware, "gpu")];
2887        let meta = meta_with(&[("intent", "ml-training")]);
2888        let cx = ctx(&tags, &meta);
2889        let (result, trace) = ast.evaluate_with_trace(&cx);
2890        assert!(result);
2891        assert_eq!(trace.children.len(), 2);
2892        for child in &trace.children {
2893            assert!(child.result, "all children must have matched: {child:?}");
2894        }
2895    }
2896    #[test]
2897    fn evaluate_with_trace_records_not_inversion() {
2898        // Pin: Not's trace child carries the inner result (pre-
2899        // negation); the Not node carries the post-negation result.
2900        let ast = Predicate::Not(Box::new(Predicate::Exists {
2901            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2902        }));
2903        let meta = empty_meta();
2904        let cx = ctx(&[], &meta); // gpu absent → inner false → Not true
2905        let (result, trace) = ast.evaluate_with_trace(&cx);
2906        assert!(result, "Not(absent) should be true");
2907        assert_eq!(trace.label, "Not");
2908        assert!(trace.result);
2909        assert_eq!(trace.children.len(), 1);
2910        assert!(!trace.children[0].result, "inner Exists should be false");
2911    }
2912    #[test]
2913    fn debug_report_aggregates_match_counts() {
2914        // 3 candidates, 1 matches.
2915        let pred = Predicate::Exists {
2916            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2917        };
2918        let no_gpu_tags: Vec<Tag> = vec![];
2919        let gpu_tags: Vec<Tag> = vec![axis_present(TaxonomyAxis::Hardware, "gpu")];
2920        let meta = empty_meta();
2921
2922        let contexts = vec![
2923            ctx(&no_gpu_tags, &meta),
2924            ctx(&gpu_tags, &meta),
2925            ctx(&no_gpu_tags, &meta),
2926        ];
2927        let report = PredicateDebugReport::from_evaluations(&pred, contexts);
2928        assert_eq!(report.total_candidates, 3);
2929        assert_eq!(report.matched, 1);
2930        // One leaf clause.
2931        assert_eq!(report.clause_stats.len(), 1);
2932        let stats = report.clause_stats.values().next().unwrap();
2933        assert_eq!(stats.evaluated, 3);
2934        assert_eq!(stats.matched, 1);
2935    }
2936    #[test]
2937    fn debug_report_separates_per_clause_stats_in_composite() {
2938        // For an And of two clauses, the report should carry stats
2939        // for the And node + each leaf. Short-circuited clauses
2940        // get fewer evaluations.
2941        let pred = Predicate::And(vec![
2942            Predicate::MetadataEquals {
2943                key: "intent".into(),
2944                value: "ml-training".into(),
2945            }, // cheap, often false
2946            Predicate::Exists {
2947                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2948            }, // moderate
2949        ]);
2950
2951        // 4 candidates: only one has the right intent + GPU.
2952        let no_meta = empty_meta();
2953        let intent_meta = meta_with(&[("intent", "ml-training")]);
2954        let no_gpu: Vec<Tag> = vec![];
2955        let gpu: Vec<Tag> = vec![axis_present(TaxonomyAxis::Hardware, "gpu")];
2956
2957        let contexts = vec![
2958            ctx(&no_gpu, &no_meta),     // both fail; short-circuit on metadata
2959            ctx(&gpu, &no_meta),        // both fail; short-circuit on metadata
2960            ctx(&no_gpu, &intent_meta), // metadata true, gpu fail
2961            ctx(&gpu, &intent_meta),    // both true → match
2962        ];
2963        let report = PredicateDebugReport::from_evaluations(&pred, contexts);
2964
2965        assert_eq!(report.total_candidates, 4);
2966        assert_eq!(report.matched, 1);
2967
2968        // 3 entries: And node + MetadataEquals leaf + Exists leaf.
2969        assert_eq!(report.clause_stats.len(), 3);
2970
2971        let metadata_stats = report
2972            .clause_stats
2973            .values()
2974            .find(|s| s.label.starts_with("MetadataEquals"))
2975            .expect("MetadataEquals stats present");
2976        assert_eq!(
2977            metadata_stats.evaluated, 4,
2978            "metadata clause runs every time"
2979        );
2980        assert_eq!(metadata_stats.matched, 2, "intent matches in 2 of 4");
2981
2982        let exists_stats = report
2983            .clause_stats
2984            .values()
2985            .find(|s| s.label.starts_with("Exists"))
2986            .expect("Exists stats present");
2987        // Only the 2 candidates with intent_meta got past the
2988        // short-circuit; gpu check ran twice.
2989        assert_eq!(
2990            exists_stats.evaluated, 2,
2991            "gpu clause only runs after metadata passes"
2992        );
2993        assert_eq!(exists_stats.matched, 1);
2994    }
2995    #[test]
2996    fn debug_report_render_includes_summary_and_clauses() {
2997        let pred = Predicate::Exists {
2998            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
2999        };
3000        let report = PredicateDebugReport::from_evaluations(&pred, vec![ctx(&[], &empty_meta())]);
3001        let rendered = report.render();
3002        // Pin the load-bearing parts of the format. Operators read
3003        // the report by these markers; CI fails loudly if they drift.
3004        assert!(rendered.contains("Predicate evaluation report"));
3005        assert!(rendered.contains("Total candidates: 1"));
3006        assert!(rendered.contains("Matched:          0"));
3007        assert!(rendered.contains("Per-clause stats"));
3008        assert!(rendered.contains("Exists(hardware.gpu)"));
3009    }
3010    #[test]
3011    fn debug_report_handles_empty_corpus() {
3012        let pred = Predicate::Exists {
3013            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3014        };
3015        let report = PredicateDebugReport::from_evaluations(&pred, Vec::<EvalContext>::new());
3016        assert_eq!(report.total_candidates, 0);
3017        assert_eq!(report.matched, 0);
3018        assert!(report.clause_stats.is_empty());
3019        // Render must not panic on empty.
3020        let rendered = report.render();
3021        assert!(rendered.contains("Total candidates: 0"));
3022    }
3023    // ========================================================================
3024    // PredicateWire (flat-tree IR) — Phase 5 of CAPABILITY_ENHANCEMENTS_PLAN.md.
3025    // ========================================================================
3026
3027    fn sample_complex_predicate() -> Predicate {
3028        // And-of-Or-of-And + Not — exercises every composite variant
3029        // and a sampling of leaf variants.
3030        Predicate::And(vec![
3031            Predicate::Or(vec![
3032                Predicate::Exists {
3033                    key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3034                },
3035                Predicate::And(vec![
3036                    Predicate::NumericAtLeast {
3037                        key: TagKey::new(TaxonomyAxis::Hardware, "memory_gb"),
3038                        threshold: 64.0,
3039                    },
3040                    Predicate::MetadataExists {
3041                        key: "intent".into(),
3042                    },
3043                ]),
3044            ]),
3045            Predicate::Not(Box::new(Predicate::MetadataEquals {
3046                key: "decommissioning".into(),
3047                value: "true".into(),
3048            })),
3049            Predicate::SemverCompatible {
3050                key: TagKey::new(TaxonomyAxis::Software, "runtime.python"),
3051                version: "3.11".into(),
3052            },
3053        ])
3054    }
3055    #[test]
3056    fn wire_round_trip_preserves_complex_predicate() {
3057        // Pin: `Predicate → PredicateWire → Predicate` is identity.
3058        let original = sample_complex_predicate();
3059        let wire = original.to_wire();
3060        let rebuilt = wire.into_predicate().expect("rebuild");
3061        assert_eq!(original, rebuilt);
3062    }
3063    #[test]
3064    fn wire_round_trip_through_serde_json() {
3065        // Pin: the wire format serializes through serde_json
3066        // cleanly (no recursion-limit blowup like raw Predicate).
3067        let original = sample_complex_predicate();
3068        let wire = original.to_wire();
3069        let json = serde_json::to_string(&wire).expect("serialize wire");
3070        let parsed: PredicateWire = serde_json::from_str(&json).expect("deserialize wire");
3071        let rebuilt = parsed.into_predicate().expect("rebuild");
3072        assert_eq!(original, rebuilt);
3073    }
3074    #[test]
3075    fn wire_root_is_at_highest_index_in_post_order_emission() {
3076        // Pin: `to_wire` emits children before parents, so the
3077        // root always sits at `nodes.len() - 1` for a freshly-
3078        // emitted wire payload. The substrate's invariant
3079        // (children at lower indices) leans on this.
3080        let pred = sample_complex_predicate();
3081        let wire = pred.to_wire();
3082        assert_eq!(wire.root_idx as usize, wire.nodes.len() - 1);
3083    }
3084    #[test]
3085    fn wire_round_trip_byte_stable_across_calls() {
3086        // Pin: two `to_wire()` calls on equal predicates produce
3087        // identical wire bytes. Required for cross-binding fixture
3088        // pinning.
3089        let pred = sample_complex_predicate();
3090        let wire_a = pred.to_wire();
3091        let wire_b = pred.to_wire();
3092        assert_eq!(wire_a, wire_b);
3093        let json_a = serde_json::to_string(&wire_a).unwrap();
3094        let json_b = serde_json::to_string(&wire_b).unwrap();
3095        assert_eq!(json_a, json_b);
3096    }
3097    #[test]
3098    fn wire_round_trip_preserves_evaluation_semantics() {
3099        // Pin: a rebuilt predicate produces identical evaluation
3100        // results to the original on a fixed corpus. The serde
3101        // round-trip is semantically transparent.
3102        let original = sample_complex_predicate();
3103        let wire = original.to_wire();
3104        let rebuilt = wire.into_predicate().unwrap();
3105
3106        let no_meta = empty_meta();
3107        let intent_meta = meta_with(&[("intent", "ml-training")]);
3108        let decommission_meta =
3109            meta_with(&[("intent", "ml-training"), ("decommissioning", "true")]);
3110        let no_gpu: Vec<Tag> = vec![];
3111        let gpu: Vec<Tag> = vec![axis_present(TaxonomyAxis::Hardware, "gpu")];
3112        let gpu_with_runtime: Vec<Tag> = vec![
3113            axis_present(TaxonomyAxis::Hardware, "gpu"),
3114            axis_eq(TaxonomyAxis::Software, "runtime.python", "3.11.5"),
3115        ];
3116
3117        let cases: Vec<(&[Tag], &BTreeMap<String, String>)> = vec![
3118            (&no_gpu, &no_meta),
3119            (&gpu, &no_meta),
3120            (&gpu, &intent_meta),
3121            (&gpu_with_runtime, &intent_meta),
3122            (&gpu_with_runtime, &decommission_meta),
3123        ];
3124
3125        for (i, (tags, meta)) in cases.iter().enumerate() {
3126            let cx = ctx(tags, meta);
3127            assert_eq!(
3128                original.evaluate(&cx),
3129                rebuilt.evaluate(&cx),
3130                "case {i}: original vs rebuilt diverged on evaluation",
3131            );
3132        }
3133    }
3134    #[test]
3135    fn wire_from_empty_nodes_table_errors_gracefully() {
3136        let wire = PredicateWire {
3137            nodes: Vec::new(),
3138            root_idx: 0,
3139        };
3140        assert_eq!(wire.into_predicate(), Err(PredicateWireError::Empty));
3141    }
3142    #[test]
3143    fn wire_from_out_of_bounds_root_errors_gracefully() {
3144        let wire = PredicateWire {
3145            nodes: vec![PredicateNodeWire::Exists {
3146                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3147            }],
3148            root_idx: 5,
3149        };
3150        assert_eq!(
3151            wire.into_predicate(),
3152            Err(PredicateWireError::RootOutOfBounds {
3153                root_idx: 5,
3154                len: 1,
3155            }),
3156        );
3157    }
3158    #[test]
3159    fn wire_from_cycle_in_and_children_errors_gracefully() {
3160        // Malformed: the `And` at index 0 references child index
3161        // 1, which doesn't exist yet (post-order requires
3162        // child < parent). Catches index cycles.
3163        let wire = PredicateWire {
3164            nodes: vec![PredicateNodeWire::And { children: vec![1] }],
3165            root_idx: 0,
3166        };
3167        let err = wire.into_predicate().unwrap_err();
3168        assert!(
3169            matches!(
3170                err,
3171                PredicateWireError::CycleDetected {
3172                    parent: 0,
3173                    child: 1
3174                }
3175            ),
3176            "expected CycleDetected; got {err:?}",
3177        );
3178    }
3179    #[test]
3180    fn wire_from_self_referencing_not_errors_gracefully() {
3181        // `Not` referencing its own index is the simplest cycle.
3182        let wire = PredicateWire {
3183            nodes: vec![PredicateNodeWire::Not { child: 0 }],
3184            root_idx: 0,
3185        };
3186        let err = wire.into_predicate().unwrap_err();
3187        assert!(
3188            matches!(
3189                err,
3190                PredicateWireError::CycleDetected {
3191                    parent: 0,
3192                    child: 0
3193                }
3194            ),
3195            "expected CycleDetected; got {err:?}",
3196        );
3197    }
3198    #[test]
3199    fn wire_simple_leaf_round_trips() {
3200        // Smallest case: a single leaf predicate. nodes has one
3201        // entry; root_idx is 0.
3202        let pred = Predicate::Exists {
3203            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3204        };
3205        let wire = pred.to_wire();
3206        assert_eq!(wire.nodes.len(), 1);
3207        assert_eq!(wire.root_idx, 0);
3208        assert_eq!(wire.into_predicate().unwrap(), pred);
3209    }
3210    #[test]
3211    fn wire_rebuilt_predicate_matches_planner_evaluation() {
3212        // Pin: planner-aware evaluation continues to work after
3213        // round-trip. The flat IR doesn't lose the AST shape;
3214        // `evaluate()` still finds And/Or to reorder.
3215        let original = sample_complex_predicate();
3216        let wire = original.to_wire();
3217        let rebuilt = wire.into_predicate().unwrap();
3218
3219        let tags: Vec<Tag> = vec![
3220            axis_present(TaxonomyAxis::Hardware, "gpu"),
3221            axis_eq(TaxonomyAxis::Software, "runtime.python", "3.11.5"),
3222        ];
3223        let meta = meta_with(&[("intent", "ml-training")]);
3224        let cx = ctx(&tags, &meta);
3225
3226        // Both planned and unplanned must agree, AND match between
3227        // original and rebuilt.
3228        let orig_planned = original.evaluate(&cx);
3229        let orig_unplanned = original.evaluate_unplanned(&cx);
3230        let rebuilt_planned = rebuilt.evaluate(&cx);
3231        let rebuilt_unplanned = rebuilt.evaluate_unplanned(&cx);
3232
3233        assert_eq!(orig_planned, orig_unplanned);
3234        assert_eq!(rebuilt_planned, rebuilt_unplanned);
3235        assert_eq!(orig_planned, rebuilt_planned);
3236    }
3237    // ========================================================================
3238    // nRPC envelope helpers — Phase 5.B of CAPABILITY_ENHANCEMENTS_PLAN.md.
3239    // ========================================================================
3240
3241    #[test]
3242    fn rpc_header_round_trip_preserves_predicate() {
3243        // Pin the canonical happy path: predicate → header → headers
3244        // table on the server side → decoded predicate. Service
3245        // handlers will use exactly this flow.
3246        let original = sample_complex_predicate();
3247        let header = predicate_to_rpc_header(&original).expect("encode");
3248        assert_eq!(header.0, RPC_WHERE_HEADER);
3249
3250        // Receiver: a Vec<RpcHeader>-shaped surface, with our
3251        // `where:` header alongside others (trace context, etc.).
3252        let headers = vec![
3253            ("trace-id".to_string(), b"abc123".to_vec()),
3254            header,
3255            ("idempotency-key".to_string(), b"def456".to_vec()),
3256        ];
3257        let decoded = predicate_from_rpc_headers(&headers)
3258            .expect("header present")
3259            .expect("decode succeeds");
3260        assert_eq!(decoded, original);
3261    }
3262    #[test]
3263    fn rpc_header_missing_returns_none() {
3264        // Service that doesn't see a `net-where` header
3265        // should treat the request as unfiltered. `None` is the
3266        // signal; service defaults to "match all".
3267        let headers = vec![
3268            ("trace-id".to_string(), b"abc123".to_vec()),
3269            ("idempotency-key".to_string(), b"def456".to_vec()),
3270        ];
3271        assert!(predicate_from_rpc_headers(&headers).is_none());
3272    }
3273    #[test]
3274    fn rpc_header_empty_returns_none() {
3275        let headers: Vec<(String, Vec<u8>)> = Vec::new();
3276        assert!(predicate_from_rpc_headers(&headers).is_none());
3277    }
3278    #[test]
3279    fn rpc_header_malformed_json_returns_decode_error() {
3280        // Service receiving a `net-where` header with garbage
3281        // bytes should reject the request, not silently default to
3282        // unfiltered. Silent fallback would let an attacker / bug
3283        // return more rows than the caller intended.
3284        let headers = vec![(RPC_WHERE_HEADER.to_string(), b"not-json".to_vec())];
3285        let result = predicate_from_rpc_headers(&headers).unwrap();
3286        assert!(
3287            matches!(result, Err(PredicateRpcDecodeError::Json(_))),
3288            "expected JSON decode error; got {result:?}",
3289        );
3290    }
3291    #[test]
3292    fn rpc_header_oversize_payload_returns_decode_error() {
3293        // N-6 regression: decode path enforces the
3294        // `MAX_PREDICATE_RPC_HEADER_VALUE_LEN` cap symmetrically with
3295        // the encode path. Pre-fix `predicate_from_rpc_headers` had
3296        // no length check, so an oversize JSON blob walked through
3297        // `serde_json::from_slice` + `rebuild_predicate` with depth
3298        // bounded only by input size — a cheap parse-bomb DoS shape
3299        // if a transport cap was ever relaxed.
3300        let oversize = vec![b' '; MAX_PREDICATE_RPC_HEADER_VALUE_LEN + 1];
3301        let headers = vec![(RPC_WHERE_HEADER.to_string(), oversize)];
3302        let result = predicate_from_rpc_headers(&headers).unwrap();
3303        assert!(
3304            matches!(
3305                result,
3306                Err(PredicateRpcDecodeError::Oversize { actual, limit })
3307                    if actual == MAX_PREDICATE_RPC_HEADER_VALUE_LEN + 1
3308                       && limit == MAX_PREDICATE_RPC_HEADER_VALUE_LEN
3309            ),
3310            "expected Oversize decode error; got {result:?}",
3311        );
3312    }
3313    #[test]
3314    fn rpc_header_cycle_in_payload_returns_decode_error() {
3315        // Defensive: a wire payload with a child-index cycle
3316        // (legal JSON but structurally invalid) is rejected.
3317        let bad_wire = PredicateWire {
3318            nodes: vec![PredicateNodeWire::Not { child: 0 }],
3319            root_idx: 0,
3320        };
3321        let bad_bytes = serde_json::to_vec(&bad_wire).unwrap();
3322        let headers = vec![(RPC_WHERE_HEADER.to_string(), bad_bytes)];
3323        let result = predicate_from_rpc_headers(&headers).unwrap();
3324        assert!(
3325            matches!(
3326                result,
3327                Err(PredicateRpcDecodeError::Wire(
3328                    PredicateWireError::CycleDetected { .. }
3329                ))
3330            ),
3331            "expected wire cycle error; got {result:?}",
3332        );
3333    }
3334    #[test]
3335    fn rpc_header_first_match_wins_on_duplicate_headers() {
3336        // Per the helper's documented contract: duplicate headers
3337        // under `net-where` are not coalesced; the first
3338        // match wins. Pin so a future "merge duplicates" change
3339        // is loud.
3340        let pred_a = Predicate::Exists {
3341            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3342        };
3343        let pred_b = Predicate::MetadataEquals {
3344            key: "intent".into(),
3345            value: "ml-training".into(),
3346        };
3347        let header_a = predicate_to_rpc_header(&pred_a).unwrap();
3348        let header_b = predicate_to_rpc_header(&pred_b).unwrap();
3349        let headers = vec![header_a, header_b];
3350        let decoded = predicate_from_rpc_headers(&headers).unwrap().unwrap();
3351        assert_eq!(decoded, pred_a);
3352    }
3353    #[test]
3354    fn rpc_header_oversize_predicate_rejected_at_encode() {
3355        // A predicate that would exceed the header-value cap is
3356        // rejected by `predicate_to_rpc_header` rather than being
3357        // truncated / silently dropped. Caller decides how to
3358        // surface this (split the predicate, simplify, or fail).
3359        // Build a many-clause Or that overflows the 4 KB cap.
3360        let mut clauses = Vec::new();
3361        // ~30 chars of metadata key per clause; 200 clauses ≈ 6 KB JSON.
3362        for i in 0..200 {
3363            clauses.push(Predicate::MetadataEquals {
3364                key: format!("very-long-metadata-key-{i:04}"),
3365                value: format!("very-long-metadata-value-{i:04}"),
3366            });
3367        }
3368        let huge = Predicate::Or(clauses);
3369        let result = predicate_to_rpc_header(&huge);
3370        assert!(
3371            matches!(result, Err(PredicateRpcEncodeError::TooLarge { actual, limit })
3372                if actual > limit && limit == MAX_PREDICATE_RPC_HEADER_VALUE_LEN),
3373            "expected TooLarge; got {result:?}",
3374        );
3375    }
3376    #[test]
3377    fn rpc_header_typical_predicate_fits_well_under_cap() {
3378        // Sanity bound: a representative predicate (5 leaves +
3379        // some boolean composition) should encode well under
3380        // the 4 KB cap. This is the load-bearing case for
3381        // production use.
3382        let pred = sample_complex_predicate();
3383        let header = predicate_to_rpc_header(&pred).expect("encode");
3384        // Should be well under the cap. Loose upper bound: 1 KB.
3385        assert!(
3386            header.1.len() < 1024,
3387            "encoded predicate is {} bytes, expected < 1024",
3388            header.1.len(),
3389        );
3390    }
3391    #[test]
3392    fn rpc_header_can_be_decoded_via_borrow_or_owned_tuple() {
3393        // Pin: the `AsRpcHeader` trait accepts both `&(String, Vec<u8>)`
3394        // and `(String, Vec<u8>)` so service handlers can iterate
3395        // either an owned vec or a borrowed slice.
3396        let pred = Predicate::Exists {
3397            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3398        };
3399        let header = predicate_to_rpc_header(&pred).unwrap();
3400        let headers = vec![header];
3401
3402        // Owned slice.
3403        let decoded_owned = predicate_from_rpc_headers(&headers).unwrap().unwrap();
3404        assert_eq!(decoded_owned, pred);
3405
3406        // Borrow-collected slice.
3407        let by_ref: Vec<&(String, Vec<u8>)> = headers.iter().collect();
3408        let decoded_borrow = predicate_from_rpc_headers(&by_ref).unwrap().unwrap();
3409        assert_eq!(decoded_borrow, pred);
3410    }
3411    #[test]
3412    fn rpc_header_json_format_is_human_readable() {
3413        // Pin the wire format as JSON (not postcard) so cross-
3414        // binding fixtures and tcpdump captures are diff-able.
3415        // Phase 9b of CAPABILITY_SYSTEM_SDK_PLAN.md uses this same
3416        // shape for the `predicate_nrpc_envelope.json` fixture.
3417        let pred = Predicate::MetadataEquals {
3418            key: "intent".into(),
3419            value: "ml-training".into(),
3420        };
3421        let header = predicate_to_rpc_header(&pred).unwrap();
3422        let json = std::str::from_utf8(&header.1).expect("JSON is UTF-8");
3423        assert!(
3424            json.contains("\"kind\":\"metadata_equals\""),
3425            "unexpected JSON shape: {json}",
3426        );
3427        assert!(json.contains("\"key\":\"intent\""), "missing key: {json}");
3428        assert!(
3429            json.contains("\"value\":\"ml-training\""),
3430            "missing value: {json}",
3431        );
3432    }
3433    /// N-9 regression: `dynamic_cost` and `dynamic_cost_or` must
3434    /// saturate `usize` cardinalities to `u32::MAX` rather than
3435    /// wrapping. Pre-fix the `(cardinality as u32)` cast wrapped
3436    /// modulo `2³²`, so a cardinality of `u32::MAX + 1` divided
3437    /// `static_cost / 0.max(1) = static_cost`, while
3438    /// `u32::MAX + 2` divided by `1`, treating the most-selective
3439    /// key as if it had only 1 distinct value. Real-world trigger:
3440    /// long-running fleets with unbounded-cardinality metadata
3441    /// keys (session id, request id, anything per-call).
3442    #[test]
3443    fn dynamic_cost_saturates_huge_cardinality_to_u32_max() {
3444        struct HugeCardinality;
3445        impl crate::adapter::net::behavior::CardinalityProvider for HugeCardinality {
3446            fn axis_cardinality(&self, _key: &crate::adapter::net::behavior::tag::TagKey) -> usize {
3447                // > u32::MAX. On 64-bit hosts this wraps if cast
3448                // via `as u32`; the fix uses `u32::try_from(...)`
3449                // with a saturating fallback.
3450                (u32::MAX as usize).wrapping_add(2)
3451            }
3452            fn metadata_value_cardinality(&self, _key: &str) -> usize {
3453                (u32::MAX as usize).wrapping_add(2)
3454            }
3455        }
3456
3457        let clause = Predicate::Equals {
3458            key: TagKey::new(TaxonomyAxis::Hardware, "memory_gb"),
3459            value: "1".into(),
3460        };
3461        let dyn_cost = clause.dynamic_cost(&HugeCardinality);
3462        let static_c = clause.static_cost();
3463        // With saturation, cardinality clamps to u32::MAX so
3464        // `static_c / u32::MAX == 0` (since static_c < u32::MAX).
3465        // Pre-fix the cast wrapped to 1, giving `static_c / 1 ==
3466        // static_c` — the bug shape.
3467        assert!(
3468            dyn_cost < static_c,
3469            "dynamic_cost must reflect saturation (got {dyn_cost}, static={static_c})",
3470        );
3471
3472        // Same pin for the Or-side: `static_c.saturating_mul(u32::MAX)`
3473        // saturates to u32::MAX rather than wrapping back to a
3474        // tiny number.
3475        let or_cost = clause.dynamic_cost_or(&HugeCardinality);
3476        assert_eq!(
3477            or_cost,
3478            u32::MAX,
3479            "dynamic_cost_or must saturate to u32::MAX on huge cardinality",
3480        );
3481    }
3482    // ========================================================================
3483    // Service-side row filter ergonomics — Phase 5.B follow-on of
3484    // CAPABILITY_ENHANCEMENTS_PLAN.md.
3485    // ========================================================================
3486
3487    #[test]
3488    fn matches_capability_set_evaluates_against_caps_tags_and_metadata() {
3489        // Pin: `Predicate::matches_capability_set` is a one-line
3490        // entry point for "does this CapabilitySet match this
3491        // predicate?". Internally materializes caps.tags as a Vec
3492        // for the slice-based EvalContext.
3493        let pred = Predicate::And(vec![
3494            Predicate::Exists {
3495                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3496            },
3497            Predicate::MetadataEquals {
3498                key: "intent".into(),
3499                value: "ml-training".into(),
3500            },
3501        ]);
3502
3503        // Match: caps has both tag and metadata.
3504        let caps_match = CapabilitySet::new()
3505            .with_hardware(HardwareCapabilities::new().with_gpu(GpuInfo::new(
3506                GpuVendor::Nvidia,
3507                "h100",
3508                80,
3509            )))
3510            .with_metadata("intent", "ml-training");
3511        assert!(pred.matches_capability_set(&caps_match));
3512
3513        // Miss on the metadata side.
3514        let caps_miss_meta = CapabilitySet::new().with_hardware(
3515            HardwareCapabilities::new().with_gpu(GpuInfo::new(GpuVendor::Nvidia, "h100", 80)),
3516        );
3517        assert!(!pred.matches_capability_set(&caps_miss_meta));
3518
3519        // Miss on the tag side.
3520        let caps_miss_tag = CapabilitySet::new().with_metadata("intent", "ml-training");
3521        assert!(!pred.matches_capability_set(&caps_miss_tag));
3522
3523        // Empty caps don't match.
3524        assert!(!pred.matches_capability_set(&CapabilitySet::default()));
3525    }
3526    /// Application row type used to exercise `RpcPredicateContext`
3527    /// and `filter_by_predicate`. Mirrors what a service
3528    /// handler's row would look like.
3529    struct TestJob {
3530        id: u64,
3531        tags: Vec<Tag>,
3532        metadata: BTreeMap<String, String>,
3533    }
3534    impl RpcPredicateContext for TestJob {
3535        fn rpc_predicate_tags(&self) -> &[Tag] {
3536            &self.tags
3537        }
3538        fn rpc_predicate_metadata(&self) -> &BTreeMap<String, String> {
3539            &self.metadata
3540        }
3541    }
3542    #[test]
3543    fn filter_by_predicate_returns_all_rows_when_predicate_is_none() {
3544        // Pin: `pred = None` is the no-filter case (request didn't
3545        // include `net-where`). Every row passes through.
3546        let jobs = vec![
3547            TestJob {
3548                id: 1,
3549                tags: vec![],
3550                metadata: BTreeMap::new(),
3551            },
3552            TestJob {
3553                id: 2,
3554                tags: vec![axis_present(TaxonomyAxis::Hardware, "gpu")],
3555                metadata: BTreeMap::new(),
3556            },
3557        ];
3558        let filtered: Vec<u64> = filter_by_predicate(jobs, None).map(|j| j.id).collect();
3559        assert_eq!(filtered, vec![1, 2]);
3560    }
3561    #[test]
3562    fn filter_by_predicate_keeps_only_matching_rows() {
3563        // Pin: with a predicate set, only rows whose tags +
3564        // metadata satisfy it survive the filter.
3565        let pred = Predicate::Exists {
3566            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3567        };
3568        let jobs = vec![
3569            TestJob {
3570                id: 1,
3571                tags: vec![],
3572                metadata: BTreeMap::new(),
3573            },
3574            TestJob {
3575                id: 2,
3576                tags: vec![axis_present(TaxonomyAxis::Hardware, "gpu")],
3577                metadata: BTreeMap::new(),
3578            },
3579            TestJob {
3580                id: 3,
3581                tags: vec![axis_eq(TaxonomyAxis::Hardware, "gpu.vendor", "nvidia")],
3582                metadata: BTreeMap::new(),
3583            },
3584            TestJob {
3585                id: 4,
3586                tags: vec![
3587                    axis_present(TaxonomyAxis::Hardware, "gpu"),
3588                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "64"),
3589                ],
3590                metadata: BTreeMap::new(),
3591            },
3592        ];
3593        let filtered: Vec<u64> = filter_by_predicate(jobs, Some(&pred))
3594            .map(|j| j.id)
3595            .collect();
3596        // Only ids 2 and 4 have the gpu presence tag.
3597        assert_eq!(filtered, vec![2, 4]);
3598    }
3599    #[test]
3600    fn filter_by_predicate_combined_axis_and_metadata_clauses() {
3601        // Pin: predicates with both axis-tag AND metadata clauses
3602        // work end-to-end through the filter helper. Mirrors the
3603        // canonical "where: gpu AND intent = ml-training" use case.
3604        let pred = Predicate::And(vec![
3605            Predicate::Exists {
3606                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3607            },
3608            Predicate::MetadataEquals {
3609                key: "intent".into(),
3610                value: "ml-training".into(),
3611            },
3612        ]);
3613        let jobs = vec![
3614            TestJob {
3615                id: 1,
3616                tags: vec![axis_present(TaxonomyAxis::Hardware, "gpu")],
3617                metadata: meta_with(&[("intent", "embedding-cache")]),
3618            },
3619            TestJob {
3620                id: 2,
3621                tags: vec![axis_present(TaxonomyAxis::Hardware, "gpu")],
3622                metadata: meta_with(&[("intent", "ml-training")]),
3623            },
3624            TestJob {
3625                id: 3,
3626                tags: vec![],
3627                metadata: meta_with(&[("intent", "ml-training")]),
3628            },
3629        ];
3630        let filtered: Vec<u64> = filter_by_predicate(jobs, Some(&pred))
3631            .map(|j| j.id)
3632            .collect();
3633        // Only id 2 has both gpu AND intent=ml-training.
3634        assert_eq!(filtered, vec![2]);
3635    }
3636    #[test]
3637    fn filter_by_predicate_empty_input_yields_empty_iterator() {
3638        let pred = Predicate::Exists {
3639            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3640        };
3641        let jobs: Vec<TestJob> = Vec::new();
3642        let filtered: Vec<u64> = filter_by_predicate(jobs, Some(&pred))
3643            .map(|j| j.id)
3644            .collect();
3645        assert!(filtered.is_empty());
3646    }
3647    #[test]
3648    fn end_to_end_predicate_pushdown_flow() {
3649        // Pin the canonical Phase 5.B usage: client builds a
3650        // predicate, encodes to an RPC header, server decodes and
3651        // filters its row stream. This is the load-bearing
3652        // workflow Phase 5.B exists for.
3653
3654        // Client side: build predicate, encode.
3655        let pred = Predicate::And(vec![
3656            Predicate::Exists {
3657                key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3658            },
3659            Predicate::NumericAtLeast {
3660                key: TagKey::new(TaxonomyAxis::Hardware, "memory_gb"),
3661                threshold: 32.0,
3662            },
3663        ]);
3664        let encoded = predicate_to_rpc_header(&pred).expect("encode");
3665
3666        // Server side: receive request with this header alongside
3667        // standard tracing/idempotency keys. Decode the predicate.
3668        let request_headers = vec![
3669            ("trace-id".to_string(), b"abc123".to_vec()),
3670            encoded,
3671            ("idempotency-key".to_string(), b"def456".to_vec()),
3672        ];
3673        let decoded_pred = predicate_from_rpc_headers(&request_headers)
3674            .expect("header present")
3675            .expect("decode");
3676
3677        // Server side: filter the row stream.
3678        let jobs = vec![
3679            TestJob {
3680                id: 1, // No GPU.
3681                tags: vec![axis_eq(TaxonomyAxis::Hardware, "memory_gb", "64")],
3682                metadata: BTreeMap::new(),
3683            },
3684            TestJob {
3685                id: 2, // GPU + 32 GB → matches.
3686                tags: vec![
3687                    axis_present(TaxonomyAxis::Hardware, "gpu"),
3688                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "32"),
3689                ],
3690                metadata: BTreeMap::new(),
3691            },
3692            TestJob {
3693                id: 3, // GPU + 16 GB → too little memory.
3694                tags: vec![
3695                    axis_present(TaxonomyAxis::Hardware, "gpu"),
3696                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "16"),
3697                ],
3698                metadata: BTreeMap::new(),
3699            },
3700            TestJob {
3701                id: 4, // GPU + 65 GB → matches.
3702                tags: vec![
3703                    axis_present(TaxonomyAxis::Hardware, "gpu"),
3704                    axis_eq(TaxonomyAxis::Hardware, "memory_gb", "64"),
3705                ],
3706                metadata: BTreeMap::new(),
3707            },
3708        ];
3709        let matched: Vec<u64> = filter_by_predicate(jobs, Some(&decoded_pred))
3710            .map(|j| j.id)
3711            .collect();
3712        assert_eq!(matched, vec![2, 4]);
3713    }
3714    #[test]
3715    fn to_wire_handles_deep_nesting_without_stack_overflow() {
3716        // Regression: the prior recursive append_to_wire would
3717        // blow the thread stack on caller-controlled deeply
3718        // nested predicates. The iterative version uses a
3719        // heap-allocated work stack — depth-unbounded.
3720        //
3721        // Build a 10_000-deep `Not(Not(...Exists))` chain;
3722        // confirm to_wire produces 10_001 nodes (10k Not + 1
3723        // leaf) and that the root index is the topmost Not.
3724        // (The mirror rebuild path `into_predicate` is still
3725        // recursive — out of scope for this fix; the FFI parser
3726        // caps depth at 64, and SDK consumers build trees via
3727        // typed factories where recursion-driven depth is a
3728        // developer-controlled property.)
3729        let leaf = Predicate::Exists {
3730            key: TagKey::new(TaxonomyAxis::Hardware, "gpu"),
3731        };
3732        let mut p = leaf;
3733        for _ in 0..10_000 {
3734            p = Predicate::Not(Box::new(p));
3735        }
3736        let wire = p.to_wire();
3737        assert_eq!(wire.nodes.len(), 10_001);
3738        let root = &wire.nodes[wire.root_idx as usize];
3739        assert!(matches!(root, PredicateNodeWire::Not { .. }));
3740    }
3741    #[test]
3742    fn to_wire_preserves_left_to_right_child_ordering() {
3743        // The iterative walk pushes children in reverse so the
3744        // leftmost child is popped first; pin the output to
3745        // catch any regression that flips order.
3746        let p = Predicate::And(vec![
3747            Predicate::Exists {
3748                key: TagKey::new(TaxonomyAxis::Hardware, "a"),
3749            },
3750            Predicate::Exists {
3751                key: TagKey::new(TaxonomyAxis::Hardware, "b"),
3752            },
3753            Predicate::Exists {
3754                key: TagKey::new(TaxonomyAxis::Hardware, "c"),
3755            },
3756        ]);
3757        let wire = p.to_wire();
3758        // 3 leaves + 1 And = 4 nodes.
3759        assert_eq!(wire.nodes.len(), 4);
3760        // Root is the And.
3761        let PredicateNodeWire::And { children } = &wire.nodes[wire.root_idx as usize] else {
3762            panic!("root should be And");
3763        };
3764        // Children should reference leaves at indices [0,1,2]
3765        // — emitted in input order.
3766        assert_eq!(children.as_slice(), &[0u32, 1, 2]);
3767        // And each leaf should match the expected key.
3768        for (i, key) in ["a", "b", "c"].iter().enumerate() {
3769            let PredicateNodeWire::Exists { key: k } = &wire.nodes[i] else {
3770                panic!("expected Exists at index {i}");
3771            };
3772            assert_eq!(k.key.as_str(), *key);
3773        }
3774    }
3775}