1use std::cell::{Cell, RefCell};
8use std::collections::BTreeMap;
9use std::rc::Rc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use async_trait::async_trait;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use sha2::{Digest, Sha256};
18use tokio::sync::Semaphore;
19
20use super::compose::verdict_strictness;
21use crate::flow::{InvariantBlockError, InvariantResult, PredicateHash, Slice};
22
23const DEFAULT_DETERMINISTIC_BUDGET: Duration = Duration::from_millis(50);
24const DEFAULT_SEMANTIC_BUDGET: Duration = Duration::from_secs(2);
25const DEFAULT_SEMANTIC_TOKEN_CAP: u64 = 1024;
26const DEFAULT_MAX_DETERMINISTIC_LANES: usize = 16;
27const DEFAULT_MAX_SEMANTIC_LANES: usize = 2;
28const DEFAULT_MAX_DETERMINISTIC_LANES_PER_SLICE: usize = usize::MAX;
29const DEFAULT_MAX_SEMANTIC_LANES_PER_SLICE: usize = 1;
30const DEFAULT_SLICE_DETERMINISTIC_ENVELOPE: Duration = Duration::from_secs(5);
31const DEFAULT_SLICE_SEMANTIC_ENVELOPE: Duration = Duration::from_secs(20);
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub enum PredicateKind {
37 Deterministic,
39 Semantic,
42}
43
44#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
46pub struct CheapJudgeRequest {
47 pub prompt: String,
48 pub evidence_key: String,
49 pub evidence: String,
50}
51
52#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
54pub struct CheapJudgeResponse {
55 pub passes: bool,
56 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub reason: Option<String>,
58 #[serde(default)]
59 pub input_tokens: u64,
60 #[serde(default)]
61 pub output_tokens: u64,
62 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub provider_id: Option<String>,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
65 pub model_id: Option<String>,
66 #[serde(default, skip_serializing_if = "Option::is_none")]
67 pub cheap_judge_version: Option<String>,
68}
69
70#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
72pub struct SemanticReplayAuditMetadata {
73 #[serde(default, skip_serializing_if = "Option::is_none")]
74 pub provider_id: Option<String>,
75 #[serde(default, skip_serializing_if = "Option::is_none")]
76 pub model_id: Option<String>,
77 pub prompt_hash: String,
78 pub evidence_hashes: BTreeMap<String, String>,
79 pub token_cap: u64,
80 #[serde(default, skip_serializing_if = "Option::is_none")]
81 pub cheap_judge_version: Option<String>,
82}
83
84#[async_trait(?Send)]
86pub trait CheapJudge {
87 async fn cheap_judge(
88 &self,
89 request: CheapJudgeRequest,
90 ) -> Result<CheapJudgeResponse, InvariantBlockError>;
91}
92
93#[async_trait(?Send)]
95pub trait PredicateRunner {
96 fn hash(&self) -> PredicateHash;
97 fn kind(&self) -> PredicateKind;
98 fn fallback_hash(&self) -> Option<PredicateHash> {
99 None
100 }
101
102 fn evidence(&self) -> BTreeMap<String, String> {
106 BTreeMap::new()
107 }
108
109 async fn evaluate(&self, context: PredicateContext) -> InvariantResult;
110}
111
112#[derive(Clone)]
114pub struct PredicateContext {
115 inner: Rc<PredicateContextInner>,
116}
117
118struct PredicateContextInner {
119 slice: Rc<Slice>,
120 kind: PredicateKind,
121 evidence: BTreeMap<String, String>,
122 cheap_judge: Option<Rc<dyn CheapJudge>>,
123 semantic_token_cap: u64,
124 cancel_token: Arc<AtomicBool>,
125 judge_state: RefCell<JudgeBudgetState>,
126}
127
128#[derive(Default)]
129struct JudgeBudgetState {
130 calls: u64,
131 tokens: u64,
132 block_error: Option<InvariantBlockError>,
133 semantic_audit: Option<SemanticReplayAuditMetadata>,
134}
135
136impl PredicateContext {
137 fn new(
138 slice: Rc<Slice>,
139 kind: PredicateKind,
140 evidence: BTreeMap<String, String>,
141 cheap_judge: Option<Rc<dyn CheapJudge>>,
142 semantic_token_cap: u64,
143 cancel_token: Arc<AtomicBool>,
144 ) -> Self {
145 Self {
146 inner: Rc::new(PredicateContextInner {
147 slice,
148 kind,
149 evidence,
150 cheap_judge,
151 semantic_token_cap,
152 cancel_token,
153 judge_state: RefCell::new(JudgeBudgetState::default()),
154 }),
155 }
156 }
157
158 pub fn slice(&self) -> &Slice {
159 &self.inner.slice
160 }
161
162 pub fn kind(&self) -> PredicateKind {
163 self.inner.kind
164 }
165
166 pub fn evidence(&self, key: &str) -> Option<&str> {
167 self.inner.evidence.get(key).map(String::as_str)
168 }
169
170 pub fn is_cancelled(&self) -> bool {
171 self.inner.cancel_token.load(Ordering::SeqCst)
172 }
173
174 pub async fn cheap_judge(
180 &self,
181 prompt: impl Into<String>,
182 evidence_key: impl Into<String>,
183 ) -> Result<CheapJudgeResponse, InvariantBlockError> {
184 if self.inner.kind != PredicateKind::Semantic {
185 return Err(self.record_block(InvariantBlockError::new(
186 "side_effect_denied",
187 "deterministic predicates cannot invoke cheap_judge",
188 )));
189 }
190
191 let prompt = prompt.into();
192 let evidence_key = evidence_key.into();
193 let Some(evidence) = self.inner.evidence.get(&evidence_key).cloned() else {
194 return Err(self.record_block(InvariantBlockError::new(
195 "evidence_missing",
196 format!(
197 "semantic predicate requested evidence key '{evidence_key}' that was not pre-baked"
198 ),
199 )));
200 };
201
202 let estimated_tokens = estimate_tokens(&prompt).saturating_add(estimate_tokens(&evidence));
203 {
204 let mut state = self.inner.judge_state.borrow_mut();
205 if state.calls >= 1 {
206 let error = InvariantBlockError::budget_exceeded(
207 "semantic predicate exceeded one cheap_judge call",
208 );
209 state.block_error = Some(error.clone());
210 return Err(error);
211 }
212 if state.tokens.saturating_add(estimated_tokens) > self.inner.semantic_token_cap {
213 let error = InvariantBlockError::budget_exceeded(format!(
214 "semantic predicate cheap_judge request exceeds token cap {}",
215 self.inner.semantic_token_cap
216 ));
217 state.block_error = Some(error.clone());
218 return Err(error);
219 }
220 state.calls += 1;
221 state.tokens = state.tokens.saturating_add(estimated_tokens);
222 }
223
224 let Some(judge) = self.inner.cheap_judge.clone() else {
225 return Err(self.record_block(InvariantBlockError::new(
226 "llm_unavailable",
227 "semantic predicate cheap_judge was requested but no judge is installed",
228 )));
229 };
230
231 let response = match judge
232 .cheap_judge(CheapJudgeRequest {
233 prompt: prompt.clone(),
234 evidence_key: evidence_key.clone(),
235 evidence: evidence.clone(),
236 })
237 .await
238 {
239 Ok(response) => response,
240 Err(error) => return Err(self.record_block(error)),
241 };
242 {
243 let mut state = self.inner.judge_state.borrow_mut();
244 state.semantic_audit = Some(SemanticReplayAuditMetadata {
245 provider_id: response.provider_id.clone(),
246 model_id: response.model_id.clone(),
247 prompt_hash: stable_hash(prompt.as_bytes()),
248 evidence_hashes: self
249 .inner
250 .evidence
251 .iter()
252 .map(|(key, value)| (key.clone(), stable_hash(value.as_bytes())))
253 .collect(),
254 token_cap: self.inner.semantic_token_cap,
255 cheap_judge_version: response.cheap_judge_version.clone(),
256 });
257 }
258 let response_tokens = response.input_tokens.saturating_add(response.output_tokens);
259 {
260 let mut state = self.inner.judge_state.borrow_mut();
261 state.tokens = state.tokens.saturating_add(response_tokens);
262 if state.tokens > self.inner.semantic_token_cap {
263 let error = InvariantBlockError::budget_exceeded(format!(
264 "semantic predicate cheap_judge response exceeded token cap {}",
265 self.inner.semantic_token_cap
266 ));
267 state.block_error = Some(error.clone());
268 return Err(error);
269 }
270 }
271 Ok(response)
272 }
273
274 fn cancel(&self) {
275 self.inner.cancel_token.store(true, Ordering::SeqCst);
276 }
277
278 fn block_error(&self) -> Option<InvariantBlockError> {
279 self.inner.judge_state.borrow().block_error.clone()
280 }
281
282 fn semantic_audit(&self) -> Option<SemanticReplayAuditMetadata> {
283 self.inner.judge_state.borrow().semantic_audit.clone()
284 }
285
286 fn record_block(&self, error: InvariantBlockError) -> InvariantBlockError {
287 self.inner.judge_state.borrow_mut().block_error = Some(error.clone());
288 error
289 }
290}
291
292#[derive(Clone, Debug)]
314pub struct PredicateSchedulerConfig {
315 pub max_deterministic_lanes: usize,
318 pub max_semantic_lanes: usize,
321 pub max_deterministic_lanes_per_slice: usize,
323 pub max_semantic_lanes_per_slice: usize,
327 pub slice_deterministic_envelope: Duration,
333 pub slice_semantic_envelope: Duration,
337}
338
339impl Default for PredicateSchedulerConfig {
340 fn default() -> Self {
341 Self {
342 max_deterministic_lanes: DEFAULT_MAX_DETERMINISTIC_LANES,
343 max_semantic_lanes: DEFAULT_MAX_SEMANTIC_LANES,
344 max_deterministic_lanes_per_slice: DEFAULT_MAX_DETERMINISTIC_LANES_PER_SLICE,
345 max_semantic_lanes_per_slice: DEFAULT_MAX_SEMANTIC_LANES_PER_SLICE,
346 slice_deterministic_envelope: DEFAULT_SLICE_DETERMINISTIC_ENVELOPE,
347 slice_semantic_envelope: DEFAULT_SLICE_SEMANTIC_ENVELOPE,
348 }
349 }
350}
351
352#[derive(Clone, Debug)]
354pub struct PredicateExecutorConfig {
355 pub deterministic_budget: Duration,
356 pub semantic_budget: Duration,
357 pub semantic_token_cap: u64,
358 pub scheduler: PredicateSchedulerConfig,
360}
361
362impl Default for PredicateExecutorConfig {
363 fn default() -> Self {
364 Self {
365 deterministic_budget: DEFAULT_DETERMINISTIC_BUDGET,
366 semantic_budget: DEFAULT_SEMANTIC_BUDGET,
367 semantic_token_cap: DEFAULT_SEMANTIC_TOKEN_CAP,
368 scheduler: PredicateSchedulerConfig::default(),
369 }
370 }
371}
372
373#[derive(Clone)]
375pub struct PredicateExecutor {
376 config: PredicateExecutorConfig,
377 cheap_judge: Option<Rc<dyn CheapJudge>>,
378}
379
380impl PredicateExecutor {
381 pub fn new(config: PredicateExecutorConfig) -> Self {
382 Self {
383 config,
384 cheap_judge: None,
385 }
386 }
387
388 pub fn with_cheap_judge(
389 config: PredicateExecutorConfig,
390 cheap_judge: Rc<dyn CheapJudge>,
391 ) -> Self {
392 Self {
393 config,
394 cheap_judge: Some(cheap_judge),
395 }
396 }
397
398 pub async fn execute_slice(
404 &self,
405 slice: &Slice,
406 predicates: &[Rc<dyn PredicateRunner>],
407 ) -> PredicateExecutionReport {
408 let mut reports = self
409 .execute_slices(vec![(slice.clone(), predicates.to_vec())])
410 .await;
411 reports.pop().unwrap_or(PredicateExecutionReport {
412 records: Vec::new(),
413 })
414 }
415
416 pub async fn execute_slices(
430 &self,
431 slices: Vec<(Slice, Vec<Rc<dyn PredicateRunner>>)>,
432 ) -> Vec<PredicateExecutionReport> {
433 let scheduler = &self.config.scheduler;
434 let det_global = Arc::new(Semaphore::new(clamp_permits(
435 scheduler.max_deterministic_lanes,
436 )));
437 let sem_global = Arc::new(Semaphore::new(clamp_permits(scheduler.max_semantic_lanes)));
438
439 let slice_futures = slices
440 .into_iter()
441 .map(|(slice, predicates)| {
442 let det_global = det_global.clone();
443 let sem_global = sem_global.clone();
444 async move {
445 self.execute_slice_inner(slice, predicates, det_global, sem_global)
446 .await
447 }
448 })
449 .collect::<Vec<_>>();
450
451 futures::future::join_all(slice_futures).await
452 }
453
454 async fn execute_slice_inner(
455 &self,
456 slice: Slice,
457 predicates: Vec<Rc<dyn PredicateRunner>>,
458 det_global: Arc<Semaphore>,
459 sem_global: Arc<Semaphore>,
460 ) -> PredicateExecutionReport {
461 let scheduler = &self.config.scheduler;
462 let slice_rc = Rc::new(slice);
463 let lanes = SliceLanes::new(
464 det_global,
465 sem_global,
466 scheduler.max_deterministic_lanes_per_slice,
467 scheduler.max_semantic_lanes_per_slice,
468 );
469 let envelope = SliceEnvelope::new(
470 scheduler.slice_deterministic_envelope,
471 scheduler.slice_semantic_envelope,
472 );
473
474 let buffer = predicates.len().max(1);
479
480 let mut records = futures::stream::iter(predicates)
481 .map(|runner| {
482 let slice = slice_rc.clone();
483 let lanes = lanes.clone();
484 let envelope = envelope.clone();
485 async move { self.execute_one(slice, runner, lanes, envelope).await }
486 })
487 .buffer_unordered(buffer)
488 .collect::<Vec<_>>()
489 .await;
490
491 self.apply_semantic_fallbacks(&mut records);
492 records.sort_by(|left, right| left.predicate_hash.cmp(&right.predicate_hash));
493 PredicateExecutionReport { records }
494 }
495
496 async fn execute_one(
497 &self,
498 slice: Rc<Slice>,
499 runner: Rc<dyn PredicateRunner>,
500 lanes: SliceLanes,
501 envelope: SliceEnvelope,
502 ) -> PredicateExecutionRecord {
503 let started = Instant::now();
504 let predicate_hash = runner.hash();
505 let kind = runner.kind();
506 let first = self
507 .run_attempt(slice.clone(), runner.as_ref(), &lanes, &envelope)
508 .await;
509 let first_hash = hash_result(&first.result);
510 let mut result = first.result;
511 let mut attempts = 1;
512 let mut second_hash = None;
513 let semantic_replay_audit = first.semantic_audit;
514
515 if kind == PredicateKind::Deterministic && !result.is_blocking() {
516 let second = self
517 .run_attempt(slice, runner.as_ref(), &lanes, &envelope)
518 .await;
519 attempts = 2;
520 let replay_hash = hash_result(&second.result);
521 second_hash = replay_hash.clone();
522 if second.result.is_blocking() {
523 result = second.result;
524 } else {
525 match (first_hash.as_ref(), replay_hash.as_ref()) {
526 (Some(left), Some(right)) if left == right => {}
527 (Some(left), Some(right)) => {
528 result = InvariantResult::block(InvariantBlockError::nondeterministic_drift(
529 format!(
530 "deterministic predicate result drifted across replay: {left} != {right}"
531 ),
532 ));
533 }
534 _ => {
535 result = InvariantResult::block(InvariantBlockError::new(
536 "result_hash_failed",
537 "failed to hash deterministic predicate replay result",
538 ));
539 }
540 }
541 }
542 }
543
544 PredicateExecutionRecord {
545 predicate_hash,
546 kind,
547 fallback_hash: runner.fallback_hash(),
548 result,
549 elapsed_ms: started.elapsed().as_millis() as u64,
550 attempts,
551 replayable: kind == PredicateKind::Deterministic,
552 first_result_hash: first_hash,
553 second_result_hash: second_hash,
554 semantic_replay_audit,
555 }
556 }
557
558 async fn run_attempt(
559 &self,
560 slice: Rc<Slice>,
561 runner: &dyn PredicateRunner,
562 lanes: &SliceLanes,
563 envelope: &SliceEnvelope,
564 ) -> PredicateAttempt {
565 let kind = runner.kind();
566 let timeout = match kind {
567 PredicateKind::Deterministic => self.config.deterministic_budget,
568 PredicateKind::Semantic => self.config.semantic_budget,
569 };
570
571 if let Some(attempt) =
572 envelope_exhausted_attempt(envelope, kind, "before this predicate started")
573 {
574 return attempt;
575 }
576
577 let _permits = lanes.acquire(kind).await;
578
579 if let Some(attempt) =
582 envelope_exhausted_attempt(envelope, kind, "while waiting for a lane")
583 {
584 return attempt;
585 }
586
587 let context = PredicateContext::new(
588 slice,
589 kind,
590 runner.evidence(),
591 self.cheap_judge.clone(),
592 self.config.semantic_token_cap,
593 Arc::new(AtomicBool::new(false)),
594 );
595 let started = Instant::now();
596 let attempt = match tokio::time::timeout(timeout, runner.evaluate(context.clone())).await {
597 Ok(result) => PredicateAttempt {
598 result: context
599 .block_error()
600 .map(InvariantResult::block)
601 .unwrap_or(result),
602 semantic_audit: context.semantic_audit(),
603 },
604 Err(_) => {
605 context.cancel();
606 PredicateAttempt {
607 result: InvariantResult::block(InvariantBlockError::budget_exceeded(format!(
608 "{kind:?} predicate exceeded {}ms budget",
609 timeout.as_millis()
610 ))),
611 semantic_audit: context.semantic_audit(),
612 }
613 }
614 };
615 envelope.charge(kind, started.elapsed());
616 attempt
617 }
618
619 fn apply_semantic_fallbacks(&self, records: &mut [PredicateExecutionRecord]) {
620 let by_hash = records
621 .iter()
622 .map(|record| {
623 (
624 record.predicate_hash.clone(),
625 (record.kind, record.result.clone()),
626 )
627 })
628 .collect::<BTreeMap<_, _>>();
629
630 for record in records {
631 if record.kind != PredicateKind::Semantic {
632 continue;
633 }
634 let Some(fallback_hash) = record.fallback_hash.as_ref() else {
635 record.result = InvariantResult::block(InvariantBlockError::new(
636 "fallback_missing",
637 "semantic predicate did not declare a deterministic fallback",
638 ));
639 continue;
640 };
641 let Some((fallback_kind, fallback_result)) = by_hash.get(fallback_hash) else {
642 record.result = InvariantResult::block(InvariantBlockError::new(
643 "fallback_missing",
644 format!(
645 "semantic predicate fallback {} was not evaluated",
646 fallback_hash.as_str()
647 ),
648 ));
649 continue;
650 };
651 if *fallback_kind != PredicateKind::Deterministic {
652 record.result = InvariantResult::block(InvariantBlockError::new(
653 "fallback_not_deterministic",
654 format!(
655 "semantic predicate fallback {} is not deterministic",
656 fallback_hash.as_str()
657 ),
658 ));
659 continue;
660 }
661 record.result = stricter_result(&record.result, fallback_result);
662 }
663 }
664}
665
666impl Default for PredicateExecutor {
667 fn default() -> Self {
668 Self::new(PredicateExecutorConfig::default())
669 }
670}
671
672#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
674pub struct PredicateExecutionRecord {
675 pub predicate_hash: PredicateHash,
676 pub kind: PredicateKind,
677 #[serde(default, skip_serializing_if = "Option::is_none")]
678 pub fallback_hash: Option<PredicateHash>,
679 pub result: InvariantResult,
680 pub elapsed_ms: u64,
681 pub attempts: u8,
682 pub replayable: bool,
683 #[serde(default, skip_serializing_if = "Option::is_none")]
684 pub first_result_hash: Option<String>,
685 #[serde(default, skip_serializing_if = "Option::is_none")]
686 pub second_result_hash: Option<String>,
687 #[serde(default, skip_serializing_if = "Option::is_none")]
688 pub semantic_replay_audit: Option<SemanticReplayAuditMetadata>,
689}
690
691#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
693pub struct PredicateExecutionReport {
694 pub records: Vec<PredicateExecutionRecord>,
695}
696
697impl PredicateExecutionReport {
698 pub fn invariants_applied(&self) -> Vec<(PredicateHash, InvariantResult)> {
699 self.records
700 .iter()
701 .map(|record| (record.predicate_hash.clone(), record.result.clone()))
702 .collect()
703 }
704}
705
706fn hash_result(result: &InvariantResult) -> Option<String> {
707 let bytes = serde_json::to_vec(result).ok()?;
708 Some(hex::encode(Sha256::digest(bytes)))
709}
710
711fn stable_hash(bytes: &[u8]) -> String {
712 format!("sha256:{}", hex::encode(Sha256::digest(bytes)))
713}
714
715fn stricter_result(left: &InvariantResult, right: &InvariantResult) -> InvariantResult {
716 if verdict_strictness(&left.verdict) >= verdict_strictness(&right.verdict) {
717 left.clone()
718 } else {
719 right.clone()
720 }
721}
722
723fn estimate_tokens(value: &str) -> u64 {
724 value.split_whitespace().count().max(1) as u64
725}
726
727fn clamp_permits(value: usize) -> usize {
728 value.clamp(1, Semaphore::MAX_PERMITS)
729}
730
731fn envelope_exhausted_attempt(
735 envelope: &SliceEnvelope,
736 kind: PredicateKind,
737 when: &str,
738) -> Option<PredicateAttempt> {
739 let remaining = envelope.remaining(kind)?;
740 if !remaining.is_zero() {
741 return None;
742 }
743 Some(PredicateAttempt {
744 result: InvariantResult::block(InvariantBlockError::budget_exceeded(format!(
745 "slice {kind:?} envelope exhausted {when}"
746 ))),
747 semantic_audit: None,
748 })
749}
750
751struct PredicateAttempt {
752 result: InvariantResult,
753 semantic_audit: Option<SemanticReplayAuditMetadata>,
754}
755
756#[derive(Clone)]
759struct SliceLanes {
760 deterministic_global: Arc<Semaphore>,
761 deterministic_local: Arc<Semaphore>,
762 semantic_global: Arc<Semaphore>,
763 semantic_local: Arc<Semaphore>,
764}
765
766impl SliceLanes {
767 fn new(
768 deterministic_global: Arc<Semaphore>,
769 semantic_global: Arc<Semaphore>,
770 deterministic_per_slice: usize,
771 semantic_per_slice: usize,
772 ) -> Self {
773 Self {
774 deterministic_global,
775 deterministic_local: Arc::new(Semaphore::new(clamp_permits(deterministic_per_slice))),
776 semantic_global,
777 semantic_local: Arc::new(Semaphore::new(clamp_permits(semantic_per_slice))),
778 }
779 }
780
781 async fn acquire(&self, kind: PredicateKind) -> LaneTickets {
782 let (global, local) = match kind {
783 PredicateKind::Deterministic => (&self.deterministic_global, &self.deterministic_local),
784 PredicateKind::Semantic => (&self.semantic_global, &self.semantic_local),
785 };
786 let local_ticket = local
790 .clone()
791 .acquire_owned()
792 .await
793 .expect("predicate lane semaphore closed");
794 let global_ticket = global
795 .clone()
796 .acquire_owned()
797 .await
798 .expect("predicate lane semaphore closed");
799 LaneTickets {
800 _local: local_ticket,
801 _global: global_ticket,
802 }
803 }
804}
805
806struct LaneTickets {
808 _local: tokio::sync::OwnedSemaphorePermit,
809 _global: tokio::sync::OwnedSemaphorePermit,
810}
811
812#[derive(Clone)]
814struct SliceEnvelope {
815 deterministic_used: Rc<Cell<Duration>>,
816 semantic_used: Rc<Cell<Duration>>,
817 deterministic_budget: Duration,
818 semantic_budget: Duration,
819}
820
821impl SliceEnvelope {
822 fn new(deterministic_budget: Duration, semantic_budget: Duration) -> Self {
823 Self {
824 deterministic_used: Rc::new(Cell::new(Duration::ZERO)),
825 semantic_used: Rc::new(Cell::new(Duration::ZERO)),
826 deterministic_budget,
827 semantic_budget,
828 }
829 }
830
831 fn cell(&self, kind: PredicateKind) -> &Cell<Duration> {
832 match kind {
833 PredicateKind::Deterministic => &self.deterministic_used,
834 PredicateKind::Semantic => &self.semantic_used,
835 }
836 }
837
838 fn budget(&self, kind: PredicateKind) -> Duration {
839 match kind {
840 PredicateKind::Deterministic => self.deterministic_budget,
841 PredicateKind::Semantic => self.semantic_budget,
842 }
843 }
844
845 fn remaining(&self, kind: PredicateKind) -> Option<Duration> {
846 let budget = self.budget(kind);
847 if budget.is_zero() {
848 return None;
849 }
850 let used = self.cell(kind).get();
851 Some(budget.saturating_sub(used))
852 }
853
854 fn charge(&self, kind: PredicateKind, elapsed: Duration) {
855 let cell = self.cell(kind);
856 cell.set(cell.get().saturating_add(elapsed));
857 }
858}
859
860#[cfg(test)]
861mod tests {
862 use super::*;
863 use crate::flow::{Approval, AtomId, PredicateHash, Slice, SliceId, SliceStatus, TestId};
864 use std::cell::Cell;
865
866 fn slice() -> Slice {
867 Slice {
868 id: SliceId([9; 32]),
869 atoms: vec![AtomId([1; 32])],
870 intents: Vec::new(),
871 invariants_applied: Vec::new(),
872 required_tests: vec![TestId::new("test:unit")],
873 approval_chain: Vec::<Approval>::new(),
874 base_ref: AtomId([0; 32]),
875 status: SliceStatus::Ready,
876 }
877 }
878
879 struct StaticPredicate {
880 hash: &'static str,
881 kind: PredicateKind,
882 fallback_hash: Option<&'static str>,
883 result: InvariantResult,
884 delay: Duration,
885 }
886
887 #[async_trait(?Send)]
888 impl PredicateRunner for StaticPredicate {
889 fn hash(&self) -> PredicateHash {
890 PredicateHash::new(self.hash)
891 }
892
893 fn kind(&self) -> PredicateKind {
894 self.kind
895 }
896
897 fn fallback_hash(&self) -> Option<PredicateHash> {
898 self.fallback_hash.map(PredicateHash::new)
899 }
900
901 async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
902 if !self.delay.is_zero() {
903 tokio::time::sleep(self.delay).await;
904 }
905 self.result.clone()
906 }
907 }
908
909 struct DriftingPredicate {
910 calls: Cell<u64>,
911 }
912
913 #[async_trait(?Send)]
914 impl PredicateRunner for DriftingPredicate {
915 fn hash(&self) -> PredicateHash {
916 PredicateHash::new("drift")
917 }
918
919 fn kind(&self) -> PredicateKind {
920 PredicateKind::Deterministic
921 }
922
923 async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
924 let calls = self.calls.get();
925 self.calls.set(calls + 1);
926 if calls == 0 {
927 InvariantResult::allow()
928 } else {
929 InvariantResult::warn("changed")
930 }
931 }
932 }
933
934 struct SemanticPredicate {
935 calls: u8,
936 fallback_hash: Option<&'static str>,
937 }
938
939 #[async_trait(?Send)]
940 impl PredicateRunner for SemanticPredicate {
941 fn hash(&self) -> PredicateHash {
942 PredicateHash::new(format!("semantic-{}", self.calls))
943 }
944
945 fn kind(&self) -> PredicateKind {
946 PredicateKind::Semantic
947 }
948
949 fn fallback_hash(&self) -> Option<PredicateHash> {
950 self.fallback_hash.map(PredicateHash::new)
951 }
952
953 fn evidence(&self) -> BTreeMap<String, String> {
954 BTreeMap::from([("case".to_string(), "pre-baked evidence".to_string())])
955 }
956
957 async fn evaluate(&self, context: PredicateContext) -> InvariantResult {
958 for _ in 0..self.calls {
959 let Err(error) = context.cheap_judge("judge the case", "case").await else {
960 continue;
961 };
962 return InvariantResult::block(error);
963 }
964 InvariantResult::allow()
965 }
966 }
967
968 struct PassingJudge;
969
970 #[async_trait(?Send)]
971 impl CheapJudge for PassingJudge {
972 async fn cheap_judge(
973 &self,
974 _request: CheapJudgeRequest,
975 ) -> Result<CheapJudgeResponse, InvariantBlockError> {
976 Ok(CheapJudgeResponse {
977 passes: true,
978 reason: None,
979 input_tokens: 2,
980 output_tokens: 1,
981 provider_id: Some("mock-provider".to_string()),
982 model_id: Some("mock-model-1".to_string()),
983 cheap_judge_version: Some("cheap-judge-v1".to_string()),
984 })
985 }
986 }
987
988 struct ParallelProbe {
989 hash: &'static str,
990 kind: PredicateKind,
991 active: Rc<Cell<usize>>,
992 max_active: Rc<Cell<usize>>,
993 }
994
995 #[async_trait(?Send)]
996 impl PredicateRunner for ParallelProbe {
997 fn hash(&self) -> PredicateHash {
998 PredicateHash::new(self.hash)
999 }
1000
1001 fn kind(&self) -> PredicateKind {
1002 self.kind
1003 }
1004
1005 async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
1006 let active = self.active.get() + 1;
1007 self.active.set(active);
1008 self.max_active.set(self.max_active.get().max(active));
1009 tokio::time::sleep(Duration::from_millis(10)).await;
1010 self.active.set(self.active.get() - 1);
1011 InvariantResult::allow()
1012 }
1013 }
1014
1015 #[tokio::test]
1016 async fn deterministic_predicate_replays_bit_identically() {
1017 let executor = PredicateExecutor::default();
1018 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
1019 hash: "stable",
1020 kind: PredicateKind::Deterministic,
1021 fallback_hash: None,
1022 result: InvariantResult::allow(),
1023 delay: Duration::ZERO,
1024 })];
1025
1026 let report = executor.execute_slice(&slice(), &predicates).await;
1027
1028 assert_eq!(report.records.len(), 1);
1029 let record = &report.records[0];
1030 assert_eq!(record.result, InvariantResult::allow());
1031 assert_eq!(record.attempts, 2);
1032 assert_eq!(record.first_result_hash, record.second_result_hash);
1033 }
1034
1035 #[tokio::test]
1036 async fn deterministic_drift_blocks_the_predicate() {
1037 let executor = PredicateExecutor::default();
1038 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(DriftingPredicate {
1039 calls: Cell::new(0),
1040 })];
1041
1042 let report = executor.execute_slice(&slice(), &predicates).await;
1043
1044 let block = report.records[0].result.block_error().expect("blocked");
1045 assert_eq!(block.code, "nondeterministic_drift");
1046 }
1047
1048 #[tokio::test]
1049 async fn deterministic_budget_overrun_blocks_instead_of_panicking() {
1050 let executor = PredicateExecutor::new(PredicateExecutorConfig {
1051 deterministic_budget: Duration::from_millis(1),
1052 ..PredicateExecutorConfig::default()
1053 });
1054 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
1055 hash: "slow",
1056 kind: PredicateKind::Deterministic,
1057 fallback_hash: None,
1058 result: InvariantResult::allow(),
1059 delay: Duration::from_millis(20),
1060 })];
1061
1062 let report = executor.execute_slice(&slice(), &predicates).await;
1063
1064 let block = report.records[0].result.block_error().expect("blocked");
1065 assert_eq!(block.code, "budget_exceeded");
1066 }
1067
1068 #[tokio::test]
1069 async fn predicates_are_polled_concurrently_for_a_slice() {
1070 let active = Rc::new(Cell::new(0));
1071 let max_active = Rc::new(Cell::new(0));
1072 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1073 Rc::new(ParallelProbe {
1074 hash: "parallel-a",
1075 kind: PredicateKind::Deterministic,
1076 active: active.clone(),
1077 max_active: max_active.clone(),
1078 }),
1079 Rc::new(ParallelProbe {
1080 hash: "parallel-b",
1081 kind: PredicateKind::Deterministic,
1082 active,
1083 max_active: max_active.clone(),
1084 }),
1085 ];
1086
1087 let report = PredicateExecutor::default()
1088 .execute_slice(&slice(), &predicates)
1089 .await;
1090
1091 assert_eq!(report.records.len(), 2);
1092 assert_eq!(max_active.get(), 2);
1093 }
1094
1095 #[tokio::test]
1096 async fn semantic_predicate_gets_one_cheap_judge_call() {
1097 let executor = PredicateExecutor::with_cheap_judge(
1098 PredicateExecutorConfig::default(),
1099 Rc::new(PassingJudge),
1100 );
1101 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1102 Rc::new(SemanticPredicate {
1103 calls: 2,
1104 fallback_hash: Some("fallback"),
1105 }),
1106 Rc::new(StaticPredicate {
1107 hash: "fallback",
1108 kind: PredicateKind::Deterministic,
1109 fallback_hash: None,
1110 result: InvariantResult::allow(),
1111 delay: Duration::ZERO,
1112 }),
1113 ];
1114
1115 let report = executor.execute_slice(&slice(), &predicates).await;
1116
1117 let semantic = report
1118 .records
1119 .iter()
1120 .find(|record| record.kind == PredicateKind::Semantic)
1121 .unwrap();
1122 let block = semantic.result.block_error().expect("blocked");
1123 assert_eq!(block.code, "budget_exceeded");
1124 }
1125
1126 #[tokio::test]
1127 async fn semantic_and_fallback_agree_records_both_results() {
1128 let executor = PredicateExecutor::default();
1129 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1130 Rc::new(StaticPredicate {
1131 hash: "semantic",
1132 kind: PredicateKind::Semantic,
1133 fallback_hash: Some("fallback"),
1134 result: InvariantResult::warn("semantic concern"),
1135 delay: Duration::ZERO,
1136 }),
1137 Rc::new(StaticPredicate {
1138 hash: "fallback",
1139 kind: PredicateKind::Deterministic,
1140 fallback_hash: None,
1141 result: InvariantResult::warn("fallback concern"),
1142 delay: Duration::ZERO,
1143 }),
1144 ];
1145
1146 let report = executor.execute_slice(&slice(), &predicates).await;
1147
1148 assert_eq!(report.records.len(), 2);
1149 assert_eq!(report.invariants_applied().len(), 2);
1150 let semantic = report
1151 .records
1152 .iter()
1153 .find(|record| record.predicate_hash == PredicateHash::new("semantic"))
1154 .unwrap();
1155 assert_eq!(semantic.fallback_hash, Some(PredicateHash::new("fallback")));
1156 assert!(matches!(
1157 semantic.result.verdict,
1158 crate::flow::Verdict::Warn { .. }
1159 ));
1160 }
1161
1162 #[tokio::test]
1163 async fn semantic_fallback_disagreement_selects_stricter_verdict() {
1164 let executor = PredicateExecutor::default();
1165 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1166 Rc::new(StaticPredicate {
1167 hash: "semantic",
1168 kind: PredicateKind::Semantic,
1169 fallback_hash: Some("fallback"),
1170 result: InvariantResult::allow(),
1171 delay: Duration::ZERO,
1172 }),
1173 Rc::new(StaticPredicate {
1174 hash: "fallback",
1175 kind: PredicateKind::Deterministic,
1176 fallback_hash: None,
1177 result: InvariantResult::block(InvariantBlockError::new(
1178 "fallback_policy",
1179 "fallback blocked",
1180 )),
1181 delay: Duration::ZERO,
1182 }),
1183 ];
1184
1185 let report = executor.execute_slice(&slice(), &predicates).await;
1186 let semantic = report
1187 .records
1188 .iter()
1189 .find(|record| record.predicate_hash == PredicateHash::new("semantic"))
1190 .unwrap();
1191
1192 let block = semantic
1193 .result
1194 .block_error()
1195 .expect("stricter fallback wins");
1196 assert_eq!(block.code, "fallback_policy");
1197 }
1198
1199 #[tokio::test]
1200 async fn semantic_missing_fallback_blocks() {
1201 let executor = PredicateExecutor::default();
1202 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
1203 hash: "semantic",
1204 kind: PredicateKind::Semantic,
1205 fallback_hash: None,
1206 result: InvariantResult::allow(),
1207 delay: Duration::ZERO,
1208 })];
1209
1210 let report = executor.execute_slice(&slice(), &predicates).await;
1211
1212 let block = report.records[0].result.block_error().expect("blocked");
1213 assert_eq!(block.code, "fallback_missing");
1214 }
1215
1216 #[tokio::test]
1217 async fn semantic_predicate_requires_prebaked_evidence() {
1218 struct MissingEvidence;
1219
1220 #[async_trait(?Send)]
1221 impl PredicateRunner for MissingEvidence {
1222 fn hash(&self) -> PredicateHash {
1223 PredicateHash::new("missing-evidence")
1224 }
1225
1226 fn kind(&self) -> PredicateKind {
1227 PredicateKind::Semantic
1228 }
1229
1230 fn fallback_hash(&self) -> Option<PredicateHash> {
1231 Some(PredicateHash::new("fallback"))
1232 }
1233
1234 async fn evaluate(&self, context: PredicateContext) -> InvariantResult {
1235 match context.cheap_judge("judge", "missing").await {
1236 Ok(_) => InvariantResult::allow(),
1237 Err(error) => InvariantResult::block(error),
1238 }
1239 }
1240 }
1241
1242 let executor = PredicateExecutor::with_cheap_judge(
1243 PredicateExecutorConfig::default(),
1244 Rc::new(PassingJudge),
1245 );
1246 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1247 Rc::new(MissingEvidence),
1248 Rc::new(StaticPredicate {
1249 hash: "fallback",
1250 kind: PredicateKind::Deterministic,
1251 fallback_hash: None,
1252 result: InvariantResult::allow(),
1253 delay: Duration::ZERO,
1254 }),
1255 ];
1256
1257 let report = executor.execute_slice(&slice(), &predicates).await;
1258
1259 let semantic = report
1260 .records
1261 .iter()
1262 .find(|record| record.kind == PredicateKind::Semantic)
1263 .unwrap();
1264 let block = semantic.result.block_error().expect("blocked");
1265 assert_eq!(block.code, "evidence_missing");
1266 }
1267
1268 #[tokio::test]
1269 async fn semantic_replay_audit_records_judge_hashes() {
1270 let executor = PredicateExecutor::with_cheap_judge(
1271 PredicateExecutorConfig::default(),
1272 Rc::new(PassingJudge),
1273 );
1274 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1275 Rc::new(SemanticPredicate {
1276 calls: 1,
1277 fallback_hash: Some("fallback"),
1278 }),
1279 Rc::new(StaticPredicate {
1280 hash: "fallback",
1281 kind: PredicateKind::Deterministic,
1282 fallback_hash: None,
1283 result: InvariantResult::allow(),
1284 delay: Duration::ZERO,
1285 }),
1286 ];
1287
1288 let report = executor.execute_slice(&slice(), &predicates).await;
1289 let semantic = report
1290 .records
1291 .iter()
1292 .find(|record| record.kind == PredicateKind::Semantic)
1293 .unwrap();
1294 let audit = semantic
1295 .semantic_replay_audit
1296 .as_ref()
1297 .expect("semantic audit metadata");
1298
1299 assert_eq!(audit.provider_id.as_deref(), Some("mock-provider"));
1300 assert_eq!(audit.model_id.as_deref(), Some("mock-model-1"));
1301 assert_eq!(audit.prompt_hash, stable_hash("judge the case".as_bytes()));
1302 let expected_evidence_hash = stable_hash("pre-baked evidence".as_bytes());
1303 assert_eq!(
1304 audit.evidence_hashes.get("case").map(String::as_str),
1305 Some(expected_evidence_hash.as_str())
1306 );
1307 assert_eq!(audit.token_cap, DEFAULT_SEMANTIC_TOKEN_CAP);
1308 }
1309
1310 fn slice_with_id(id: u8) -> Slice {
1311 let mut value = slice();
1312 value.id = SliceId([id; 32]);
1313 value
1314 }
1315
1316 struct FinishingProbe {
1319 hash: &'static str,
1320 kind: PredicateKind,
1321 delay: Duration,
1322 finish_log: Rc<RefCell<Vec<(String, u128)>>>,
1324 epoch: Instant,
1325 }
1326
1327 #[async_trait(?Send)]
1328 impl PredicateRunner for FinishingProbe {
1329 fn hash(&self) -> PredicateHash {
1330 PredicateHash::new(self.hash)
1331 }
1332
1333 fn kind(&self) -> PredicateKind {
1334 self.kind
1335 }
1336
1337 async fn evaluate(&self, _context: PredicateContext) -> InvariantResult {
1338 tokio::time::sleep(self.delay).await;
1339 self.finish_log
1340 .borrow_mut()
1341 .push((self.hash.to_string(), self.epoch.elapsed().as_micros()));
1342 InvariantResult::allow()
1343 }
1344 }
1345
1346 #[tokio::test]
1347 async fn execute_slices_returns_one_report_per_slice_in_input_order() {
1348 let executor = PredicateExecutor::default();
1349 let slice_a = slice_with_id(1);
1350 let slice_b = slice_with_id(2);
1351
1352 let predicates_a: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
1353 hash: "alpha",
1354 kind: PredicateKind::Deterministic,
1355 fallback_hash: None,
1356 result: InvariantResult::allow(),
1357 delay: Duration::ZERO,
1358 })];
1359 let predicates_b: Vec<Rc<dyn PredicateRunner>> = vec![Rc::new(StaticPredicate {
1360 hash: "beta",
1361 kind: PredicateKind::Deterministic,
1362 fallback_hash: None,
1363 result: InvariantResult::allow(),
1364 delay: Duration::ZERO,
1365 })];
1366
1367 let reports = executor
1368 .execute_slices(vec![(slice_a, predicates_a), (slice_b, predicates_b)])
1369 .await;
1370
1371 assert_eq!(reports.len(), 2);
1372 assert_eq!(
1373 reports[0].records[0].predicate_hash,
1374 PredicateHash::new("alpha")
1375 );
1376 assert_eq!(
1377 reports[1].records[0].predicate_hash,
1378 PredicateHash::new("beta")
1379 );
1380 }
1381
1382 #[tokio::test]
1383 async fn semantic_lane_fairness_prevents_monopolization() {
1384 let config = PredicateExecutorConfig {
1389 scheduler: PredicateSchedulerConfig {
1390 max_semantic_lanes: 1,
1391 max_semantic_lanes_per_slice: 1,
1392 slice_semantic_envelope: Duration::from_secs(60),
1393 ..PredicateSchedulerConfig::default()
1394 },
1395 ..PredicateExecutorConfig::default()
1396 };
1397 let executor = PredicateExecutor::with_cheap_judge(config, Rc::new(PassingJudge));
1398
1399 let log_a = Rc::new(RefCell::new(Vec::new()));
1400 let log_b = Rc::new(RefCell::new(Vec::new()));
1401 let epoch = Instant::now();
1402
1403 let predicates_a: Vec<Rc<dyn PredicateRunner>> = (0..4)
1404 .map(|i| -> Rc<dyn PredicateRunner> {
1405 Rc::new(FinishingProbe {
1406 hash: ["a0", "a1", "a2", "a3"][i],
1407 kind: PredicateKind::Semantic,
1408 delay: Duration::from_millis(20),
1409 finish_log: log_a.clone(),
1410 epoch,
1411 })
1412 })
1413 .collect();
1414 let predicates_b: Vec<Rc<dyn PredicateRunner>> = (0..4)
1415 .map(|i| -> Rc<dyn PredicateRunner> {
1416 Rc::new(FinishingProbe {
1417 hash: ["b0", "b1", "b2", "b3"][i],
1418 kind: PredicateKind::Semantic,
1419 delay: Duration::from_millis(20),
1420 finish_log: log_b.clone(),
1421 epoch,
1422 })
1423 })
1424 .collect();
1425
1426 let _ = executor
1427 .execute_slices(vec![
1428 (slice_with_id(1), predicates_a),
1429 (slice_with_id(2), predicates_b),
1430 ])
1431 .await;
1432
1433 let log_a_snapshot = log_a.borrow().clone();
1434 let log_b_snapshot = log_b.borrow().clone();
1435 assert_eq!(log_a_snapshot.len(), 4);
1436 assert_eq!(log_b_snapshot.len(), 4);
1437 let earliest_b = log_b_snapshot
1438 .iter()
1439 .map(|(_, micros)| *micros)
1440 .min()
1441 .unwrap();
1442 let latest_a = log_a_snapshot
1443 .iter()
1444 .map(|(_, micros)| *micros)
1445 .max()
1446 .unwrap();
1447 assert!(
1448 earliest_b < latest_a,
1449 "fair scheduler should interleave slices: B's first finished at {earliest_b}us, A's last at {latest_a}us",
1450 );
1451 }
1452
1453 #[tokio::test]
1454 async fn deterministic_progress_continues_while_semantic_work_waits() {
1455 let config = PredicateExecutorConfig {
1460 semantic_budget: Duration::from_secs(10),
1461 scheduler: PredicateSchedulerConfig {
1462 max_semantic_lanes: 1,
1463 max_semantic_lanes_per_slice: 1,
1464 slice_semantic_envelope: Duration::from_secs(60),
1465 ..PredicateSchedulerConfig::default()
1466 },
1467 ..PredicateExecutorConfig::default()
1468 };
1469 let executor = PredicateExecutor::with_cheap_judge(config, Rc::new(PassingJudge));
1470
1471 let log = Rc::new(RefCell::new(Vec::new()));
1472 let epoch = Instant::now();
1473
1474 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1475 Rc::new(FinishingProbe {
1476 hash: "slow-semantic",
1477 kind: PredicateKind::Semantic,
1478 delay: Duration::from_millis(80),
1479 finish_log: log.clone(),
1480 epoch,
1481 }),
1482 Rc::new(FinishingProbe {
1483 hash: "det-1",
1484 kind: PredicateKind::Deterministic,
1485 delay: Duration::from_millis(5),
1486 finish_log: log.clone(),
1487 epoch,
1488 }),
1489 Rc::new(FinishingProbe {
1490 hash: "det-2",
1491 kind: PredicateKind::Deterministic,
1492 delay: Duration::from_millis(5),
1493 finish_log: log.clone(),
1494 epoch,
1495 }),
1496 ];
1497
1498 let _ = executor.execute_slice(&slice(), &predicates).await;
1499 let snapshot = log.borrow().clone();
1500 let det_finish = snapshot
1501 .iter()
1502 .filter_map(|(name, micros)| name.starts_with("det-").then_some(*micros))
1503 .max()
1504 .expect("deterministic finished");
1505 let semantic_finish = snapshot
1506 .iter()
1507 .find(|(name, _)| name == "slow-semantic")
1508 .map(|(_, micros)| *micros)
1509 .expect("semantic finished");
1510
1511 assert!(
1512 det_finish < semantic_finish,
1513 "deterministic ({det_finish}us) should finish before slow semantic ({semantic_finish}us)",
1514 );
1515 }
1516
1517 #[tokio::test]
1518 async fn slice_deterministic_envelope_blocks_remaining_predicates() {
1519 let config = PredicateExecutorConfig {
1523 scheduler: PredicateSchedulerConfig {
1524 max_deterministic_lanes_per_slice: 1,
1525 slice_deterministic_envelope: Duration::from_millis(15),
1526 ..PredicateSchedulerConfig::default()
1527 },
1528 ..PredicateExecutorConfig::default()
1529 };
1530 let executor = PredicateExecutor::new(config);
1531 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1532 Rc::new(StaticPredicate {
1533 hash: "first",
1534 kind: PredicateKind::Deterministic,
1535 fallback_hash: None,
1536 result: InvariantResult::allow(),
1537 delay: Duration::from_millis(20),
1538 }),
1539 Rc::new(StaticPredicate {
1540 hash: "second",
1541 kind: PredicateKind::Deterministic,
1542 fallback_hash: None,
1543 result: InvariantResult::allow(),
1544 delay: Duration::ZERO,
1545 }),
1546 Rc::new(StaticPredicate {
1547 hash: "third",
1548 kind: PredicateKind::Deterministic,
1549 fallback_hash: None,
1550 result: InvariantResult::allow(),
1551 delay: Duration::ZERO,
1552 }),
1553 ];
1554
1555 let report = executor.execute_slice(&slice(), &predicates).await;
1556
1557 let by_hash: BTreeMap<_, _> = report
1558 .records
1559 .iter()
1560 .map(|record| (record.predicate_hash.as_str().to_string(), record))
1561 .collect();
1562 let second = by_hash.get("second").expect("second present");
1567 let third = by_hash.get("third").expect("third present");
1568 let second_block = second.result.block_error().expect("second blocked");
1569 assert_eq!(second_block.code, "budget_exceeded");
1570 let third_block = third.result.block_error().expect("third blocked");
1571 assert_eq!(third_block.code, "budget_exceeded");
1572 }
1573
1574 #[tokio::test]
1575 async fn slice_semantic_envelope_blocks_remaining_predicates() {
1576 let config = PredicateExecutorConfig {
1577 scheduler: PredicateSchedulerConfig {
1578 max_semantic_lanes: 1,
1579 max_semantic_lanes_per_slice: 1,
1580 slice_semantic_envelope: Duration::from_millis(15),
1581 ..PredicateSchedulerConfig::default()
1582 },
1583 ..PredicateExecutorConfig::default()
1584 };
1585 let executor = PredicateExecutor::with_cheap_judge(config, Rc::new(PassingJudge));
1586
1587 let predicates: Vec<Rc<dyn PredicateRunner>> = vec![
1588 Rc::new(StaticPredicate {
1589 hash: "sem-first",
1590 kind: PredicateKind::Semantic,
1591 fallback_hash: Some("sem-fallback"),
1592 result: InvariantResult::allow(),
1593 delay: Duration::from_millis(30),
1594 }),
1595 Rc::new(StaticPredicate {
1596 hash: "sem-second",
1597 kind: PredicateKind::Semantic,
1598 fallback_hash: Some("sem-fallback"),
1599 result: InvariantResult::allow(),
1600 delay: Duration::ZERO,
1601 }),
1602 Rc::new(StaticPredicate {
1603 hash: "sem-fallback",
1604 kind: PredicateKind::Deterministic,
1605 fallback_hash: None,
1606 result: InvariantResult::allow(),
1607 delay: Duration::ZERO,
1608 }),
1609 ];
1610
1611 let report = executor.execute_slice(&slice(), &predicates).await;
1612 let second = report
1613 .records
1614 .iter()
1615 .find(|record| record.predicate_hash == PredicateHash::new("sem-second"))
1616 .expect("sem-second present");
1617 let block = second.result.block_error().expect("blocked");
1618 assert_eq!(block.code, "budget_exceeded");
1619 }
1620
1621 #[tokio::test]
1622 async fn slice_envelopes_are_independent_across_slices() {
1623 let config = PredicateExecutorConfig {
1626 scheduler: PredicateSchedulerConfig {
1627 max_semantic_lanes: 2,
1628 max_semantic_lanes_per_slice: 1,
1629 slice_semantic_envelope: Duration::from_millis(15),
1630 ..PredicateSchedulerConfig::default()
1631 },
1632 ..PredicateExecutorConfig::default()
1633 };
1634 let executor = PredicateExecutor::with_cheap_judge(config, Rc::new(PassingJudge));
1635
1636 let slice_a_preds: Vec<Rc<dyn PredicateRunner>> = vec![
1637 Rc::new(StaticPredicate {
1638 hash: "a-slow",
1639 kind: PredicateKind::Semantic,
1640 fallback_hash: Some("a-fallback"),
1641 result: InvariantResult::allow(),
1642 delay: Duration::from_millis(30),
1643 }),
1644 Rc::new(StaticPredicate {
1645 hash: "a-second",
1646 kind: PredicateKind::Semantic,
1647 fallback_hash: Some("a-fallback"),
1648 result: InvariantResult::allow(),
1649 delay: Duration::ZERO,
1650 }),
1651 Rc::new(StaticPredicate {
1652 hash: "a-fallback",
1653 kind: PredicateKind::Deterministic,
1654 fallback_hash: None,
1655 result: InvariantResult::allow(),
1656 delay: Duration::ZERO,
1657 }),
1658 ];
1659 let slice_b_preds: Vec<Rc<dyn PredicateRunner>> = vec![
1660 Rc::new(StaticPredicate {
1661 hash: "b-fast",
1662 kind: PredicateKind::Semantic,
1663 fallback_hash: Some("b-fallback"),
1664 result: InvariantResult::allow(),
1665 delay: Duration::from_millis(2),
1666 }),
1667 Rc::new(StaticPredicate {
1668 hash: "b-fallback",
1669 kind: PredicateKind::Deterministic,
1670 fallback_hash: None,
1671 result: InvariantResult::allow(),
1672 delay: Duration::ZERO,
1673 }),
1674 ];
1675
1676 let reports = executor
1677 .execute_slices(vec![
1678 (slice_with_id(1), slice_a_preds),
1679 (slice_with_id(2), slice_b_preds),
1680 ])
1681 .await;
1682
1683 let slice_b = &reports[1];
1684 let b_fast = slice_b
1685 .records
1686 .iter()
1687 .find(|record| record.predicate_hash == PredicateHash::new("b-fast"))
1688 .unwrap();
1689 assert!(b_fast.result.block_error().is_none());
1692 }
1693
1694 #[tokio::test]
1695 async fn output_ordering_is_deterministic_across_random_finish_order() {
1696 let make_predicates = || -> Vec<Rc<dyn PredicateRunner>> {
1700 vec![
1701 Rc::new(StaticPredicate {
1702 hash: "z-last",
1703 kind: PredicateKind::Deterministic,
1704 fallback_hash: None,
1705 result: InvariantResult::allow(),
1706 delay: Duration::from_millis(15),
1707 }),
1708 Rc::new(StaticPredicate {
1709 hash: "a-first",
1710 kind: PredicateKind::Deterministic,
1711 fallback_hash: None,
1712 result: InvariantResult::allow(),
1713 delay: Duration::ZERO,
1714 }),
1715 Rc::new(StaticPredicate {
1716 hash: "m-mid",
1717 kind: PredicateKind::Deterministic,
1718 fallback_hash: None,
1719 result: InvariantResult::allow(),
1720 delay: Duration::from_millis(7),
1721 }),
1722 ]
1723 };
1724
1725 let executor = PredicateExecutor::default();
1726 let report_one = executor.execute_slice(&slice(), &make_predicates()).await;
1727 let report_two = executor.execute_slice(&slice(), &make_predicates()).await;
1728 let order_one: Vec<_> = report_one
1729 .records
1730 .iter()
1731 .map(|record| record.predicate_hash.as_str().to_string())
1732 .collect();
1733 let order_two: Vec<_> = report_two
1734 .records
1735 .iter()
1736 .map(|record| record.predicate_hash.as_str().to_string())
1737 .collect();
1738
1739 assert_eq!(order_one, vec!["a-first", "m-mid", "z-last"]);
1740 assert_eq!(order_one, order_two);
1741 }
1742}