use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use serde::Deserialize;
use serde_json::{json, Value};
use crate::handler::JsonRpcMessage;
use crate::parslee_tools::ParsleeToolExecutor;
use crate::session::{ClientSession, ServerState, WsChannel};
use super::contract::{derive_contract, OutcomeContract};
use super::external_loop::{run_external_loop, ExternalLoopConfig};
use super::merge::{publish_branch, stage_and_diff};
use super::native_loop::{
run_native_loop, AskUser, LoopOutcome, NativeLoopConfig, TurnGenerator,
};
use super::config::CoderConfig;
use super::router::{detect_ready_agents, resolve_engine, EngineChoice};
use super::session::{
default_state_dir, CancelFlag, CoderEvent, CoderEventKind, CoderSession, CoderState,
EventEmitter, EventSink, UserInputGate,
};
use super::shell_tool::WorktreeExecutor;
use super::skill_memory::RepairMemory;
pub struct CoderSessionEntry {
pub session: Arc<tokio::sync::Mutex<CoderSession>>,
pub events: Arc<tokio::sync::Mutex<Vec<CoderEvent>>>,
pub cancel: CancelFlag,
pub sink: Arc<EventSink>,
pub generator: Arc<dyn TurnGenerator>,
pub memory: RepairMemory,
pub mcp_endpoint: Option<String>,
pub user_input: Arc<UserInputGate>,
pub task: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
}
pub fn coder_state_dir() -> Result<PathBuf, String> {
if let Some(dir) = std::env::var_os("CAR_CODER_STATE_DIR") {
return Ok(PathBuf::from(dir));
}
default_state_dir()
}
fn now_event_frame(event: &CoderEvent) -> Option<String> {
serde_json::to_string(&json!({
"jsonrpc": "2.0",
"method": "coder.event",
"params": event,
}))
.ok()
}
async fn send_frame(channel: &WsChannel, frame: &str) {
use futures::SinkExt;
use tokio_tungstenite::tungstenite::Message;
let _ = channel
.write
.lock()
.await
.send(Message::Text(frame.to_string().into()))
.await;
}
fn summarize_repo(root: &Path) -> String {
let mut names: Vec<String> = std::fs::read_dir(root)
.map(|entries| {
entries
.flatten()
.filter_map(|e| e.file_name().into_string().ok())
.filter(|n| n != ".git")
.collect()
})
.unwrap_or_default();
names.sort();
names.truncate(40);
let build_hints: Vec<&str> = [
("Cargo.toml", "Rust (cargo)"),
("package.json", "Node (npm)"),
("pyproject.toml", "Python (pyproject)"),
("go.mod", "Go"),
("Makefile", "make"),
("Package.swift", "Swift (SwiftPM)"),
]
.iter()
.filter(|(f, _)| root.join(f).exists())
.map(|(_, hint)| *hint)
.collect();
format!(
"Top-level entries: {}\nBuild systems detected: {}",
names.join(", "),
if build_hints.is_empty() {
"none recognized".to_string()
} else {
build_hints.join(", ")
}
)
}
fn is_git_repo(path: &Path) -> bool {
std::process::Command::new("git")
.arg("-C")
.arg(path)
.args(["rev-parse", "--is-inside-work-tree"])
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
fn spawn_event_drain(
state: Arc<ServerState>,
session_id: String,
events: Arc<tokio::sync::Mutex<Vec<CoderEvent>>>,
) -> EventEmitter {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<CoderEvent>();
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
let frame = now_event_frame(&event);
let mut buffer = events.lock().await;
buffer.push(event);
if let Some(frame) = &frame {
let subscribers: Vec<Arc<WsChannel>> = state
.coder_subscribers
.lock()
.await
.iter()
.filter(|((sid, _), _)| *sid == session_id)
.map(|(_, ch)| ch.clone())
.collect();
for channel in subscribers {
send_frame(&channel, frame).await;
}
}
drop(buffer);
}
});
Arc::new(move |event| {
let _ = tx.send(event);
})
}
const ASK_USER_TIMEOUT_SECS: u64 = 600;
const ASK_USER_CANCEL_POLL_MS: u64 = 200;
struct GateAsker {
sink: Arc<EventSink>,
gate: Arc<UserInputGate>,
cancel: CancelFlag,
}
#[async_trait::async_trait]
impl AskUser for GateAsker {
async fn ask(&self, prompt: &str) -> Result<String, String> {
let mut rx = self.gate.park();
self.sink.emit(CoderEventKind::UserInputRequested {
prompt: prompt.to_string(),
});
let deadline =
tokio::time::Instant::now() + std::time::Duration::from_secs(ASK_USER_TIMEOUT_SECS);
let poll = std::time::Duration::from_millis(ASK_USER_CANCEL_POLL_MS);
loop {
if self.cancel.load(std::sync::atomic::Ordering::SeqCst) {
self.gate.clear();
return Err("cancelled while awaiting user input".to_string());
}
tokio::select! {
res = &mut rx => {
return match res {
Ok(answer) => Ok(answer),
Err(_) => Err("user-input request was cleared before an answer arrived".to_string()),
};
}
_ = tokio::time::sleep(poll) => {
if tokio::time::Instant::now() >= deadline {
self.gate.clear();
return Err(format!(
"no user response within {ASK_USER_TIMEOUT_SECS}s; proceeding without it"
));
}
}
}
}
}
}
pub struct StartArgs {
pub repo: PathBuf,
pub intent: String,
pub engine: EngineChoice,
pub max_iterations: Option<u32>,
pub state_dir: PathBuf,
pub project: Option<(String, super::project::ProjectKind)>,
}
pub async fn start_session(
state: &Arc<ServerState>,
args: StartArgs,
generator: Arc<dyn TurnGenerator>,
) -> Result<Value, String> {
let repo = args
.repo
.canonicalize()
.map_err(|e| format!("repo path {}: {e}", args.repo.display()))?;
if !is_git_repo(&repo) {
return Err(format!(
"{} is not inside a git repository — the coder works in git worktrees",
repo.display()
));
}
let config = CoderConfig::load();
let resolved = match &args.engine {
EngineChoice::Native => super::router::ResolvedEngine {
engine: EngineChoice::Native,
reason: "explicitly requested".into(),
},
other => {
let detected = detect_ready_agents().await;
resolve_engine(other, &args.intent, &detected, &config.preference_refs())?
}
};
let memory = RepairMemory::new(state.shared_memgine.clone());
let mcp_endpoint = state.mcp_url.get().cloned();
let max_iterations = args
.max_iterations
.unwrap_or(config.default_max_iterations);
let mut session = CoderSession::new(
&repo,
&args.intent,
resolved.engine.clone(),
max_iterations,
Some(args.state_dir.clone()),
);
if let Some((slug, kind)) = &args.project {
session = session.with_project(slug.clone(), *kind);
}
session.keep_workspace_on_failure = config.keep_workspace_on_failure;
let worktree = session.provision_workspace()?;
let session_id = session.id.clone();
let events = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let emitter = spawn_event_drain(state.clone(), session_id.clone(), events.clone());
let sink = Arc::new(EventSink::new(
&session_id,
Some(emitter),
Some(args.state_dir.join(format!("{session_id}.events.jsonl"))),
));
sink.emit(CoderEventKind::EngineSelected {
engine: resolved.engine.label(),
reason: resolved.reason,
});
let is_agent_project = matches!(
args.project,
Some((_, super::project::ProjectKind::Agent))
);
let contract = if is_agent_project {
Ok(OutcomeContract {
description: format!(
"Build an in-daemon agent for: {}. It must pass its own acceptance scenarios.",
args.intent.trim()
),
checks: vec![super::contract::ContractCheck {
name: "agent_scenarios_pass".into(),
command: "(in-daemon scenario evaluation)".into(),
expect_exit_zero: true,
output_contains: None,
timeout_secs: 600,
}],
})
} else {
derive_app_contract(&generator, &args.intent, &worktree).await
};
let contract = match contract {
Ok(c) => c,
Err(e) => {
session.error = Some(e.clone());
let _ = session.transition(CoderState::Failed, &sink);
let entry = Arc::new(CoderSessionEntry {
session: Arc::new(tokio::sync::Mutex::new(session)),
events,
cancel: Arc::new(std::sync::atomic::AtomicBool::new(false)),
sink,
generator,
memory: memory.clone(),
mcp_endpoint: mcp_endpoint.clone(),
user_input: Arc::new(UserInputGate::new()),
task: std::sync::Mutex::new(None),
});
state
.coder_sessions
.lock()
.await
.insert(session_id.clone(), entry);
return Err(format!("contract derivation failed: {e}"));
}
};
session.contract = Some(contract.clone());
session.transition(CoderState::ContractProposed, &sink)?;
sink.emit(CoderEventKind::ContractProposed {
contract: contract.clone(),
});
let response = json!({
"session_id": session_id,
"state": session.state.as_str(),
"engine": session.engine.label(),
"worktree": session.workspace_path,
"contract": contract,
});
let entry = Arc::new(CoderSessionEntry {
session: Arc::new(tokio::sync::Mutex::new(session)),
events,
cancel: Arc::new(std::sync::atomic::AtomicBool::new(false)),
sink,
generator,
memory,
mcp_endpoint,
user_input: Arc::new(UserInputGate::new()),
task: std::sync::Mutex::new(None),
});
state
.coder_sessions
.lock()
.await
.insert(session_id, entry);
Ok(response)
}
async fn derive_app_contract(
generator: &Arc<dyn TurnGenerator>,
intent: &str,
worktree: &Path,
) -> Result<OutcomeContract, String> {
let summary = summarize_repo(worktree);
let gen_for_derive = generator.clone();
derive_contract(
move |prompt| {
let generator = gen_for_derive.clone();
async move {
generator
.generate(car_inference::GenerateRequest {
prompt,
params: car_inference::GenerateParams {
temperature: 0.0,
max_tokens: 2048,
thinking: car_inference::tasks::generate::ThinkingMode::Off,
..Default::default()
},
intent: Some(car_inference::IntentHint {
task: Some(car_inference::TaskHint::Code),
require: vec![car_inference::ModelCapability::Code],
prefer_quality: true,
..Default::default()
}),
..Default::default()
})
.await
.map(|r| r.text)
}
},
intent,
&summary,
3,
)
.await
}
pub async fn confirm_session(
state: &Arc<ServerState>,
session_id: &str,
contract_override: Option<OutcomeContract>,
) -> Result<Value, String> {
let entry = get_entry(state, session_id).await?;
{
let mut session = entry.session.lock().await;
if session.state != CoderState::ContractProposed {
return Err(format!(
"session is {}, expected contract_proposed",
session.state.as_str()
));
}
if let Some(contract) = contract_override {
let issues = contract.validate();
if !issues.is_empty() {
return Err(format!("edited contract is invalid: {}", issues.join("; ")));
}
entry.sink.emit(CoderEventKind::ContractProposed {
contract: contract.clone(),
});
session.contract = Some(contract);
}
session.transition(CoderState::ContractConfirmed, &entry.sink)?;
session.transition(CoderState::Running, &entry.sink)?;
}
let task_entry = entry.clone();
let handle = tokio::spawn(async move {
run_session_loop(task_entry).await;
});
*entry.task.lock().expect("task slot poisoned") = Some(handle);
Ok(json!({ "state": "running" }))
}
async fn run_session_loop(entry: Arc<CoderSessionEntry>) {
let (engine, intent, contract, worktree, max_iterations, project_kind) = {
let session = entry.session.lock().await;
let Some(contract) = session.contract.clone() else {
return; };
let Some(worktree) = session.workspace_path.clone() else {
return;
};
(
session.engine.clone(),
session.intent.clone(),
contract,
worktree,
session.max_iterations,
session.project_kind,
)
};
let executor = WorktreeExecutor::new(&worktree)
.with_delegate(Arc::new(ParsleeToolExecutor), ParsleeToolExecutor::tool_defs());
if matches!(project_kind, Some(super::project::ProjectKind::Agent)) {
let outcome = run_agent_build(&entry, &intent, &worktree, &executor, max_iterations).await;
finalize_outcome(&entry, &worktree, outcome).await;
return;
}
let native_cfg = NativeLoopConfig {
max_iterations,
..Default::default()
};
let asker = GateAsker {
sink: entry.sink.clone(),
gate: entry.user_input.clone(),
cancel: entry.cancel.clone(),
};
let outcome: LoopOutcome = match &engine {
EngineChoice::External(agent_id) if !agent_id.is_empty() => {
run_external_with_native_fallback(
&entry, agent_id, &intent, &contract, &executor, &native_cfg, &asker,
)
.await
}
EngineChoice::Foreman(agent_id) if !agent_id.is_empty() => {
match super::foreman_loop::run_foreman_loop(
agent_id,
&intent,
&contract,
&executor,
&entry.sink,
&entry.cancel,
&entry.generator,
entry.mcp_endpoint.as_deref(),
)
.await
{
Ok(outcome) if outcome.passed || outcome.error.is_some() => outcome,
Ok(_red) => {
entry.sink.emit(CoderEventKind::EngineFallback {
from: format!("foreman:{agent_id}"),
to: "native".into(),
reason: "contract not satisfied after foreman's verified union; \
repairing natively on top of it"
.into(),
});
run_native_loop(
entry.generator.as_ref(),
&executor,
&intent,
&contract,
&entry.sink,
&entry.cancel,
&native_cfg,
&entry.memory,
Some(&asker),
)
.await
}
Err(fallback) => {
entry.sink.emit(CoderEventKind::EngineFallback {
from: format!("foreman:{agent_id}"),
to: format!("external:{agent_id}"),
reason: fallback.reason(),
});
run_external_with_native_fallback(
&entry, agent_id, &intent, &contract, &executor, &native_cfg, &asker,
)
.await
}
}
}
_ => {
run_native_loop(
entry.generator.as_ref(),
&executor,
&intent,
&contract,
&entry.sink,
&entry.cancel,
&native_cfg,
&entry.memory,
Some(&asker),
)
.await
}
};
finalize_outcome(&entry, &worktree, outcome).await;
}
async fn finalize_outcome(entry: &Arc<CoderSessionEntry>, worktree: &Path, outcome: LoopOutcome) {
let mut session = entry.session.lock().await;
session.iterations = outcome.iterations;
session.last_check_results = outcome.last_results.clone();
if outcome.passed {
match stage_and_diff(worktree) {
Ok((stat, patch_truncated)) => {
entry.sink.emit(CoderEventKind::DiffReady {
stat,
patch_truncated,
});
}
Err(e) => {
entry.sink.emit(CoderEventKind::Error {
message: format!("diff generation failed: {e}"),
});
}
}
let _ = session.transition(CoderState::NeedsApproval, &entry.sink);
} else {
session.error = Some(outcome.error.unwrap_or_else(|| {
format!(
"contract not satisfied after {} iteration(s)",
outcome.iterations
)
}));
let to = if session.error.as_deref() == Some("cancelled") {
CoderState::Abandoned
} else {
CoderState::Failed
};
if to == CoderState::Failed && session.keep_workspace_on_failure {
if let Some(path) = &session.workspace_path {
entry.sink.emit(CoderEventKind::Error {
message: format!(
"session failed; worktree retained for postmortem at {} \
(keep_workspace_on_failure)",
path.display()
),
});
}
}
let _ = session.transition(to, &entry.sink);
}
}
async fn run_agent_build(
entry: &Arc<CoderSessionEntry>,
intent: &str,
worktree: &Path,
executor: &WorktreeExecutor,
max_iterations: u32,
) -> LoopOutcome {
use super::declarative::{build_agent, BuildAgentConfig};
if entry.cancel.load(std::sync::atomic::Ordering::SeqCst) {
return LoopOutcome {
passed: false,
iterations: 0,
last_results: Vec::new(),
error: Some("cancelled".into()),
};
}
let agent_id = {
let session = entry.session.lock().await;
session
.project
.clone()
.unwrap_or_else(|| session.short_id().to_string())
};
let mut available_tools: Vec<String> = WorktreeExecutor::tool_defs()
.iter()
.filter_map(|d| d.get("name").and_then(Value::as_str).map(String::from))
.collect();
available_tools.extend(ParsleeToolExecutor::tool_names());
entry.sink.emit(CoderEventKind::PlanText {
text: "Designing the agent and checking it against its scenarios…".into(),
});
let cfg = BuildAgentConfig {
agent_id,
available_tools,
max_attempts: max_iterations.max(3),
};
let built = build_agent(intent, entry.generator.as_ref(), executor, &cfg).await;
if !built.passed {
return LoopOutcome {
passed: false,
iterations: built.attempts,
last_results: Vec::new(),
error: Some(if built.issues.is_empty() {
"could not build an agent that passes its scenarios".into()
} else {
format!("agent did not pass its scenarios: {}", built.issues.join("; "))
}),
};
}
let spec = built.spec.expect("passed build has a spec");
let agent_json = serde_json::to_string_pretty(&spec).unwrap_or_default();
let scenarios_json = serde_json::to_string_pretty(&spec.scenarios).unwrap_or_default();
if let Err(e) = std::fs::write(worktree.join("agent.json"), agent_json)
.and_then(|_| std::fs::write(worktree.join("scenarios.json"), scenarios_json))
{
return LoopOutcome {
passed: false,
iterations: built.attempts,
last_results: Vec::new(),
error: Some(format!("failed to write the agent spec: {e}")),
};
}
entry.sink.emit(CoderEventKind::PlanText {
text: format!(
"Built agent '{}' — {} scenario(s) pass. Tools: {}.",
spec.name,
spec.scenarios.len(),
if spec.tools.is_empty() {
"none".into()
} else {
spec.tools.join(", ")
}
),
});
entry.session.lock().await.built_agent = Some(spec);
LoopOutcome {
passed: true,
iterations: built.attempts,
last_results: Vec::new(),
error: None,
}
}
async fn run_external_with_native_fallback(
entry: &Arc<CoderSessionEntry>,
agent_id: &str,
intent: &str,
contract: &OutcomeContract,
executor: &WorktreeExecutor,
native_cfg: &NativeLoopConfig,
asker: &GateAsker,
) -> LoopOutcome {
let external = run_external_loop(
agent_id,
intent,
contract,
executor,
&entry.sink,
&entry.cancel,
&ExternalLoopConfig::default(),
entry.mcp_endpoint.as_deref(),
)
.await;
let spawn_failed = external.error.as_deref().is_some_and(|e| e != "cancelled");
if spawn_failed {
entry.sink.emit(CoderEventKind::EngineFallback {
from: format!("external:{agent_id}"),
to: "native".into(),
reason: external.error.clone().unwrap_or_default(),
});
run_native_loop(
entry.generator.as_ref(),
executor,
intent,
contract,
&entry.sink,
&entry.cancel,
native_cfg,
&entry.memory,
Some(asker),
)
.await
} else {
external
}
}
pub async fn approve_merge_session(
state: &Arc<ServerState>,
session_id: &str,
approve: bool,
) -> Result<Value, String> {
let entry = get_entry(state, session_id).await?;
let mut session = entry.session.lock().await;
if session.state != CoderState::NeedsApproval {
return Err(format!(
"session is {}, expected needs_approval",
session.state.as_str()
));
}
if !approve {
session.transition(CoderState::Abandoned, &entry.sink)?;
return Ok(json!({ "state": "abandoned" }));
}
let worktree = session
.workspace_path
.clone()
.ok_or("session has no worktree")?;
let contract = session.contract.clone().ok_or("session has no contract")?;
let branch = if session.project.is_some() {
super::merge::commit_to_main(&session.repo, &worktree, &session.intent, &contract)?;
"main".to_string()
} else {
publish_branch(
&session.repo,
&worktree,
session.short_id(),
&session.intent,
&contract,
)?
};
session.result_branch = Some(branch.clone());
let mut registered_agent: Option<String> = None;
if let Some(spec) = session.built_agent.clone() {
match state.declagents().and_then(|r| r.upsert(spec.clone())) {
Ok(()) => {
registered_agent = Some(spec.id.clone());
entry.sink.emit(CoderEventKind::PlanText {
text: format!("Agent '{}' added to your agents and ready to run.", spec.name),
});
}
Err(e) => {
entry.sink.emit(CoderEventKind::Error {
message: format!("agent built and saved, but registration failed: {e}"),
});
}
}
}
entry.sink.emit(CoderEventKind::MergeCompleted {
branch: branch.clone(),
});
session.transition(CoderState::Merged, &entry.sink)?;
Ok(json!({ "state": "merged", "branch": branch, "agent_id": registered_agent }))
}
pub async fn cancel_session(state: &Arc<ServerState>, session_id: &str) -> Result<Value, String> {
let entry = get_entry(state, session_id).await?;
entry.cancel.store(true, std::sync::atomic::Ordering::SeqCst);
entry.user_input.clear();
if let Some(handle) = entry.task.lock().expect("task slot poisoned").take() {
handle.abort();
}
let mut session = entry.session.lock().await;
if !session.state.is_terminal() {
session.transition(CoderState::Abandoned, &entry.sink)?;
}
Ok(json!({ "state": session.state.as_str() }))
}
async fn get_entry(
state: &Arc<ServerState>,
session_id: &str,
) -> Result<Arc<CoderSessionEntry>, String> {
state
.coder_sessions
.lock()
.await
.get(session_id)
.cloned()
.ok_or_else(|| format!("no live coder session '{session_id}'"))
}
fn session_summary(session: &CoderSession, live: bool) -> Value {
json!({
"session_id": session.id,
"state": session.state.as_str(),
"intent": session.intent,
"repo": session.repo,
"engine": session.engine.label(),
"iterations": session.iterations,
"updated_at": session.updated_at,
"live": live,
"error": session.error,
})
}
#[derive(Deserialize)]
struct StartParams {
#[serde(default)]
repo: Option<PathBuf>,
#[serde(default)]
project: Option<String>,
intent: String,
#[serde(default)]
engine: Option<String>,
#[serde(default)]
max_iterations: Option<u32>,
}
pub async fn handle_coder_start(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: StartParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
let engine = EngineChoice::parse(params.engine.as_deref().unwrap_or("auto"))?;
let generator: Arc<dyn TurnGenerator> = crate::handler::get_inference_engine(state).clone();
let (repo, project) = match (params.repo, params.project) {
(Some(_), Some(_)) => {
return Err("provide exactly one of `repo` or `project`, not both".into())
}
(None, None) => {
return Err("provide one of `repo` (a git path) or `project` (a managed project)".into())
}
(Some(repo), None) => (repo, None),
(None, Some(slug)) => {
let proj = super::project::load_project(&slug)?;
(proj.repo_path, Some((proj.slug, proj.kind)))
}
};
start_session(
state,
StartArgs {
repo,
intent: params.intent,
engine,
max_iterations: params.max_iterations,
state_dir: coder_state_dir()?,
project,
},
generator,
)
.await
}
#[derive(Deserialize)]
struct ProjectsCreateParams {
name: String,
#[serde(default)]
kind: Option<String>,
}
pub async fn handle_coder_projects_create(
req: &JsonRpcMessage,
_state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: ProjectsCreateParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
let kind = super::project::ProjectKind::parse(params.kind.as_deref().unwrap_or("app"))?;
let project = super::project::resolve_or_create_project(¶ms.name, kind)?;
serde_json::to_value(&project).map_err(|e| e.to_string())
}
pub async fn handle_coder_projects_list(_state: &Arc<ServerState>) -> Result<Value, String> {
Ok(json!({ "projects": super::project::list_projects() }))
}
#[derive(Deserialize)]
struct ProjectsGetParams {
slug: String,
}
pub async fn handle_coder_projects_get(
req: &JsonRpcMessage,
_state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: ProjectsGetParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
let project = super::project::load_project(¶ms.slug)?;
serde_json::to_value(&project).map_err(|e| e.to_string())
}
#[derive(Deserialize)]
struct ConfirmParams {
session_id: String,
#[serde(default)]
contract: Option<OutcomeContract>,
}
pub async fn handle_coder_confirm_contract(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: ConfirmParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
confirm_session(state, ¶ms.session_id, params.contract).await
}
pub async fn handle_coder_list(state: &Arc<ServerState>) -> Result<Value, String> {
let mut out: Vec<Value> = Vec::new();
let mut live_ids = std::collections::HashSet::new();
{
let sessions = state.coder_sessions.lock().await;
for entry in sessions.values() {
let session = entry.session.lock().await;
live_ids.insert(session.id.clone());
out.push(session_summary(&session, true));
}
}
if let Ok(dir) = coder_state_dir() {
for session in CoderSession::list(&dir) {
if !live_ids.contains(&session.id) {
out.push(session_summary(&session, false));
}
}
}
out.sort_by_key(|v| std::cmp::Reverse(v["updated_at"].as_u64().unwrap_or(0)));
Ok(json!({ "sessions": out }))
}
#[derive(Deserialize)]
struct SessionIdParams {
session_id: String,
}
pub async fn handle_coder_get(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: SessionIdParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
if let Ok(entry) = get_entry(state, ¶ms.session_id).await {
let session = entry.session.lock().await;
let mut value = serde_json::to_value(&*session).map_err(|e| e.to_string())?;
value["live"] = json!(true);
value["next_seq"] = json!(entry.events.lock().await.len() as u64);
return Ok(value);
}
let dir = coder_state_dir()?;
let session = CoderSession::load(&dir.join(format!("{}.json", params.session_id)))?;
let mut value = serde_json::to_value(&session).map_err(|e| e.to_string())?;
value["live"] = json!(false);
Ok(value)
}
#[derive(Deserialize)]
struct SubscribeParams {
session_id: String,
#[serde(default)]
from_seq: u64,
}
pub async fn handle_coder_subscribe(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
session: &Arc<ClientSession>,
) -> Result<Value, String> {
let params: SubscribeParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
let entry = get_entry(state, ¶ms.session_id).await?;
let buffer = entry.events.lock().await;
let mut replayed = 0u64;
for event in buffer.iter().filter(|e| e.seq >= params.from_seq) {
if let Some(frame) = now_event_frame(event) {
send_frame(&session.channel, &frame).await;
replayed += 1;
}
}
state.coder_subscribers.lock().await.insert(
(params.session_id.clone(), session.client_id.clone()),
session.channel.clone(),
);
drop(buffer);
let current_state = entry.session.lock().await.state.as_str().to_string();
Ok(json!({ "state": current_state, "events_replayed": replayed }))
}
pub async fn handle_coder_unsubscribe(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
session: &Arc<ClientSession>,
) -> Result<Value, String> {
let params: SessionIdParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
state
.coder_subscribers
.lock()
.await
.remove(&(params.session_id, session.client_id.clone()));
Ok(json!({ "ok": true }))
}
#[derive(Deserialize)]
struct RespondParams {
session_id: String,
text: String,
}
pub async fn handle_coder_respond(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: RespondParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
let entry = get_entry(state, ¶ms.session_id).await?;
entry.user_input.fulfill(params.text)?;
Ok(json!({ "ok": true }))
}
#[derive(Deserialize)]
struct ApproveParams {
session_id: String,
approve: bool,
}
pub async fn handle_coder_approve_merge(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: ApproveParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
approve_merge_session(state, ¶ms.session_id, params.approve).await
}
pub async fn handle_coder_cancel(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: SessionIdParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
cancel_session(state, ¶ms.session_id).await
}
pub async fn drop_subscriptions_for_client(state: &ServerState, client_id: &str) {
state
.coder_subscribers
.lock()
.await
.retain(|(_, cid), _| cid != client_id);
}
pub type CoderSessionMap = HashMap<String, Arc<CoderSessionEntry>>;
fn declarative_row(spec: &car_registry::declarative::DeclarativeAgentSpec) -> Value {
json!({
"id": spec.id,
"name": spec.name,
"kind": "declarative",
"enabled": spec.enabled,
"tools": spec.tools,
"scenarios": spec.scenarios.len(),
})
}
pub async fn declarative_agent_rows(state: &Arc<ServerState>) -> Vec<Value> {
match state.declagents() {
Ok(reg) => reg.list().iter().map(declarative_row).collect(),
Err(_) => Vec::new(),
}
}
pub async fn handle_declagents_list(state: &Arc<ServerState>) -> Result<Value, String> {
let reg = state.declagents()?;
Ok(json!({ "agents": reg.list().iter().map(declarative_row).collect::<Vec<_>>() }))
}
#[derive(Deserialize)]
struct DeclAgentIdParams {
id: String,
}
pub async fn handle_declagents_get(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: DeclAgentIdParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
let reg = state.declagents()?;
let spec = reg
.get(¶ms.id)
.ok_or_else(|| format!("no declarative agent '{}'", params.id))?;
serde_json::to_value(&spec).map_err(|e| e.to_string())
}
pub async fn handle_declagents_remove(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: DeclAgentIdParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
let reg = state.declagents()?;
Ok(json!({ "removed": reg.remove(¶ms.id)? }))
}
#[derive(Deserialize)]
struct DeclAgentEnableParams {
id: String,
enabled: bool,
}
pub async fn handle_declagents_set_enabled(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: DeclAgentEnableParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
let reg = state.declagents()?;
reg.set_enabled(¶ms.id, params.enabled)?;
Ok(json!({ "ok": true }))
}
#[derive(Deserialize)]
struct DeclAgentInvokeParams {
id: String,
input: String,
}
async fn run_declarative(
spec: &car_registry::declarative::DeclarativeAgentSpec,
input: &str,
state: &Arc<ServerState>,
) -> Result<super::declarative::AgentRunResult, String> {
let generator: Arc<dyn TurnGenerator> = crate::handler::get_inference_engine(state).clone();
let scratch = tempfile::tempdir().map_err(|e| format!("scratch dir: {e}"))?;
let executor = WorktreeExecutor::new(scratch.path())
.with_delegate(Arc::new(ParsleeToolExecutor), ParsleeToolExecutor::tool_defs());
let runner =
super::declarative::DeclarativeAgentRunner::new(spec, generator.as_ref(), &executor);
Ok(runner.run(input).await)
}
fn run_result_json(result: &super::declarative::AgentRunResult) -> Value {
json!({
"output": result.output,
"turns": result.turns,
"tool_calls": result.tool_calls,
"error": result.error,
})
}
fn run_succeeded(result: &super::declarative::AgentRunResult) -> bool {
result.error.is_none() && !result.output.trim().is_empty()
}
fn run_is_recordable(result: &super::declarative::AgentRunResult) -> bool {
!(result.turns == 0 && result.error.is_some())
}
fn record_routing_outcome(
state: &Arc<ServerState>,
agent_id: &str,
result: &super::declarative::AgentRunResult,
) {
if !run_is_recordable(result) {
return;
}
if let Ok(store) = state.routing() {
let _ = store.record_outcome(agent_id, run_succeeded(result));
}
}
fn record_routing_edge(state: &Arc<ServerState>, from: &str, to: &str, ok: bool) {
if let Ok(store) = state.routing() {
let _ = store.record_edge(from, to, ok);
}
}
fn record_routing_capability(state: &Arc<ServerState>, agent: &str, task_emb: &[f32]) {
if let Ok(store) = state.routing() {
let _ = store.record_capability(agent, task_emb);
}
}
pub async fn handle_declagents_invoke(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: DeclAgentInvokeParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
let reg = state.declagents()?;
let spec = reg
.get(¶ms.id)
.ok_or_else(|| format!("no declarative agent '{}'", params.id))?;
if !spec.enabled {
return Err(format!("agent '{}' is disabled", params.id));
}
let result = run_declarative(&spec, ¶ms.input, state).await?;
record_routing_outcome(state, &spec.id, &result);
Ok(run_result_json(&result))
}
fn capability_text(spec: &car_registry::declarative::DeclarativeAgentSpec) -> String {
let mut text = format!("{}. {}", spec.name, spec.identity);
if !spec.standing_goal.is_empty() {
text.push_str(&format!(" Goal: {}.", spec.standing_goal));
}
if !spec.tools.is_empty() {
text.push_str(&format!(" Tools: {}.", spec.tools.join(", ")));
}
text
}
fn cosine(a: &[f32], b: &[f32]) -> f32 {
if a.len() != b.len() {
return 0.0;
}
let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
let na: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let nb: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if na == 0.0 || nb == 0.0 {
0.0
} else {
dot / (na * nb)
}
}
const ROUTE_SIMILARITY_WEIGHT: f32 = 0.7;
const ROUTE_EDGE_WEIGHT: f32 = 0.2;
const MAX_ROUTE_HOPS: usize = 4;
const LEARNED_SIM_WEIGHT: f32 = 0.4;
fn blended_similarity(coldstart: f32, learned: Option<f32>) -> f32 {
match learned {
Some(l) => (1.0 - LEARNED_SIM_WEIGHT) * coldstart + LEARNED_SIM_WEIGHT * l,
None => coldstart,
}
}
fn blended_score(similarity: f32, success_prior: f32) -> f32 {
let sim = similarity.max(0.0);
ROUTE_SIMILARITY_WEIGHT * sim + (1.0 - ROUTE_SIMILARITY_WEIGHT) * success_prior
}
fn route_score(similarity: f32, success_prior: f32, edge_weight: f32) -> f32 {
blended_score(similarity, success_prior) + ROUTE_EDGE_WEIGHT * edge_weight
}
fn is_excluded(id: &str, from: Option<&str>, visited: &[String]) -> bool {
from == Some(id) || visited.iter().any(|v| v == id)
}
fn rank_agents(
need_emb: &[f32],
agent_embs: &[Vec<f32>],
agents: &[car_registry::declarative::DeclarativeAgentSpec],
routing: &car_registry::routing::RoutingSnapshot,
from: Option<&str>,
) -> Vec<(usize, f32, f32, f32, f32)> {
let mut ranked: Vec<(usize, f32, f32, f32, f32)> = agent_embs
.iter()
.enumerate()
.map(|(i, e)| {
let coldstart = cosine(need_emb, e);
let learned = routing
.learned_capability(&agents[i].id)
.map(|c| cosine(need_emb, c));
let similarity = blended_similarity(coldstart, learned);
let prior = routing.success_prior(&agents[i].id);
let edge = from.map_or(0.0, |f| routing.edge_weight(f, &agents[i].id));
(i, route_score(similarity, prior, edge), similarity, prior, edge)
})
.collect();
ranked.sort_by(|a, b| {
b.1.total_cmp(&a.1)
.then_with(|| agents[a.0].id.cmp(&agents[b.0].id))
});
ranked
}
#[derive(Deserialize)]
struct DeclAgentRouteParams {
need: String,
#[serde(default)]
invoke: bool,
#[serde(default)]
from: Option<String>,
#[serde(default)]
visited: Vec<String>,
}
const ROUTE_TOP_K: usize = 3;
pub async fn handle_declagents_route(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: DeclAgentRouteParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
if params.need.trim().is_empty() {
return Err("need must be a non-empty task description".to_string());
}
if params.visited.len() >= MAX_ROUTE_HOPS {
return Err(format!(
"routing path exceeded {MAX_ROUTE_HOPS} hops (cycle or runaway forward)"
));
}
let from = params.from.as_deref();
let reg = state.declagents()?;
let agents: Vec<_> = reg
.list()
.into_iter()
.filter(|s| s.enabled && !is_excluded(&s.id, from, ¶ms.visited))
.collect();
if agents.is_empty() {
return Err("no eligible declarative agents to route to".to_string());
}
let engine = crate::handler::get_inference_engine(state);
let _permit = state.admission.acquire().await;
let need_embs = engine
.embed(car_inference::EmbedRequest {
texts: vec![params.need.clone()],
model: None,
instruction: Some("Match this task to the agent best able to perform it".to_string()),
is_query: true,
})
.await
.map_err(|e| format!("embed failed: {e}"))?;
let agent_embs = engine
.embed(car_inference::EmbedRequest {
texts: agents.iter().map(capability_text).collect(),
model: None,
instruction: None,
is_query: false,
})
.await
.map_err(|e| format!("embed failed: {e}"))?;
drop(_permit);
let need_emb = need_embs
.first()
.ok_or_else(|| "embedder returned no vectors".to_string())?;
let routing = state
.routing()
.map(|s| s.snapshot())
.unwrap_or_default();
let ranked = rank_agents(need_emb, &agent_embs, &agents, &routing, from);
let candidates: Vec<Value> = ranked
.iter()
.take(ROUTE_TOP_K)
.map(|(i, score, similarity, prior, edge)| {
json!({
"id": agents[*i].id,
"name": agents[*i].name,
"score": score,
"similarity": similarity,
"success_rate": prior,
"edge_weight": edge,
})
})
.collect();
let chosen = &agents[ranked[0].0];
let result = if params.invoke {
let run = run_declarative(chosen, ¶ms.need, state).await?;
record_routing_outcome(state, &chosen.id, &run);
if run_is_recordable(&run) {
if run_succeeded(&run) {
record_routing_capability(state, &chosen.id, need_emb);
}
if let Some(f) = from {
record_routing_edge(state, f, &chosen.id, run_succeeded(&run));
}
}
Some(run_result_json(&run))
} else {
None
};
let mut next_visited = params.visited.clone();
next_visited.push(chosen.id.clone());
Ok(json!({
"chosen": chosen.id,
"candidates": candidates,
"invoked": params.invoke,
"result": result,
"next_visited": next_visited,
}))
}
const DEFAULT_MAX_SUBTASKS: usize = 5;
const MAX_SUBTASKS_CAP: usize = 10;
fn parse_subtasks(raw: &str, need: &str, max: usize) -> Vec<String> {
let subs: Vec<String> = serde_json::from_str::<Value>(raw)
.ok()
.and_then(|v| v.get("subtasks").and_then(|s| s.as_array()).cloned())
.into_iter()
.flatten()
.filter_map(|v| v.as_str().map(|s| s.trim().to_string()))
.filter(|s| !s.is_empty())
.take(max)
.collect();
if subs.is_empty() {
vec![need.to_string()]
} else {
subs
}
}
async fn decompose_need(state: &Arc<ServerState>, need: &str, max: usize) -> Vec<String> {
let prompt = format!(
"You are a task planner. Decompose the request below into at most {max} \
INDEPENDENT subtasks, each handleable by a separate specialist agent. \
If the request is already atomic, return it as a single subtask. \
Respond with JSON only: {{\"subtasks\": [\"...\", \"...\"]}}.\n\n\
Request: {need}"
);
let engine = crate::handler::get_inference_engine(state);
let _permit = state.admission.acquire().await;
let raw = engine
.generate(car_inference::GenerateRequest {
prompt,
response_format: Some(car_inference::ResponseFormat::JsonObject),
..Default::default()
})
.await;
drop(_permit);
match raw {
Ok(text) => parse_subtasks(&text, need, max),
Err(_) => vec![need.to_string()],
}
}
#[derive(Deserialize)]
struct DeclAgentSplitParams {
need: String,
#[serde(default)]
invoke: bool,
#[serde(default)]
max_subtasks: Option<usize>,
}
pub async fn handle_declagents_route_split(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: DeclAgentSplitParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
if params.need.trim().is_empty() {
return Err("need must be a non-empty task description".to_string());
}
let max = params
.max_subtasks
.unwrap_or(DEFAULT_MAX_SUBTASKS)
.clamp(1, MAX_SUBTASKS_CAP);
let reg = state.declagents()?;
let agents: Vec<_> = reg.list().into_iter().filter(|s| s.enabled).collect();
if agents.is_empty() {
return Err("no enabled declarative agents to route to".to_string());
}
let subtasks = decompose_need(state, ¶ms.need, max).await;
let engine = crate::handler::get_inference_engine(state);
let _permit = state.admission.acquire().await;
let agent_embs = engine
.embed(car_inference::EmbedRequest {
texts: agents.iter().map(capability_text).collect(),
model: None,
instruction: None,
is_query: false,
})
.await
.map_err(|e| format!("embed failed: {e}"))?;
let sub_embs = engine
.embed(car_inference::EmbedRequest {
texts: subtasks.clone(),
model: None,
instruction: Some("Match this task to the agent best able to perform it".to_string()),
is_query: true,
})
.await
.map_err(|e| format!("embed failed: {e}"))?;
drop(_permit);
let routing = state
.routing()
.map(|s| s.snapshot())
.unwrap_or_default();
let mut routed = Vec::with_capacity(subtasks.len());
for (i, sub) in subtasks.iter().enumerate() {
let Some(need_emb) = sub_embs.get(i) else {
continue;
};
let ranked = rank_agents(need_emb, &agent_embs, &agents, &routing, None);
let (idx, score, ..) = ranked[0]; let chosen = &agents[idx];
let result = if params.invoke {
match run_declarative(chosen, sub, state).await {
Ok(run) => {
record_routing_outcome(state, &chosen.id, &run);
if run_is_recordable(&run) && run_succeeded(&run) {
record_routing_capability(state, &chosen.id, need_emb);
}
Some(run_result_json(&run))
}
Err(e) => Some(json!({ "error": e })),
}
} else {
None
};
routed.push(json!({
"subtask": sub,
"chosen": chosen.id,
"score": score,
"result": result,
}));
}
Ok(json!({
"subtasks": routed,
"count": routed.len(),
"invoked": params.invoke,
}))
}
pub async fn handle_declagents_routing_stats(state: &Arc<ServerState>) -> Result<Value, String> {
let snapshot = state.routing()?.snapshot();
let agents: serde_json::Map<String, Value> = snapshot
.agents
.iter()
.map(|(id, s)| {
(
id.clone(),
json!({
"successes": s.successes,
"failures": s.failures,
"ema_success_rate": s.ema_success_rate,
"learned": !s.learned_vector.is_empty(),
}),
)
})
.collect();
Ok(json!({ "agents": agents, "edges": snapshot.edges }))
}
const DISCOVERY_DEFAULT_LIMIT: usize = 5;
const DISCOVERY_MAX_LIMIT: usize = 50;
const DISCOVERY_PROVIDER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
const EXTERNAL_DETECT_TTL: std::time::Duration = std::time::Duration::from_secs(60);
#[derive(Deserialize)]
struct DiscoveryResolveParams {
need: String,
#[serde(default)]
limit: Option<usize>,
}
struct DiscoveredService {
identifier: String,
name: String,
kind: String,
protocol: String,
capability_text: String,
agent_id: Option<String>,
}
fn declarative_services(state: &Arc<ServerState>) -> Vec<DiscoveredService> {
let Ok(reg) = state.declagents() else {
return Vec::new();
};
reg.list()
.into_iter()
.filter(|s| s.enabled)
.filter_map(|s| {
let identifier =
car_connectors::discovery::ServiceIdentifier::local("agent", &s.id).ok()?;
let capability = capability_text(&s);
Some(DiscoveredService {
identifier: identifier.to_string(),
name: s.name,
kind: "declarative".to_string(),
protocol: "in-daemon".to_string(),
capability_text: capability,
agent_id: Some(s.id),
})
})
.collect()
}
async fn connector_services(state: &Arc<ServerState>) -> Vec<DiscoveredService> {
state.ensure_connectors_loaded().await;
let mgr = state.connectors();
let mut out = Vec::new();
for status in mgr.list().await {
if !status.connected {
continue;
}
let Ok(tools) = mgr.tools(&status.slug).await else {
continue;
};
for t in tools {
if !t.enabled {
continue;
}
let Ok(identifier) = car_connectors::discovery::ServiceIdentifier::new(
status.slug.clone(),
[String::from("tool")],
t.name.clone(),
) else {
continue;
};
let capability_text = if t.description.is_empty() {
t.name.clone()
} else {
format!("{}. {}", t.name, t.description)
};
out.push(DiscoveredService {
identifier: identifier.to_string(),
name: t.canonical,
kind: "connector".to_string(),
protocol: "mcp".to_string(),
capability_text,
agent_id: None,
});
}
}
out
}
fn external_capability_text(spec: &car_external_agents::ExternalAgentSpec) -> String {
let c = &spec.capabilities;
let feats: Vec<&str> = [
(c.tool_use, "tool use"),
(c.mcp, "MCP"),
(c.hooks, "hooks"),
(c.sessions, "sessions"),
(c.streaming, "streaming"),
]
.into_iter()
.filter_map(|(on, label)| on.then_some(label))
.collect();
let mut text = format!("{}. Agentic coding CLI.", spec.display_name);
if !feats.is_empty() {
text.push_str(&format!(" Capabilities: {}.", feats.join(", ")));
}
text
}
fn external_detect_cache(
) -> &'static tokio::sync::Mutex<Option<(std::time::Instant, Vec<car_external_agents::ExternalAgentSpec>)>>
{
static CACHE: std::sync::OnceLock<
tokio::sync::Mutex<Option<(std::time::Instant, Vec<car_external_agents::ExternalAgentSpec>)>>,
> = std::sync::OnceLock::new();
CACHE.get_or_init(|| tokio::sync::Mutex::new(None))
}
async fn external_agent_services() -> Vec<DiscoveredService> {
let specs = {
let mut guard = external_detect_cache().lock().await;
let fresh = guard
.as_ref()
.is_some_and(|(at, _)| at.elapsed() < EXTERNAL_DETECT_TTL);
if !fresh {
*guard = Some((std::time::Instant::now(), car_external_agents::detect().await));
}
guard.as_ref().map(|(_, s)| s.clone()).unwrap_or_default()
};
specs
.into_iter()
.filter_map(|spec| {
let identifier = car_connectors::discovery::ServiceIdentifier::new(
"external",
[String::from("agent")],
spec.id.clone(),
)
.ok()?;
Some(DiscoveredService {
identifier: identifier.to_string(),
capability_text: external_capability_text(&spec),
name: spec.display_name,
kind: "external".to_string(),
protocol: "cli".to_string(),
agent_id: None,
})
})
.collect()
}
const A2A_CARD_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
const A2A_CARD_TTL: std::time::Duration = std::time::Duration::from_secs(60);
fn a2a_card_cache(
) -> &'static tokio::sync::Mutex<std::collections::HashMap<String, (std::time::Instant, car_a2a::AgentCard)>>
{
static CACHE: std::sync::OnceLock<
tokio::sync::Mutex<std::collections::HashMap<String, (std::time::Instant, car_a2a::AgentCard)>>,
> = std::sync::OnceLock::new();
CACHE.get_or_init(|| tokio::sync::Mutex::new(std::collections::HashMap::new()))
}
async fn peer_card_cached(url: &str) -> Option<car_a2a::AgentCard> {
if let Some((at, card)) = a2a_card_cache().lock().await.get(url) {
if at.elapsed() < A2A_CARD_TTL {
return Some(card.clone());
}
}
let fetched = tokio::time::timeout(
A2A_CARD_TIMEOUT,
car_a2a::A2aClient::new(url.to_string()).agent_card(),
)
.await;
let card = match fetched {
Ok(Ok(c)) => c,
_ => return None,
};
a2a_card_cache()
.lock()
.await
.insert(url.to_string(), (std::time::Instant::now(), card.clone()));
Some(card)
}
async fn a2a_peer_services() -> Vec<DiscoveredService> {
let Ok(reg) = car_a2a::peers::PeerRegistry::user_default() else {
return Vec::new();
};
let peers = reg.list();
{
let live: std::collections::HashSet<&str> = peers.iter().map(|p| p.url.as_str()).collect();
a2a_card_cache().lock().await.retain(|url, _| live.contains(url.as_str()));
}
let fetched = futures::future::join_all(
peers
.into_iter()
.map(|peer| async move { peer_card_cached(&peer.url).await.map(|card| (peer, card)) }),
)
.await;
let mut out = Vec::new();
for (peer, card) in fetched.into_iter().flatten() {
for skill in card.skills {
let identifier = match car_connectors::discovery::ServiceIdentifier::new(
peer.slug.clone(),
[String::from("skill")],
skill.id.clone(),
) {
Ok(id) => id,
Err(_) => {
tracing::debug!(
peer = %peer.slug,
skill = %skill.id,
"discovery: skipping a2a skill with non-identifier id"
);
continue;
}
};
let capability_text = if skill.description.is_empty() {
skill.name.clone()
} else {
format!("{}. {}", skill.name, skill.description)
};
out.push(DiscoveredService {
identifier: identifier.to_string(),
name: skill.name,
kind: "a2a".to_string(),
protocol: "a2a".to_string(),
capability_text,
agent_id: None,
});
}
}
out
}
const AGENTDNS_ROOT_URL_ENV: &str = "CAR_AGENTDNS_ROOT_URL";
const MAX_REMOTE_RECORDS: usize = 100;
const MAX_REMOTE_TEXT_CHARS: usize = 2000;
fn parslee_api_host() -> Option<String> {
let base = std::env::var(crate::parslee_auth::API_BASE_KEY)
.unwrap_or_else(|_| crate::parslee_auth::DEFAULT_API_BASE.to_string());
reqwest::Url::parse(&base)
.ok()
.and_then(|u| u.host_str().map(str::to_string))
}
fn root_host_is_trusted(root_url: &str) -> bool {
let Ok(url) = reqwest::Url::parse(root_url) else {
return false;
};
url.scheme() == "https" && url.host_str() == parslee_api_host().as_deref()
}
async fn trusted_root_bearer(root_url: &str, _state: &Arc<ServerState>) -> Option<String> {
if !root_host_is_trusted(root_url) {
return None;
}
car_auth::access_token_refreshing().await
}
fn truncate_chars(s: &str, max: usize) -> String {
s.chars().take(max).collect()
}
async fn remote_root_services(state: &Arc<ServerState>, need: &str, limit: usize) -> Vec<DiscoveredService> {
let Some(base) = std::env::var_os(AGENTDNS_ROOT_URL_ENV) else {
return Vec::new();
};
let base = base.to_string_lossy().into_owned();
let token = trusted_root_bearer(&base, state).await;
let root = car_connectors::discovery::RemoteRoot::new(base, token);
let records = match root.resolve(need, limit).await {
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, "discovery.resolve: remote root resolve failed; skipping");
return Vec::new();
}
};
records
.into_iter()
.take(MAX_REMOTE_RECORDS)
.filter_map(|rec| {
let identifier = car_connectors::discovery::ServiceIdentifier::parse(&rec.identifier)
.ok()?
.to_string();
let raw = if rec.description.is_empty() {
rec.name.clone()
} else {
format!("{}. {}", rec.name, rec.description)
};
Some(DiscoveredService {
identifier,
name: truncate_chars(&rec.name, MAX_REMOTE_TEXT_CHARS),
kind: truncate_chars(&rec.kind, 64),
protocol: truncate_chars(&rec.protocol, 64),
capability_text: truncate_chars(&raw, MAX_REMOTE_TEXT_CHARS),
agent_id: None,
})
})
.collect()
}
fn score_service(
service: &DiscoveredService,
need_emb: &[f32],
cap_emb: &[f32],
routing: &car_registry::routing::RoutingSnapshot,
) -> (f32, f32) {
let coldstart = cosine(need_emb, cap_emb);
let (learned, prior) = match &service.agent_id {
Some(id) => (
routing.learned_capability(id).map(|c| cosine(need_emb, c)),
routing.success_prior(id),
),
None => (None, car_registry::routing::NEUTRAL_PRIOR),
};
let similarity = blended_similarity(coldstart, learned);
(route_score(similarity, prior, 0.0), similarity)
}
pub async fn handle_discovery_resolve(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: DiscoveryResolveParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
if params.need.trim().is_empty() {
return Err("need must be a non-empty capability description".to_string());
}
let limit = params
.limit
.unwrap_or(DISCOVERY_DEFAULT_LIMIT)
.clamp(1, DISCOVERY_MAX_LIMIT);
let mut services = declarative_services(state);
match tokio::time::timeout(DISCOVERY_PROVIDER_TIMEOUT, connector_services(state)).await {
Ok(connectors) => services.extend(connectors),
Err(_) => {
tracing::warn!("discovery.resolve: connector provider timed out; skipping")
}
}
match tokio::time::timeout(DISCOVERY_PROVIDER_TIMEOUT, external_agent_services()).await {
Ok(external) => services.extend(external),
Err(_) => {
tracing::warn!("discovery.resolve: external-agent provider timed out; skipping")
}
}
match tokio::time::timeout(DISCOVERY_PROVIDER_TIMEOUT, a2a_peer_services()).await {
Ok(peers) => services.extend(peers),
Err(_) => {
tracing::warn!("discovery.resolve: a2a-peer provider timed out; skipping")
}
}
match tokio::time::timeout(
DISCOVERY_PROVIDER_TIMEOUT,
remote_root_services(state, ¶ms.need, limit),
)
.await
{
Ok(remote) => services.extend(remote),
Err(_) => {
tracing::warn!("discovery.resolve: remote-root provider timed out; skipping")
}
}
let mut seen = std::collections::HashSet::new();
services.retain(|s| seen.insert(s.identifier.clone()));
if services.is_empty() {
return Ok(json!({ "services": [], "count": 0 }));
}
let engine = crate::handler::get_inference_engine(state);
let _permit = state.admission.acquire().await;
let need_embs = engine
.embed(car_inference::EmbedRequest {
texts: vec![params.need.clone()],
model: None,
instruction: Some("Match this need to the service best able to perform it".to_string()),
is_query: true,
})
.await
.map_err(|e| format!("embed failed: {e}"))?;
let cap_embs = engine
.embed(car_inference::EmbedRequest {
texts: services.iter().map(|s| s.capability_text.clone()).collect(),
model: None,
instruction: None,
is_query: false,
})
.await
.map_err(|e| format!("embed failed: {e}"))?;
drop(_permit);
let need_emb = need_embs
.first()
.ok_or_else(|| "embedder returned no vectors".to_string())?;
if cap_embs.len() != services.len() {
return Err(format!(
"embedder returned {} vectors for {} services",
cap_embs.len(),
services.len()
));
}
let routing = state
.routing()
.map(|s| s.snapshot())
.unwrap_or_default();
let mut ranked: Vec<(usize, f32, f32)> = cap_embs
.iter()
.enumerate()
.map(|(i, e)| {
let (score, similarity) = score_service(&services[i], need_emb, e, &routing);
(i, score, similarity)
})
.collect();
ranked.sort_by(|a, b| {
b.1.total_cmp(&a.1)
.then_with(|| services[a.0].identifier.cmp(&services[b.0].identifier))
});
let out: Vec<Value> = ranked
.iter()
.take(limit)
.map(|(i, score, similarity)| {
let s = &services[*i];
json!({
"identifier": s.identifier,
"name": s.name,
"kind": s.kind,
"protocol": s.protocol,
"score": score,
"similarity": similarity,
})
})
.collect();
Ok(json!({ "count": out.len(), "services": out }))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coder::native_loop::TurnGenerator;
use async_trait::async_trait;
use car_inference::{GenerateRequest, InferenceResult};
use std::sync::atomic::{AtomicUsize, Ordering};
fn spec(id: &str, identity: &str, tools: &[&str]) -> car_registry::declarative::DeclarativeAgentSpec {
car_registry::declarative::DeclarativeAgentSpec {
id: id.to_string(),
name: id.to_string(),
identity: identity.to_string(),
tools: tools.iter().map(|t| t.to_string()).collect(),
denied_tools: vec![],
standing_goal: String::new(),
scenarios: vec![],
enabled: true,
}
}
#[test]
fn cosine_is_one_for_identical_and_zero_for_orthogonal() {
let a = [1.0, 2.0, 3.0];
assert!((cosine(&a, &a) - 1.0).abs() < 1e-6);
assert!((cosine(&[1.0, 0.0], &[0.0, 1.0])).abs() < 1e-6);
}
#[test]
fn cosine_zero_norm_is_zero_not_nan() {
let z = cosine(&[0.0, 0.0], &[1.0, 2.0]);
assert_eq!(z, 0.0);
assert!(!z.is_nan());
}
#[test]
fn capability_text_includes_identity_goal_and_tools() {
let mut s = spec("billing", "Handles invoices.", &["fetch", "parse"]);
s.standing_goal = "Keep ledgers reconciled".to_string();
let text = capability_text(&s);
assert!(text.contains("Handles invoices."));
assert!(text.contains("Keep ledgers reconciled"));
assert!(text.contains("fetch, parse"));
}
#[test]
fn blended_score_keeps_similarity_dominant() {
let strong_unproven = blended_score(0.9, 0.5);
let weak_proven = blended_score(0.2, 1.0);
assert!(strong_unproven > weak_proven);
}
#[test]
fn blended_score_prior_breaks_ties() {
assert!(blended_score(0.8, 1.0) > blended_score(0.8, 0.5));
}
#[test]
fn blended_score_clamps_negative_similarity() {
let s = blended_score(-0.5, 0.5);
assert!((s - (1.0 - ROUTE_SIMILARITY_WEIGHT) * 0.5).abs() < 1e-6);
}
fn run(turns: u32, output: &str, error: Option<&str>) -> super::super::declarative::AgentRunResult {
super::super::declarative::AgentRunResult {
output: output.to_string(),
turns,
tool_calls: 0,
error: error.map(|s| s.to_string()),
}
}
#[test]
fn infra_noise_runs_are_not_recorded() {
assert!(!run_is_recordable(&run(0, "", Some("model load failed"))));
assert!(run_is_recordable(&run(3, "", Some("gave up"))));
assert!(run_is_recordable(&run(2, "done", None)));
}
#[test]
fn run_succeeded_requires_no_error_and_nonempty_output() {
assert!(run_succeeded(&run(2, "hello", None)));
assert!(!run_succeeded(&run(2, " ", None))); assert!(!run_succeeded(&run(2, "hello", Some("boom"))));
}
fn svc(kind: &'static str, agent_id: Option<&str>) -> DiscoveredService {
DiscoveredService {
identifier: format!("agentdns://x/{kind}/y"),
name: "y".into(),
kind: kind.to_string(),
protocol: "p".to_string(),
capability_text: "y".into(),
agent_id: agent_id.map(|s| s.to_string()),
}
}
#[test]
fn external_capability_text_lists_enabled_features() {
let spec = car_external_agents::ExternalAgentSpec {
id: "claude-code".into(),
display_name: "Claude Code".into(),
binary_path: "/usr/local/bin/claude".into(),
version: None,
auth_kind: Default::default(),
capabilities: car_external_agents::Capabilities {
tool_use: true,
mcp: true,
hooks: false,
sessions: true,
streaming: false,
},
detected_at: 0,
health: None,
};
let text = external_capability_text(&spec);
assert!(text.contains("Claude Code"));
assert!(text.contains("tool use, MCP, sessions")); assert!(!text.contains("hooks"));
}
#[test]
fn bearer_only_to_trusted_parslee_https_host() {
assert!(root_host_is_trusted("https://api.parslee.ai/agentdns/resolve"));
assert!(!root_host_is_trusted("http://api.parslee.ai"));
assert!(!root_host_is_trusted("https://attacker.example/agentdns/resolve"));
assert!(!root_host_is_trusted("not a url"));
}
#[test]
fn truncate_chars_is_char_boundary_safe() {
assert_eq!(truncate_chars("hello", 3), "hel");
assert_eq!(truncate_chars("hello", 10), "hello");
assert_eq!(truncate_chars("héllo", 2), "hé");
}
#[test]
fn score_service_uses_neutral_prior_for_non_declarative() {
let routing = car_registry::routing::RoutingSnapshot::default();
let s = svc("connector", None);
let (score, sim) = score_service(&s, &[1.0, 0.0], &[1.0, 0.0], &routing);
assert!((sim - 1.0).abs() < 1e-6);
assert!((score - 0.85).abs() < 1e-6);
}
#[test]
fn score_service_blends_learning_for_proven_declarative() {
let mut routing = car_registry::routing::RoutingSnapshot::default();
routing.agents.insert(
"a".into(),
car_registry::routing::AgentStats {
successes: 4,
failures: 0,
ema_success_rate: 1.0,
learned_vector: vec![],
},
);
let s = svc("declarative", Some("a"));
let (score, _) = score_service(&s, &[1.0, 0.0], &[1.0, 0.0], &routing);
assert!((score - 1.0).abs() < 1e-6);
}
#[test]
fn parse_subtasks_extracts_clean_list() {
let raw = r#"{"subtasks": ["book flight", " reserve hotel ", "", "rent car"]}"#;
let subs = parse_subtasks(raw, "trip", 5);
assert_eq!(subs, vec!["book flight", "reserve hotel", "rent car"]); }
#[test]
fn parse_subtasks_caps_at_max() {
let raw = r#"{"subtasks": ["a","b","c","d"]}"#;
assert_eq!(parse_subtasks(raw, "x", 2), vec!["a", "b"]);
}
#[test]
fn parse_subtasks_falls_back_to_need() {
assert_eq!(parse_subtasks("not json", "do it", 5), vec!["do it"]);
assert_eq!(parse_subtasks(r#"{"other": []}"#, "do it", 5), vec!["do it"]);
assert_eq!(parse_subtasks(r#"{"subtasks": [" "]}"#, "do it", 5), vec!["do it"]);
}
#[test]
fn blended_similarity_falls_back_to_coldstart_without_centroid() {
assert_eq!(blended_similarity(0.6, None), 0.6);
let b = blended_similarity(0.5, Some(1.0));
assert!((b - (0.6 * 0.5 + 0.4 * 1.0)).abs() < 1e-6);
}
#[test]
fn route_score_edge_boost_promotes_forward_target() {
let plain = route_score(0.6, 0.5, 0.0);
let forwarded = route_score(0.6, 0.5, 0.9);
assert!(forwarded > plain);
}
#[test]
fn excludes_delegator_and_visited_path() {
let visited = vec!["a".to_string(), "b".to_string()];
assert!(is_excluded("self", Some("self"), &[])); assert!(is_excluded("a", None, &visited)); assert!(is_excluded("b", Some("self"), &visited));
assert!(!is_excluded("c", Some("self"), &visited)); }
#[test]
fn ranking_prefers_higher_cosine() {
let need = [1.0_f32, 0.0];
let agent_embs = [[0.9_f32, 0.1], [0.0, 1.0]];
let mut ranked: Vec<(usize, f32)> = agent_embs
.iter()
.enumerate()
.map(|(i, e)| (i, cosine(&need, e)))
.collect();
ranked.sort_by(|a, b| b.1.total_cmp(&a.1));
assert_eq!(ranked[0].0, 0);
}
struct Script {
turns: Vec<InferenceResult>,
cursor: AtomicUsize,
}
fn turn(text: &str, tool_calls: Value) -> InferenceResult {
serde_json::from_value(json!({
"text": text,
"tool_calls": tool_calls,
"trace_id": "t",
"model_used": "scripted",
"latency_ms": 0,
}))
.expect("scripted InferenceResult shape")
}
#[async_trait]
impl TurnGenerator for Script {
async fn generate(&self, _req: GenerateRequest) -> Result<InferenceResult, String> {
let i = self.cursor.fetch_add(1, Ordering::SeqCst);
self.turns
.get(i)
.cloned()
.ok_or_else(|| "script exhausted".to_string())
}
}
fn init_repo(dir: &Path) {
for args in [
vec!["init", "-q", "-b", "main"],
vec!["-c", "user.name=t", "-c", "user.email=t@t", "commit", "-q", "--allow-empty", "-m", "init"],
] {
let out = std::process::Command::new("git")
.arg("-C")
.arg(dir)
.args(&args)
.output()
.unwrap();
assert!(out.status.success(), "{}", String::from_utf8_lossy(&out.stderr));
}
}
#[tokio::test]
async fn e2e_start_confirm_run_approve() {
let repo_dir = tempfile::tempdir().unwrap();
init_repo(repo_dir.path());
let state_dir = tempfile::tempdir().unwrap();
let journal = tempfile::tempdir().unwrap();
let state = Arc::new(ServerState::standalone(journal.path().to_path_buf()));
let script: Arc<dyn TurnGenerator> = Arc::new(Script {
turns: vec![
turn(
r#"{"description": "x.txt contains hello",
"checks": [{"name": "content", "command": "grep -q hello x.txt"}]}"#,
json!([]),
),
turn(
"",
json!([{
"id": "c1", "name": "write_file",
"arguments": {"path": "x.txt", "content": "hello from the coder"}
}]),
),
turn("done", json!([])),
],
cursor: AtomicUsize::new(0),
});
let response = start_session(
&state,
StartArgs {
repo: repo_dir.path().to_path_buf(),
intent: "create x.txt containing hello".into(),
engine: EngineChoice::Native,
max_iterations: Some(4),
state_dir: state_dir.path().to_path_buf(),
project: None,
},
script,
)
.await
.unwrap();
let session_id = response["session_id"].as_str().unwrap().to_string();
assert_eq!(response["state"], "contract_proposed");
assert_eq!(response["contract"]["checks"][0]["name"], "content");
confirm_session(&state, &session_id, None).await.unwrap();
let entry = get_entry(&state, &session_id).await.unwrap();
let handle = entry.task.lock().unwrap().take().unwrap();
handle.await.unwrap();
{
let session = entry.session.lock().await;
assert_eq!(session.state, CoderState::NeedsApproval, "error: {:?}", session.error);
assert!(session.last_check_results.iter().all(|r| r.passed));
}
let events = entry.events.lock().await;
let has = |pred: &dyn Fn(&CoderEventKind) -> bool| events.iter().any(|e| pred(&e.kind));
assert!(has(&|k| matches!(k, CoderEventKind::EngineSelected { .. })));
assert!(has(&|k| matches!(k, CoderEventKind::ContractProposed { .. })));
assert!(has(&|k| matches!(k, CoderEventKind::ToolCall { tool, .. } if tool == "write_file")));
assert!(has(&|k| matches!(k, CoderEventKind::CheckCompleted { result } if result.passed)));
assert!(has(&|k| matches!(k, CoderEventKind::DiffReady { stat, .. } if stat.contains("x.txt"))));
drop(events);
let merged = approve_merge_session(&state, &session_id, true).await.unwrap();
assert_eq!(merged["state"], "merged");
let branch = merged["branch"].as_str().unwrap();
let show = std::process::Command::new("git")
.arg("-C")
.arg(repo_dir.path())
.args(["show", &format!("{branch}:x.txt")])
.output()
.unwrap();
assert!(show.status.success());
assert_eq!(String::from_utf8_lossy(&show.stdout), "hello from the coder");
let status = std::process::Command::new("git")
.arg("-C")
.arg(repo_dir.path())
.args(["status", "--porcelain"])
.output()
.unwrap();
assert!(status.stdout.is_empty(), "user checkout dirtied");
assert!(!repo_dir.path().join("x.txt").exists());
}
#[tokio::test]
async fn project_session_commits_to_main_no_branch() {
let projects_dir = tempfile::tempdir().unwrap();
let state_dir = tempfile::tempdir().unwrap();
let journal = tempfile::tempdir().unwrap();
let _guard = crate::coder::project::projects_env_lock()
.lock()
.unwrap_or_else(|e| e.into_inner());
let prev = std::env::var_os("CAR_PROJECTS_DIR");
unsafe {
std::env::set_var("CAR_PROJECTS_DIR", projects_dir.path());
}
let project =
crate::coder::project::resolve_or_create_project("My App", crate::coder::project::ProjectKind::App)
.unwrap();
let state = Arc::new(ServerState::standalone(journal.path().to_path_buf()));
let script: Arc<dyn TurnGenerator> = Arc::new(Script {
turns: vec![
turn(
r#"{"description": "x.txt contains hi", "checks": [{"name": "content", "command": "grep -q hi x.txt"}]}"#,
json!([]),
),
turn(
"",
json!([{"id": "c1", "name": "write_file", "arguments": {"path": "x.txt", "content": "hi project"}}]),
),
turn("done", json!([])),
],
cursor: AtomicUsize::new(0),
});
let response = start_session(
&state,
StartArgs {
repo: project.repo_path.clone(),
intent: "create x.txt containing hi".into(),
engine: EngineChoice::Native,
max_iterations: Some(4),
state_dir: state_dir.path().to_path_buf(),
project: Some((project.slug.clone(), project.kind)),
},
script,
)
.await
.unwrap();
let session_id = response["session_id"].as_str().unwrap().to_string();
confirm_session(&state, &session_id, None).await.unwrap();
let entry = get_entry(&state, &session_id).await.unwrap();
entry.task.lock().unwrap().take().unwrap().await.unwrap();
let merged = approve_merge_session(&state, &session_id, true).await.unwrap();
assert_eq!(merged["state"], "merged");
assert_eq!(merged["branch"], "main", "project sessions deliver to main");
let git = |args: &[&str]| {
std::process::Command::new("git")
.arg("-C")
.arg(&project.repo_path)
.args(args)
.output()
.unwrap()
};
let show = git(&["show", "main:x.txt"]);
assert!(show.status.success());
assert_eq!(String::from_utf8_lossy(&show.stdout), "hi project");
assert_eq!(project.repo_path.join("x.txt").exists(), true, "lands in the checkout");
let branches = git(&["branch", "--list", "car/coder/*"]);
assert!(branches.stdout.is_empty(), "no coder branch for a project session");
unsafe {
match prev {
Some(v) => std::env::set_var("CAR_PROJECTS_DIR", v),
None => std::env::remove_var("CAR_PROJECTS_DIR"),
}
}
}
#[tokio::test]
async fn e2e_agent_project_builds_registers_and_invokes() {
let projects_dir = tempfile::tempdir().unwrap();
let declagents = tempfile::tempdir().unwrap();
let state_dir = tempfile::tempdir().unwrap();
let journal = tempfile::tempdir().unwrap();
let _guard = crate::coder::project::projects_env_lock()
.lock()
.unwrap_or_else(|e| e.into_inner());
let prev_proj = std::env::var_os("CAR_PROJECTS_DIR");
let prev_decl = std::env::var_os("CAR_DECLAGENTS_PATH");
unsafe {
std::env::set_var("CAR_PROJECTS_DIR", projects_dir.path());
std::env::set_var("CAR_DECLAGENTS_PATH", declagents.path().join("declagents.json"));
}
let project = crate::coder::project::resolve_or_create_project(
"Greeter Bot",
crate::coder::project::ProjectKind::Agent,
)
.unwrap();
let state = Arc::new(ServerState::standalone(journal.path().to_path_buf()));
let script: Arc<dyn TurnGenerator> = Arc::new(Script {
turns: vec![
turn(
r#"{"name":"Greeter","identity":"You greet warmly.","tools":[],
"standing_goal":"greet","scenarios":[{"input":"hi","expect":"hello"}]}"#,
json!([]),
),
turn("hello, friend!", json!([])),
],
cursor: AtomicUsize::new(0),
});
let response = start_session(
&state,
StartArgs {
repo: project.repo_path.clone(),
intent: "a friendly greeter".into(),
engine: EngineChoice::Native,
max_iterations: Some(3),
state_dir: state_dir.path().to_path_buf(),
project: Some((project.slug.clone(), project.kind)),
},
script,
)
.await
.unwrap();
let session_id = response["session_id"].as_str().unwrap().to_string();
assert_eq!(response["contract"]["checks"][0]["name"], "agent_scenarios_pass");
confirm_session(&state, &session_id, None).await.unwrap();
let entry = get_entry(&state, &session_id).await.unwrap();
entry.task.lock().unwrap().take().unwrap().await.unwrap();
{
let session = entry.session.lock().await;
assert_eq!(session.state, CoderState::NeedsApproval, "error: {:?}", session.error);
assert!(session.built_agent.is_some(), "spec stashed for registration");
}
let merged = approve_merge_session(&state, &session_id, true).await.unwrap();
assert_eq!(merged["state"], "merged");
assert_eq!(merged["branch"], "main");
assert_eq!(merged["agent_id"].as_str().unwrap(), project.slug);
let reg = state.declagents().unwrap();
let registered = reg.get(&project.slug).unwrap();
assert_eq!(registered.name, "Greeter");
assert_eq!(registered.scenarios.len(), 1);
let show = std::process::Command::new("git")
.arg("-C")
.arg(&project.repo_path)
.args(["show", "main:agent.json"])
.output()
.unwrap();
assert!(show.status.success(), "agent.json on main");
let invoke_script: Arc<dyn TurnGenerator> =
Arc::new(Script { turns: vec![turn("hello again!", json!([]))], cursor: AtomicUsize::new(0) });
let exec_dir = tempfile::tempdir().unwrap();
let exec = WorktreeExecutor::new(exec_dir.path());
let runner = crate::coder::declarative::DeclarativeAgentRunner::new(
®istered,
invoke_script.as_ref(),
&exec,
);
let run = runner.run("hi there").await;
assert!(run.output.contains("hello"), "agent runs in-daemon: {run:?}");
unsafe {
match prev_proj {
Some(v) => std::env::set_var("CAR_PROJECTS_DIR", v),
None => std::env::remove_var("CAR_PROJECTS_DIR"),
}
match prev_decl {
Some(v) => std::env::set_var("CAR_DECLAGENTS_PATH", v),
None => std::env::remove_var("CAR_DECLAGENTS_PATH"),
}
}
}
#[tokio::test]
async fn failing_contract_ends_in_failed_with_results() {
let repo_dir = tempfile::tempdir().unwrap();
init_repo(repo_dir.path());
let state_dir = tempfile::tempdir().unwrap();
let journal = tempfile::tempdir().unwrap();
let state = Arc::new(ServerState::standalone(journal.path().to_path_buf()));
let script: Arc<dyn TurnGenerator> = Arc::new(Script {
turns: vec![
turn(
r#"{"description": "impossible", "checks": [{"name": "missing", "command": "test -f never.txt"}]}"#,
json!([]),
),
turn("i did nothing", json!([])),
turn("still nothing", json!([])),
],
cursor: AtomicUsize::new(0),
});
let response = start_session(
&state,
StartArgs {
repo: repo_dir.path().to_path_buf(),
intent: "impossible task".into(),
engine: EngineChoice::Native,
max_iterations: Some(2),
state_dir: state_dir.path().to_path_buf(),
project: None,
},
script,
)
.await
.unwrap();
let session_id = response["session_id"].as_str().unwrap().to_string();
confirm_session(&state, &session_id, None).await.unwrap();
let entry = get_entry(&state, &session_id).await.unwrap();
let handle = entry.task.lock().unwrap().take().unwrap();
handle.await.unwrap();
let session = entry.session.lock().await;
assert_eq!(session.state, CoderState::Failed);
assert!(session.error.as_deref().unwrap().contains("not satisfied"));
assert!(!session.last_check_results[0].passed);
drop(session);
let err = approve_merge_session(&state, &session_id, true).await.unwrap_err();
assert!(err.contains("expected needs_approval"), "{err}");
}
#[tokio::test]
async fn confirm_with_edited_contract_replaces_proposal() {
let repo_dir = tempfile::tempdir().unwrap();
init_repo(repo_dir.path());
let state_dir = tempfile::tempdir().unwrap();
let journal = tempfile::tempdir().unwrap();
let state = Arc::new(ServerState::standalone(journal.path().to_path_buf()));
let script: Arc<dyn TurnGenerator> = Arc::new(Script {
turns: vec![
turn(
r#"{"description": "original", "checks": [{"name": "a", "command": "true"}]}"#,
json!([]),
),
turn("done", json!([])),
],
cursor: AtomicUsize::new(0),
});
let response = start_session(
&state,
StartArgs {
repo: repo_dir.path().to_path_buf(),
intent: "x".into(),
engine: EngineChoice::Native,
max_iterations: Some(2),
state_dir: state_dir.path().to_path_buf(),
project: None,
},
script,
)
.await
.unwrap();
let session_id = response["session_id"].as_str().unwrap().to_string();
let edited = OutcomeContract {
description: "edited".into(),
checks: vec![crate::coder::contract::ContractCheck {
name: "edited_check".into(),
command: "true".into(),
expect_exit_zero: true,
output_contains: None,
timeout_secs: 10,
}],
};
confirm_session(&state, &session_id, Some(edited)).await.unwrap();
let entry = get_entry(&state, &session_id).await.unwrap();
let handle = entry.task.lock().unwrap().take().unwrap();
handle.await.unwrap();
let session = entry.session.lock().await;
assert_eq!(session.contract.as_ref().unwrap().description, "edited");
assert_eq!(session.state, CoderState::NeedsApproval);
}
#[tokio::test]
async fn cancel_mid_run_abandons_session() {
let repo_dir = tempfile::tempdir().unwrap();
init_repo(repo_dir.path());
let state_dir = tempfile::tempdir().unwrap();
let journal = tempfile::tempdir().unwrap();
let state = Arc::new(ServerState::standalone(journal.path().to_path_buf()));
let script: Arc<dyn TurnGenerator> = Arc::new(Script {
turns: vec![
turn(
r#"{"description": "slow", "checks": [{"name": "n", "command": "test -f done.txt"}]}"#,
json!([]),
),
turn(
"",
json!([{
"id": "c1", "name": "shell",
"arguments": {"command": "sleep 20", "timeout_secs": 30}
}]),
),
turn("done", json!([])),
],
cursor: AtomicUsize::new(0),
});
let response = start_session(
&state,
StartArgs {
repo: repo_dir.path().to_path_buf(),
intent: "slow".into(),
engine: EngineChoice::Native,
max_iterations: Some(2),
state_dir: state_dir.path().to_path_buf(),
project: None,
},
script,
)
.await
.unwrap();
let session_id = response["session_id"].as_str().unwrap().to_string();
confirm_session(&state, &session_id, None).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
let started = std::time::Instant::now();
let result = cancel_session(&state, &session_id).await.unwrap();
assert_eq!(result["state"], "abandoned");
assert!(started.elapsed() < std::time::Duration::from_secs(5));
let entry = get_entry(&state, &session_id).await.unwrap();
let session = entry.session.lock().await;
assert!(session.workspace.is_none());
}
#[tokio::test]
async fn respond_fulfills_a_pending_ask_user_request() {
use car_inference::tasks::generate::Message;
let repo_dir = tempfile::tempdir().unwrap();
init_repo(repo_dir.path());
let state_dir = tempfile::tempdir().unwrap();
let journal = tempfile::tempdir().unwrap();
let state = Arc::new(ServerState::standalone(journal.path().to_path_buf()));
struct AskGen {
cursor: AtomicUsize,
}
#[async_trait]
impl TurnGenerator for AskGen {
async fn generate(&self, req: GenerateRequest) -> Result<InferenceResult, String> {
let i = self.cursor.fetch_add(1, Ordering::SeqCst);
Ok(match i {
0 => turn(
r#"{"description": "ans.txt records the answer",
"checks": [{"name": "c", "command": "grep -q FORTY-TWO ans.txt"}]}"#,
json!([]),
),
1 => turn(
"",
json!([{"id": "a1", "name": "ask_user",
"arguments": {"prompt": "what is the answer?"}}]),
),
2 => {
let answer = req
.messages
.as_ref()
.and_then(|ms| {
ms.iter().rev().find_map(|m| match m {
Message::ToolResult { content, .. } => Some(content.clone()),
_ => None,
})
})
.unwrap_or_default();
turn(
"",
json!([{"id": "w1", "name": "write_file",
"arguments": {"path": "ans.txt", "content": answer}}]),
)
}
_ => turn("done", json!([])),
})
}
}
let response = start_session(
&state,
StartArgs {
repo: repo_dir.path().to_path_buf(),
intent: "record the user's answer".into(),
engine: EngineChoice::Native,
max_iterations: Some(4),
state_dir: state_dir.path().to_path_buf(),
project: None,
},
Arc::new(AskGen { cursor: AtomicUsize::new(0) }),
)
.await
.unwrap();
let session_id = response["session_id"].as_str().unwrap().to_string();
confirm_session(&state, &session_id, None).await.unwrap();
let entry = get_entry(&state, &session_id).await.unwrap();
{
let state = state.clone();
let sid = session_id.clone();
let gate = entry.user_input.clone();
tokio::spawn(async move {
for _ in 0..200 {
if gate.is_pending() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
}
let req: JsonRpcMessage = serde_json::from_value(json!({
"jsonrpc": "2.0", "id": 1, "method": "coder.respond",
"params": {"session_id": sid, "text": "FORTY-TWO"},
}))
.unwrap();
handle_coder_respond(&req, &state).await.unwrap();
});
}
let handle = entry.task.lock().unwrap().take().unwrap();
handle.await.unwrap();
let session = entry.session.lock().await;
assert_eq!(session.state, CoderState::NeedsApproval, "error: {:?}", session.error);
drop(session);
assert!(entry.events.lock().await.iter().any(|e| matches!(
&e.kind,
CoderEventKind::UserInputRequested { prompt } if prompt == "what is the answer?"
)));
}
#[tokio::test]
async fn respond_errors_when_no_request_pending() {
let repo_dir = tempfile::tempdir().unwrap();
init_repo(repo_dir.path());
let state_dir = tempfile::tempdir().unwrap();
let journal = tempfile::tempdir().unwrap();
let state = Arc::new(ServerState::standalone(journal.path().to_path_buf()));
let script: Arc<dyn TurnGenerator> = Arc::new(Script {
turns: vec![turn(
r#"{"description": "x", "checks": [{"name": "a", "command": "true"}]}"#,
json!([]),
)],
cursor: AtomicUsize::new(0),
});
let response = start_session(
&state,
StartArgs {
repo: repo_dir.path().to_path_buf(),
intent: "x".into(),
engine: EngineChoice::Native,
max_iterations: Some(1),
state_dir: state_dir.path().to_path_buf(),
project: None,
},
script,
)
.await
.unwrap();
let session_id = response["session_id"].as_str().unwrap().to_string();
let req: JsonRpcMessage = serde_json::from_value(json!({
"jsonrpc": "2.0", "id": 1, "method": "coder.respond",
"params": {"session_id": session_id, "text": "unexpected"},
}))
.unwrap();
let err = handle_coder_respond(&req, &state).await.unwrap_err();
assert!(err.contains("no pending user-input request"), "{err}");
}
#[tokio::test]
async fn cancel_unblocks_a_waiting_ask_user_request() {
let repo_dir = tempfile::tempdir().unwrap();
init_repo(repo_dir.path());
let state_dir = tempfile::tempdir().unwrap();
let journal = tempfile::tempdir().unwrap();
let state = Arc::new(ServerState::standalone(journal.path().to_path_buf()));
let script: Arc<dyn TurnGenerator> = Arc::new(Script {
turns: vec![
turn(
r#"{"description": "blocks", "checks": [{"name": "n", "command": "test -f done.txt"}]}"#,
json!([]),
),
turn(
"",
json!([{"id": "a1", "name": "ask_user",
"arguments": {"prompt": "blocking question"}}]),
),
],
cursor: AtomicUsize::new(0),
});
let response = start_session(
&state,
StartArgs {
repo: repo_dir.path().to_path_buf(),
intent: "blocks".into(),
engine: EngineChoice::Native,
max_iterations: Some(2),
state_dir: state_dir.path().to_path_buf(),
project: None,
},
script,
)
.await
.unwrap();
let session_id = response["session_id"].as_str().unwrap().to_string();
confirm_session(&state, &session_id, None).await.unwrap();
let entry = get_entry(&state, &session_id).await.unwrap();
for _ in 0..200 {
if entry.user_input.is_pending() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
}
assert!(entry.user_input.is_pending(), "ask_user should have parked");
let started = std::time::Instant::now();
let result = cancel_session(&state, &session_id).await.unwrap();
assert_eq!(result["state"], "abandoned");
assert!(started.elapsed() < std::time::Duration::from_secs(5));
let session = entry.session.lock().await;
assert_eq!(session.state, CoderState::Abandoned);
}
fn config_env_lock() -> &'static std::sync::Mutex<()> {
static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
LOCK.get_or_init(|| std::sync::Mutex::new(()))
}
#[tokio::test]
async fn coder_toml_keep_on_failure_and_default_iterations_take_effect() {
let _guard = config_env_lock().lock().unwrap();
let repo_dir = tempfile::tempdir().unwrap();
init_repo(repo_dir.path());
let state_dir = tempfile::tempdir().unwrap();
let journal = tempfile::tempdir().unwrap();
let cfg_dir = tempfile::tempdir().unwrap();
let cfg_path = cfg_dir.path().join("coder.toml");
std::fs::write(
&cfg_path,
"[coder]\nkeep_workspace_on_failure = true\ndefault_max_iterations = 3\n",
)
.unwrap();
std::env::set_var("CAR_CODER_CONFIG", &cfg_path);
let state = Arc::new(ServerState::standalone(journal.path().to_path_buf()));
let script: Arc<dyn TurnGenerator> = Arc::new(Script {
turns: vec![
turn(
r#"{"description": "impossible", "checks": [{"name": "missing", "command": "test -f never.txt"}]}"#,
json!([]),
),
turn("nothing", json!([])),
turn("still nothing", json!([])),
turn("nope", json!([])),
],
cursor: AtomicUsize::new(0),
});
assert_eq!(
CoderConfig::load().default_max_iterations,
3,
"config default_max_iterations should load"
);
let response = start_session(
&state,
StartArgs {
repo: repo_dir.path().to_path_buf(),
intent: "impossible task".into(),
engine: EngineChoice::Native,
max_iterations: None,
state_dir: state_dir.path().to_path_buf(),
project: None,
},
script,
)
.await
.unwrap();
let session_id = response["session_id"].as_str().unwrap().to_string();
let worktree = PathBuf::from(response["worktree"].as_str().unwrap());
confirm_session(&state, &session_id, None).await.unwrap();
let entry = get_entry(&state, &session_id).await.unwrap();
let handle = entry.task.lock().unwrap().take().unwrap();
handle.await.unwrap();
let session = entry.session.lock().await;
assert_eq!(session.state, CoderState::Failed);
assert!(session.keep_workspace_on_failure);
assert_eq!(session.max_iterations, 3);
assert!(worktree.is_dir(), "worktree should survive Failed under keep flag");
assert_eq!(session.workspace_path.as_deref(), Some(worktree.as_path()));
drop(session);
assert!(entry.events.lock().await.iter().any(|e| matches!(
&e.kind,
CoderEventKind::Error { message } if message.contains("retained for postmortem")
)));
std::env::remove_var("CAR_CODER_CONFIG");
let _ = std::process::Command::new("git")
.arg("-C")
.arg(repo_dir.path())
.args(["worktree", "remove", "--force"])
.arg(&worktree)
.output();
}
#[tokio::test]
async fn missing_coder_toml_uses_defaults() {
let _guard = config_env_lock().lock().unwrap();
let repo_dir = tempfile::tempdir().unwrap();
init_repo(repo_dir.path());
let state_dir = tempfile::tempdir().unwrap();
let journal = tempfile::tempdir().unwrap();
let cfg_dir = tempfile::tempdir().unwrap();
std::env::set_var("CAR_CODER_CONFIG", cfg_dir.path().join("absent.toml"));
assert_eq!(CoderConfig::load(), CoderConfig::default());
let state = Arc::new(ServerState::standalone(journal.path().to_path_buf()));
let script: Arc<dyn TurnGenerator> = Arc::new(Script {
turns: vec![
turn(
r#"{"description": "impossible", "checks": [{"name": "missing", "command": "test -f never.txt"}]}"#,
json!([]),
),
turn("nothing", json!([])),
turn("still nothing", json!([])),
],
cursor: AtomicUsize::new(0),
});
let response = start_session(
&state,
StartArgs {
repo: repo_dir.path().to_path_buf(),
intent: "impossible".into(),
engine: EngineChoice::Native,
max_iterations: Some(2),
state_dir: state_dir.path().to_path_buf(),
project: None,
},
script,
)
.await
.unwrap();
let session_id = response["session_id"].as_str().unwrap().to_string();
let worktree = PathBuf::from(response["worktree"].as_str().unwrap());
confirm_session(&state, &session_id, None).await.unwrap();
let entry = get_entry(&state, &session_id).await.unwrap();
let handle = entry.task.lock().unwrap().take().unwrap();
handle.await.unwrap();
let session = entry.session.lock().await;
assert_eq!(session.state, CoderState::Failed);
assert!(!session.keep_workspace_on_failure, "default is not to keep");
assert!(!worktree.exists(), "worktree should be reaped under defaults");
std::env::remove_var("CAR_CODER_CONFIG");
}
#[test]
fn summarize_repo_reports_entries_and_build_system() {
let dir = tempfile::tempdir().unwrap();
std::fs::write(dir.path().join("Cargo.toml"), "[package]").unwrap();
std::fs::write(dir.path().join("main.rs"), "").unwrap();
let s = summarize_repo(dir.path());
assert!(s.contains("Cargo.toml"));
assert!(s.contains("Rust (cargo)"));
}
}