Skip to main content

harn_vm/flow/predicates/
executor.rs

1//! Budgeted executor for Flow invariant predicates.
2//!
3//! The executor is intentionally small: callers provide predicate runners, and
4//! this module owns scheduling, per-kind budgets, semantic cheap-judge limits,
5//! and deterministic replay drift detection.
6
7use std::cell::RefCell;
8use std::collections::BTreeMap;
9use std::rc::Rc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use async_trait::async_trait;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use sha2::{Digest, Sha256};
18
19use super::compose::verdict_strictness;
20use crate::flow::{InvariantBlockError, InvariantResult, PredicateHash, Slice};
21
22const DEFAULT_DETERMINISTIC_BUDGET: Duration = Duration::from_millis(50);
23const DEFAULT_SEMANTIC_BUDGET: Duration = Duration::from_secs(2);
24const DEFAULT_SEMANTIC_TOKEN_CAP: u64 = 1024;
25
26/// Predicate execution mode declared by predicate author annotations.
27#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(rename_all = "snake_case")]
29pub enum PredicateKind {
30    /// Pure Harn predicate. No shell, network, LLM, or host side effects.
31    Deterministic,
32    /// Semantic predicate. May make one `cheap_judge` call over pre-baked
33    /// evidence within the semantic wall-clock and token budgets.
34    Semantic,
35}
36
37/// Request passed to the cheap semantic judge.
38#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
39pub struct CheapJudgeRequest {
40    pub prompt: String,
41    pub evidence_key: String,
42    pub evidence: String,
43}
44
45/// Response returned by the cheap semantic judge.
46#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
47pub struct CheapJudgeResponse {
48    pub passes: bool,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub reason: Option<String>,
51    #[serde(default)]
52    pub input_tokens: u64,
53    #[serde(default)]
54    pub output_tokens: u64,
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub provider_id: Option<String>,
57    #[serde(default, skip_serializing_if = "Option::is_none")]
58    pub model_id: Option<String>,
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub cheap_judge_version: Option<String>,
61}
62
63/// Replay-audit metadata for a semantic predicate's judge call.
64#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
65pub struct SemanticReplayAuditMetadata {
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub provider_id: Option<String>,
68    #[serde(default, skip_serializing_if = "Option::is_none")]
69    pub model_id: Option<String>,
70    pub prompt_hash: String,
71    pub evidence_hashes: BTreeMap<String, String>,
72    pub token_cap: u64,
73    #[serde(default, skip_serializing_if = "Option::is_none")]
74    pub cheap_judge_version: Option<String>,
75}
76
77/// Host-provided adapter for semantic predicate judging.
78#[async_trait(?Send)]
79pub trait CheapJudge {
80    async fn cheap_judge(
81        &self,
82        request: CheapJudgeRequest,
83    ) -> Result<CheapJudgeResponse, InvariantBlockError>;
84}
85
86/// Predicate runner supplied by a collector or Harn adapter.
87#[async_trait(?Send)]
88pub trait PredicateRunner {
89    fn hash(&self) -> PredicateHash;
90    fn kind(&self) -> PredicateKind;
91    fn fallback_hash(&self) -> Option<PredicateHash> {
92        None
93    }
94
95    /// Static evidence captured when the predicate was authored. Semantic
96    /// predicates may only judge over this map; the executor never fetches
97    /// evidence during evaluation.
98    fn evidence(&self) -> BTreeMap<String, String> {
99        BTreeMap::new()
100    }
101
102    async fn evaluate(&self, context: PredicateContext) -> InvariantResult;
103}
104
105/// Runtime context made available to a predicate invocation.
106#[derive(Clone)]
107pub struct PredicateContext {
108    inner: Rc<PredicateContextInner>,
109}
110
111struct PredicateContextInner {
112    slice: Rc<Slice>,
113    kind: PredicateKind,
114    evidence: BTreeMap<String, String>,
115    cheap_judge: Option<Rc<dyn CheapJudge>>,
116    semantic_token_cap: u64,
117    cancel_token: Arc<AtomicBool>,
118    judge_state: RefCell<JudgeBudgetState>,
119}
120
121#[derive(Default)]
122struct JudgeBudgetState {
123    calls: u64,
124    tokens: u64,
125    block_error: Option<InvariantBlockError>,
126    semantic_audit: Option<SemanticReplayAuditMetadata>,
127}
128
129impl PredicateContext {
130    fn new(
131        slice: Rc<Slice>,
132        kind: PredicateKind,
133        evidence: BTreeMap<String, String>,
134        cheap_judge: Option<Rc<dyn CheapJudge>>,
135        semantic_token_cap: u64,
136        cancel_token: Arc<AtomicBool>,
137    ) -> Self {
138        Self {
139            inner: Rc::new(PredicateContextInner {
140                slice,
141                kind,
142                evidence,
143                cheap_judge,
144                semantic_token_cap,
145                cancel_token,
146                judge_state: RefCell::new(JudgeBudgetState::default()),
147            }),
148        }
149    }
150
151    pub fn slice(&self) -> &Slice {
152        &self.inner.slice
153    }
154
155    pub fn kind(&self) -> PredicateKind {
156        self.inner.kind
157    }
158
159    pub fn evidence(&self, key: &str) -> Option<&str> {
160        self.inner.evidence.get(key).map(String::as_str)
161    }
162
163    pub fn is_cancelled(&self) -> bool {
164        self.inner.cancel_token.load(Ordering::SeqCst)
165    }
166
167    /// Invoke the semantic cheap judge with executor-enforced limits.
168    ///
169    /// Deterministic predicates cannot call this method. Semantic predicates
170    /// get one call per predicate evaluation attempt, and the request must
171    /// refer to evidence already present in the predicate's evidence map.
172    pub async fn cheap_judge(
173        &self,
174        prompt: impl Into<String>,
175        evidence_key: impl Into<String>,
176    ) -> Result<CheapJudgeResponse, InvariantBlockError> {
177        if self.inner.kind != PredicateKind::Semantic {
178            return Err(self.record_block(InvariantBlockError::new(
179                "side_effect_denied",
180                "deterministic predicates cannot invoke cheap_judge",
181            )));
182        }
183
184        let prompt = prompt.into();
185        let evidence_key = evidence_key.into();
186        let Some(evidence) = self.inner.evidence.get(&evidence_key).cloned() else {
187            return Err(self.record_block(InvariantBlockError::new(
188                "evidence_missing",
189                format!(
190                    "semantic predicate requested evidence key '{evidence_key}' that was not pre-baked"
191                ),
192            )));
193        };
194
195        let estimated_tokens = estimate_tokens(&prompt).saturating_add(estimate_tokens(&evidence));
196        {
197            let mut state = self.inner.judge_state.borrow_mut();
198            if state.calls >= 1 {
199                let error = InvariantBlockError::budget_exceeded(
200                    "semantic predicate exceeded one cheap_judge call",
201                );
202                state.block_error = Some(error.clone());
203                return Err(error);
204            }
205            if state.tokens.saturating_add(estimated_tokens) > self.inner.semantic_token_cap {
206                let error = InvariantBlockError::budget_exceeded(format!(
207                    "semantic predicate cheap_judge request exceeds token cap {}",
208                    self.inner.semantic_token_cap
209                ));
210                state.block_error = Some(error.clone());
211                return Err(error);
212            }
213            state.calls += 1;
214            state.tokens = state.tokens.saturating_add(estimated_tokens);
215        }
216
217        let Some(judge) = self.inner.cheap_judge.clone() else {
218            return Err(self.record_block(InvariantBlockError::new(
219                "llm_unavailable",
220                "semantic predicate cheap_judge was requested but no judge is installed",
221            )));
222        };
223
224        let response = match judge
225            .cheap_judge(CheapJudgeRequest {
226                prompt: prompt.clone(),
227                evidence_key: evidence_key.clone(),
228                evidence: evidence.clone(),
229            })
230            .await
231        {
232            Ok(response) => response,
233            Err(error) => return Err(self.record_block(error)),
234        };
235        {
236            let mut state = self.inner.judge_state.borrow_mut();
237            state.semantic_audit = Some(SemanticReplayAuditMetadata {
238                provider_id: response.provider_id.clone(),
239                model_id: response.model_id.clone(),
240                prompt_hash: stable_hash(prompt.as_bytes()),
241                evidence_hashes: self
242                    .inner
243                    .evidence
244                    .iter()
245                    .map(|(key, value)| (key.clone(), stable_hash(value.as_bytes())))
246                    .collect(),
247                token_cap: self.inner.semantic_token_cap,
248                cheap_judge_version: response.cheap_judge_version.clone(),
249            });
250        }
251        let response_tokens = response.input_tokens.saturating_add(response.output_tokens);
252        {
253            let mut state = self.inner.judge_state.borrow_mut();
254            state.tokens = state.tokens.saturating_add(response_tokens);
255            if state.tokens > self.inner.semantic_token_cap {
256                let error = InvariantBlockError::budget_exceeded(format!(
257                    "semantic predicate cheap_judge response exceeded token cap {}",
258                    self.inner.semantic_token_cap
259                ));
260                state.block_error = Some(error.clone());
261                return Err(error);
262            }
263        }
264        Ok(response)
265    }
266
267    fn cancel(&self) {
268        self.inner.cancel_token.store(true, Ordering::SeqCst);
269    }
270
271    fn block_error(&self) -> Option<InvariantBlockError> {
272        self.inner.judge_state.borrow().block_error.clone()
273    }
274
275    fn semantic_audit(&self) -> Option<SemanticReplayAuditMetadata> {
276        self.inner.judge_state.borrow().semantic_audit.clone()
277    }
278
279    fn record_block(&self, error: InvariantBlockError) -> InvariantBlockError {
280        self.inner.judge_state.borrow_mut().block_error = Some(error.clone());
281        error
282    }
283}
284
285/// Executor configuration.
286#[derive(Clone, Debug)]
287pub struct PredicateExecutorConfig {
288    pub deterministic_budget: Duration,
289    pub semantic_budget: Duration,
290    pub semantic_token_cap: u64,
291    /// Maximum number of predicates polled concurrently for one slice.
292    pub max_parallel_predicates: usize,
293}
294
295impl Default for PredicateExecutorConfig {
296    fn default() -> Self {
297        Self {
298            deterministic_budget: DEFAULT_DETERMINISTIC_BUDGET,
299            semantic_budget: DEFAULT_SEMANTIC_BUDGET,
300            semantic_token_cap: DEFAULT_SEMANTIC_TOKEN_CAP,
301            max_parallel_predicates: usize::MAX,
302        }
303    }
304}
305
306/// Budgeted predicate executor.
307#[derive(Clone)]
308pub struct PredicateExecutor {
309    config: PredicateExecutorConfig,
310    cheap_judge: Option<Rc<dyn CheapJudge>>,
311}
312
313impl PredicateExecutor {
314    pub fn new(config: PredicateExecutorConfig) -> Self {
315        Self {
316            config,
317            cheap_judge: None,
318        }
319    }
320
321    pub fn with_cheap_judge(
322        config: PredicateExecutorConfig,
323        cheap_judge: Rc<dyn CheapJudge>,
324    ) -> Self {
325        Self {
326            config,
327            cheap_judge: Some(cheap_judge),
328        }
329    }
330
331    pub async fn execute_slice(
332        &self,
333        slice: &Slice,
334        predicates: &[Rc<dyn PredicateRunner>],
335    ) -> PredicateExecutionReport {
336        let parallelism = self
337            .config
338            .max_parallel_predicates
339            .max(1)
340            .min(predicates.len().max(1));
341        let slice = Rc::new(slice.clone());
342        let records = futures::stream::iter(predicates.iter())
343            .map(|runner| self.execute_one(slice.clone(), runner.as_ref()))
344            .buffer_unordered(parallelism)
345            .collect::<Vec<_>>()
346            .await;
347
348        let mut records = records;
349        self.apply_semantic_fallbacks(&mut records);
350        records.sort_by(|left, right| left.predicate_hash.cmp(&right.predicate_hash));
351        PredicateExecutionReport { records }
352    }
353
354    async fn execute_one(
355        &self,
356        slice: Rc<Slice>,
357        runner: &dyn PredicateRunner,
358    ) -> PredicateExecutionRecord {
359        let started = Instant::now();
360        let predicate_hash = runner.hash();
361        let kind = runner.kind();
362        let first = self.run_attempt(slice.clone(), runner).await;
363        let first_hash = hash_result(&first.result);
364        let mut result = first.result;
365        let mut attempts = 1;
366        let mut second_hash = None;
367        let semantic_replay_audit = first.semantic_audit;
368
369        if kind == PredicateKind::Deterministic && !result.is_blocking() {
370            let second = self.run_attempt(slice, runner).await;
371            attempts = 2;
372            let replay_hash = hash_result(&second.result);
373            second_hash = replay_hash.clone();
374            if second.result.is_blocking() {
375                result = second.result;
376            } else {
377                match (first_hash.as_ref(), replay_hash.as_ref()) {
378                    (Some(left), Some(right)) if left == right => {}
379                    (Some(left), Some(right)) => {
380                        result = InvariantResult::block(InvariantBlockError::nondeterministic_drift(
381                            format!(
382                                "deterministic predicate result drifted across replay: {left} != {right}"
383                            ),
384                        ));
385                    }
386                    _ => {
387                        result = InvariantResult::block(InvariantBlockError::new(
388                            "result_hash_failed",
389                            "failed to hash deterministic predicate replay result",
390                        ));
391                    }
392                }
393            }
394        }
395
396        PredicateExecutionRecord {
397            predicate_hash,
398            kind,
399            fallback_hash: runner.fallback_hash(),
400            result,
401            elapsed_ms: started.elapsed().as_millis() as u64,
402            attempts,
403            replayable: kind == PredicateKind::Deterministic,
404            first_result_hash: first_hash,
405            second_result_hash: second_hash,
406            semantic_replay_audit,
407        }
408    }
409
410    async fn run_attempt(
411        &self,
412        slice: Rc<Slice>,
413        runner: &dyn PredicateRunner,
414    ) -> PredicateAttempt {
415        let kind = runner.kind();
416        let timeout = match kind {
417            PredicateKind::Deterministic => self.config.deterministic_budget,
418            PredicateKind::Semantic => self.config.semantic_budget,
419        };
420        let context = PredicateContext::new(
421            slice,
422            kind,
423            runner.evidence(),
424            self.cheap_judge.clone(),
425            self.config.semantic_token_cap,
426            Arc::new(AtomicBool::new(false)),
427        );
428        match tokio::time::timeout(timeout, runner.evaluate(context.clone())).await {
429            Ok(result) => PredicateAttempt {
430                result: context
431                    .block_error()
432                    .map(InvariantResult::block)
433                    .unwrap_or(result),
434                semantic_audit: context.semantic_audit(),
435            },
436            Err(_) => {
437                context.cancel();
438                PredicateAttempt {
439                    result: InvariantResult::block(InvariantBlockError::budget_exceeded(format!(
440                        "{kind:?} predicate exceeded {}ms budget",
441                        timeout.as_millis()
442                    ))),
443                    semantic_audit: context.semantic_audit(),
444                }
445            }
446        }
447    }
448
449    fn apply_semantic_fallbacks(&self, records: &mut [PredicateExecutionRecord]) {
450        let by_hash = records
451            .iter()
452            .map(|record| {
453                (
454                    record.predicate_hash.clone(),
455                    (record.kind, record.result.clone()),
456                )
457            })
458            .collect::<BTreeMap<_, _>>();
459
460        for record in records {
461            if record.kind != PredicateKind::Semantic {
462                continue;
463            }
464            let Some(fallback_hash) = record.fallback_hash.as_ref() else {
465                record.result = InvariantResult::block(InvariantBlockError::new(
466                    "fallback_missing",
467                    "semantic predicate did not declare a deterministic fallback",
468                ));
469                continue;
470            };
471            let Some((fallback_kind, fallback_result)) = by_hash.get(fallback_hash) else {
472                record.result = InvariantResult::block(InvariantBlockError::new(
473                    "fallback_missing",
474                    format!(
475                        "semantic predicate fallback {} was not evaluated",
476                        fallback_hash.as_str()
477                    ),
478                ));
479                continue;
480            };
481            if *fallback_kind != PredicateKind::Deterministic {
482                record.result = InvariantResult::block(InvariantBlockError::new(
483                    "fallback_not_deterministic",
484                    format!(
485                        "semantic predicate fallback {} is not deterministic",
486                        fallback_hash.as_str()
487                    ),
488                ));
489                continue;
490            }
491            record.result = stricter_result(&record.result, fallback_result);
492        }
493    }
494}
495
496impl Default for PredicateExecutor {
497    fn default() -> Self {
498        Self::new(PredicateExecutorConfig::default())
499    }
500}
501
502/// Per-predicate execution metadata.
503#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
504pub struct PredicateExecutionRecord {
505    pub predicate_hash: PredicateHash,
506    pub kind: PredicateKind,
507    #[serde(default, skip_serializing_if = "Option::is_none")]
508    pub fallback_hash: Option<PredicateHash>,
509    pub result: InvariantResult,
510    pub elapsed_ms: u64,
511    pub attempts: u8,
512    pub replayable: bool,
513    #[serde(default, skip_serializing_if = "Option::is_none")]
514    pub first_result_hash: Option<String>,
515    #[serde(default, skip_serializing_if = "Option::is_none")]
516    pub second_result_hash: Option<String>,
517    #[serde(default, skip_serializing_if = "Option::is_none")]
518    pub semantic_replay_audit: Option<SemanticReplayAuditMetadata>,
519}
520
521/// Complete result of executing all predicates for one slice.
522#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
523pub struct PredicateExecutionReport {
524    pub records: Vec<PredicateExecutionRecord>,
525}
526
527impl PredicateExecutionReport {
528    pub fn invariants_applied(&self) -> Vec<(PredicateHash, InvariantResult)> {
529        self.records
530            .iter()
531            .map(|record| (record.predicate_hash.clone(), record.result.clone()))
532            .collect()
533    }
534}
535
536fn hash_result(result: &InvariantResult) -> Option<String> {
537    let bytes = serde_json::to_vec(result).ok()?;
538    Some(hex::encode(Sha256::digest(bytes)))
539}
540
541fn stable_hash(bytes: &[u8]) -> String {
542    format!("sha256:{}", hex::encode(Sha256::digest(bytes)))
543}
544
545fn stricter_result(left: &InvariantResult, right: &InvariantResult) -> InvariantResult {
546    if verdict_strictness(&left.verdict) >= verdict_strictness(&right.verdict) {
547        left.clone()
548    } else {
549        right.clone()
550    }
551}
552
553fn estimate_tokens(value: &str) -> u64 {
554    value.split_whitespace().count().max(1) as u64
555}
556
557struct PredicateAttempt {
558    result: InvariantResult,
559    semantic_audit: Option<SemanticReplayAuditMetadata>,
560}
561
562#[cfg(test)]
563mod tests {
564    use super::*;
565    use crate::flow::{Approval, AtomId, PredicateHash, Slice, SliceId, SliceStatus, TestId};
566    use std::cell::Cell;
567
568    fn slice() -> Slice {
569        Slice {
570            id: SliceId([9; 32]),
571            atoms: vec![AtomId([1; 32])],
572            intents: Vec::new(),
573            invariants_applied: Vec::new(),
574            required_tests: vec![TestId::new("test:unit")],
575            approval_chain: Vec::<Approval>::new(),
576            base_ref: AtomId([0; 32]),
577            status: SliceStatus::Ready,
578        }
579    }
580
581    struct StaticPredicate {
582        hash: &'static str,
583        kind: PredicateKind,
584        fallback_hash: Option<&'static str>,
585        result: InvariantResult,
586        delay: Duration,
587    }
588
589    #[async_trait(?Send)]
590    impl PredicateRunner for StaticPredicate {
591        fn hash(&self) -> PredicateHash {
592            PredicateHash::new(self.hash)
593        }
594
595        fn kind(&self) -> PredicateKind {
596            self.kind
597        }
598
599        fn fallback_hash(&self) -> Option<PredicateHash> {
600            self.fallback_hash.map(PredicateHash::new)
601        }
602
603        async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
604            if !self.delay.is_zero() {
605                tokio::time::sleep(self.delay).await;
606            }
607            self.result.clone()
608        }
609    }
610
611    struct DriftingPredicate {
612        calls: Cell<u64>,
613    }
614
615    #[async_trait(?Send)]
616    impl PredicateRunner for DriftingPredicate {
617        fn hash(&self) -> PredicateHash {
618            PredicateHash::new("drift")
619        }
620
621        fn kind(&self) -> PredicateKind {
622            PredicateKind::Deterministic
623        }
624
625        async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
626            let calls = self.calls.get();
627            self.calls.set(calls + 1);
628            if calls == 0 {
629                InvariantResult::allow()
630            } else {
631                InvariantResult::warn("changed")
632            }
633        }
634    }
635
636    struct SemanticPredicate {
637        calls: u8,
638        fallback_hash: Option<&'static str>,
639    }
640
641    #[async_trait(?Send)]
642    impl PredicateRunner for SemanticPredicate {
643        fn hash(&self) -> PredicateHash {
644            PredicateHash::new(format!("semantic-{}", self.calls))
645        }
646
647        fn kind(&self) -> PredicateKind {
648            PredicateKind::Semantic
649        }
650
651        fn fallback_hash(&self) -> Option<PredicateHash> {
652            self.fallback_hash.map(PredicateHash::new)
653        }
654
655        fn evidence(&self) -> BTreeMap<String, String> {
656            BTreeMap::from([("case".to_string(), "pre-baked evidence".to_string())])
657        }
658
659        async fn evaluate(&self, context: PredicateContext) -> InvariantResult {
660            for _ in 0..self.calls {
661                let Err(error) = context.cheap_judge("judge the case", "case").await else {
662                    continue;
663                };
664                return InvariantResult::block(error);
665            }
666            InvariantResult::allow()
667        }
668    }
669
670    struct PassingJudge;
671
672    #[async_trait(?Send)]
673    impl CheapJudge for PassingJudge {
674        async fn cheap_judge(
675            &self,
676            _request: CheapJudgeRequest,
677        ) -> Result<CheapJudgeResponse, InvariantBlockError> {
678            Ok(CheapJudgeResponse {
679                passes: true,
680                reason: None,
681                input_tokens: 2,
682                output_tokens: 1,
683                provider_id: Some("mock-provider".to_string()),
684                model_id: Some("mock-model-1".to_string()),
685                cheap_judge_version: Some("cheap-judge-v1".to_string()),
686            })
687        }
688    }
689
690    struct ParallelProbe {
691        hash: &'static str,
692        active: Rc<Cell<usize>>,
693        max_active: Rc<Cell<usize>>,
694    }
695
696    #[async_trait(?Send)]
697    impl PredicateRunner for ParallelProbe {
698        fn hash(&self) -> PredicateHash {
699            PredicateHash::new(self.hash)
700        }
701
702        fn kind(&self) -> PredicateKind {
703            PredicateKind::Semantic
704        }
705
706        async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
707            let active = self.active.get() + 1;
708            self.active.set(active);
709            self.max_active.set(self.max_active.get().max(active));
710            tokio::time::sleep(Duration::from_millis(10)).await;
711            self.active.set(self.active.get() - 1);
712            InvariantResult::allow()
713        }
714    }
715
716    #[tokio::test]
717    async fn deterministic_predicate_replays_bit_identically() {
718        let executor = PredicateExecutor::default();
719        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
720            hash: "stable",
721            kind: PredicateKind::Deterministic,
722            fallback_hash: None,
723            result: InvariantResult::allow(),
724            delay: Duration::ZERO,
725        })];
726
727        let report = executor.execute_slice(&slice(), &predicates).await;
728
729        assert_eq!(report.records.len(), 1);
730        let record = &report.records[0];
731        assert_eq!(record.result, InvariantResult::allow());
732        assert_eq!(record.attempts, 2);
733        assert_eq!(record.first_result_hash, record.second_result_hash);
734    }
735
736    #[tokio::test]
737    async fn deterministic_drift_blocks_the_predicate() {
738        let executor = PredicateExecutor::default();
739        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(DriftingPredicate {
740            calls: Cell::new(0),
741        })];
742
743        let report = executor.execute_slice(&slice(), &predicates).await;
744
745        let block = report.records[0].result.block_error().expect("blocked");
746        assert_eq!(block.code, "nondeterministic_drift");
747    }
748
749    #[tokio::test]
750    async fn deterministic_budget_overrun_blocks_instead_of_panicking() {
751        let executor = PredicateExecutor::new(PredicateExecutorConfig {
752            deterministic_budget: Duration::from_millis(1),
753            ..PredicateExecutorConfig::default()
754        });
755        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
756            hash: "slow",
757            kind: PredicateKind::Deterministic,
758            fallback_hash: None,
759            result: InvariantResult::allow(),
760            delay: Duration::from_millis(20),
761        })];
762
763        let report = executor.execute_slice(&slice(), &predicates).await;
764
765        let block = report.records[0].result.block_error().expect("blocked");
766        assert_eq!(block.code, "budget_exceeded");
767    }
768
769    #[tokio::test]
770    async fn predicates_are_polled_concurrently_for_a_slice() {
771        let active = Rc::new(Cell::new(0));
772        let max_active = Rc::new(Cell::new(0));
773        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
774            Rc::new(ParallelProbe {
775                hash: "parallel-a",
776                active: active.clone(),
777                max_active: max_active.clone(),
778            }),
779            Rc::new(ParallelProbe {
780                hash: "parallel-b",
781                active,
782                max_active: max_active.clone(),
783            }),
784        ];
785
786        let report = PredicateExecutor::default()
787            .execute_slice(&slice(), &predicates)
788            .await;
789
790        assert_eq!(report.records.len(), 2);
791        assert_eq!(max_active.get(), 2);
792    }
793
794    #[tokio::test]
795    async fn semantic_predicate_gets_one_cheap_judge_call() {
796        let executor = PredicateExecutor::with_cheap_judge(
797            PredicateExecutorConfig::default(),
798            Rc::new(PassingJudge),
799        );
800        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
801            Rc::new(SemanticPredicate {
802                calls: 2,
803                fallback_hash: Some("fallback"),
804            }),
805            Rc::new(StaticPredicate {
806                hash: "fallback",
807                kind: PredicateKind::Deterministic,
808                fallback_hash: None,
809                result: InvariantResult::allow(),
810                delay: Duration::ZERO,
811            }),
812        ];
813
814        let report = executor.execute_slice(&slice(), &predicates).await;
815
816        let semantic = report
817            .records
818            .iter()
819            .find(|record| record.kind == PredicateKind::Semantic)
820            .unwrap();
821        let block = semantic.result.block_error().expect("blocked");
822        assert_eq!(block.code, "budget_exceeded");
823    }
824
825    #[tokio::test]
826    async fn semantic_and_fallback_agree_records_both_results() {
827        let executor = PredicateExecutor::default();
828        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
829            Rc::new(StaticPredicate {
830                hash: "semantic",
831                kind: PredicateKind::Semantic,
832                fallback_hash: Some("fallback"),
833                result: InvariantResult::warn("semantic concern"),
834                delay: Duration::ZERO,
835            }),
836            Rc::new(StaticPredicate {
837                hash: "fallback",
838                kind: PredicateKind::Deterministic,
839                fallback_hash: None,
840                result: InvariantResult::warn("fallback concern"),
841                delay: Duration::ZERO,
842            }),
843        ];
844
845        let report = executor.execute_slice(&slice(), &predicates).await;
846
847        assert_eq!(report.records.len(), 2);
848        assert_eq!(report.invariants_applied().len(), 2);
849        let semantic = report
850            .records
851            .iter()
852            .find(|record| record.predicate_hash == PredicateHash::new("semantic"))
853            .unwrap();
854        assert_eq!(semantic.fallback_hash, Some(PredicateHash::new("fallback")));
855        assert!(matches!(
856            semantic.result.verdict,
857            crate::flow::Verdict::Warn { .. }
858        ));
859    }
860
861    #[tokio::test]
862    async fn semantic_fallback_disagreement_selects_stricter_verdict() {
863        let executor = PredicateExecutor::default();
864        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
865            Rc::new(StaticPredicate {
866                hash: "semantic",
867                kind: PredicateKind::Semantic,
868                fallback_hash: Some("fallback"),
869                result: InvariantResult::allow(),
870                delay: Duration::ZERO,
871            }),
872            Rc::new(StaticPredicate {
873                hash: "fallback",
874                kind: PredicateKind::Deterministic,
875                fallback_hash: None,
876                result: InvariantResult::block(InvariantBlockError::new(
877                    "fallback_policy",
878                    "fallback blocked",
879                )),
880                delay: Duration::ZERO,
881            }),
882        ];
883
884        let report = executor.execute_slice(&slice(), &predicates).await;
885        let semantic = report
886            .records
887            .iter()
888            .find(|record| record.predicate_hash == PredicateHash::new("semantic"))
889            .unwrap();
890
891        let block = semantic
892            .result
893            .block_error()
894            .expect("stricter fallback wins");
895        assert_eq!(block.code, "fallback_policy");
896    }
897
898    #[tokio::test]
899    async fn semantic_missing_fallback_blocks() {
900        let executor = PredicateExecutor::default();
901        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
902            hash: "semantic",
903            kind: PredicateKind::Semantic,
904            fallback_hash: None,
905            result: InvariantResult::allow(),
906            delay: Duration::ZERO,
907        })];
908
909        let report = executor.execute_slice(&slice(), &predicates).await;
910
911        let block = report.records[0].result.block_error().expect("blocked");
912        assert_eq!(block.code, "fallback_missing");
913    }
914
915    #[tokio::test]
916    async fn semantic_predicate_requires_prebaked_evidence() {
917        struct MissingEvidence;
918
919        #[async_trait(?Send)]
920        impl PredicateRunner for MissingEvidence {
921            fn hash(&self) -> PredicateHash {
922                PredicateHash::new("missing-evidence")
923            }
924
925            fn kind(&self) -> PredicateKind {
926                PredicateKind::Semantic
927            }
928
929            fn fallback_hash(&self) -> Option<PredicateHash> {
930                Some(PredicateHash::new("fallback"))
931            }
932
933            async fn evaluate(&self, context: PredicateContext) -> InvariantResult {
934                match context.cheap_judge("judge", "missing").await {
935                    Ok(_) => InvariantResult::allow(),
936                    Err(error) => InvariantResult::block(error),
937                }
938            }
939        }
940
941        let executor = PredicateExecutor::with_cheap_judge(
942            PredicateExecutorConfig::default(),
943            Rc::new(PassingJudge),
944        );
945        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
946            Rc::new(MissingEvidence),
947            Rc::new(StaticPredicate {
948                hash: "fallback",
949                kind: PredicateKind::Deterministic,
950                fallback_hash: None,
951                result: InvariantResult::allow(),
952                delay: Duration::ZERO,
953            }),
954        ];
955
956        let report = executor.execute_slice(&slice(), &predicates).await;
957
958        let semantic = report
959            .records
960            .iter()
961            .find(|record| record.kind == PredicateKind::Semantic)
962            .unwrap();
963        let block = semantic.result.block_error().expect("blocked");
964        assert_eq!(block.code, "evidence_missing");
965    }
966
967    #[tokio::test]
968    async fn semantic_replay_audit_records_judge_hashes() {
969        let executor = PredicateExecutor::with_cheap_judge(
970            PredicateExecutorConfig::default(),
971            Rc::new(PassingJudge),
972        );
973        let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
974            Rc::new(SemanticPredicate {
975                calls: 1,
976                fallback_hash: Some("fallback"),
977            }),
978            Rc::new(StaticPredicate {
979                hash: "fallback",
980                kind: PredicateKind::Deterministic,
981                fallback_hash: None,
982                result: InvariantResult::allow(),
983                delay: Duration::ZERO,
984            }),
985        ];
986
987        let report = executor.execute_slice(&slice(), &predicates).await;
988        let semantic = report
989            .records
990            .iter()
991            .find(|record| record.kind == PredicateKind::Semantic)
992            .unwrap();
993        let audit = semantic
994            .semantic_replay_audit
995            .as_ref()
996            .expect("semantic audit metadata");
997
998        assert_eq!(audit.provider_id.as_deref(), Some("mock-provider"));
999        assert_eq!(audit.model_id.as_deref(), Some("mock-model-1"));
1000        assert_eq!(audit.prompt_hash, stable_hash("judge the case".as_bytes()));
1001        let expected_evidence_hash = stable_hash("pre-baked evidence".as_bytes());
1002        assert_eq!(
1003            audit.evidence_hashes.get("case").map(String::as_str),
1004            Some(expected_evidence_hash.as_str())
1005        );
1006        assert_eq!(audit.token_cap, DEFAULT_SEMANTIC_TOKEN_CAP);
1007    }
1008}