Skip to main content

zeph_orchestration/
verify_predicate.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Per-subtask verification predicates (predicate gate).
5//!
6//! Each task in a DAG may carry a `VerifyPredicate` that must be satisfied by
7//! the task's output before downstream tasks may consume it. Evaluation is
8//! LLM-based via `PredicateEvaluator`.
9//!
10//! # Design
11//!
12//! - [`VerifyPredicate`] is an enum stored in `TaskNode.verify_predicate`. Only the
13//!   `Natural(String)` variant is constructible in v1 — `Expression` returns an error
14//!   if the planner ever emits one.
15//! - [`PredicateOutcome`] is persisted on `TaskNode` via `GraphPersistence::save` (wired in
16//!   `zeph-core` scheduler loop and `handle_plan_confirm`). After a crash, rehydrating the
17//!   graph via `/plan resume <id>` restores `predicate_outcome` so the gate is not re-evaluated
18//!   for already-completed tasks.
19//! - [`PredicateEvaluator`] wraps any [`LlmProvider`] and produces [`PredicateOutcome`]
20//!   values. The evaluation prompt is intentionally minimal and model-agnostic.
21
22use std::time::Duration;
23
24use serde::{Deserialize, Serialize};
25use zeph_llm::provider::{LlmProvider, Message, Role};
26use zeph_sanitizer::{ContentSanitizer, ContentSource, ContentSourceKind};
27
28use super::error::OrchestrationError;
29
30/// A verification criterion attached to a task node.
31///
32/// The planner populates this from the `verify_criteria` field in its JSON output.
33/// Only `Natural` is constructible in v1. If the planner emits `Expression`, the
34/// scheduler returns `OrchestrationError::PredicateNotSupported` rather than
35/// silently ignoring the criterion.
36///
37/// # Examples
38///
39/// ```rust
40/// use zeph_orchestration::VerifyPredicate;
41///
42/// let pred = VerifyPredicate::Natural("output must contain a valid JSON object".to_string());
43/// assert!(matches!(pred, VerifyPredicate::Natural(_)));
44/// ```
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
47#[non_exhaustive]
48pub enum VerifyPredicate {
49    /// Free-form natural-language criterion evaluated by the LLM judge.
50    Natural(String),
51    /// Symbolic expression (reserved, not supported in v1).
52    Expression(String),
53}
54
55impl VerifyPredicate {
56    /// Returns `Ok(&criterion)` for `Natural` predicates; `Err(PredicateNotSupported)`
57    /// for unsupported variants.
58    ///
59    /// # Errors
60    ///
61    /// Returns [`OrchestrationError::PredicateNotSupported`] when the variant is not
62    /// evaluatable in the current version.
63    pub fn as_natural(&self) -> Result<&str, OrchestrationError> {
64        match self {
65            VerifyPredicate::Natural(s) => Ok(s.as_str()),
66            VerifyPredicate::Expression(s) => Err(OrchestrationError::PredicateNotSupported(
67                format!("Expression predicate '{s}' is not supported in v1; use Natural"),
68            )),
69        }
70    }
71}
72
73/// Result of evaluating a [`VerifyPredicate`] against a task's output.
74///
75/// Stored on `TaskNode::predicate_outcome` (in-memory only; restart re-evaluates
76/// any pending predicates). A `None` value signals "not yet evaluated"; consumers
77/// should re-emit `SchedulerAction::VerifyPredicate` on the next tick.
78///
79/// # Examples
80///
81/// ```rust
82/// use zeph_orchestration::PredicateOutcome;
83///
84/// let outcome = PredicateOutcome { passed: true, confidence: 0.9, reason: "output is valid JSON".to_string() };
85/// assert!(outcome.passed);
86/// assert!(outcome.confidence > 0.8);
87/// ```
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct PredicateOutcome {
90    /// Whether the predicate was satisfied.
91    pub passed: bool,
92    /// Confidence score in [0.0, 1.0]. Values < 0.5 with `passed = true` log a warn.
93    pub confidence: f32,
94    /// Human-readable explanation from the LLM judge.
95    pub reason: String,
96}
97
98/// LLM-backed predicate evaluator.
99///
100/// Evaluates a [`VerifyPredicate`] against task output by calling the configured
101/// LLM provider with a judge prompt. Fail-open: evaluation errors produce a
102/// permissive `passed = true` outcome with `confidence = 0.0` and log a warning
103/// rather than aborting the scheduler.
104///
105/// Task output is sanitized via [`ContentSanitizer`] before being embedded in the
106/// judge prompt, mirroring the same defence used by `PlanVerifier`.
107///
108/// # Examples
109///
110/// ```rust,no_run
111/// use zeph_orchestration::{PredicateEvaluator, VerifyPredicate};
112/// use zeph_sanitizer::{ContentSanitizer, ContentIsolationConfig};
113///
114/// # async fn example<P: zeph_llm::provider::LlmProvider>(provider: P) {
115/// let sanitizer = ContentSanitizer::new(&ContentIsolationConfig::default());
116/// let evaluator = PredicateEvaluator::new(provider, sanitizer, 30);
117/// let outcome = evaluator
118///     .evaluate(
119///         &VerifyPredicate::Natural("output must include a summary".to_string()),
120///         "Here is the summary: ...",
121///         None,
122///     )
123///     .await;
124/// assert!(outcome.confidence >= 0.0);
125/// # }
126/// ```
127pub struct PredicateEvaluator<P: LlmProvider> {
128    provider: P,
129    sanitizer: ContentSanitizer,
130    timeout: Duration,
131}
132
133impl<P: LlmProvider> PredicateEvaluator<P> {
134    /// Create a new evaluator backed by `provider`.
135    ///
136    /// `sanitizer` is applied to task output before it is embedded in the judge prompt.
137    /// `timeout_secs` bounds the LLM call; on timeout the evaluator returns a fail-open
138    /// outcome (`passed = true`, `confidence = 0.0`) and logs a warning.
139    pub fn new(provider: P, sanitizer: ContentSanitizer, timeout_secs: u64) -> Self {
140        Self {
141            provider,
142            sanitizer,
143            timeout: Duration::from_secs(timeout_secs),
144        }
145    }
146
147    /// Evaluate `predicate` against `output`.
148    ///
149    /// `prior_failure_reason` is injected into the prompt on re-runs so the model
150    /// knows why the previous attempt failed. Pass `None` on the first evaluation.
151    ///
152    /// On LLM or parse error, returns a permissive outcome (`passed = true,
153    /// confidence = 0.0`) and logs a warning — fail-open per the orchestration
154    /// error policy.
155    #[tracing::instrument(
156        name = "orchestration.verify_predicate.evaluate",
157        skip(self, predicate, output, prior_failure_reason)
158    )]
159    pub async fn evaluate(
160        &self,
161        predicate: &VerifyPredicate,
162        output: &str,
163        prior_failure_reason: Option<&str>,
164    ) -> PredicateOutcome {
165        let criterion = match predicate.as_natural() {
166            Ok(s) => s,
167            Err(e) => {
168                tracing::warn!(error = %e, "unsupported predicate variant, skipping evaluation (fail-open)");
169                return PredicateOutcome {
170                    passed: true,
171                    confidence: 0.0,
172                    reason: format!("predicate not evaluated: {e}"),
173                };
174            }
175        };
176
177        let prior_note = prior_failure_reason
178            .map(|r| {
179                // Truncate and wrap in XML tags to prevent injection from compromised judge output.
180                let truncated: String = r.chars().take(256).collect();
181                format!(
182                    "\n\n<prior_failure_reason>{truncated}</prior_failure_reason>\n\
183                     Note: a previous evaluation failed with this reason. Take it into account."
184                )
185            })
186            .unwrap_or_default();
187
188        let system = format!(
189            "You are a strict output verifier. Evaluate whether the task output satisfies \
190             the given criterion. Respond with a JSON object: \
191             {{\"passed\": true/false, \"confidence\": 0.0-1.0, \"reason\": \"...\"}}\n\
192             Criterion: {criterion}{prior_note}"
193        );
194
195        // Sanitize task output before embedding it in the judge prompt (prompt-injection defence).
196        let source = ContentSource::new(ContentSourceKind::ToolResult)
197            .with_identifier("predicate-evaluator-input");
198        let sanitized = self.sanitizer.sanitize(output, source);
199        let user = format!("Task output:\n\n{}", sanitized.body);
200
201        let messages = vec![
202            Message::from_legacy(Role::System, system),
203            Message::from_legacy(Role::User, user),
204        ];
205
206        match tokio::time::timeout(
207            self.timeout,
208            self.provider.chat_typed::<EvalResponse>(&messages),
209        )
210        .await
211        {
212            Ok(Ok(resp)) => {
213                let outcome = PredicateOutcome {
214                    passed: resp.passed,
215                    confidence: resp.confidence.clamp(0.0, 1.0),
216                    reason: resp.reason,
217                };
218                if outcome.passed && outcome.confidence < 0.5 {
219                    tracing::warn!(
220                        confidence = outcome.confidence,
221                        reason = %outcome.reason,
222                        "weak predicate pass (confidence < 0.5)"
223                    );
224                }
225                outcome
226            }
227            Ok(Err(e)) => {
228                tracing::warn!(
229                    error = %e,
230                    "predicate evaluation LLM call failed, returning fail-open outcome"
231                );
232                PredicateOutcome {
233                    passed: true,
234                    confidence: 0.0,
235                    reason: format!("evaluation failed: {e}"),
236                }
237            }
238            Err(_elapsed) => {
239                tracing::warn!(
240                    timeout_secs = self.timeout.as_secs(),
241                    "predicate evaluation timed out, returning fail-open outcome"
242                );
243                PredicateOutcome {
244                    passed: true,
245                    confidence: 0.0,
246                    reason: "evaluation timed out".to_string(),
247                }
248            }
249        }
250    }
251}
252
253/// Internal response shape for predicate evaluation.
254#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
255struct EvalResponse {
256    passed: bool,
257    confidence: f32,
258    reason: String,
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[test]
266    fn natural_predicate_as_natural() {
267        let pred = VerifyPredicate::Natural("must contain JSON".to_string());
268        assert_eq!(pred.as_natural().unwrap(), "must contain JSON");
269    }
270
271    #[test]
272    fn expression_predicate_returns_error() {
273        let pred = VerifyPredicate::Expression("len(output) > 0".to_string());
274        assert!(pred.as_natural().is_err());
275    }
276
277    #[test]
278    fn predicate_outcome_serde_roundtrip() {
279        let o = PredicateOutcome {
280            passed: true,
281            confidence: 0.85,
282            reason: "looks good".to_string(),
283        };
284        let json = serde_json::to_string(&o).expect("serialize");
285        let restored: PredicateOutcome = serde_json::from_str(&json).expect("deserialize");
286        assert_eq!(restored.passed, o.passed);
287        assert!((restored.confidence - o.confidence).abs() < f32::EPSILON);
288        assert_eq!(restored.reason, o.reason);
289    }
290
291    #[test]
292    fn verify_predicate_serde_roundtrip_natural() {
293        let pred = VerifyPredicate::Natural("criterion".to_string());
294        let json = serde_json::to_string(&pred).expect("serialize");
295        let restored: VerifyPredicate = serde_json::from_str(&json).expect("deserialize");
296        assert_eq!(pred, restored);
297    }
298
299    #[test]
300    fn task_node_missing_predicate_fields_deserialize_as_none() {
301        // Simulate old JSON blob without predicate fields — #[serde(default)] must handle it.
302        let json = r#"{
303            "id": 0,
304            "title": "t",
305            "description": "d",
306            "agent_hint": null,
307            "status": "pending",
308            "depends_on": [],
309            "result": null,
310            "assigned_agent": null,
311            "retry_count": 0,
312            "failure_strategy": null,
313            "max_retries": null
314        }"#;
315        // Parse as serde_json::Value first (TaskNode is in graph.rs; test the concept here
316        // by checking that our types have correct default handling).
317        let val: serde_json::Value = serde_json::from_str(json).expect("parse");
318        assert!(val.get("verify_predicate").is_none());
319        assert!(val.get("predicate_outcome").is_none());
320        // Actual TaskNode deserialization is tested in graph.rs tests.
321    }
322}