use crate::error::MultiError;
use crate::mailbox::Mailbox;
use crate::runner::AgentRunner;
use crate::shared::SharedInfra;
use crate::types::{AgentOutput, AgentSpec};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tracing::instrument;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AdvisorVerdict {
Continue { rationale: String },
Plan { guidance: String },
Correction { guidance: String },
Stop { reason: String },
Uncertain { reason: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdvisorResult {
pub verdict: AdvisorVerdict,
pub advisor_output: AgentOutput,
pub used: u32,
pub max_uses: u32,
}
pub struct Advisor {
pub advisor: AgentSpec,
pub max_uses: u32,
used: AtomicU32,
}
impl Advisor {
pub fn new(advisor: AgentSpec, max_uses: u32) -> Self {
Self {
advisor,
max_uses,
used: AtomicU32::new(0),
}
}
pub fn used(&self) -> u32 {
self.used.load(Ordering::Relaxed)
}
pub fn remaining(&self) -> u32 {
self.max_uses.saturating_sub(self.used())
}
#[instrument(name = "multi.advisor", skip_all)]
pub async fn consult(
&self,
task: &str,
runner: &Arc<dyn AgentRunner>,
infra: &SharedInfra,
) -> Result<AdvisorResult, MultiError> {
let prior = self.used.fetch_add(1, Ordering::Relaxed);
if prior >= self.max_uses {
return Err(MultiError::AdvisorExhausted {
used: prior,
max_uses: self.max_uses,
});
}
let advisory_task = format!(
r#"You are an advisor. The executor remains in control of the loop.
Return exactly one verdict as JSON:
```json
{{
"verdict": "continue|plan|correction|stop|uncertain",
"rationale": "text when verdict is continue",
"guidance": "text when verdict is plan or correction",
"reason": "text when verdict is stop or uncertain"
}}
```
Task or draft to review:
{task}"#
);
let mailbox = Mailbox::default();
let rt = infra.make_runtime();
let output = runner
.run(&self.advisor, &advisory_task, &rt, &mailbox)
.await
.map_err(|e| {
MultiError::AgentFailed(
self.advisor.name.clone(),
format!("advisor consultation failed: {}", e),
)
})?;
let verdict = parse_verdict(&output.answer);
Ok(AdvisorResult {
verdict,
advisor_output: output,
used: prior + 1,
max_uses: self.max_uses,
})
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TaskRisk {
Low,
Medium,
High,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AdvisorTriggerContext {
pub repeated_failures: u32,
pub explicit_uncertainty: bool,
pub missing_guidance: bool,
pub prior_advisor_calls: u32,
#[serde(default = "default_task_risk")]
pub task_risk: TaskRisk,
#[serde(default)]
pub labels: Vec<String>,
}
const fn default_task_risk() -> TaskRisk {
TaskRisk::Low
}
impl Default for AdvisorTriggerContext {
fn default() -> Self {
Self {
repeated_failures: 0,
explicit_uncertainty: false,
missing_guidance: false,
prior_advisor_calls: 0,
task_risk: TaskRisk::Low,
labels: Vec::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AdvisorTriggerDecision {
NoConsult { reason: String },
OptionalConsult { reason: String },
MustConsult { reason: String },
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AdvisorTriggerPolicy {
pub max_calls_per_run: u32,
pub repeated_failure_threshold: u32,
pub consult_on_missing_guidance: bool,
pub require_on_high_risk: bool,
}
impl Default for AdvisorTriggerPolicy {
fn default() -> Self {
Self {
max_calls_per_run: 3,
repeated_failure_threshold: 2,
consult_on_missing_guidance: true,
require_on_high_risk: true,
}
}
}
impl AdvisorTriggerPolicy {
pub fn evaluate(&self, ctx: &AdvisorTriggerContext) -> AdvisorTriggerDecision {
if ctx.prior_advisor_calls >= self.max_calls_per_run {
return AdvisorTriggerDecision::NoConsult {
reason: "advisor budget exhausted".to_string(),
};
}
if self.require_on_high_risk && matches!(ctx.task_risk, TaskRisk::High) {
return AdvisorTriggerDecision::MustConsult {
reason: "high-risk task".to_string(),
};
}
if ctx.repeated_failures >= self.repeated_failure_threshold {
return AdvisorTriggerDecision::MustConsult {
reason: format!(
"repeated failures reached threshold ({}/{})",
ctx.repeated_failures, self.repeated_failure_threshold
),
};
}
if ctx.explicit_uncertainty {
return AdvisorTriggerDecision::OptionalConsult {
reason: "executor signaled uncertainty".to_string(),
};
}
if self.consult_on_missing_guidance && ctx.missing_guidance {
return AdvisorTriggerDecision::OptionalConsult {
reason: "no approved guidance matched".to_string(),
};
}
AdvisorTriggerDecision::NoConsult {
reason: "no trigger matched".to_string(),
}
}
}
fn parse_verdict(response: &str) -> AdvisorVerdict {
if let Some(json_str) = car_ir::json_extract::extract_json_object(response) {
if let Ok(parsed) = serde_json::from_str::<HashMap<String, serde_json::Value>>(&json_str) {
let verdict = parsed.get("verdict").and_then(|v| v.as_str()).unwrap_or("");
let rationale = parsed
.get("rationale")
.and_then(|v| v.as_str())
.unwrap_or("")
.trim();
let guidance = parsed
.get("guidance")
.and_then(|v| v.as_str())
.unwrap_or("")
.trim();
let reason = parsed
.get("reason")
.and_then(|v| v.as_str())
.unwrap_or("")
.trim();
return match verdict {
"continue" if !rationale.is_empty() => AdvisorVerdict::Continue {
rationale: rationale.to_string(),
},
"plan" if !guidance.is_empty() => AdvisorVerdict::Plan {
guidance: guidance.to_string(),
},
"correction" if !guidance.is_empty() => AdvisorVerdict::Correction {
guidance: guidance.to_string(),
},
"stop" if !reason.is_empty() => AdvisorVerdict::Stop {
reason: reason.to_string(),
},
"uncertain" if !reason.is_empty() => AdvisorVerdict::Uncertain {
reason: reason.to_string(),
},
_ => AdvisorVerdict::Uncertain {
reason: "advisor response could not be parsed cleanly".to_string(),
},
};
}
}
let lower = response.to_lowercase();
if lower.contains("verdict: continue") {
AdvisorVerdict::Continue {
rationale: response.trim().to_string(),
}
} else if lower.contains("verdict: stop") {
AdvisorVerdict::Stop {
reason: response.trim().to_string(),
}
} else if lower.contains("verdict: correction") {
AdvisorVerdict::Correction {
guidance: response.trim().to_string(),
}
} else if lower.contains("verdict: plan") {
AdvisorVerdict::Plan {
guidance: response.trim().to_string(),
}
} else {
AdvisorVerdict::Uncertain {
reason: response.trim().to_string(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use car_engine::Runtime;
struct StaticRunner {
response: String,
}
#[async_trait::async_trait]
impl crate::runner::AgentRunner for StaticRunner {
async fn run(
&self,
spec: &AgentSpec,
_task: &str,
_runtime: &Runtime,
_mailbox: &Mailbox,
) -> Result<AgentOutput, MultiError> {
Ok(AgentOutput {
name: spec.name.clone(),
answer: self.response.clone(),
turns: 1,
tool_calls: 0,
duration_ms: 3.0,
error: None,
outcome: None,
tokens: None,
tools_used: Vec::new(),
})
}
}
#[tokio::test]
async fn consult_returns_parsed_verdict() {
let advisor = Advisor::new(AgentSpec::new("advisor", "review"), 2);
let runner: Arc<dyn crate::runner::AgentRunner> = Arc::new(StaticRunner {
response: r#"{"verdict":"plan","guidance":"check state before editing"}"#.to_string(),
});
let infra = SharedInfra::new();
let result = advisor.consult("help", &runner, &infra).await.unwrap();
assert_eq!(
result.verdict,
AdvisorVerdict::Plan {
guidance: "check state before editing".to_string(),
}
);
assert_eq!(result.used, 1);
assert_eq!(advisor.remaining(), 1);
}
#[tokio::test]
async fn consult_enforces_budget() {
let advisor = Advisor::new(AgentSpec::new("advisor", "review"), 1);
let runner: Arc<dyn crate::runner::AgentRunner> = Arc::new(StaticRunner {
response: r#"{"verdict":"uncertain","reason":"need more evidence"}"#.to_string(),
});
let infra = SharedInfra::new();
advisor.consult("first", &runner, &infra).await.unwrap();
let err = advisor
.consult("second", &runner, &infra)
.await
.unwrap_err();
match err {
MultiError::AdvisorExhausted { used, max_uses } => {
assert_eq!(used, 1);
assert_eq!(max_uses, 1);
}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn trigger_policy_requires_high_risk() {
let policy = AdvisorTriggerPolicy::default();
let ctx = AdvisorTriggerContext {
task_risk: TaskRisk::High,
..Default::default()
};
assert_eq!(
policy.evaluate(&ctx),
AdvisorTriggerDecision::MustConsult {
reason: "high-risk task".to_string(),
}
);
}
#[test]
fn trigger_policy_honors_budget() {
let policy = AdvisorTriggerPolicy::default();
let ctx = AdvisorTriggerContext {
prior_advisor_calls: 3,
explicit_uncertainty: true,
..Default::default()
};
assert_eq!(
policy.evaluate(&ctx),
AdvisorTriggerDecision::NoConsult {
reason: "advisor budget exhausted".to_string(),
}
);
}
#[test]
fn parse_verdict_falls_back_to_uncertain() {
let verdict = parse_verdict("I am not sure.");
assert_eq!(
verdict,
AdvisorVerdict::Uncertain {
reason: "I am not sure.".to_string(),
}
);
}
#[test]
fn parse_verdict_reads_continue_json() {
let verdict = parse_verdict(
r#"{"verdict":"continue","rationale":"current completion claim is supported"}"#,
);
assert_eq!(
verdict,
AdvisorVerdict::Continue {
rationale: "current completion claim is supported".to_string(),
}
);
}
}