Skip to main content

harn_vm/flow/predicates/
executor.rs

1//! Budgeted executor for Flow invariant predicates.
2//!
3//! The executor is intentionally small: callers provide predicate runners, and
4//! this module owns scheduling, per-kind budgets, semantic cheap-judge limits,
5//! and deterministic replay drift detection.
6
7use std::cell::{Cell, RefCell};
8use std::collections::BTreeMap;
9use std::rc::Rc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use async_trait::async_trait;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use sha2::{Digest, Sha256};
18use tokio::sync::Semaphore;
19
20use super::compose::verdict_strictness;
21use crate::flow::{InvariantBlockError, InvariantResult, PredicateHash, Slice};
22
23const DEFAULT_DETERMINISTIC_BUDGET: Duration = Duration::from_millis(50);
24const DEFAULT_SEMANTIC_BUDGET: Duration = Duration::from_secs(2);
25const DEFAULT_SEMANTIC_TOKEN_CAP: u64 = 1024;
26const DEFAULT_MAX_DETERMINISTIC_LANES: usize = 16;
27const DEFAULT_MAX_SEMANTIC_LANES: usize = 2;
28const DEFAULT_MAX_DETERMINISTIC_LANES_PER_SLICE: usize = usize::MAX;
29const DEFAULT_MAX_SEMANTIC_LANES_PER_SLICE: usize = 1;
30const DEFAULT_SLICE_DETERMINISTIC_ENVELOPE: Duration = Duration::from_secs(5);
31const DEFAULT_SLICE_SEMANTIC_ENVELOPE: Duration = Duration::from_secs(20);
32
33/// Predicate execution mode declared by predicate author annotations.
34#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub enum PredicateKind {
37    /// Pure Harn predicate. No shell, network, LLM, or host side effects.
38    Deterministic,
39    /// Semantic predicate. May make one `cheap_judge` call over pre-baked
40    /// evidence within the semantic wall-clock and token budgets.
41    Semantic,
42}
43
44/// Request passed to the cheap semantic judge.
45#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
46pub struct CheapJudgeRequest {
47    pub prompt: String,
48    pub evidence_key: String,
49    pub evidence: String,
50}
51
52/// Response returned by the cheap semantic judge.
53#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
54pub struct CheapJudgeResponse {
55    pub passes: bool,
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub reason: Option<String>,
58    #[serde(default)]
59    pub input_tokens: u64,
60    #[serde(default)]
61    pub output_tokens: u64,
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub provider_id: Option<String>,
64    #[serde(default, skip_serializing_if = "Option::is_none")]
65    pub model_id: Option<String>,
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub cheap_judge_version: Option<String>,
68}
69
70/// Replay-audit metadata for a semantic predicate's judge call.
71#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
72pub struct SemanticReplayAuditMetadata {
73    #[serde(default, skip_serializing_if = "Option::is_none")]
74    pub provider_id: Option<String>,
75    #[serde(default, skip_serializing_if = "Option::is_none")]
76    pub model_id: Option<String>,
77    pub prompt_hash: String,
78    pub evidence_hashes: BTreeMap<String, String>,
79    pub token_cap: u64,
80    #[serde(default, skip_serializing_if = "Option::is_none")]
81    pub cheap_judge_version: Option<String>,
82}
83
84/// Host-provided adapter for semantic predicate judging.
85#[async_trait(?Send)]
86pub trait CheapJudge {
87    async fn cheap_judge(
88        &self,
89        request: CheapJudgeRequest,
90    ) -> Result<CheapJudgeResponse, InvariantBlockError>;
91}
92
93/// Predicate runner supplied by a collector or Harn adapter.
94#[async_trait(?Send)]
95pub trait PredicateRunner {
96    fn hash(&self) -> PredicateHash;
97    fn kind(&self) -> PredicateKind;
98    fn fallback_hash(&self) -> Option<PredicateHash> {
99        None
100    }
101
102    /// Static evidence captured when the predicate was authored. Semantic
103    /// predicates may only judge over this map; the executor never fetches
104    /// evidence during evaluation.
105    fn evidence(&self) -> BTreeMap<String, String> {
106        BTreeMap::new()
107    }
108
109    async fn evaluate(&self, context: PredicateContext) -> InvariantResult;
110}
111
112/// Runtime context made available to a predicate invocation.
113#[derive(Clone)]
114pub struct PredicateContext {
115    inner: Rc<PredicateContextInner>,
116}
117
118struct PredicateContextInner {
119    slice: Rc<Slice>,
120    kind: PredicateKind,
121    evidence: BTreeMap<String, String>,
122    cheap_judge: Option<Rc<dyn CheapJudge>>,
123    semantic_token_cap: u64,
124    cancel_token: Arc<AtomicBool>,
125    judge_state: RefCell<JudgeBudgetState>,
126}
127
128#[derive(Default)]
129struct JudgeBudgetState {
130    calls: u64,
131    tokens: u64,
132    block_error: Option<InvariantBlockError>,
133    semantic_audit: Option<SemanticReplayAuditMetadata>,
134}
135
136impl PredicateContext {
137    fn new(
138        slice: Rc<Slice>,
139        kind: PredicateKind,
140        evidence: BTreeMap<String, String>,
141        cheap_judge: Option<Rc<dyn CheapJudge>>,
142        semantic_token_cap: u64,
143        cancel_token: Arc<AtomicBool>,
144    ) -> Self {
145        Self {
146            inner: Rc::new(PredicateContextInner {
147                slice,
148                kind,
149                evidence,
150                cheap_judge,
151                semantic_token_cap,
152                cancel_token,
153                judge_state: RefCell::new(JudgeBudgetState::default()),
154            }),
155        }
156    }
157
158    pub fn slice(&self) -> &Slice {
159        &self.inner.slice
160    }
161
162    pub fn kind(&self) -> PredicateKind {
163        self.inner.kind
164    }
165
166    pub fn evidence(&self, key: &str) -> Option<&str> {
167        self.inner.evidence.get(key).map(String::as_str)
168    }
169
170    pub fn is_cancelled(&self) -> bool {
171        self.inner.cancel_token.load(Ordering::SeqCst)
172    }
173
174    /// Invoke the semantic cheap judge with executor-enforced limits.
175    ///
176    /// Deterministic predicates cannot call this method. Semantic predicates
177    /// get one call per predicate evaluation attempt, and the request must
178    /// refer to evidence already present in the predicate's evidence map.
179    pub async fn cheap_judge(
180        &self,
181        prompt: impl Into<String>,
182        evidence_key: impl Into<String>,
183    ) -> Result<CheapJudgeResponse, InvariantBlockError> {
184        if self.inner.kind != PredicateKind::Semantic {
185            return Err(self.record_block(InvariantBlockError::new(
186                "side_effect_denied",
187                "deterministic predicates cannot invoke cheap_judge",
188            )));
189        }
190
191        let prompt = prompt.into();
192        let evidence_key = evidence_key.into();
193        let Some(evidence) = self.inner.evidence.get(&evidence_key).cloned() else {
194            return Err(self.record_block(InvariantBlockError::new(
195                "evidence_missing",
196                format!(
197                    "semantic predicate requested evidence key '{evidence_key}' that was not pre-baked"
198                ),
199            )));
200        };
201
202        let estimated_tokens = estimate_tokens(&prompt).saturating_add(estimate_tokens(&evidence));
203        {
204            let mut state = self.inner.judge_state.borrow_mut();
205            if state.calls >= 1 {
206                let error = InvariantBlockError::budget_exceeded(
207                    "semantic predicate exceeded one cheap_judge call",
208                );
209                state.block_error = Some(error.clone());
210                return Err(error);
211            }
212            if state.tokens.saturating_add(estimated_tokens) > self.inner.semantic_token_cap {
213                let error = InvariantBlockError::budget_exceeded(format!(
214                    "semantic predicate cheap_judge request exceeds token cap {}",
215                    self.inner.semantic_token_cap
216                ));
217                state.block_error = Some(error.clone());
218                return Err(error);
219            }
220            state.calls += 1;
221            state.tokens = state.tokens.saturating_add(estimated_tokens);
222        }
223
224        let Some(judge) = self.inner.cheap_judge.clone() else {
225            return Err(self.record_block(InvariantBlockError::new(
226                "llm_unavailable",
227                "semantic predicate cheap_judge was requested but no judge is installed",
228            )));
229        };
230
231        let response = match judge
232            .cheap_judge(CheapJudgeRequest {
233                prompt: prompt.clone(),
234                evidence_key: evidence_key.clone(),
235                evidence: evidence.clone(),
236            })
237            .await
238        {
239            Ok(response) => response,
240            Err(error) => return Err(self.record_block(error)),
241        };
242        {
243            let mut state = self.inner.judge_state.borrow_mut();
244            state.semantic_audit = Some(SemanticReplayAuditMetadata {
245                provider_id: response.provider_id.clone(),
246                model_id: response.model_id.clone(),
247                prompt_hash: stable_hash(prompt.as_bytes()),
248                evidence_hashes: self
249                    .inner
250                    .evidence
251                    .iter()
252                    .map(|(key, value)| (key.clone(), stable_hash(value.as_bytes())))
253                    .collect(),
254                token_cap: self.inner.semantic_token_cap,
255                cheap_judge_version: response.cheap_judge_version.clone(),
256            });
257        }
258        let response_tokens = response.input_tokens.saturating_add(response.output_tokens);
259        {
260            let mut state = self.inner.judge_state.borrow_mut();
261            state.tokens = state.tokens.saturating_add(response_tokens);
262            if state.tokens > self.inner.semantic_token_cap {
263                let error = InvariantBlockError::budget_exceeded(format!(
264                    "semantic predicate cheap_judge response exceeded token cap {}",
265                    self.inner.semantic_token_cap
266                ));
267                state.block_error = Some(error.clone());
268                return Err(error);
269            }
270        }
271        Ok(response)
272    }
273
274    fn cancel(&self) {
275        self.inner.cancel_token.store(true, Ordering::SeqCst);
276    }
277
278    fn block_error(&self) -> Option<InvariantBlockError> {
279        self.inner.judge_state.borrow().block_error.clone()
280    }
281
282    fn semantic_audit(&self) -> Option<SemanticReplayAuditMetadata> {
283        self.inner.judge_state.borrow().semantic_audit.clone()
284    }
285
286    fn record_block(&self, error: InvariantBlockError) -> InvariantBlockError {
287        self.inner.judge_state.borrow_mut().block_error = Some(error.clone());
288        error
289    }
290}
291
292/// Cross-slice scheduling configuration.
293///
294/// The fairness scheduler enforces three orthogonal limits on top of the
295/// existing per-predicate hard timeouts:
296///
297/// 1. **Global lane caps** (`max_deterministic_lanes`, `max_semantic_lanes`)
298///    bound how many predicates of each kind may run concurrently across all
299///    queued slices. Deterministic and semantic lanes are independent so
300///    deterministic work continues to make progress while semantic predicates
301///    wait for cheap-judge slots.
302/// 2. **Per-slice lane caps**
303///    (`max_deterministic_lanes_per_slice`, `max_semantic_lanes_per_slice`)
304///    keep one slice from monopolizing all global lanes. With the default
305///    semantic-per-slice cap of 1, two slices each holding a permit alternate
306///    fairly through the global semantic semaphore (FIFO under tokio).
307/// 3. **Aggregate per-slice envelopes**
308///    (`slice_deterministic_envelope`, `slice_semantic_envelope`) bound the
309///    total wall-clock spent on each kind within one slice. Once exhausted,
310///    every remaining predicate of that kind for that slice short-circuits to
311///    a structured `budget_exceeded` block — never a panic and never an
312///    implicit approval.
313#[derive(Clone, Debug)]
314pub struct PredicateSchedulerConfig {
315    /// Global cap on concurrent deterministic predicate evaluations across
316    /// all slices.
317    pub max_deterministic_lanes: usize,
318    /// Global cap on concurrent semantic predicate evaluations across all
319    /// slices.
320    pub max_semantic_lanes: usize,
321    /// Per-slice cap on concurrent deterministic predicate evaluations.
322    pub max_deterministic_lanes_per_slice: usize,
323    /// Per-slice cap on concurrent semantic predicate evaluations. Keep this
324    /// strictly less than `max_semantic_lanes` to enforce fairness when more
325    /// than one slice is queued.
326    pub max_semantic_lanes_per_slice: usize,
327    /// Aggregate wall-clock envelope summed across all deterministic predicate
328    /// attempts for one slice. Both the first attempt and the replay attempt
329    /// of a deterministic predicate count against this envelope. Setting this
330    /// to `Duration::ZERO` disables aggregate enforcement and falls back to
331    /// the per-predicate hard timeout alone.
332    pub slice_deterministic_envelope: Duration,
333    /// Aggregate wall-clock envelope summed across all semantic predicate
334    /// attempts for one slice. `Duration::ZERO` disables aggregate
335    /// enforcement.
336    pub slice_semantic_envelope: Duration,
337}
338
339impl Default for PredicateSchedulerConfig {
340    fn default() -> Self {
341        Self {
342            max_deterministic_lanes: DEFAULT_MAX_DETERMINISTIC_LANES,
343            max_semantic_lanes: DEFAULT_MAX_SEMANTIC_LANES,
344            max_deterministic_lanes_per_slice: DEFAULT_MAX_DETERMINISTIC_LANES_PER_SLICE,
345            max_semantic_lanes_per_slice: DEFAULT_MAX_SEMANTIC_LANES_PER_SLICE,
346            slice_deterministic_envelope: DEFAULT_SLICE_DETERMINISTIC_ENVELOPE,
347            slice_semantic_envelope: DEFAULT_SLICE_SEMANTIC_ENVELOPE,
348        }
349    }
350}
351
352/// Executor configuration.
353#[derive(Clone, Debug)]
354pub struct PredicateExecutorConfig {
355    pub deterministic_budget: Duration,
356    pub semantic_budget: Duration,
357    pub semantic_token_cap: u64,
358    /// Cross-slice fairness and aggregate-envelope scheduling knobs.
359    pub scheduler: PredicateSchedulerConfig,
360}
361
362impl Default for PredicateExecutorConfig {
363    fn default() -> Self {
364        Self {
365            deterministic_budget: DEFAULT_DETERMINISTIC_BUDGET,
366            semantic_budget: DEFAULT_SEMANTIC_BUDGET,
367            semantic_token_cap: DEFAULT_SEMANTIC_TOKEN_CAP,
368            scheduler: PredicateSchedulerConfig::default(),
369        }
370    }
371}
372
373/// Budgeted predicate executor.
374#[derive(Clone)]
375pub struct PredicateExecutor {
376    config: PredicateExecutorConfig,
377    cheap_judge: Option<Rc<dyn CheapJudge>>,
378}
379
380impl PredicateExecutor {
381    pub fn new(config: PredicateExecutorConfig) -> Self {
382        Self {
383            config,
384            cheap_judge: None,
385        }
386    }
387
388    pub fn with_cheap_judge(
389        config: PredicateExecutorConfig,
390        cheap_judge: Rc<dyn CheapJudge>,
391    ) -> Self {
392        Self {
393            config,
394            cheap_judge: Some(cheap_judge),
395        }
396    }
397
398    /// Evaluate predicates for a single slice using the scheduler defaults.
399    ///
400    /// This is shorthand for [`execute_slices`](Self::execute_slices) with a
401    /// single-element queue. Even one slice still flows through the cross-slice
402    /// scheduler so the aggregate per-slice envelope and lane caps apply.
403    pub async fn execute_slice(
404        &self,
405        slice: &Slice,
406        predicates: &[Rc<dyn PredicateRunner>],
407    ) -> PredicateExecutionReport {
408        let mut reports = self
409            .execute_slices(vec![(slice.clone(), predicates.to_vec())])
410            .await;
411        reports.pop().unwrap_or(PredicateExecutionReport {
412            records: Vec::new(),
413        })
414    }
415
416    /// Evaluate predicates for several candidate slices fairly.
417    ///
418    /// All slices share the global deterministic and semantic lane semaphores
419    /// configured in [`PredicateSchedulerConfig`]. Each slice additionally
420    /// owns its own per-slice lane semaphores (so one slice cannot occupy all
421    /// global semantic lanes), and each slice independently tracks aggregate
422    /// wall-clock against the per-kind envelopes. Once a slice exhausts an
423    /// envelope, all remaining predicates of that kind for that slice
424    /// short-circuit to a `budget_exceeded` `Block`.
425    ///
426    /// Output preserves input slice order. Within each slice, records are
427    /// sorted by predicate hash so reports are deterministic regardless of
428    /// the order predicates finished.
429    pub async fn execute_slices(
430        &self,
431        slices: Vec<(Slice, Vec<Rc<dyn PredicateRunner>>)>,
432    ) -> Vec<PredicateExecutionReport> {
433        let scheduler = &self.config.scheduler;
434        let det_global = Arc::new(Semaphore::new(clamp_permits(
435            scheduler.max_deterministic_lanes,
436        )));
437        let sem_global = Arc::new(Semaphore::new(clamp_permits(scheduler.max_semantic_lanes)));
438
439        let slice_futures = slices
440            .into_iter()
441            .map(|(slice, predicates)| {
442                let det_global = det_global.clone();
443                let sem_global = sem_global.clone();
444                async move {
445                    self.execute_slice_inner(slice, predicates, det_global, sem_global)
446                        .await
447                }
448            })
449            .collect::<Vec<_>>();
450
451        futures::future::join_all(slice_futures).await
452    }
453
454    async fn execute_slice_inner(
455        &self,
456        slice: Slice,
457        predicates: Vec<Rc<dyn PredicateRunner>>,
458        det_global: Arc<Semaphore>,
459        sem_global: Arc<Semaphore>,
460    ) -> PredicateExecutionReport {
461        let scheduler = &self.config.scheduler;
462        let slice_rc = Rc::new(slice);
463        let lanes = SliceLanes::new(
464            det_global,
465            sem_global,
466            scheduler.max_deterministic_lanes_per_slice,
467            scheduler.max_semantic_lanes_per_slice,
468        );
469        let envelope = SliceEnvelope::new(
470            scheduler.slice_deterministic_envelope,
471            scheduler.slice_semantic_envelope,
472        );
473
474        // The lane semaphores enforce real concurrency. `buffer_unordered`
475        // just decides how many futures we polled-but-not-yet-completed at
476        // once; cap it at the actual predicate count so we don't allocate
477        // internal slots for predicates that don't exist.
478        let buffer = predicates.len().max(1);
479
480        let mut records = futures::stream::iter(predicates)
481            .map(|runner| {
482                let slice = slice_rc.clone();
483                let lanes = lanes.clone();
484                let envelope = envelope.clone();
485                async move { self.execute_one(slice, runner, lanes, envelope).await }
486            })
487            .buffer_unordered(buffer)
488            .collect::<Vec<_>>()
489            .await;
490
491        self.apply_semantic_fallbacks(&mut records);
492        records.sort_by(|left, right| left.predicate_hash.cmp(&right.predicate_hash));
493        PredicateExecutionReport { records }
494    }
495
496    async fn execute_one(
497        &self,
498        slice: Rc<Slice>,
499        runner: Rc<dyn PredicateRunner>,
500        lanes: SliceLanes,
501        envelope: SliceEnvelope,
502    ) -> PredicateExecutionRecord {
503        let started = Instant::now();
504        let predicate_hash = runner.hash();
505        let kind = runner.kind();
506        let first = self
507            .run_attempt(slice.clone(), runner.as_ref(), &lanes, &envelope)
508            .await;
509        let first_hash = hash_result(&first.result);
510        let mut result = first.result;
511        let mut attempts = 1;
512        let mut second_hash = None;
513        let semantic_replay_audit = first.semantic_audit;
514
515        if kind == PredicateKind::Deterministic && !result.is_blocking() {
516            let second = self
517                .run_attempt(slice, runner.as_ref(), &lanes, &envelope)
518                .await;
519            attempts = 2;
520            let replay_hash = hash_result(&second.result);
521            second_hash = replay_hash.clone();
522            if second.result.is_blocking() {
523                result = second.result;
524            } else {
525                match (first_hash.as_ref(), replay_hash.as_ref()) {
526                    (Some(left), Some(right)) if left == right => {}
527                    (Some(left), Some(right)) => {
528                        result = InvariantResult::block(InvariantBlockError::nondeterministic_drift(
529                            format!(
530                                "deterministic predicate result drifted across replay: {left} != {right}"
531                            ),
532                        ));
533                    }
534                    _ => {
535                        result = InvariantResult::block(InvariantBlockError::new(
536                            "result_hash_failed",
537                            "failed to hash deterministic predicate replay result",
538                        ));
539                    }
540                }
541            }
542        }
543
544        PredicateExecutionRecord {
545            predicate_hash,
546            kind,
547            fallback_hash: runner.fallback_hash(),
548            result,
549            elapsed_ms: started.elapsed().as_millis() as u64,
550            attempts,
551            replayable: kind == PredicateKind::Deterministic,
552            first_result_hash: first_hash,
553            second_result_hash: second_hash,
554            semantic_replay_audit,
555        }
556    }
557
558    async fn run_attempt(
559        &self,
560        slice: Rc<Slice>,
561        runner: &dyn PredicateRunner,
562        lanes: &SliceLanes,
563        envelope: &SliceEnvelope,
564    ) -> PredicateAttempt {
565        let kind = runner.kind();
566        let timeout = match kind {
567            PredicateKind::Deterministic => self.config.deterministic_budget,
568            PredicateKind::Semantic => self.config.semantic_budget,
569        };
570
571        if let Some(attempt) =
572            envelope_exhausted_attempt(envelope, kind, "before this predicate started")
573        {
574            return attempt;
575        }
576
577        let _permits = lanes.acquire(kind).await;
578
579        // Re-check after acquiring the lane: the queue may have been long
580        // enough that the slice's envelope drained while we waited.
581        if let Some(attempt) =
582            envelope_exhausted_attempt(envelope, kind, "while waiting for a lane")
583        {
584            return attempt;
585        }
586
587        let context = PredicateContext::new(
588            slice,
589            kind,
590            runner.evidence(),
591            self.cheap_judge.clone(),
592            self.config.semantic_token_cap,
593            Arc::new(AtomicBool::new(false)),
594        );
595        let started = Instant::now();
596        let attempt = match tokio::time::timeout(timeout, runner.evaluate(context.clone())).await {
597            Ok(result) => PredicateAttempt {
598                result: context
599                    .block_error()
600                    .map(InvariantResult::block)
601                    .unwrap_or(result),
602                semantic_audit: context.semantic_audit(),
603            },
604            Err(_) => {
605                context.cancel();
606                PredicateAttempt {
607                    result: InvariantResult::block(InvariantBlockError::budget_exceeded(format!(
608                        "{kind:?} predicate exceeded {}ms budget",
609                        timeout.as_millis()
610                    ))),
611                    semantic_audit: context.semantic_audit(),
612                }
613            }
614        };
615        envelope.charge(kind, started.elapsed());
616        attempt
617    }
618
619    fn apply_semantic_fallbacks(&self, records: &mut [PredicateExecutionRecord]) {
620        let by_hash = records
621            .iter()
622            .map(|record| {
623                (
624                    record.predicate_hash.clone(),
625                    (record.kind, record.result.clone()),
626                )
627            })
628            .collect::<BTreeMap<_, _>>();
629
630        for record in records {
631            if record.kind != PredicateKind::Semantic {
632                continue;
633            }
634            let Some(fallback_hash) = record.fallback_hash.as_ref() else {
635                record.result = InvariantResult::block(InvariantBlockError::new(
636                    "fallback_missing",
637                    "semantic predicate did not declare a deterministic fallback",
638                ));
639                continue;
640            };
641            let Some((fallback_kind, fallback_result)) = by_hash.get(fallback_hash) else {
642                record.result = InvariantResult::block(InvariantBlockError::new(
643                    "fallback_missing",
644                    format!(
645                        "semantic predicate fallback {} was not evaluated",
646                        fallback_hash.as_str()
647                    ),
648                ));
649                continue;
650            };
651            if *fallback_kind != PredicateKind::Deterministic {
652                record.result = InvariantResult::block(InvariantBlockError::new(
653                    "fallback_not_deterministic",
654                    format!(
655                        "semantic predicate fallback {} is not deterministic",
656                        fallback_hash.as_str()
657                    ),
658                ));
659                continue;
660            }
661            record.result = stricter_result(&record.result, fallback_result);
662        }
663    }
664}
665
666impl Default for PredicateExecutor {
667    fn default() -> Self {
668        Self::new(PredicateExecutorConfig::default())
669    }
670}
671
672/// Per-predicate execution metadata.
673#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
674pub struct PredicateExecutionRecord {
675    pub predicate_hash: PredicateHash,
676    pub kind: PredicateKind,
677    #[serde(default, skip_serializing_if = "Option::is_none")]
678    pub fallback_hash: Option<PredicateHash>,
679    pub result: InvariantResult,
680    pub elapsed_ms: u64,
681    pub attempts: u8,
682    pub replayable: bool,
683    #[serde(default, skip_serializing_if = "Option::is_none")]
684    pub first_result_hash: Option<String>,
685    #[serde(default, skip_serializing_if = "Option::is_none")]
686    pub second_result_hash: Option<String>,
687    #[serde(default, skip_serializing_if = "Option::is_none")]
688    pub semantic_replay_audit: Option<SemanticReplayAuditMetadata>,
689}
690
691/// Complete result of executing all predicates for one slice.
692#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
693pub struct PredicateExecutionReport {
694    pub records: Vec<PredicateExecutionRecord>,
695}
696
697impl PredicateExecutionReport {
698    pub fn invariants_applied(&self) -> Vec<(PredicateHash, InvariantResult)> {
699        self.records
700            .iter()
701            .map(|record| (record.predicate_hash.clone(), record.result.clone()))
702            .collect()
703    }
704}
705
706fn hash_result(result: &InvariantResult) -> Option<String> {
707    let bytes = serde_json::to_vec(result).ok()?;
708    Some(hex::encode(Sha256::digest(bytes)))
709}
710
711fn stable_hash(bytes: &[u8]) -> String {
712    format!("sha256:{}", hex::encode(Sha256::digest(bytes)))
713}
714
715fn stricter_result(left: &InvariantResult, right: &InvariantResult) -> InvariantResult {
716    if verdict_strictness(&left.verdict) >= verdict_strictness(&right.verdict) {
717        left.clone()
718    } else {
719        right.clone()
720    }
721}
722
723fn estimate_tokens(value: &str) -> u64 {
724    value.split_whitespace().count().max(1) as u64
725}
726
727fn clamp_permits(value: usize) -> usize {
728    value.clamp(1, Semaphore::MAX_PERMITS)
729}
730
731/// Short-circuit return for predicates whose slice envelope is already
732/// exhausted. Returns `None` when the envelope still has headroom (or is
733/// disabled by a zero budget).
734fn envelope_exhausted_attempt(
735    envelope: &SliceEnvelope,
736    kind: PredicateKind,
737    when: &str,
738) -> Option<PredicateAttempt> {
739    let remaining = envelope.remaining(kind)?;
740    if !remaining.is_zero() {
741        return None;
742    }
743    Some(PredicateAttempt {
744        result: InvariantResult::block(InvariantBlockError::budget_exceeded(format!(
745            "slice {kind:?} envelope exhausted {when}"
746        ))),
747        semantic_audit: None,
748    })
749}
750
751struct PredicateAttempt {
752    result: InvariantResult,
753    semantic_audit: Option<SemanticReplayAuditMetadata>,
754}
755
756/// Per-slice lane semaphores. Pairs a global (cross-slice) semaphore with a
757/// per-slice semaphore so a single slice cannot occupy every global lane.
758#[derive(Clone)]
759struct SliceLanes {
760    deterministic_global: Arc<Semaphore>,
761    deterministic_local: Arc<Semaphore>,
762    semantic_global: Arc<Semaphore>,
763    semantic_local: Arc<Semaphore>,
764}
765
766impl SliceLanes {
767    fn new(
768        deterministic_global: Arc<Semaphore>,
769        semantic_global: Arc<Semaphore>,
770        deterministic_per_slice: usize,
771        semantic_per_slice: usize,
772    ) -> Self {
773        Self {
774            deterministic_global,
775            deterministic_local: Arc::new(Semaphore::new(clamp_permits(deterministic_per_slice))),
776            semantic_global,
777            semantic_local: Arc::new(Semaphore::new(clamp_permits(semantic_per_slice))),
778        }
779    }
780
781    async fn acquire(&self, kind: PredicateKind) -> LaneTickets {
782        let (global, local) = match kind {
783            PredicateKind::Deterministic => (&self.deterministic_global, &self.deterministic_local),
784            PredicateKind::Semantic => (&self.semantic_global, &self.semantic_local),
785        };
786        // Take per-slice first to preserve global FIFO admission. Acquiring
787        // the global permit first would let one slice park its waiters at the
788        // head of the global queue and starve other slices' lanes.
789        let local_ticket = local
790            .clone()
791            .acquire_owned()
792            .await
793            .expect("predicate lane semaphore closed");
794        let global_ticket = global
795            .clone()
796            .acquire_owned()
797            .await
798            .expect("predicate lane semaphore closed");
799        LaneTickets {
800            _local: local_ticket,
801            _global: global_ticket,
802        }
803    }
804}
805
806/// RAII permit holder. Both permits release when the value drops.
807struct LaneTickets {
808    _local: tokio::sync::OwnedSemaphorePermit,
809    _global: tokio::sync::OwnedSemaphorePermit,
810}
811
812/// Aggregate per-kind wall-clock counters for one slice's predicate envelope.
813#[derive(Clone)]
814struct SliceEnvelope {
815    deterministic_used: Rc<Cell<Duration>>,
816    semantic_used: Rc<Cell<Duration>>,
817    deterministic_budget: Duration,
818    semantic_budget: Duration,
819}
820
821impl SliceEnvelope {
822    fn new(deterministic_budget: Duration, semantic_budget: Duration) -> Self {
823        Self {
824            deterministic_used: Rc::new(Cell::new(Duration::ZERO)),
825            semantic_used: Rc::new(Cell::new(Duration::ZERO)),
826            deterministic_budget,
827            semantic_budget,
828        }
829    }
830
831    fn cell(&self, kind: PredicateKind) -> &Cell<Duration> {
832        match kind {
833            PredicateKind::Deterministic => &self.deterministic_used,
834            PredicateKind::Semantic => &self.semantic_used,
835        }
836    }
837
838    fn budget(&self, kind: PredicateKind) -> Duration {
839        match kind {
840            PredicateKind::Deterministic => self.deterministic_budget,
841            PredicateKind::Semantic => self.semantic_budget,
842        }
843    }
844
845    fn remaining(&self, kind: PredicateKind) -> Option<Duration> {
846        let budget = self.budget(kind);
847        if budget.is_zero() {
848            return None;
849        }
850        let used = self.cell(kind).get();
851        Some(budget.saturating_sub(used))
852    }
853
854    fn charge(&self, kind: PredicateKind, elapsed: Duration) {
855        let cell = self.cell(kind);
856        cell.set(cell.get().saturating_add(elapsed));
857    }
858}
859
860#[cfg(test)]
861mod tests {
862    use super::*;
863    use crate::flow::{Approval, AtomId, PredicateHash, Slice, SliceId, SliceStatus, TestId};
864    use std::cell::Cell;
865
866    fn slice() -> Slice {
867        Slice {
868            id: SliceId([9; 32]),
869            atoms: vec![AtomId([1; 32])],
870            intents: Vec::new(),
871            invariants_applied: Vec::new(),
872            required_tests: vec![TestId::new("test:unit")],
873            approval_chain: Vec::<Approval>::new(),
874            base_ref: AtomId([0; 32]),
875            status: SliceStatus::Ready,
876        }
877    }
878
879    struct StaticPredicate {
880        hash: &'static str,
881        kind: PredicateKind,
882        fallback_hash: Option<&'static str>,
883        result: InvariantResult,
884        delay: Duration,
885    }
886
887    #[async_trait(?Send)]
888    impl PredicateRunner for StaticPredicate {
889        fn hash(&self) -> PredicateHash {
890            PredicateHash::new(self.hash)
891        }
892
893        fn kind(&self) -> PredicateKind {
894            self.kind
895        }
896
897        fn fallback_hash(&self) -> Option<PredicateHash> {
898            self.fallback_hash.map(PredicateHash::new)
899        }
900
901        async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
902            if !self.delay.is_zero() {
903                tokio::time::sleep(self.delay).await;
904            }
905            self.result.clone()
906        }
907    }
908
909    struct DriftingPredicate {
910        calls: Cell<u64>,
911    }
912
913    #[async_trait(?Send)]
914    impl PredicateRunner for DriftingPredicate {
915        fn hash(&self) -> PredicateHash {
916            PredicateHash::new("drift")
917        }
918
919        fn kind(&self) -> PredicateKind {
920            PredicateKind::Deterministic
921        }
922
923        async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
924            let calls = self.calls.get();
925            self.calls.set(calls + 1);
926            if calls == 0 {
927                InvariantResult::allow()
928            } else {
929                InvariantResult::warn("changed")
930            }
931        }
932    }
933
934    struct SemanticPredicate {
935        calls: u8,
936        fallback_hash: Option<&'static str>,
937    }
938
939    #[async_trait(?Send)]
940    impl PredicateRunner for SemanticPredicate {
941        fn hash(&self) -> PredicateHash {
942            PredicateHash::new(format!("semantic-{}", self.calls))
943        }
944
945        fn kind(&self) -> PredicateKind {
946            PredicateKind::Semantic
947        }
948
949        fn fallback_hash(&self) -> Option<PredicateHash> {
950            self.fallback_hash.map(PredicateHash::new)
951        }
952
953        fn evidence(&self) -> BTreeMap<String, String> {
954            BTreeMap::from([("case".to_string(), "pre-baked evidence".to_string())])
955        }
956
957        async fn evaluate(&self, context: PredicateContext) -> InvariantResult {
958            for _ in 0..self.calls {
959                let Err(error) = context.cheap_judge("judge the case", "case").await else {
960                    continue;
961                };
962                return InvariantResult::block(error);
963            }
964            InvariantResult::allow()
965        }
966    }
967
968    struct PassingJudge;
969
970    #[async_trait(?Send)]
971    impl CheapJudge for PassingJudge {
972        async fn cheap_judge(
973            &self,
974            _request: CheapJudgeRequest,
975        ) -> Result<CheapJudgeResponse, InvariantBlockError> {
976            Ok(CheapJudgeResponse {
977                passes: true,
978                reason: None,
979                input_tokens: 2,
980                output_tokens: 1,
981                provider_id: Some("mock-provider".to_string()),
982                model_id: Some("mock-model-1".to_string()),
983                cheap_judge_version: Some("cheap-judge-v1".to_string()),
984            })
985        }
986    }
987
988    struct ParallelProbe {
989        hash: &'static str,
990        kind: PredicateKind,
991        active: Rc<Cell<usize>>,
992        max_active: Rc<Cell<usize>>,
993    }
994
995    #[async_trait(?Send)]
996    impl PredicateRunner for ParallelProbe {
997        fn hash(&self) -> PredicateHash {
998            PredicateHash::new(self.hash)
999        }
1000
1001        fn kind(&self) -> PredicateKind {
1002            self.kind
1003        }
1004
1005        async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
1006            let active = self.active.get() + 1;
1007            self.active.set(active);
1008            self.max_active.set(self.max_active.get().max(active));
1009            tokio::time::sleep(Duration::from_millis(10)).await;
1010            self.active.set(self.active.get() - 1);
1011            InvariantResult::allow()
1012        }
1013    }
1014
1015    #[tokio::test]
1016    async fn deterministic_predicate_replays_bit_identically() {
1017        let executor = PredicateExecutor::default();
1018        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
1019            hash: "stable",
1020            kind: PredicateKind::Deterministic,
1021            fallback_hash: None,
1022            result: InvariantResult::allow(),
1023            delay: Duration::ZERO,
1024        })];
1025
1026        let report = executor.execute_slice(&slice(), &predicates).await;
1027
1028        assert_eq!(report.records.len(), 1);
1029        let record = &report.records[0];
1030        assert_eq!(record.result, InvariantResult::allow());
1031        assert_eq!(record.attempts, 2);
1032        assert_eq!(record.first_result_hash, record.second_result_hash);
1033    }
1034
1035    #[tokio::test]
1036    async fn deterministic_drift_blocks_the_predicate() {
1037        let executor = PredicateExecutor::default();
1038        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(DriftingPredicate {
1039            calls: Cell::new(0),
1040        })];
1041
1042        let report = executor.execute_slice(&slice(), &predicates).await;
1043
1044        let block = report.records[0].result.block_error().expect("blocked");
1045        assert_eq!(block.code, "nondeterministic_drift");
1046    }
1047
1048    #[tokio::test]
1049    async fn deterministic_budget_overrun_blocks_instead_of_panicking() {
1050        let executor = PredicateExecutor::new(PredicateExecutorConfig {
1051            deterministic_budget: Duration::from_millis(1),
1052            ..PredicateExecutorConfig::default()
1053        });
1054        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
1055            hash: "slow",
1056            kind: PredicateKind::Deterministic,
1057            fallback_hash: None,
1058            result: InvariantResult::allow(),
1059            delay: Duration::from_millis(20),
1060        })];
1061
1062        let report = executor.execute_slice(&slice(), &predicates).await;
1063
1064        let block = report.records[0].result.block_error().expect("blocked");
1065        assert_eq!(block.code, "budget_exceeded");
1066    }
1067
1068    #[tokio::test]
1069    async fn predicates_are_polled_concurrently_for_a_slice() {
1070        let active = Rc::new(Cell::new(0));
1071        let max_active = Rc::new(Cell::new(0));
1072        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1073            Rc::new(ParallelProbe {
1074                hash: "parallel-a",
1075                kind: PredicateKind::Deterministic,
1076                active: active.clone(),
1077                max_active: max_active.clone(),
1078            }),
1079            Rc::new(ParallelProbe {
1080                hash: "parallel-b",
1081                kind: PredicateKind::Deterministic,
1082                active,
1083                max_active: max_active.clone(),
1084            }),
1085        ];
1086
1087        let report = PredicateExecutor::default()
1088            .execute_slice(&slice(), &predicates)
1089            .await;
1090
1091        assert_eq!(report.records.len(), 2);
1092        assert_eq!(max_active.get(), 2);
1093    }
1094
1095    #[tokio::test]
1096    async fn semantic_predicate_gets_one_cheap_judge_call() {
1097        let executor = PredicateExecutor::with_cheap_judge(
1098            PredicateExecutorConfig::default(),
1099            Rc::new(PassingJudge),
1100        );
1101        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1102            Rc::new(SemanticPredicate {
1103                calls: 2,
1104                fallback_hash: Some("fallback"),
1105            }),
1106            Rc::new(StaticPredicate {
1107                hash: "fallback",
1108                kind: PredicateKind::Deterministic,
1109                fallback_hash: None,
1110                result: InvariantResult::allow(),
1111                delay: Duration::ZERO,
1112            }),
1113        ];
1114
1115        let report = executor.execute_slice(&slice(), &predicates).await;
1116
1117        let semantic = report
1118            .records
1119            .iter()
1120            .find(|record| record.kind == PredicateKind::Semantic)
1121            .unwrap();
1122        let block = semantic.result.block_error().expect("blocked");
1123        assert_eq!(block.code, "budget_exceeded");
1124    }
1125
1126    #[tokio::test]
1127    async fn semantic_and_fallback_agree_records_both_results() {
1128        let executor = PredicateExecutor::default();
1129        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1130            Rc::new(StaticPredicate {
1131                hash: "semantic",
1132                kind: PredicateKind::Semantic,
1133                fallback_hash: Some("fallback"),
1134                result: InvariantResult::warn("semantic concern"),
1135                delay: Duration::ZERO,
1136            }),
1137            Rc::new(StaticPredicate {
1138                hash: "fallback",
1139                kind: PredicateKind::Deterministic,
1140                fallback_hash: None,
1141                result: InvariantResult::warn("fallback concern"),
1142                delay: Duration::ZERO,
1143            }),
1144        ];
1145
1146        let report = executor.execute_slice(&slice(), &predicates).await;
1147
1148        assert_eq!(report.records.len(), 2);
1149        assert_eq!(report.invariants_applied().len(), 2);
1150        let semantic = report
1151            .records
1152            .iter()
1153            .find(|record| record.predicate_hash == PredicateHash::new("semantic"))
1154            .unwrap();
1155        assert_eq!(semantic.fallback_hash, Some(PredicateHash::new("fallback")));
1156        assert!(matches!(
1157            semantic.result.verdict,
1158            crate::flow::Verdict::Warn { .. }
1159        ));
1160    }
1161
1162    #[tokio::test]
1163    async fn semantic_fallback_disagreement_selects_stricter_verdict() {
1164        let executor = PredicateExecutor::default();
1165        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1166            Rc::new(StaticPredicate {
1167                hash: "semantic",
1168                kind: PredicateKind::Semantic,
1169                fallback_hash: Some("fallback"),
1170                result: InvariantResult::allow(),
1171                delay: Duration::ZERO,
1172            }),
1173            Rc::new(StaticPredicate {
1174                hash: "fallback",
1175                kind: PredicateKind::Deterministic,
1176                fallback_hash: None,
1177                result: InvariantResult::block(InvariantBlockError::new(
1178                    "fallback_policy",
1179                    "fallback blocked",
1180                )),
1181                delay: Duration::ZERO,
1182            }),
1183        ];
1184
1185        let report = executor.execute_slice(&slice(), &predicates).await;
1186        let semantic = report
1187            .records
1188            .iter()
1189            .find(|record| record.predicate_hash == PredicateHash::new("semantic"))
1190            .unwrap();
1191
1192        let block = semantic
1193            .result
1194            .block_error()
1195            .expect("stricter fallback wins");
1196        assert_eq!(block.code, "fallback_policy");
1197    }
1198
1199    #[tokio::test]
1200    async fn semantic_missing_fallback_blocks() {
1201        let executor = PredicateExecutor::default();
1202        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
1203            hash: "semantic",
1204            kind: PredicateKind::Semantic,
1205            fallback_hash: None,
1206            result: InvariantResult::allow(),
1207            delay: Duration::ZERO,
1208        })];
1209
1210        let report = executor.execute_slice(&slice(), &predicates).await;
1211
1212        let block = report.records[0].result.block_error().expect("blocked");
1213        assert_eq!(block.code, "fallback_missing");
1214    }
1215
1216    #[tokio::test]
1217    async fn semantic_predicate_requires_prebaked_evidence() {
1218        struct MissingEvidence;
1219
1220        #[async_trait(?Send)]
1221        impl PredicateRunner for MissingEvidence {
1222            fn hash(&self) -> PredicateHash {
1223                PredicateHash::new("missing-evidence")
1224            }
1225
1226            fn kind(&self) -> PredicateKind {
1227                PredicateKind::Semantic
1228            }
1229
1230            fn fallback_hash(&self) -> Option<PredicateHash> {
1231                Some(PredicateHash::new("fallback"))
1232            }
1233
1234            async fn evaluate(&self, context: PredicateContext) -> InvariantResult {
1235                match context.cheap_judge("judge", "missing").await {
1236                    Ok(_) => InvariantResult::allow(),
1237                    Err(error) => InvariantResult::block(error),
1238                }
1239            }
1240        }
1241
1242        let executor = PredicateExecutor::with_cheap_judge(
1243            PredicateExecutorConfig::default(),
1244            Rc::new(PassingJudge),
1245        );
1246        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1247            Rc::new(MissingEvidence),
1248            Rc::new(StaticPredicate {
1249                hash: "fallback",
1250                kind: PredicateKind::Deterministic,
1251                fallback_hash: None,
1252                result: InvariantResult::allow(),
1253                delay: Duration::ZERO,
1254            }),
1255        ];
1256
1257        let report = executor.execute_slice(&slice(), &predicates).await;
1258
1259        let semantic = report
1260            .records
1261            .iter()
1262            .find(|record| record.kind == PredicateKind::Semantic)
1263            .unwrap();
1264        let block = semantic.result.block_error().expect("blocked");
1265        assert_eq!(block.code, "evidence_missing");
1266    }
1267
1268    #[tokio::test]
1269    async fn semantic_replay_audit_records_judge_hashes() {
1270        let executor = PredicateExecutor::with_cheap_judge(
1271            PredicateExecutorConfig::default(),
1272            Rc::new(PassingJudge),
1273        );
1274        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1275            Rc::new(SemanticPredicate {
1276                calls: 1,
1277                fallback_hash: Some("fallback"),
1278            }),
1279            Rc::new(StaticPredicate {
1280                hash: "fallback",
1281                kind: PredicateKind::Deterministic,
1282                fallback_hash: None,
1283                result: InvariantResult::allow(),
1284                delay: Duration::ZERO,
1285            }),
1286        ];
1287
1288        let report = executor.execute_slice(&slice(), &predicates).await;
1289        let semantic = report
1290            .records
1291            .iter()
1292            .find(|record| record.kind == PredicateKind::Semantic)
1293            .unwrap();
1294        let audit = semantic
1295            .semantic_replay_audit
1296            .as_ref()
1297            .expect("semantic audit metadata");
1298
1299        assert_eq!(audit.provider_id.as_deref(), Some("mock-provider"));
1300        assert_eq!(audit.model_id.as_deref(), Some("mock-model-1"));
1301        assert_eq!(audit.prompt_hash, stable_hash("judge the case".as_bytes()));
1302        let expected_evidence_hash = stable_hash("pre-baked evidence".as_bytes());
1303        assert_eq!(
1304            audit.evidence_hashes.get("case").map(String::as_str),
1305            Some(expected_evidence_hash.as_str())
1306        );
1307        assert_eq!(audit.token_cap, DEFAULT_SEMANTIC_TOKEN_CAP);
1308    }
1309
1310    fn slice_with_id(id: u8) -> Slice {
1311        let mut value = slice();
1312        value.id = SliceId([id; 32]);
1313        value
1314    }
1315
1316    /// Probe that records the wall-clock instant it became active, so a test
1317    /// can compare which slice actually got a lane first.
1318    struct FinishingProbe {
1319        hash: &'static str,
1320        kind: PredicateKind,
1321        delay: Duration,
1322        // Per-slice list of (predicate_hash, finish_micros_since_start).
1323        finish_log: Rc<RefCell<Vec<(String, u128)>>>,
1324        epoch: Instant,
1325    }
1326
1327    #[async_trait(?Send)]
1328    impl PredicateRunner for FinishingProbe {
1329        fn hash(&self) -> PredicateHash {
1330            PredicateHash::new(self.hash)
1331        }
1332
1333        fn kind(&self) -> PredicateKind {
1334            self.kind
1335        }
1336
1337        async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
1338            tokio::time::sleep(self.delay).await;
1339            self.finish_log
1340                .borrow_mut()
1341                .push((self.hash.to_string(), self.epoch.elapsed().as_micros()));
1342            InvariantResult::allow()
1343        }
1344    }
1345
1346    #[tokio::test]
1347    async fn execute_slices_returns_one_report_per_slice_in_input_order() {
1348        let executor = PredicateExecutor::default();
1349        let slice_a = slice_with_id(1);
1350        let slice_b = slice_with_id(2);
1351
1352        let predicates_a: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
1353            hash: "alpha",
1354            kind: PredicateKind::Deterministic,
1355            fallback_hash: None,
1356            result: InvariantResult::allow(),
1357            delay: Duration::ZERO,
1358        })];
1359        let predicates_b: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
1360            hash: "beta",
1361            kind: PredicateKind::Deterministic,
1362            fallback_hash: None,
1363            result: InvariantResult::allow(),
1364            delay: Duration::ZERO,
1365        })];
1366
1367        let reports = executor
1368            .execute_slices(vec![(slice_a, predicates_a), (slice_b, predicates_b)])
1369            .await;
1370
1371        assert_eq!(reports.len(), 2);
1372        assert_eq!(
1373            reports[0].records[0].predicate_hash,
1374            PredicateHash::new("alpha")
1375        );
1376        assert_eq!(
1377            reports[1].records[0].predicate_hash,
1378            PredicateHash::new("beta")
1379        );
1380    }
1381
1382    #[tokio::test]
1383    async fn semantic_lane_fairness_prevents_monopolization() {
1384        // Two slices each enqueue four semantic predicates. With one global
1385        // semantic lane plus a per-slice cap of one, slice B's first
1386        // predicate must execute before slice A's last predicate, even
1387        // though slice A submitted its predicates first.
1388        let config = PredicateExecutorConfig {
1389            scheduler: PredicateSchedulerConfig {
1390                max_semantic_lanes: 1,
1391                max_semantic_lanes_per_slice: 1,
1392                slice_semantic_envelope: Duration::from_secs(60),
1393                ..PredicateSchedulerConfig::default()
1394            },
1395            ..PredicateExecutorConfig::default()
1396        };
1397        let executor = PredicateExecutor::with_cheap_judge(config, Rc::new(PassingJudge));
1398
1399        let log_a = Rc::new(RefCell::new(Vec::new()));
1400        let log_b = Rc::new(RefCell::new(Vec::new()));
1401        let epoch = Instant::now();
1402
1403        let predicates_a: Vec<Rc<dyn PredicateRunner>> = (0..4)
1404            .map(|i| -> Rc<dyn PredicateRunner> {
1405                Rc::new(FinishingProbe {
1406                    hash: ["a0", "a1", "a2", "a3"][i],
1407                    kind: PredicateKind::Semantic,
1408                    delay: Duration::from_millis(20),
1409                    finish_log: log_a.clone(),
1410                    epoch,
1411                })
1412            })
1413            .collect();
1414        let predicates_b: Vec<Rc<dyn PredicateRunner>> = (0..4)
1415            .map(|i| -> Rc<dyn PredicateRunner> {
1416                Rc::new(FinishingProbe {
1417                    hash: ["b0", "b1", "b2", "b3"][i],
1418                    kind: PredicateKind::Semantic,
1419                    delay: Duration::from_millis(20),
1420                    finish_log: log_b.clone(),
1421                    epoch,
1422                })
1423            })
1424            .collect();
1425
1426        let _ = executor
1427            .execute_slices(vec![
1428                (slice_with_id(1), predicates_a),
1429                (slice_with_id(2), predicates_b),
1430            ])
1431            .await;
1432
1433        let log_a_snapshot = log_a.borrow().clone();
1434        let log_b_snapshot = log_b.borrow().clone();
1435        assert_eq!(log_a_snapshot.len(), 4);
1436        assert_eq!(log_b_snapshot.len(), 4);
1437        let earliest_b = log_b_snapshot
1438            .iter()
1439            .map(|(_, micros)| *micros)
1440            .min()
1441            .unwrap();
1442        let latest_a = log_a_snapshot
1443            .iter()
1444            .map(|(_, micros)| *micros)
1445            .max()
1446            .unwrap();
1447        assert!(
1448            earliest_b < latest_a,
1449            "fair scheduler should interleave slices: B's first finished at {earliest_b}us, A's last at {latest_a}us",
1450        );
1451    }
1452
1453    #[tokio::test]
1454    async fn deterministic_progress_continues_while_semantic_work_waits() {
1455        // Single global semantic lane with a slow semantic predicate that
1456        // dominates the scheduler. Deterministic predicates must complete
1457        // before the semantic predicate because deterministic and semantic
1458        // lanes are independent.
1459        let config = PredicateExecutorConfig {
1460            semantic_budget: Duration::from_secs(10),
1461            scheduler: PredicateSchedulerConfig {
1462                max_semantic_lanes: 1,
1463                max_semantic_lanes_per_slice: 1,
1464                slice_semantic_envelope: Duration::from_secs(60),
1465                ..PredicateSchedulerConfig::default()
1466            },
1467            ..PredicateExecutorConfig::default()
1468        };
1469        let executor = PredicateExecutor::with_cheap_judge(config, Rc::new(PassingJudge));
1470
1471        let log = Rc::new(RefCell::new(Vec::new()));
1472        let epoch = Instant::now();
1473
1474        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1475            Rc::new(FinishingProbe {
1476                hash: "slow-semantic",
1477                kind: PredicateKind::Semantic,
1478                delay: Duration::from_millis(80),
1479                finish_log: log.clone(),
1480                epoch,
1481            }),
1482            Rc::new(FinishingProbe {
1483                hash: "det-1",
1484                kind: PredicateKind::Deterministic,
1485                delay: Duration::from_millis(5),
1486                finish_log: log.clone(),
1487                epoch,
1488            }),
1489            Rc::new(FinishingProbe {
1490                hash: "det-2",
1491                kind: PredicateKind::Deterministic,
1492                delay: Duration::from_millis(5),
1493                finish_log: log.clone(),
1494                epoch,
1495            }),
1496        ];
1497
1498        let _ = executor.execute_slice(&slice(), &predicates).await;
1499        let snapshot = log.borrow().clone();
1500        let det_finish = snapshot
1501            .iter()
1502            .filter_map(|(name, micros)| name.starts_with("det-").then_some(*micros))
1503            .max()
1504            .expect("deterministic finished");
1505        let semantic_finish = snapshot
1506            .iter()
1507            .find(|(name, _)| name == "slow-semantic")
1508            .map(|(_, micros)| *micros)
1509            .expect("semantic finished");
1510
1511        assert!(
1512            det_finish < semantic_finish,
1513            "deterministic ({det_finish}us) should finish before slow semantic ({semantic_finish}us)",
1514        );
1515    }
1516
1517    #[tokio::test]
1518    async fn slice_deterministic_envelope_blocks_remaining_predicates() {
1519        // Tight aggregate envelope with predicates that each consume
1520        // measurable wall-clock. After the first runs, the envelope is
1521        // exhausted and the rest must short-circuit to budget_exceeded.
1522        let config = PredicateExecutorConfig {
1523            scheduler: PredicateSchedulerConfig {
1524                max_deterministic_lanes_per_slice: 1,
1525                slice_deterministic_envelope: Duration::from_millis(15),
1526                ..PredicateSchedulerConfig::default()
1527            },
1528            ..PredicateExecutorConfig::default()
1529        };
1530        let executor = PredicateExecutor::new(config);
1531        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1532            Rc::new(StaticPredicate {
1533                hash: "first",
1534                kind: PredicateKind::Deterministic,
1535                fallback_hash: None,
1536                result: InvariantResult::allow(),
1537                delay: Duration::from_millis(20),
1538            }),
1539            Rc::new(StaticPredicate {
1540                hash: "second",
1541                kind: PredicateKind::Deterministic,
1542                fallback_hash: None,
1543                result: InvariantResult::allow(),
1544                delay: Duration::ZERO,
1545            }),
1546            Rc::new(StaticPredicate {
1547                hash: "third",
1548                kind: PredicateKind::Deterministic,
1549                fallback_hash: None,
1550                result: InvariantResult::allow(),
1551                delay: Duration::ZERO,
1552            }),
1553        ];
1554
1555        let report = executor.execute_slice(&slice(), &predicates).await;
1556
1557        let by_hash: BTreeMap<_, _> = report
1558            .records
1559            .iter()
1560            .map(|record| (record.predicate_hash.as_str().to_string(), record))
1561            .collect();
1562        // The first predicate runs (and may itself block on the per-predicate
1563        // 50ms timeout — irrelevant for envelope semantics). The second and
1564        // third must be denied admission with structured budget_exceeded
1565        // blocks rather than silently allowing them through.
1566        let second = by_hash.get("second").expect("second present");
1567        let third = by_hash.get("third").expect("third present");
1568        let second_block = second.result.block_error().expect("second blocked");
1569        assert_eq!(second_block.code, "budget_exceeded");
1570        let third_block = third.result.block_error().expect("third blocked");
1571        assert_eq!(third_block.code, "budget_exceeded");
1572    }
1573
1574    #[tokio::test]
1575    async fn slice_semantic_envelope_blocks_remaining_predicates() {
1576        let config = PredicateExecutorConfig {
1577            scheduler: PredicateSchedulerConfig {
1578                max_semantic_lanes: 1,
1579                max_semantic_lanes_per_slice: 1,
1580                slice_semantic_envelope: Duration::from_millis(15),
1581                ..PredicateSchedulerConfig::default()
1582            },
1583            ..PredicateExecutorConfig::default()
1584        };
1585        let executor = PredicateExecutor::with_cheap_judge(config, Rc::new(PassingJudge));
1586
1587        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1588            Rc::new(StaticPredicate {
1589                hash: "sem-first",
1590                kind: PredicateKind::Semantic,
1591                fallback_hash: Some("sem-fallback"),
1592                result: InvariantResult::allow(),
1593                delay: Duration::from_millis(30),
1594            }),
1595            Rc::new(StaticPredicate {
1596                hash: "sem-second",
1597                kind: PredicateKind::Semantic,
1598                fallback_hash: Some("sem-fallback"),
1599                result: InvariantResult::allow(),
1600                delay: Duration::ZERO,
1601            }),
1602            Rc::new(StaticPredicate {
1603                hash: "sem-fallback",
1604                kind: PredicateKind::Deterministic,
1605                fallback_hash: None,
1606                result: InvariantResult::allow(),
1607                delay: Duration::ZERO,
1608            }),
1609        ];
1610
1611        let report = executor.execute_slice(&slice(), &predicates).await;
1612        let second = report
1613            .records
1614            .iter()
1615            .find(|record| record.predicate_hash == PredicateHash::new("sem-second"))
1616            .expect("sem-second present");
1617        let block = second.result.block_error().expect("blocked");
1618        assert_eq!(block.code, "budget_exceeded");
1619    }
1620
1621    #[tokio::test]
1622    async fn slice_envelopes_are_independent_across_slices() {
1623        // Slice A blows its semantic envelope. Slice B's semantic predicate
1624        // must still run normally; envelopes are per-slice, not global.
1625        let config = PredicateExecutorConfig {
1626            scheduler: PredicateSchedulerConfig {
1627                max_semantic_lanes: 2,
1628                max_semantic_lanes_per_slice: 1,
1629                slice_semantic_envelope: Duration::from_millis(15),
1630                ..PredicateSchedulerConfig::default()
1631            },
1632            ..PredicateExecutorConfig::default()
1633        };
1634        let executor = PredicateExecutor::with_cheap_judge(config, Rc::new(PassingJudge));
1635
1636        let slice_a_preds: Vec<Rc<dyn PredicateRunner>> = vec![
1637            Rc::new(StaticPredicate {
1638                hash: "a-slow",
1639                kind: PredicateKind::Semantic,
1640                fallback_hash: Some("a-fallback"),
1641                result: InvariantResult::allow(),
1642                delay: Duration::from_millis(30),
1643            }),
1644            Rc::new(StaticPredicate {
1645                hash: "a-second",
1646                kind: PredicateKind::Semantic,
1647                fallback_hash: Some("a-fallback"),
1648                result: InvariantResult::allow(),
1649                delay: Duration::ZERO,
1650            }),
1651            Rc::new(StaticPredicate {
1652                hash: "a-fallback",
1653                kind: PredicateKind::Deterministic,
1654                fallback_hash: None,
1655                result: InvariantResult::allow(),
1656                delay: Duration::ZERO,
1657            }),
1658        ];
1659        let slice_b_preds: Vec<Rc<dyn PredicateRunner>> = vec![
1660            Rc::new(StaticPredicate {
1661                hash: "b-fast",
1662                kind: PredicateKind::Semantic,
1663                fallback_hash: Some("b-fallback"),
1664                result: InvariantResult::allow(),
1665                delay: Duration::from_millis(2),
1666            }),
1667            Rc::new(StaticPredicate {
1668                hash: "b-fallback",
1669                kind: PredicateKind::Deterministic,
1670                fallback_hash: None,
1671                result: InvariantResult::allow(),
1672                delay: Duration::ZERO,
1673            }),
1674        ];
1675
1676        let reports = executor
1677            .execute_slices(vec![
1678                (slice_with_id(1), slice_a_preds),
1679                (slice_with_id(2), slice_b_preds),
1680            ])
1681            .await;
1682
1683        let slice_b = &reports[1];
1684        let b_fast = slice_b
1685            .records
1686            .iter()
1687            .find(|record| record.predicate_hash == PredicateHash::new("b-fast"))
1688            .unwrap();
1689        // b-fast must not be blocked by budget_exceeded — slice A's envelope
1690        // exhaustion must not cross the slice boundary.
1691        assert!(b_fast.result.block_error().is_none());
1692    }
1693
1694    #[tokio::test]
1695    async fn output_ordering_is_deterministic_across_random_finish_order() {
1696        // Predicates with varying delays finish in non-deterministic order.
1697        // The report must still sort by predicate hash so two replays of the
1698        // same scheduler produce bit-identical record orderings.
1699        let make_predicates = || -> Vec<Rc<dyn PredicateRunner>> {
1700            vec![
1701                Rc::new(StaticPredicate {
1702                    hash: "z-last",
1703                    kind: PredicateKind::Deterministic,
1704                    fallback_hash: None,
1705                    result: InvariantResult::allow(),
1706                    delay: Duration::from_millis(15),
1707                }),
1708                Rc::new(StaticPredicate {
1709                    hash: "a-first",
1710                    kind: PredicateKind::Deterministic,
1711                    fallback_hash: None,
1712                    result: InvariantResult::allow(),
1713                    delay: Duration::ZERO,
1714                }),
1715                Rc::new(StaticPredicate {
1716                    hash: "m-mid",
1717                    kind: PredicateKind::Deterministic,
1718                    fallback_hash: None,
1719                    result: InvariantResult::allow(),
1720                    delay: Duration::from_millis(7),
1721                }),
1722            ]
1723        };
1724
1725        let executor = PredicateExecutor::default();
1726        let report_one = executor.execute_slice(&slice(), &make_predicates()).await;
1727        let report_two = executor.execute_slice(&slice(), &make_predicates()).await;
1728        let order_one: Vec<_> = report_one
1729            .records
1730            .iter()
1731            .map(|record| record.predicate_hash.as_str().to_string())
1732            .collect();
1733        let order_two: Vec<_> = report_two
1734            .records
1735            .iter()
1736            .map(|record| record.predicate_hash.as_str().to_string())
1737            .collect();
1738
1739        assert_eq!(order_one, vec!["a-first", "m-mid", "z-last"]);
1740        assert_eq!(order_one, order_two);
1741    }
1742}