use std::sync::atomic::Ordering;
use async_trait::async_trait;
use car_engine::ToolExecutor;
use car_inference::tasks::generate::Message;
use car_inference::{GenerateParams, GenerateRequest, InferenceEngine, InferenceResult};
use serde_json::Value;
use super::contract::{evaluate_contract, CheckResult, OutcomeContract};
use super::session::{CancelFlag, CoderEventKind, EventSink};
use super::shell_tool::WorktreeExecutor;
use super::skill_memory::{FailureSignature, RepairMemory};
#[async_trait]
pub trait TurnGenerator: Send + Sync {
async fn generate(&self, req: GenerateRequest) -> Result<InferenceResult, String>;
}
#[async_trait]
impl TurnGenerator for InferenceEngine {
async fn generate(&self, req: GenerateRequest) -> Result<InferenceResult, String> {
self.generate_tracked(req).await.map_err(|e| e.to_string())
}
}
#[async_trait]
pub trait AskUser: Send + Sync {
async fn ask(&self, prompt: &str) -> Result<String, String>;
}
pub const ASK_USER_TOOL: &str = "ask_user";
fn ask_user_tool_def() -> Value {
serde_json::json!({
"name": ASK_USER_TOOL,
"description": "Ask the human user a question and wait for their reply. \
Use ONLY when you genuinely cannot proceed without a \
decision or missing fact the user alone can supply (an \
ambiguous requirement, a destructive choice, a missing \
credential). Do not use it for things you can determine \
by reading the repo or running commands. The call blocks \
until the user answers or a timeout elapses; on timeout \
you receive an error and should proceed with your best \
judgment.",
"parameters": {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "The question to show the user, phrased so a short reply answers it."
}
},
"required": ["prompt"]
}
})
}
#[derive(Debug, Clone)]
pub struct NativeLoopConfig {
pub model: Option<String>,
pub max_iterations: u32,
pub max_turns_per_iteration: u32,
pub max_tokens_per_turn: usize,
}
impl Default for NativeLoopConfig {
fn default() -> Self {
Self {
model: None,
max_iterations: 8,
max_turns_per_iteration: 24,
max_tokens_per_turn: 4096,
}
}
}
#[derive(Debug, Clone)]
pub struct LoopOutcome {
pub passed: bool,
pub iterations: u32,
pub last_results: Vec<CheckResult>,
pub error: Option<String>,
}
fn preview(s: &str, max: usize) -> String {
if s.len() <= max {
return s.to_string();
}
let mut end = max;
while !s.is_char_boundary(end) {
end -= 1;
}
format!("{}…", &s[..end])
}
fn system_prompt(contract: &OutcomeContract) -> String {
format!(
"You are CAR Coder, an autonomous coding agent working in an isolated git worktree \
of the user's repository. The worktree root is your working directory; all relative \
paths resolve against it.\n\n\
Rules:\n\
- Inspect before you edit: read the relevant files first.\n\
- Use the shell tool for builds and tests. Policy denies git push, sudo, and \
destructive operations outside the worktree — do not attempt them.\n\
- Do not git commit; the runtime handles version control.\n\
- When you believe the work is complete, reply with a brief plain-text summary and \
STOP calling tools. The runtime then verifies the outcome contract itself; if \
checks fail you will be re-invoked with their output.\n\n\
OUTCOME CONTRACT (the runtime runs these to decide done):\n{}",
contract.render()
)
}
fn failure_feedback(results: &[CheckResult]) -> String {
let mut msg = String::from(
"The outcome contract was evaluated and some checks FAILED. Fix the code so they pass.\n\n",
);
for r in results.iter().filter(|r| !r.passed) {
msg.push_str(&format!(
"FAILED {} (exit {:?}):\n{}\n\n",
r.name, r.exit_code, r.output_tail
));
}
msg
}
fn primary_failure(results: &[CheckResult]) -> Option<FailureSignature> {
results
.iter()
.find(|r| !r.passed)
.map(FailureSignature::from_check)
}
fn append_recall_hint(prompt: &mut String, hint: &str) {
prompt.push_str(
"\nHINT — a prior session resolved this same failure signature with this approach; \
use it as a lead, verify it still applies:\n",
);
prompt.push_str(hint);
prompt.push('\n');
}
fn winning_approach(sig: &FailureSignature, plan_text: &str) -> String {
let plan = plan_text.trim();
if plan.is_empty() {
format!(
"Re-attempted the edit; the '{}' failure of check '{}' cleared after repair.",
sig.error_class, sig.check
)
} else {
preview(plan, 1024)
}
}
#[allow(clippy::too_many_arguments)]
pub async fn run_native_loop(
inference: &dyn TurnGenerator,
executor: &WorktreeExecutor,
intent: &str,
contract: &OutcomeContract,
sink: &EventSink,
cancel: &CancelFlag,
cfg: &NativeLoopConfig,
memory: &RepairMemory,
ask: Option<&dyn AskUser>,
) -> LoopOutcome {
let mut tools = WorktreeExecutor::tool_defs();
if ask.is_some() {
tools.push(ask_user_tool_def());
}
let system = system_prompt(contract);
let mut feedback: Option<String> = None;
let mut last_results: Vec<CheckResult> = Vec::new();
let mut consecutive_inference_failures = 0u32;
let mut prior_sig: Option<FailureSignature> = None;
for iteration in 1..=cfg.max_iterations {
if cancel.load(Ordering::SeqCst) {
return LoopOutcome {
passed: false,
iterations: iteration - 1,
last_results,
error: Some("cancelled".into()),
};
}
sink.emit(CoderEventKind::IterationStarted {
n: iteration,
max: cfg.max_iterations,
});
let mut user = format!("Task:\n{intent}\n");
if let Some(fb) = &feedback {
user.push_str("\n");
user.push_str(fb);
}
if let Some(sig) = &prior_sig {
if let Some(hint) = memory.recall(sig).await {
append_recall_hint(&mut user, &hint);
}
}
let mut messages = vec![
Message::System {
content: system.clone(),
},
Message::User { content: user },
];
let mut closing_plan = String::new();
let mut turn = 0;
while turn < cfg.max_turns_per_iteration {
turn += 1;
if cancel.load(Ordering::SeqCst) {
return LoopOutcome {
passed: false,
iterations: iteration,
last_results,
error: Some("cancelled".into()),
};
}
let req = GenerateRequest {
prompt: intent.to_string(), model: cfg.model.clone(),
params: GenerateParams {
temperature: 0.0,
max_tokens: cfg.max_tokens_per_turn,
..Default::default()
},
tools: Some(tools.clone()),
messages: Some(messages.clone()),
intent: Some(car_inference::IntentHint {
task: Some(car_inference::TaskHint::Code),
..Default::default()
}),
..Default::default()
};
let result = match inference.generate(req).await {
Ok(r) => {
consecutive_inference_failures = 0;
r
}
Err(e) => {
consecutive_inference_failures += 1;
sink.emit(CoderEventKind::Error {
message: format!("inference failed (turn {turn}): {e}"),
});
if consecutive_inference_failures >= 3 {
return LoopOutcome {
passed: false,
iterations: iteration,
last_results,
error: Some(format!("inference failed repeatedly: {e}")),
};
}
continue; }
};
if result.tool_calls.is_empty() {
if !result.text.trim().is_empty() {
closing_plan = result.text.clone();
sink.emit(CoderEventKind::PlanText {
text: result.text.clone(),
});
}
break;
}
let mut calls = result.tool_calls.clone();
for (i, call) in calls.iter_mut().enumerate() {
if call.id.is_none() {
call.id = Some(format!("call_{iteration}_{turn}_{i}"));
}
}
messages.push(Message::Assistant {
content: result.text.clone(),
tool_calls: calls.clone(),
});
for call in &calls {
let params = Value::Object(call.arguments.clone().into_iter().collect());
sink.emit(CoderEventKind::ToolCall {
tool: call.name.clone(),
params_preview: preview(¶ms.to_string(), 400),
});
let (ok, content) = if call.name == ASK_USER_TOOL {
match ask {
Some(asker) => {
let prompt = params
.get("prompt")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
match asker.ask(&prompt).await {
Ok(answer) => (true, answer),
Err(e) => (false, format!("ERROR: {e}")),
}
}
None => (
false,
"ERROR: ask_user is not available in this session".to_string(),
),
}
} else {
match executor.execute(&call.name, ¶ms).await {
Ok(v) => (true, v.to_string()),
Err(e) => (false, format!("ERROR: {e}")),
}
};
sink.emit(CoderEventKind::ToolResult {
tool: call.name.clone(),
ok,
preview: preview(&content, 400),
});
messages.push(Message::ToolResult {
tool_use_id: call.id.clone().expect("assigned above"),
content: preview(&content, 16 * 1024),
});
}
}
last_results = evaluate_contract(contract, executor, sink).await;
if last_results.iter().all(|r| r.passed) {
if let Some(sig) = &prior_sig {
memory
.record_success(sig, &winning_approach(sig, &closing_plan))
.await;
}
return LoopOutcome {
passed: true,
iterations: iteration,
last_results,
error: None,
};
}
feedback = Some(failure_feedback(&last_results));
if let Some(sig) = primary_failure(&last_results) {
memory.record_failure(&sig).await;
prior_sig = Some(sig);
} else {
prior_sig = None;
}
}
LoopOutcome {
passed: false,
iterations: cfg.max_iterations,
last_results,
error: None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coder::contract::ContractCheck;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
struct Script {
turns: Vec<InferenceResult>,
cursor: AtomicUsize,
}
fn turn(text: &str, tool_calls: serde_json::Value) -> InferenceResult {
serde_json::from_value(serde_json::json!({
"text": text,
"tool_calls": tool_calls,
"trace_id": "t",
"model_used": "scripted",
"latency_ms": 0,
}))
.expect("scripted InferenceResult shape")
}
#[async_trait]
impl TurnGenerator for Script {
async fn generate(&self, _req: GenerateRequest) -> Result<InferenceResult, String> {
let i = self.cursor.fetch_add(1, Ordering::SeqCst);
self.turns
.get(i)
.cloned()
.ok_or_else(|| "script exhausted".to_string())
}
}
#[tokio::test]
async fn scripted_loop_edits_verifies_and_passes() {
let dir = tempfile::tempdir().unwrap();
let executor = WorktreeExecutor::new(dir.path());
let (sink, collected) = EventSink::collecting("coder-native");
let cancel: CancelFlag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let script = Script {
turns: vec![
turn(
"creating the file",
serde_json::json!([{
"id": "c1",
"name": "write_file",
"arguments": {"path": "hello.txt", "content": "hello coder"}
}]),
),
turn("done — file created", serde_json::json!([])),
],
cursor: AtomicUsize::new(0),
};
let contract = OutcomeContract {
description: "hello.txt exists with content".into(),
checks: vec![ContractCheck {
name: "exists".into(),
command: "grep -q 'hello coder' hello.txt".into(),
expect_exit_zero: true,
output_contains: None,
timeout_secs: 10,
}],
};
let outcome = run_native_loop(
&script,
&executor,
"create hello.txt containing 'hello coder'",
&contract,
&sink,
&cancel,
&NativeLoopConfig::default(),
&RepairMemory::disabled(),
None,
)
.await;
assert!(outcome.passed, "outcome: {outcome:?}");
assert_eq!(outcome.iterations, 1);
assert!(dir.path().join("hello.txt").exists());
let events = collected.lock().unwrap();
let types: Vec<&str> = events
.iter()
.map(|e| match &e.kind {
CoderEventKind::IterationStarted { .. } => "iteration",
CoderEventKind::ToolCall { .. } => "tool_call",
CoderEventKind::ToolResult { .. } => "tool_result",
CoderEventKind::PlanText { .. } => "plan",
CoderEventKind::CheckStarted { .. } => "check_started",
CoderEventKind::CheckCompleted { .. } => "check_completed",
_ => "other",
})
.collect();
assert_eq!(
types,
vec!["iteration", "tool_call", "tool_result", "plan", "check_started", "check_completed"]
);
}
#[tokio::test]
async fn scripted_loop_repairs_after_red_checks() {
let dir = tempfile::tempdir().unwrap();
let executor = WorktreeExecutor::new(dir.path());
let sink = EventSink::test_sink();
let cancel: CancelFlag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let script = Script {
turns: vec![
turn(
"",
serde_json::json!([{
"id": "c1", "name": "write_file",
"arguments": {"path": "x.txt", "content": "wrong"}
}]),
),
turn("done", serde_json::json!([])),
turn(
"",
serde_json::json!([{
"id": "c2", "name": "write_file",
"arguments": {"path": "x.txt", "content": "right"}
}]),
),
turn("fixed", serde_json::json!([])),
],
cursor: AtomicUsize::new(0),
};
let contract = OutcomeContract {
description: "x.txt says right".into(),
checks: vec![ContractCheck {
name: "content".into(),
command: "grep -q right x.txt".into(),
expect_exit_zero: true,
output_contains: None,
timeout_secs: 10,
}],
};
let outcome = run_native_loop(
&script, &executor, "write right into x.txt", &contract, &sink, &cancel,
&NativeLoopConfig::default(), &RepairMemory::disabled(), None,
)
.await;
assert!(outcome.passed);
assert_eq!(outcome.iterations, 2, "one repair round expected");
}
#[tokio::test]
async fn repair_round_learns_and_recalls_across_sessions() {
use crate::coder::skill_memory::FailureSignature;
use car_memgine::MemgineEngine;
use tokio::sync::Mutex as AsyncMutex;
let memory = RepairMemory::new(Some(Arc::new(AsyncMutex::new(MemgineEngine::new(None)))));
let contract = OutcomeContract {
description: "x.txt says right".into(),
checks: vec![ContractCheck {
name: "content".into(),
command: "grep -q right x.txt || { echo 'assertion failed'; exit 1; }".into(),
expect_exit_zero: true,
output_contains: None,
timeout_secs: 10,
}],
};
let sig = FailureSignature {
check: "content".into(),
error_class: "test_failure".into(),
};
let dir1 = tempfile::tempdir().unwrap();
let exec1 = WorktreeExecutor::new(dir1.path());
let sink = EventSink::test_sink();
let cancel: CancelFlag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let script1 = Script {
turns: vec![
turn(
"",
serde_json::json!([{
"id": "c1", "name": "write_file",
"arguments": {"path": "x.txt", "content": "wrong"}
}]),
),
turn("nothing useful yet", serde_json::json!([])),
turn(
"",
serde_json::json!([{
"id": "c2", "name": "write_file",
"arguments": {"path": "x.txt", "content": "right"}
}]),
),
turn("wrote 'right' into x.txt to satisfy the grep", serde_json::json!([])),
],
cursor: AtomicUsize::new(0),
};
let outcome1 = run_native_loop(
&script1, &exec1, "write right into x.txt", &contract, &sink, &cancel,
&NativeLoopConfig::default(), &memory, None,
)
.await;
assert!(outcome1.passed);
let recalled = memory.recall(&sig).await.expect("session 1 should have learned");
assert!(recalled.contains("right"), "approach captured: {recalled}");
let dir2 = tempfile::tempdir().unwrap();
let exec2 = WorktreeExecutor::new(dir2.path());
let (sink2, collected) = EventSink::collecting("coder-learn");
let seen_hint = Arc::new(std::sync::atomic::AtomicBool::new(false));
struct HintWatcher {
seen: Arc<std::sync::atomic::AtomicBool>,
cursor: AtomicUsize,
}
#[async_trait]
impl TurnGenerator for HintWatcher {
async fn generate(&self, req: GenerateRequest) -> Result<InferenceResult, String> {
let i = self.cursor.fetch_add(1, Ordering::SeqCst);
let saw_hint = req
.messages
.as_ref()
.map(|ms| {
ms.iter().any(|m| {
matches!(m, Message::User { content } if content.contains("HINT"))
})
})
.unwrap_or(false);
if saw_hint {
self.seen.store(true, Ordering::SeqCst);
}
Ok(match i {
0 => turn("did nothing", serde_json::json!([])),
1 => turn(
"",
serde_json::json!([{
"id": "c1", "name": "write_file",
"arguments": {"path": "x.txt", "content": "right"}
}]),
),
_ => turn("applied the recalled fix", serde_json::json!([])),
})
}
}
let script2 = HintWatcher {
seen: seen_hint.clone(),
cursor: AtomicUsize::new(0),
};
let outcome2 = run_native_loop(
&script2, &exec2, "write right into x.txt", &contract, &sink2, &cancel,
&NativeLoopConfig::default(), &memory, None,
)
.await;
assert!(outcome2.passed, "session 2 should pass: {outcome2:?}");
assert!(
seen_hint.load(Ordering::SeqCst),
"the recalled hint must have been injected into the repair prompt"
);
drop(collected);
}
#[tokio::test]
async fn ask_user_tool_routes_to_handler_and_answer_reaches_model() {
use std::sync::Mutex as StdMutex;
let dir = tempfile::tempdir().unwrap();
let executor = WorktreeExecutor::new(dir.path());
let (sink, collected) = EventSink::collecting("coder-ask");
let cancel: CancelFlag = Arc::new(std::sync::atomic::AtomicBool::new(false));
struct CannedAsker {
seen_prompt: Arc<StdMutex<Option<String>>>,
answer: String,
}
#[async_trait]
impl AskUser for CannedAsker {
async fn ask(&self, prompt: &str) -> Result<String, String> {
*self.seen_prompt.lock().unwrap() = Some(prompt.to_string());
Ok(self.answer.clone())
}
}
let seen_prompt = Arc::new(StdMutex::new(None));
let asker = CannedAsker {
seen_prompt: seen_prompt.clone(),
answer: "use port 8080".to_string(),
};
struct AskThenWrite {
cursor: AtomicUsize,
}
#[async_trait]
impl TurnGenerator for AskThenWrite {
async fn generate(&self, req: GenerateRequest) -> Result<InferenceResult, String> {
let i = self.cursor.fetch_add(1, Ordering::SeqCst);
match i {
0 => Ok(turn(
"",
serde_json::json!([{
"id": "a1", "name": "ask_user",
"arguments": {"prompt": "which port?"}
}]),
)),
1 => {
let answer = req
.messages
.as_ref()
.and_then(|ms| {
ms.iter().rev().find_map(|m| match m {
Message::ToolResult { content, .. } => Some(content.clone()),
_ => None,
})
})
.unwrap_or_default();
Ok(turn(
"",
serde_json::json!([{
"id": "w1", "name": "write_file",
"arguments": {"path": "answer.txt", "content": answer}
}]),
))
}
_ => Ok(turn("done", serde_json::json!([]))),
}
}
}
let contract = OutcomeContract {
description: "answer.txt records the chosen port".into(),
checks: vec![ContractCheck {
name: "has_port".into(),
command: "grep -q 8080 answer.txt".into(),
expect_exit_zero: true,
output_contains: None,
timeout_secs: 10,
}],
};
let outcome = run_native_loop(
&AskThenWrite { cursor: AtomicUsize::new(0) },
&executor,
"pick a port and record it",
&contract,
&sink,
&cancel,
&NativeLoopConfig::default(),
&RepairMemory::disabled(),
Some(&asker),
)
.await;
assert!(outcome.passed, "outcome: {outcome:?}");
assert_eq!(seen_prompt.lock().unwrap().as_deref(), Some("which port?"));
assert_eq!(
std::fs::read_to_string(dir.path().join("answer.txt")).unwrap(),
"use port 8080"
);
let events = collected.lock().unwrap();
assert!(events.iter().any(|e| matches!(
&e.kind,
CoderEventKind::ToolCall { tool, .. } if tool == ASK_USER_TOOL
)));
}
#[tokio::test]
async fn ask_user_without_handler_is_a_recoverable_error() {
let dir = tempfile::tempdir().unwrap();
let executor = WorktreeExecutor::new(dir.path());
let (sink, _collected) = EventSink::collecting("coder-noask");
let cancel: CancelFlag = Arc::new(std::sync::atomic::AtomicBool::new(false));
struct ToolPeek {
offered: Arc<std::sync::atomic::AtomicBool>,
}
#[async_trait]
impl TurnGenerator for ToolPeek {
async fn generate(&self, req: GenerateRequest) -> Result<InferenceResult, String> {
let has_ask = req
.tools
.as_ref()
.map(|ts| ts.iter().any(|t| t["name"] == ASK_USER_TOOL))
.unwrap_or(false);
self.offered.store(has_ask, Ordering::SeqCst);
Ok(turn("done", serde_json::json!([])))
}
}
let offered_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let contract = OutcomeContract {
description: "noop".into(),
checks: vec![ContractCheck {
name: "ok".into(),
command: "true".into(),
expect_exit_zero: true,
output_contains: None,
timeout_secs: 10,
}],
};
let _ = run_native_loop(
&ToolPeek { offered: offered_flag.clone() },
&executor,
"x",
&contract,
&sink,
&cancel,
&NativeLoopConfig::default(),
&RepairMemory::disabled(),
None,
)
.await;
assert!(
!offered_flag.load(Ordering::SeqCst),
"ask_user must not be offered when no handler is wired"
);
}
#[tokio::test]
async fn cancellation_stops_the_loop() {
let dir = tempfile::tempdir().unwrap();
let executor = WorktreeExecutor::new(dir.path());
let sink = EventSink::test_sink();
let cancel: CancelFlag = Arc::new(std::sync::atomic::AtomicBool::new(true));
let script = Script { turns: vec![], cursor: AtomicUsize::new(0) };
let contract = OutcomeContract {
description: "d".into(),
checks: vec![ContractCheck {
name: "never".into(),
command: "true".into(),
expect_exit_zero: true,
output_contains: None,
timeout_secs: 10,
}],
};
let outcome = run_native_loop(
&script, &executor, "x", &contract, &sink, &cancel,
&NativeLoopConfig::default(), &RepairMemory::disabled(), None,
)
.await;
assert_eq!(outcome.error.as_deref(), Some("cancelled"));
assert_eq!(outcome.iterations, 0);
}
#[test]
fn failure_feedback_lists_only_failures() {
let results = vec![
CheckResult {
name: "good".into(),
passed: true,
exit_code: Some(0),
output_tail: "ok".into(),
duration_ms: 1,
},
CheckResult {
name: "bad".into(),
passed: false,
exit_code: Some(1),
output_tail: "assertion failed".into(),
duration_ms: 1,
},
];
let fb = failure_feedback(&results);
assert!(fb.contains("FAILED bad"));
assert!(fb.contains("assertion failed"));
assert!(!fb.contains("FAILED good"));
}
#[test]
fn system_prompt_carries_the_contract() {
let contract = OutcomeContract {
description: "make the tests pass".into(),
checks: vec![super::super::contract::ContractCheck {
name: "tests".into(),
command: "cargo test -p demo".into(),
expect_exit_zero: true,
output_contains: None,
timeout_secs: 300,
}],
};
let p = system_prompt(&contract);
assert!(p.contains("cargo test -p demo"));
assert!(p.contains("STOP calling tools"));
}
#[test]
fn preview_truncates_on_char_boundary() {
assert_eq!(preview("short", 10), "short");
let long = "é".repeat(300);
let p = preview(&long, 5);
assert!(p.ends_with('…') && p.chars().count() <= 4);
}
}