1use std::cell::RefCell;
8use std::collections::BTreeMap;
9use std::rc::Rc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use async_trait::async_trait;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use sha2::{Digest, Sha256};
18
19use super::compose::verdict_strictness;
20use crate::flow::{InvariantBlockError, InvariantResult, PredicateHash, Slice};
21
22const DEFAULT_DETERMINISTIC_BUDGET: Duration = Duration::from_millis(50);
23const DEFAULT_SEMANTIC_BUDGET: Duration = Duration::from_secs(2);
24const DEFAULT_SEMANTIC_TOKEN_CAP: u64 = 1024;
25
26#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(rename_all = "snake_case")]
29pub enum PredicateKind {
30 Deterministic,
32 Semantic,
35}
36
37#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
39pub struct CheapJudgeRequest {
40 pub prompt: String,
41 pub evidence_key: String,
42 pub evidence: String,
43}
44
45#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
47pub struct CheapJudgeResponse {
48 pub passes: bool,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
50 pub reason: Option<String>,
51 #[serde(default)]
52 pub input_tokens: u64,
53 #[serde(default)]
54 pub output_tokens: u64,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
56 pub provider_id: Option<String>,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
58 pub model_id: Option<String>,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub cheap_judge_version: Option<String>,
61}
62
63#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
65pub struct SemanticReplayAuditMetadata {
66 #[serde(default, skip_serializing_if = "Option::is_none")]
67 pub provider_id: Option<String>,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
69 pub model_id: Option<String>,
70 pub prompt_hash: String,
71 pub evidence_hashes: BTreeMap<String, String>,
72 pub token_cap: u64,
73 #[serde(default, skip_serializing_if = "Option::is_none")]
74 pub cheap_judge_version: Option<String>,
75}
76
77#[async_trait(?Send)]
79pub trait CheapJudge {
80 async fn cheap_judge(
81 &self,
82 request: CheapJudgeRequest,
83 ) -> Result<CheapJudgeResponse, InvariantBlockError>;
84}
85
86#[async_trait(?Send)]
88pub trait PredicateRunner {
89 fn hash(&self) -> PredicateHash;
90 fn kind(&self) -> PredicateKind;
91 fn fallback_hash(&self) -> Option<PredicateHash> {
92 None
93 }
94
95 fn evidence(&self) -> BTreeMap<String, String> {
99 BTreeMap::new()
100 }
101
102 async fn evaluate(&self, context: PredicateContext) -> InvariantResult;
103}
104
105#[derive(Clone)]
107pub struct PredicateContext {
108 inner: Rc<PredicateContextInner>,
109}
110
111struct PredicateContextInner {
112 slice: Rc<Slice>,
113 kind: PredicateKind,
114 evidence: BTreeMap<String, String>,
115 cheap_judge: Option<Rc<dyn CheapJudge>>,
116 semantic_token_cap: u64,
117 cancel_token: Arc<AtomicBool>,
118 judge_state: RefCell<JudgeBudgetState>,
119}
120
121#[derive(Default)]
122struct JudgeBudgetState {
123 calls: u64,
124 tokens: u64,
125 block_error: Option<InvariantBlockError>,
126 semantic_audit: Option<SemanticReplayAuditMetadata>,
127}
128
129impl PredicateContext {
130 fn new(
131 slice: Rc<Slice>,
132 kind: PredicateKind,
133 evidence: BTreeMap<String, String>,
134 cheap_judge: Option<Rc<dyn CheapJudge>>,
135 semantic_token_cap: u64,
136 cancel_token: Arc<AtomicBool>,
137 ) -> Self {
138 Self {
139 inner: Rc::new(PredicateContextInner {
140 slice,
141 kind,
142 evidence,
143 cheap_judge,
144 semantic_token_cap,
145 cancel_token,
146 judge_state: RefCell::new(JudgeBudgetState::default()),
147 }),
148 }
149 }
150
151 pub fn slice(&self) -> &Slice {
152 &self.inner.slice
153 }
154
155 pub fn kind(&self) -> PredicateKind {
156 self.inner.kind
157 }
158
159 pub fn evidence(&self, key: &str) -> Option<&str> {
160 self.inner.evidence.get(key).map(String::as_str)
161 }
162
163 pub fn is_cancelled(&self) -> bool {
164 self.inner.cancel_token.load(Ordering::SeqCst)
165 }
166
167 pub async fn cheap_judge(
173 &self,
174 prompt: impl Into<String>,
175 evidence_key: impl Into<String>,
176 ) -> Result<CheapJudgeResponse, InvariantBlockError> {
177 if self.inner.kind != PredicateKind::Semantic {
178 return Err(self.record_block(InvariantBlockError::new(
179 "side_effect_denied",
180 "deterministic predicates cannot invoke cheap_judge",
181 )));
182 }
183
184 let prompt = prompt.into();
185 let evidence_key = evidence_key.into();
186 let Some(evidence) = self.inner.evidence.get(&evidence_key).cloned() else {
187 return Err(self.record_block(InvariantBlockError::new(
188 "evidence_missing",
189 format!(
190 "semantic predicate requested evidence key '{evidence_key}' that was not pre-baked"
191 ),
192 )));
193 };
194
195 let estimated_tokens = estimate_tokens(&prompt).saturating_add(estimate_tokens(&evidence));
196 {
197 let mut state = self.inner.judge_state.borrow_mut();
198 if state.calls >= 1 {
199 let error = InvariantBlockError::budget_exceeded(
200 "semantic predicate exceeded one cheap_judge call",
201 );
202 state.block_error = Some(error.clone());
203 return Err(error);
204 }
205 if state.tokens.saturating_add(estimated_tokens) > self.inner.semantic_token_cap {
206 let error = InvariantBlockError::budget_exceeded(format!(
207 "semantic predicate cheap_judge request exceeds token cap {}",
208 self.inner.semantic_token_cap
209 ));
210 state.block_error = Some(error.clone());
211 return Err(error);
212 }
213 state.calls += 1;
214 state.tokens = state.tokens.saturating_add(estimated_tokens);
215 }
216
217 let Some(judge) = self.inner.cheap_judge.clone() else {
218 return Err(self.record_block(InvariantBlockError::new(
219 "llm_unavailable",
220 "semantic predicate cheap_judge was requested but no judge is installed",
221 )));
222 };
223
224 let response = match judge
225 .cheap_judge(CheapJudgeRequest {
226 prompt: prompt.clone(),
227 evidence_key: evidence_key.clone(),
228 evidence: evidence.clone(),
229 })
230 .await
231 {
232 Ok(response) => response,
233 Err(error) => return Err(self.record_block(error)),
234 };
235 {
236 let mut state = self.inner.judge_state.borrow_mut();
237 state.semantic_audit = Some(SemanticReplayAuditMetadata {
238 provider_id: response.provider_id.clone(),
239 model_id: response.model_id.clone(),
240 prompt_hash: stable_hash(prompt.as_bytes()),
241 evidence_hashes: self
242 .inner
243 .evidence
244 .iter()
245 .map(|(key, value)| (key.clone(), stable_hash(value.as_bytes())))
246 .collect(),
247 token_cap: self.inner.semantic_token_cap,
248 cheap_judge_version: response.cheap_judge_version.clone(),
249 });
250 }
251 let response_tokens = response.input_tokens.saturating_add(response.output_tokens);
252 {
253 let mut state = self.inner.judge_state.borrow_mut();
254 state.tokens = state.tokens.saturating_add(response_tokens);
255 if state.tokens > self.inner.semantic_token_cap {
256 let error = InvariantBlockError::budget_exceeded(format!(
257 "semantic predicate cheap_judge response exceeded token cap {}",
258 self.inner.semantic_token_cap
259 ));
260 state.block_error = Some(error.clone());
261 return Err(error);
262 }
263 }
264 Ok(response)
265 }
266
267 fn cancel(&self) {
268 self.inner.cancel_token.store(true, Ordering::SeqCst);
269 }
270
271 fn block_error(&self) -> Option<InvariantBlockError> {
272 self.inner.judge_state.borrow().block_error.clone()
273 }
274
275 fn semantic_audit(&self) -> Option<SemanticReplayAuditMetadata> {
276 self.inner.judge_state.borrow().semantic_audit.clone()
277 }
278
279 fn record_block(&self, error: InvariantBlockError) -> InvariantBlockError {
280 self.inner.judge_state.borrow_mut().block_error = Some(error.clone());
281 error
282 }
283}
284
285#[derive(Clone, Debug)]
287pub struct PredicateExecutorConfig {
288 pub deterministic_budget: Duration,
289 pub semantic_budget: Duration,
290 pub semantic_token_cap: u64,
291 pub max_parallel_predicates: usize,
293}
294
295impl Default for PredicateExecutorConfig {
296 fn default() -> Self {
297 Self {
298 deterministic_budget: DEFAULT_DETERMINISTIC_BUDGET,
299 semantic_budget: DEFAULT_SEMANTIC_BUDGET,
300 semantic_token_cap: DEFAULT_SEMANTIC_TOKEN_CAP,
301 max_parallel_predicates: usize::MAX,
302 }
303 }
304}
305
306#[derive(Clone)]
308pub struct PredicateExecutor {
309 config: PredicateExecutorConfig,
310 cheap_judge: Option<Rc<dyn CheapJudge>>,
311}
312
313impl PredicateExecutor {
314 pub fn new(config: PredicateExecutorConfig) -> Self {
315 Self {
316 config,
317 cheap_judge: None,
318 }
319 }
320
321 pub fn with_cheap_judge(
322 config: PredicateExecutorConfig,
323 cheap_judge: Rc<dyn CheapJudge>,
324 ) -> Self {
325 Self {
326 config,
327 cheap_judge: Some(cheap_judge),
328 }
329 }
330
331 pub async fn execute_slice(
332 &self,
333 slice: &Slice,
334 predicates: &[Rc<dyn PredicateRunner>],
335 ) -> PredicateExecutionReport {
336 let parallelism = self
337 .config
338 .max_parallel_predicates
339 .max(1)
340 .min(predicates.len().max(1));
341 let slice = Rc::new(slice.clone());
342 let records = futures::stream::iter(predicates.iter())
343 .map(|runner| self.execute_one(slice.clone(), runner.as_ref()))
344 .buffer_unordered(parallelism)
345 .collect::<Vec<_>>()
346 .await;
347
348 let mut records = records;
349 self.apply_semantic_fallbacks(&mut records);
350 records.sort_by(|left, right| left.predicate_hash.cmp(&right.predicate_hash));
351 PredicateExecutionReport { records }
352 }
353
354 async fn execute_one(
355 &self,
356 slice: Rc<Slice>,
357 runner: &dyn PredicateRunner,
358 ) -> PredicateExecutionRecord {
359 let started = Instant::now();
360 let predicate_hash = runner.hash();
361 let kind = runner.kind();
362 let first = self.run_attempt(slice.clone(), runner).await;
363 let first_hash = hash_result(&first.result);
364 let mut result = first.result;
365 let mut attempts = 1;
366 let mut second_hash = None;
367 let semantic_replay_audit = first.semantic_audit;
368
369 if kind == PredicateKind::Deterministic && !result.is_blocking() {
370 let second = self.run_attempt(slice, runner).await;
371 attempts = 2;
372 let replay_hash = hash_result(&second.result);
373 second_hash = replay_hash.clone();
374 if second.result.is_blocking() {
375 result = second.result;
376 } else {
377 match (first_hash.as_ref(), replay_hash.as_ref()) {
378 (Some(left), Some(right)) if left == right => {}
379 (Some(left), Some(right)) => {
380 result = InvariantResult::block(InvariantBlockError::nondeterministic_drift(
381 format!(
382 "deterministic predicate result drifted across replay: {left} != {right}"
383 ),
384 ));
385 }
386 _ => {
387 result = InvariantResult::block(InvariantBlockError::new(
388 "result_hash_failed",
389 "failed to hash deterministic predicate replay result",
390 ));
391 }
392 }
393 }
394 }
395
396 PredicateExecutionRecord {
397 predicate_hash,
398 kind,
399 fallback_hash: runner.fallback_hash(),
400 result,
401 elapsed_ms: started.elapsed().as_millis() as u64,
402 attempts,
403 replayable: kind == PredicateKind::Deterministic,
404 first_result_hash: first_hash,
405 second_result_hash: second_hash,
406 semantic_replay_audit,
407 }
408 }
409
410 async fn run_attempt(
411 &self,
412 slice: Rc<Slice>,
413 runner: &dyn PredicateRunner,
414 ) -> PredicateAttempt {
415 let kind = runner.kind();
416 let timeout = match kind {
417 PredicateKind::Deterministic => self.config.deterministic_budget,
418 PredicateKind::Semantic => self.config.semantic_budget,
419 };
420 let context = PredicateContext::new(
421 slice,
422 kind,
423 runner.evidence(),
424 self.cheap_judge.clone(),
425 self.config.semantic_token_cap,
426 Arc::new(AtomicBool::new(false)),
427 );
428 match tokio::time::timeout(timeout, runner.evaluate(context.clone())).await {
429 Ok(result) => PredicateAttempt {
430 result: context
431 .block_error()
432 .map(InvariantResult::block)
433 .unwrap_or(result),
434 semantic_audit: context.semantic_audit(),
435 },
436 Err(_) => {
437 context.cancel();
438 PredicateAttempt {
439 result: InvariantResult::block(InvariantBlockError::budget_exceeded(format!(
440 "{kind:?} predicate exceeded {}ms budget",
441 timeout.as_millis()
442 ))),
443 semantic_audit: context.semantic_audit(),
444 }
445 }
446 }
447 }
448
449 fn apply_semantic_fallbacks(&self, records: &mut [PredicateExecutionRecord]) {
450 let by_hash = records
451 .iter()
452 .map(|record| {
453 (
454 record.predicate_hash.clone(),
455 (record.kind, record.result.clone()),
456 )
457 })
458 .collect::<BTreeMap<_, _>>();
459
460 for record in records {
461 if record.kind != PredicateKind::Semantic {
462 continue;
463 }
464 let Some(fallback_hash) = record.fallback_hash.as_ref() else {
465 record.result = InvariantResult::block(InvariantBlockError::new(
466 "fallback_missing",
467 "semantic predicate did not declare a deterministic fallback",
468 ));
469 continue;
470 };
471 let Some((fallback_kind, fallback_result)) = by_hash.get(fallback_hash) else {
472 record.result = InvariantResult::block(InvariantBlockError::new(
473 "fallback_missing",
474 format!(
475 "semantic predicate fallback {} was not evaluated",
476 fallback_hash.as_str()
477 ),
478 ));
479 continue;
480 };
481 if *fallback_kind != PredicateKind::Deterministic {
482 record.result = InvariantResult::block(InvariantBlockError::new(
483 "fallback_not_deterministic",
484 format!(
485 "semantic predicate fallback {} is not deterministic",
486 fallback_hash.as_str()
487 ),
488 ));
489 continue;
490 }
491 record.result = stricter_result(&record.result, fallback_result);
492 }
493 }
494}
495
496impl Default for PredicateExecutor {
497 fn default() -> Self {
498 Self::new(PredicateExecutorConfig::default())
499 }
500}
501
502#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
504pub struct PredicateExecutionRecord {
505 pub predicate_hash: PredicateHash,
506 pub kind: PredicateKind,
507 #[serde(default, skip_serializing_if = "Option::is_none")]
508 pub fallback_hash: Option<PredicateHash>,
509 pub result: InvariantResult,
510 pub elapsed_ms: u64,
511 pub attempts: u8,
512 pub replayable: bool,
513 #[serde(default, skip_serializing_if = "Option::is_none")]
514 pub first_result_hash: Option<String>,
515 #[serde(default, skip_serializing_if = "Option::is_none")]
516 pub second_result_hash: Option<String>,
517 #[serde(default, skip_serializing_if = "Option::is_none")]
518 pub semantic_replay_audit: Option<SemanticReplayAuditMetadata>,
519}
520
521#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
523pub struct PredicateExecutionReport {
524 pub records: Vec<PredicateExecutionRecord>,
525}
526
527impl PredicateExecutionReport {
528 pub fn invariants_applied(&self) -> Vec<(PredicateHash, InvariantResult)> {
529 self.records
530 .iter()
531 .map(|record| (record.predicate_hash.clone(), record.result.clone()))
532 .collect()
533 }
534}
535
536fn hash_result(result: &InvariantResult) -> Option<String> {
537 let bytes = serde_json::to_vec(result).ok()?;
538 Some(hex::encode(Sha256::digest(bytes)))
539}
540
541fn stable_hash(bytes: &[u8]) -> String {
542 format!("sha256:{}", hex::encode(Sha256::digest(bytes)))
543}
544
545fn stricter_result(left: &InvariantResult, right: &InvariantResult) -> InvariantResult {
546 if verdict_strictness(&left.verdict) >= verdict_strictness(&right.verdict) {
547 left.clone()
548 } else {
549 right.clone()
550 }
551}
552
553fn estimate_tokens(value: &str) -> u64 {
554 value.split_whitespace().count().max(1) as u64
555}
556
557struct PredicateAttempt {
558 result: InvariantResult,
559 semantic_audit: Option<SemanticReplayAuditMetadata>,
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565 use crate::flow::{Approval, AtomId, PredicateHash, Slice, SliceId, SliceStatus, TestId};
566 use std::cell::Cell;
567
568 fn slice() -> Slice {
569 Slice {
570 id: SliceId([9; 32]),
571 atoms: vec![AtomId([1; 32])],
572 intents: Vec::new(),
573 invariants_applied: Vec::new(),
574 required_tests: vec![TestId::new("test:unit")],
575 approval_chain: Vec::<Approval>::new(),
576 base_ref: AtomId([0; 32]),
577 status: SliceStatus::Ready,
578 }
579 }
580
581 struct StaticPredicate {
582 hash: &'static str,
583 kind: PredicateKind,
584 fallback_hash: Option<&'static str>,
585 result: InvariantResult,
586 delay: Duration,
587 }
588
589 #[async_trait(?Send)]
590 impl PredicateRunner for StaticPredicate {
591 fn hash(&self) -> PredicateHash {
592 PredicateHash::new(self.hash)
593 }
594
595 fn kind(&self) -> PredicateKind {
596 self.kind
597 }
598
599 fn fallback_hash(&self) -> Option<PredicateHash> {
600 self.fallback_hash.map(PredicateHash::new)
601 }
602
603 async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
604 if !self.delay.is_zero() {
605 tokio::time::sleep(self.delay).await;
606 }
607 self.result.clone()
608 }
609 }
610
611 struct DriftingPredicate {
612 calls: Cell<u64>,
613 }
614
615 #[async_trait(?Send)]
616 impl PredicateRunner for DriftingPredicate {
617 fn hash(&self) -> PredicateHash {
618 PredicateHash::new("drift")
619 }
620
621 fn kind(&self) -> PredicateKind {
622 PredicateKind::Deterministic
623 }
624
625 async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
626 let calls = self.calls.get();
627 self.calls.set(calls + 1);
628 if calls == 0 {
629 InvariantResult::allow()
630 } else {
631 InvariantResult::warn("changed")
632 }
633 }
634 }
635
636 struct SemanticPredicate {
637 calls: u8,
638 fallback_hash: Option<&'static str>,
639 }
640
641 #[async_trait(?Send)]
642 impl PredicateRunner for SemanticPredicate {
643 fn hash(&self) -> PredicateHash {
644 PredicateHash::new(format!("semantic-{}", self.calls))
645 }
646
647 fn kind(&self) -> PredicateKind {
648 PredicateKind::Semantic
649 }
650
651 fn fallback_hash(&self) -> Option<PredicateHash> {
652 self.fallback_hash.map(PredicateHash::new)
653 }
654
655 fn evidence(&self) -> BTreeMap<String, String> {
656 BTreeMap::from([("case".to_string(), "pre-baked evidence".to_string())])
657 }
658
659 async fn evaluate(&self, context: PredicateContext) -> InvariantResult {
660 for _ in 0..self.calls {
661 let Err(error) = context.cheap_judge("judge the case", "case").await else {
662 continue;
663 };
664 return InvariantResult::block(error);
665 }
666 InvariantResult::allow()
667 }
668 }
669
670 struct PassingJudge;
671
672 #[async_trait(?Send)]
673 impl CheapJudge for PassingJudge {
674 async fn cheap_judge(
675 &self,
676 _request: CheapJudgeRequest,
677 ) -> Result<CheapJudgeResponse, InvariantBlockError> {
678 Ok(CheapJudgeResponse {
679 passes: true,
680 reason: None,
681 input_tokens: 2,
682 output_tokens: 1,
683 provider_id: Some("mock-provider".to_string()),
684 model_id: Some("mock-model-1".to_string()),
685 cheap_judge_version: Some("cheap-judge-v1".to_string()),
686 })
687 }
688 }
689
690 struct ParallelProbe {
691 hash: &'static str,
692 active: Rc<Cell<usize>>,
693 max_active: Rc<Cell<usize>>,
694 }
695
696 #[async_trait(?Send)]
697 impl PredicateRunner for ParallelProbe {
698 fn hash(&self) -> PredicateHash {
699 PredicateHash::new(self.hash)
700 }
701
702 fn kind(&self) -> PredicateKind {
703 PredicateKind::Semantic
704 }
705
706 async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
707 let active = self.active.get() + 1;
708 self.active.set(active);
709 self.max_active.set(self.max_active.get().max(active));
710 tokio::time::sleep(Duration::from_millis(10)).await;
711 self.active.set(self.active.get() - 1);
712 InvariantResult::allow()
713 }
714 }
715
716 #[tokio::test]
717 async fn deterministic_predicate_replays_bit_identically() {
718 let executor = PredicateExecutor::default();
719 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
720 hash: "stable",
721 kind: PredicateKind::Deterministic,
722 fallback_hash: None,
723 result: InvariantResult::allow(),
724 delay: Duration::ZERO,
725 })];
726
727 let report = executor.execute_slice(&slice(), &predicates).await;
728
729 assert_eq!(report.records.len(), 1);
730 let record = &report.records[0];
731 assert_eq!(record.result, InvariantResult::allow());
732 assert_eq!(record.attempts, 2);
733 assert_eq!(record.first_result_hash, record.second_result_hash);
734 }
735
736 #[tokio::test]
737 async fn deterministic_drift_blocks_the_predicate() {
738 let executor = PredicateExecutor::default();
739 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(DriftingPredicate {
740 calls: Cell::new(0),
741 })];
742
743 let report = executor.execute_slice(&slice(), &predicates).await;
744
745 let block = report.records[0].result.block_error().expect("blocked");
746 assert_eq!(block.code, "nondeterministic_drift");
747 }
748
749 #[tokio::test]
750 async fn deterministic_budget_overrun_blocks_instead_of_panicking() {
751 let executor = PredicateExecutor::new(PredicateExecutorConfig {
752 deterministic_budget: Duration::from_millis(1),
753 ..PredicateExecutorConfig::default()
754 });
755 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
756 hash: "slow",
757 kind: PredicateKind::Deterministic,
758 fallback_hash: None,
759 result: InvariantResult::allow(),
760 delay: Duration::from_millis(20),
761 })];
762
763 let report = executor.execute_slice(&slice(), &predicates).await;
764
765 let block = report.records[0].result.block_error().expect("blocked");
766 assert_eq!(block.code, "budget_exceeded");
767 }
768
769 #[tokio::test]
770 async fn predicates_are_polled_concurrently_for_a_slice() {
771 let active = Rc::new(Cell::new(0));
772 let max_active = Rc::new(Cell::new(0));
773 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
774 Rc::new(ParallelProbe {
775 hash: "parallel-a",
776 active: active.clone(),
777 max_active: max_active.clone(),
778 }),
779 Rc::new(ParallelProbe {
780 hash: "parallel-b",
781 active,
782 max_active: max_active.clone(),
783 }),
784 ];
785
786 let report = PredicateExecutor::default()
787 .execute_slice(&slice(), &predicates)
788 .await;
789
790 assert_eq!(report.records.len(), 2);
791 assert_eq!(max_active.get(), 2);
792 }
793
794 #[tokio::test]
795 async fn semantic_predicate_gets_one_cheap_judge_call() {
796 let executor = PredicateExecutor::with_cheap_judge(
797 PredicateExecutorConfig::default(),
798 Rc::new(PassingJudge),
799 );
800 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
801 Rc::new(SemanticPredicate {
802 calls: 2,
803 fallback_hash: Some("fallback"),
804 }),
805 Rc::new(StaticPredicate {
806 hash: "fallback",
807 kind: PredicateKind::Deterministic,
808 fallback_hash: None,
809 result: InvariantResult::allow(),
810 delay: Duration::ZERO,
811 }),
812 ];
813
814 let report = executor.execute_slice(&slice(), &predicates).await;
815
816 let semantic = report
817 .records
818 .iter()
819 .find(|record| record.kind == PredicateKind::Semantic)
820 .unwrap();
821 let block = semantic.result.block_error().expect("blocked");
822 assert_eq!(block.code, "budget_exceeded");
823 }
824
825 #[tokio::test]
826 async fn semantic_and_fallback_agree_records_both_results() {
827 let executor = PredicateExecutor::default();
828 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
829 Rc::new(StaticPredicate {
830 hash: "semantic",
831 kind: PredicateKind::Semantic,
832 fallback_hash: Some("fallback"),
833 result: InvariantResult::warn("semantic concern"),
834 delay: Duration::ZERO,
835 }),
836 Rc::new(StaticPredicate {
837 hash: "fallback",
838 kind: PredicateKind::Deterministic,
839 fallback_hash: None,
840 result: InvariantResult::warn("fallback concern"),
841 delay: Duration::ZERO,
842 }),
843 ];
844
845 let report = executor.execute_slice(&slice(), &predicates).await;
846
847 assert_eq!(report.records.len(), 2);
848 assert_eq!(report.invariants_applied().len(), 2);
849 let semantic = report
850 .records
851 .iter()
852 .find(|record| record.predicate_hash == PredicateHash::new("semantic"))
853 .unwrap();
854 assert_eq!(semantic.fallback_hash, Some(PredicateHash::new("fallback")));
855 assert!(matches!(
856 semantic.result.verdict,
857 crate::flow::Verdict::Warn { .. }
858 ));
859 }
860
861 #[tokio::test]
862 async fn semantic_fallback_disagreement_selects_stricter_verdict() {
863 let executor = PredicateExecutor::default();
864 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
865 Rc::new(StaticPredicate {
866 hash: "semantic",
867 kind: PredicateKind::Semantic,
868 fallback_hash: Some("fallback"),
869 result: InvariantResult::allow(),
870 delay: Duration::ZERO,
871 }),
872 Rc::new(StaticPredicate {
873 hash: "fallback",
874 kind: PredicateKind::Deterministic,
875 fallback_hash: None,
876 result: InvariantResult::block(InvariantBlockError::new(
877 "fallback_policy",
878 "fallback blocked",
879 )),
880 delay: Duration::ZERO,
881 }),
882 ];
883
884 let report = executor.execute_slice(&slice(), &predicates).await;
885 let semantic = report
886 .records
887 .iter()
888 .find(|record| record.predicate_hash == PredicateHash::new("semantic"))
889 .unwrap();
890
891 let block = semantic
892 .result
893 .block_error()
894 .expect("stricter fallback wins");
895 assert_eq!(block.code, "fallback_policy");
896 }
897
898 #[tokio::test]
899 async fn semantic_missing_fallback_blocks() {
900 let executor = PredicateExecutor::default();
901 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
902 hash: "semantic",
903 kind: PredicateKind::Semantic,
904 fallback_hash: None,
905 result: InvariantResult::allow(),
906 delay: Duration::ZERO,
907 })];
908
909 let report = executor.execute_slice(&slice(), &predicates).await;
910
911 let block = report.records[0].result.block_error().expect("blocked");
912 assert_eq!(block.code, "fallback_missing");
913 }
914
915 #[tokio::test]
916 async fn semantic_predicate_requires_prebaked_evidence() {
917 struct MissingEvidence;
918
919 #[async_trait(?Send)]
920 impl PredicateRunner for MissingEvidence {
921 fn hash(&self) -> PredicateHash {
922 PredicateHash::new("missing-evidence")
923 }
924
925 fn kind(&self) -> PredicateKind {
926 PredicateKind::Semantic
927 }
928
929 fn fallback_hash(&self) -> Option<PredicateHash> {
930 Some(PredicateHash::new("fallback"))
931 }
932
933 async fn evaluate(&self, context: PredicateContext) -> InvariantResult {
934 match context.cheap_judge("judge", "missing").await {
935 Ok(_) => InvariantResult::allow(),
936 Err(error) => InvariantResult::block(error),
937 }
938 }
939 }
940
941 let executor = PredicateExecutor::with_cheap_judge(
942 PredicateExecutorConfig::default(),
943 Rc::new(PassingJudge),
944 );
945 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
946 Rc::new(MissingEvidence),
947 Rc::new(StaticPredicate {
948 hash: "fallback",
949 kind: PredicateKind::Deterministic,
950 fallback_hash: None,
951 result: InvariantResult::allow(),
952 delay: Duration::ZERO,
953 }),
954 ];
955
956 let report = executor.execute_slice(&slice(), &predicates).await;
957
958 let semantic = report
959 .records
960 .iter()
961 .find(|record| record.kind == PredicateKind::Semantic)
962 .unwrap();
963 let block = semantic.result.block_error().expect("blocked");
964 assert_eq!(block.code, "evidence_missing");
965 }
966
967 #[tokio::test]
968 async fn semantic_replay_audit_records_judge_hashes() {
969 let executor = PredicateExecutor::with_cheap_judge(
970 PredicateExecutorConfig::default(),
971 Rc::new(PassingJudge),
972 );
973 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
974 Rc::new(SemanticPredicate {
975 calls: 1,
976 fallback_hash: Some("fallback"),
977 }),
978 Rc::new(StaticPredicate {
979 hash: "fallback",
980 kind: PredicateKind::Deterministic,
981 fallback_hash: None,
982 result: InvariantResult::allow(),
983 delay: Duration::ZERO,
984 }),
985 ];
986
987 let report = executor.execute_slice(&slice(), &predicates).await;
988 let semantic = report
989 .records
990 .iter()
991 .find(|record| record.kind == PredicateKind::Semantic)
992 .unwrap();
993 let audit = semantic
994 .semantic_replay_audit
995 .as_ref()
996 .expect("semantic audit metadata");
997
998 assert_eq!(audit.provider_id.as_deref(), Some("mock-provider"));
999 assert_eq!(audit.model_id.as_deref(), Some("mock-model-1"));
1000 assert_eq!(audit.prompt_hash, stable_hash("judge the case".as_bytes()));
1001 let expected_evidence_hash = stable_hash("pre-baked evidence".as_bytes());
1002 assert_eq!(
1003 audit.evidence_hashes.get("case").map(String::as_str),
1004 Some(expected_evidence_hash.as_str())
1005 );
1006 assert_eq!(audit.token_cap, DEFAULT_SEMANTIC_TOKEN_CAP);
1007 }
1008}