zeph_orchestration/verify_predicate.rs
1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Per-subtask verification predicates (predicate gate).
5//!
6//! Each task in a DAG may carry a `VerifyPredicate` that must be satisfied by
7//! the task's output before downstream tasks may consume it. Evaluation is
8//! LLM-based via `PredicateEvaluator`.
9//!
10//! # Design
11//!
12//! - [`VerifyPredicate`] is an enum stored in `TaskNode.verify_predicate`. Only the
13//! `Natural(String)` variant is constructible in v1 — `Expression` returns an error
14//! if the planner ever emits one.
15//! - [`PredicateOutcome`] is persisted on `TaskNode` via `GraphPersistence::save` (wired in
16//! `zeph-core` scheduler loop and `handle_plan_confirm`). After a crash, rehydrating the
17//! graph via `/plan resume <id>` restores `predicate_outcome` so the gate is not re-evaluated
18//! for already-completed tasks.
19//! - [`PredicateEvaluator`] wraps any [`LlmProvider`] and produces [`PredicateOutcome`]
20//! values. The evaluation prompt is intentionally minimal and model-agnostic.
21
22use std::time::Duration;
23
24use serde::{Deserialize, Serialize};
25use zeph_llm::provider::{LlmProvider, Message, Role};
26use zeph_sanitizer::{ContentSanitizer, ContentSource, ContentSourceKind};
27
28use super::error::OrchestrationError;
29
30/// A verification criterion attached to a task node.
31///
32/// The planner populates this from the `verify_criteria` field in its JSON output.
33/// Only `Natural` is constructible in v1. If the planner emits `Expression`, the
34/// scheduler returns `OrchestrationError::PredicateNotSupported` rather than
35/// silently ignoring the criterion.
36///
37/// # Examples
38///
39/// ```rust
40/// use zeph_orchestration::VerifyPredicate;
41///
42/// let pred = VerifyPredicate::Natural("output must contain a valid JSON object".to_string());
43/// assert!(matches!(pred, VerifyPredicate::Natural(_)));
44/// ```
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
47#[non_exhaustive]
48pub enum VerifyPredicate {
49 /// Free-form natural-language criterion evaluated by the LLM judge.
50 Natural(String),
51 /// Symbolic expression (reserved, not supported in v1).
52 Expression(String),
53}
54
55impl VerifyPredicate {
56 /// Returns `Ok(&criterion)` for `Natural` predicates; `Err(PredicateNotSupported)`
57 /// for unsupported variants.
58 ///
59 /// # Errors
60 ///
61 /// Returns [`OrchestrationError::PredicateNotSupported`] when the variant is not
62 /// evaluatable in the current version.
63 pub fn as_natural(&self) -> Result<&str, OrchestrationError> {
64 match self {
65 VerifyPredicate::Natural(s) => Ok(s.as_str()),
66 VerifyPredicate::Expression(s) => Err(OrchestrationError::PredicateNotSupported(
67 format!("Expression predicate '{s}' is not supported in v1; use Natural"),
68 )),
69 }
70 }
71}
72
73/// Result of evaluating a [`VerifyPredicate`] against a task's output.
74///
75/// Stored on `TaskNode::predicate_outcome` (in-memory only; restart re-evaluates
76/// any pending predicates). A `None` value signals "not yet evaluated"; consumers
77/// should re-emit `SchedulerAction::VerifyPredicate` on the next tick.
78///
79/// # Examples
80///
81/// ```rust
82/// use zeph_orchestration::PredicateOutcome;
83///
84/// let outcome = PredicateOutcome { passed: true, confidence: 0.9, reason: "output is valid JSON".to_string() };
85/// assert!(outcome.passed);
86/// assert!(outcome.confidence > 0.8);
87/// ```
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct PredicateOutcome {
90 /// Whether the predicate was satisfied.
91 pub passed: bool,
92 /// Confidence score in [0.0, 1.0]. Values < 0.5 with `passed = true` log a warn.
93 pub confidence: f32,
94 /// Human-readable explanation from the LLM judge.
95 pub reason: String,
96}
97
98/// LLM-backed predicate evaluator.
99///
100/// Evaluates a [`VerifyPredicate`] against task output by calling the configured
101/// LLM provider with a judge prompt. Fail-open: evaluation errors produce a
102/// permissive `passed = true` outcome with `confidence = 0.0` and log a warning
103/// rather than aborting the scheduler.
104///
105/// Task output is sanitized via [`ContentSanitizer`] before being embedded in the
106/// judge prompt, mirroring the same defence used by `PlanVerifier`.
107///
108/// # Examples
109///
110/// ```rust,no_run
111/// use zeph_orchestration::{PredicateEvaluator, VerifyPredicate};
112/// use zeph_sanitizer::{ContentSanitizer, ContentIsolationConfig};
113///
114/// # async fn example<P: zeph_llm::provider::LlmProvider>(provider: P) {
115/// let sanitizer = ContentSanitizer::new(&ContentIsolationConfig::default());
116/// let evaluator = PredicateEvaluator::new(provider, sanitizer, 30);
117/// let outcome = evaluator
118/// .evaluate(
119/// &VerifyPredicate::Natural("output must include a summary".to_string()),
120/// "Here is the summary: ...",
121/// None,
122/// )
123/// .await;
124/// assert!(outcome.confidence >= 0.0);
125/// # }
126/// ```
127pub struct PredicateEvaluator<P: LlmProvider> {
128 provider: P,
129 sanitizer: ContentSanitizer,
130 timeout: Duration,
131}
132
133impl<P: LlmProvider> PredicateEvaluator<P> {
134 /// Create a new evaluator backed by `provider`.
135 ///
136 /// `sanitizer` is applied to task output before it is embedded in the judge prompt.
137 /// `timeout_secs` bounds the LLM call; on timeout the evaluator returns a fail-open
138 /// outcome (`passed = true`, `confidence = 0.0`) and logs a warning.
139 pub fn new(provider: P, sanitizer: ContentSanitizer, timeout_secs: u64) -> Self {
140 Self {
141 provider,
142 sanitizer,
143 timeout: Duration::from_secs(timeout_secs),
144 }
145 }
146
147 /// Evaluate `predicate` against `output`.
148 ///
149 /// `prior_failure_reason` is injected into the prompt on re-runs so the model
150 /// knows why the previous attempt failed. Pass `None` on the first evaluation.
151 ///
152 /// On LLM or parse error, returns a permissive outcome (`passed = true,
153 /// confidence = 0.0`) and logs a warning — fail-open per the orchestration
154 /// error policy.
155 #[tracing::instrument(
156 name = "orchestration.verify_predicate.evaluate",
157 skip(self, predicate, output, prior_failure_reason)
158 )]
159 pub async fn evaluate(
160 &self,
161 predicate: &VerifyPredicate,
162 output: &str,
163 prior_failure_reason: Option<&str>,
164 ) -> PredicateOutcome {
165 let criterion = match predicate.as_natural() {
166 Ok(s) => s,
167 Err(e) => {
168 tracing::warn!(error = %e, "unsupported predicate variant, skipping evaluation (fail-open)");
169 return PredicateOutcome {
170 passed: true,
171 confidence: 0.0,
172 reason: format!("predicate not evaluated: {e}"),
173 };
174 }
175 };
176
177 let prior_note = prior_failure_reason
178 .map(|r| {
179 // Truncate and wrap in XML tags to prevent injection from compromised judge output.
180 let truncated: String = r.chars().take(256).collect();
181 format!(
182 "\n\n<prior_failure_reason>{truncated}</prior_failure_reason>\n\
183 Note: a previous evaluation failed with this reason. Take it into account."
184 )
185 })
186 .unwrap_or_default();
187
188 let system = format!(
189 "You are a strict output verifier. Evaluate whether the task output satisfies \
190 the given criterion. Respond with a JSON object: \
191 {{\"passed\": true/false, \"confidence\": 0.0-1.0, \"reason\": \"...\"}}\n\
192 Criterion: {criterion}{prior_note}"
193 );
194
195 // Sanitize task output before embedding it in the judge prompt (prompt-injection defence).
196 let source = ContentSource::new(ContentSourceKind::ToolResult)
197 .with_identifier("predicate-evaluator-input");
198 let sanitized = self.sanitizer.sanitize(output, source);
199 let user = format!("Task output:\n\n{}", sanitized.body);
200
201 let messages = vec![
202 Message::from_legacy(Role::System, system),
203 Message::from_legacy(Role::User, user),
204 ];
205
206 match tokio::time::timeout(
207 self.timeout,
208 self.provider.chat_typed::<EvalResponse>(&messages),
209 )
210 .await
211 {
212 Ok(Ok(resp)) => {
213 let outcome = PredicateOutcome {
214 passed: resp.passed,
215 confidence: resp.confidence.clamp(0.0, 1.0),
216 reason: resp.reason,
217 };
218 if outcome.passed && outcome.confidence < 0.5 {
219 tracing::warn!(
220 confidence = outcome.confidence,
221 reason = %outcome.reason,
222 "weak predicate pass (confidence < 0.5)"
223 );
224 }
225 outcome
226 }
227 Ok(Err(e)) => {
228 tracing::warn!(
229 error = %e,
230 "predicate evaluation LLM call failed, returning fail-open outcome"
231 );
232 PredicateOutcome {
233 passed: true,
234 confidence: 0.0,
235 reason: format!("evaluation failed: {e}"),
236 }
237 }
238 Err(_elapsed) => {
239 tracing::warn!(
240 timeout_secs = self.timeout.as_secs(),
241 "predicate evaluation timed out, returning fail-open outcome"
242 );
243 PredicateOutcome {
244 passed: true,
245 confidence: 0.0,
246 reason: "evaluation timed out".to_string(),
247 }
248 }
249 }
250 }
251}
252
253/// Internal response shape for predicate evaluation.
254#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
255struct EvalResponse {
256 passed: bool,
257 confidence: f32,
258 reason: String,
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 #[test]
266 fn natural_predicate_as_natural() {
267 let pred = VerifyPredicate::Natural("must contain JSON".to_string());
268 assert_eq!(pred.as_natural().unwrap(), "must contain JSON");
269 }
270
271 #[test]
272 fn expression_predicate_returns_error() {
273 let pred = VerifyPredicate::Expression("len(output) > 0".to_string());
274 assert!(pred.as_natural().is_err());
275 }
276
277 #[test]
278 fn predicate_outcome_serde_roundtrip() {
279 let o = PredicateOutcome {
280 passed: true,
281 confidence: 0.85,
282 reason: "looks good".to_string(),
283 };
284 let json = serde_json::to_string(&o).expect("serialize");
285 let restored: PredicateOutcome = serde_json::from_str(&json).expect("deserialize");
286 assert_eq!(restored.passed, o.passed);
287 assert!((restored.confidence - o.confidence).abs() < f32::EPSILON);
288 assert_eq!(restored.reason, o.reason);
289 }
290
291 #[test]
292 fn verify_predicate_serde_roundtrip_natural() {
293 let pred = VerifyPredicate::Natural("criterion".to_string());
294 let json = serde_json::to_string(&pred).expect("serialize");
295 let restored: VerifyPredicate = serde_json::from_str(&json).expect("deserialize");
296 assert_eq!(pred, restored);
297 }
298
299 #[test]
300 fn task_node_missing_predicate_fields_deserialize_as_none() {
301 // Simulate old JSON blob without predicate fields — #[serde(default)] must handle it.
302 let json = r#"{
303 "id": 0,
304 "title": "t",
305 "description": "d",
306 "agent_hint": null,
307 "status": "pending",
308 "depends_on": [],
309 "result": null,
310 "assigned_agent": null,
311 "retry_count": 0,
312 "failure_strategy": null,
313 "max_retries": null
314 }"#;
315 // Parse as serde_json::Value first (TaskNode is in graph.rs; test the concept here
316 // by checking that our types have correct default handling).
317 let val: serde_json::Value = serde_json::from_str(json).expect("parse");
318 assert!(val.get("verify_predicate").is_none());
319 assert!(val.get("predicate_outcome").is_none());
320 // Actual TaskNode deserialization is tested in graph.rs tests.
321 }
322}