use car_engine::{format_tool_result, Runtime};
use car_inference::tasks::generate::{ContentBlock, Message, ToolCall};
use car_inference::{GenerateParams, GenerateRequest};
use car_ir::{ActionProposal, ActionStatus};
use serde_json::{json, Value};
use crate::coder::native_loop::TurnGenerator;
const OBSERVATION_CAP: usize = 16 * 1024;
pub enum AssistantEvent {
Text(String),
ToolCall { name: String, params: Value },
ToolResult {
name: String,
ok: bool,
content: String,
},
Done { text: String },
Error(String),
}
pub struct AssistantConfig {
pub model: Option<String>,
pub max_turns: u32,
pub tools: Vec<Value>,
pub gated_tools: Vec<String>,
pub approval_policy: Option<ApprovalPolicyFn>,
}
pub type ApprovalPolicyFn =
std::sync::Arc<dyn Fn(&str, &Value) -> ToolApprovalDecision + Send + Sync>;
pub enum ToolApprovalDecision {
Allow,
RequireApproval,
Deny(String),
}
pub enum ApprovalDecision {
Approved,
Denied(String),
}
#[async_trait::async_trait]
pub trait ApprovalGate: Send + Sync {
async fn request(&self, tool: &str, params: &Value) -> ApprovalDecision;
}
pub struct AssistantOutcome {
pub status: &'static str,
pub summary: String,
pub turns: u32,
pub tools_called: Vec<String>,
}
fn cap(mut s: String) -> String {
if s.len() <= OBSERVATION_CAP {
return s;
}
let mut end = OBSERVATION_CAP;
while !s.is_char_boundary(end) {
end -= 1;
}
s.truncate(end);
s.push_str("…[truncated]…");
s
}
fn build_proposal(source: &str, call: &ToolCall) -> Result<ActionProposal, String> {
serde_json::from_value(json!({
"source": source,
"actions": [{
"id": call.id,
"type": "tool_call",
"tool": call.name,
"parameters": call.arguments,
}],
}))
.map_err(|e| format!("malformed proposal: {e}"))
}
pub async fn run_assistant_loop(
generator: &dyn TurnGenerator,
runtime: &Runtime,
cfg: &AssistantConfig,
messages: &mut Vec<Message>,
emit: impl FnMut(AssistantEvent),
) -> AssistantOutcome {
let never = std::sync::atomic::AtomicBool::new(false);
run_assistant_loop_cancellable(generator, runtime, cfg, messages, &never, None, None, emit).await
}
pub async fn run_assistant_loop_cancellable(
generator: &dyn TurnGenerator,
runtime: &Runtime,
cfg: &AssistantConfig,
messages: &mut Vec<Message>,
cancel: &std::sync::atomic::AtomicBool,
approval: Option<&dyn ApprovalGate>,
images: Option<&[ContentBlock]>,
mut emit: impl FnMut(AssistantEvent),
) -> AssistantOutcome {
use std::sync::atomic::Ordering;
let tools = if cfg.tools.is_empty() {
None
} else {
Some(cfg.tools.clone())
};
let mut tools_called: Vec<String> = Vec::new();
let mut last_text = String::new();
let mut turns = 0u32;
while turns < cfg.max_turns {
if cancel.load(Ordering::Relaxed) {
return AssistantOutcome {
status: "cancelled",
summary: "cancelled".to_string(),
turns,
tools_called,
};
}
turns += 1;
let req = GenerateRequest {
prompt: String::new(),
model: cfg.model.clone(),
params: GenerateParams {
temperature: 0.0,
..Default::default()
},
context: None,
context_stable_prefix: None,
tools: tools.clone(),
images: if turns == 1 {
images.map(|imgs| imgs.to_vec())
} else {
None
},
messages: Some(messages.clone()),
cache_control: false,
response_format: None,
intent: None,
};
let mut result = match generator.generate(req).await {
Ok(r) => r,
Err(e) => {
let msg = format!("inference failed: {e}");
emit(AssistantEvent::Error(msg.clone()));
return AssistantOutcome {
status: "error",
summary: msg,
turns,
tools_called,
};
}
};
result.text = car_inference::tasks::generate::strip_leaked_reasoning(&result.text);
if result.tool_calls.is_empty() {
last_text = result.text.clone();
emit(AssistantEvent::Done {
text: last_text.clone(),
});
return AssistantOutcome {
status: "success",
summary: last_text,
turns,
tools_called,
};
}
if !result.text.trim().is_empty() {
last_text = result.text.clone();
emit(AssistantEvent::Text(result.text.clone()));
}
let mut calls = result.tool_calls.clone();
for (i, call) in calls.iter_mut().enumerate() {
if call.id.is_none() {
call.id = Some(format!("call_{turns}_{i}"));
}
}
messages.push(Message::Assistant {
content: result.text.clone(),
tool_calls: calls.clone(),
});
for call in &calls {
let id = call.id.clone().expect("ids assigned above");
emit(AssistantEvent::ToolCall {
name: call.name.clone(),
params: serde_json::to_value(&call.arguments).unwrap_or_default(),
});
let params_val = serde_json::to_value(&call.arguments).unwrap_or_default();
let posture = match &cfg.approval_policy {
Some(policy) => policy(&call.name, ¶ms_val),
None => {
if cfg.gated_tools.iter().any(|t| t == &call.name) {
ToolApprovalDecision::RequireApproval
} else {
ToolApprovalDecision::Allow
}
}
};
let refusal: Option<String> = match posture {
ToolApprovalDecision::Allow => None,
ToolApprovalDecision::Deny(reason) => Some(reason),
ToolApprovalDecision::RequireApproval => {
let decision = match approval {
Some(gate) => gate.request(&call.name, ¶ms_val).await,
None => ApprovalDecision::Denied(format!(
"'{}' needs approval: re-run with --full-access to allow it on this host, \
or use the default sandbox where edits are isolated",
call.name
)),
};
match decision {
ApprovalDecision::Approved => None,
ApprovalDecision::Denied(reason) => Some(reason),
}
}
};
if let Some(reason) = refusal {
let content = cap(json!({ "error": reason }).to_string());
emit(AssistantEvent::ToolResult {
name: call.name.clone(),
ok: false,
content: content.clone(),
});
messages.push(Message::ToolResult {
tool_use_id: id,
content,
});
continue;
}
let proposal = match build_proposal(&result.model_used, call) {
Ok(p) => p,
Err(e) => {
let content = cap(json!({ "error": e }).to_string());
emit(AssistantEvent::ToolResult {
name: call.name.clone(),
ok: false,
content: content.clone(),
});
messages.push(Message::ToolResult {
tool_use_id: id,
content,
});
continue;
}
};
let exec = runtime.execute(&proposal).await;
let action = exec.results.first();
let ok = action
.map(|r| matches!(r.status, ActionStatus::Succeeded))
.unwrap_or(false);
let content = cap(action
.map(format_tool_result)
.unwrap_or_else(|| format!("tool '{}' produced no result", call.name)));
if ok {
tools_called.push(call.name.clone());
}
emit(AssistantEvent::ToolResult {
name: call.name.clone(),
ok,
content: content.clone(),
});
messages.push(Message::ToolResult {
tool_use_id: id,
content,
});
}
}
AssistantOutcome {
status: "max_turns",
summary: if last_text.is_empty() {
format!("stopped after {} turns without finishing", cfg.max_turns)
} else {
last_text
},
turns,
tools_called,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::assistant::executor::GeneralExecutor;
use async_trait::async_trait;
use car_engine::{LocalSubstrate, Runtime, Substrate, ToolExecutor};
use car_inference::{InferenceEngine, InferenceResult};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
fn turn(text: &str, tool_calls: Value) -> InferenceResult {
serde_json::from_value(json!({
"text": text,
"tool_calls": tool_calls,
"trace_id": "t",
"model_used": "scripted",
"latency_ms": 0,
}))
.expect("scripted InferenceResult shape")
}
struct Script {
turns: Vec<InferenceResult>,
cursor: AtomicUsize,
}
#[async_trait]
impl TurnGenerator for Script {
async fn generate(&self, _req: GenerateRequest) -> Result<InferenceResult, String> {
let i = self.cursor.fetch_add(1, Ordering::SeqCst);
self.turns.get(i).cloned().ok_or("script exhausted".into())
}
}
async fn runtime_for(dir: &std::path::Path) -> Runtime {
let substrate: Arc<dyn Substrate> = Arc::new(LocalSubstrate::new());
let exec: Arc<dyn ToolExecutor> =
Arc::new(GeneralExecutor::new(substrate.clone(), dir, true));
let engine = Arc::new(InferenceEngine::new(Default::default()));
let rt = Runtime::new()
.with_inference(engine)
.with_executor(exec)
.with_substrate(substrate);
rt.register_agent_basics().await;
rt.register_tool_entry(
car_engine::ToolEntry::new(car_ir::builtins::shell()).with_side_effects(true),
)
.await;
rt
}
fn cfg() -> AssistantConfig {
AssistantConfig {
model: Some("scripted".into()),
max_turns: 6,
tools: GeneralExecutor::tool_defs(),
gated_tools: Vec::new(),
approval_policy: None,
}
}
#[tokio::test]
async fn loop_runs_a_tool_then_finishes() {
let dir = tempfile::tempdir().unwrap();
let rt = runtime_for(dir.path()).await;
let script = Script {
turns: vec![
turn(
"computing",
json!([{ "id": "c1", "name": "calculate", "arguments": { "expression": "6*7" } }]),
),
turn("The answer is 42.", json!([])),
],
cursor: AtomicUsize::new(0),
};
let mut messages = vec![
Message::System { content: "sys".into() },
Message::User { content: "what is 6*7?".into() },
];
let mut events = Vec::new();
let outcome =
run_assistant_loop(&script, &rt, &cfg(), &mut messages, |e| events.push(e)).await;
assert_eq!(outcome.status, "success");
assert_eq!(outcome.summary, "The answer is 42.");
assert!(outcome.tools_called.contains(&"calculate".to_string()));
assert!(events
.iter()
.any(|e| matches!(e, AssistantEvent::ToolResult { name, ok: true, .. } if name == "calculate")));
}
struct FixedGate(bool);
#[async_trait]
impl ApprovalGate for FixedGate {
async fn request(&self, _tool: &str, _params: &Value) -> ApprovalDecision {
if self.0 {
ApprovalDecision::Approved
} else {
ApprovalDecision::Denied("user declined".into())
}
}
}
struct CapturingGen {
images_seen: std::sync::Arc<std::sync::Mutex<Option<usize>>>,
}
#[async_trait]
impl TurnGenerator for CapturingGen {
async fn generate(&self, req: GenerateRequest) -> Result<InferenceResult, String> {
*self.images_seen.lock().unwrap() = req.images.as_ref().map(|v| v.len());
Ok(turn("done", json!([]))) }
}
#[tokio::test]
async fn images_are_attached_to_the_first_request() {
let dir = tempfile::tempdir().unwrap();
let rt = runtime_for(dir.path()).await;
let seen = std::sync::Arc::new(std::sync::Mutex::new(None));
let generator = CapturingGen {
images_seen: seen.clone(),
};
let img = ContentBlock::ImageUrl {
url: "https://example.com/x.png".into(),
detail: "auto".into(),
};
let mut messages = vec![
Message::System { content: "s".into() },
Message::User { content: "describe".into() },
];
let never = std::sync::atomic::AtomicBool::new(false);
let imgs = [img];
run_assistant_loop_cancellable(
&generator, &rt, &cfg(), &mut messages, &never, None, Some(&imgs), |_| {},
)
.await;
assert_eq!(*seen.lock().unwrap(), Some(1), "the image should reach the first request");
}
#[tokio::test]
async fn gated_tool_is_denied_without_a_gate() {
let dir = tempfile::tempdir().unwrap();
let rt = runtime_for(dir.path()).await;
let script = Script {
turns: vec![
turn(
"",
json!([{ "id": "w1", "name": "write_file", "arguments": { "path": "x.txt", "content": "no" } }]),
),
turn("could not write", json!([])),
],
cursor: AtomicUsize::new(0),
};
let mut cfg = cfg();
cfg.gated_tools = vec!["write_file".into()];
let mut messages = vec![
Message::System { content: "s".into() },
Message::User { content: "write x".into() },
];
let never = std::sync::atomic::AtomicBool::new(false);
let outcome = run_assistant_loop_cancellable(
&script, &rt, &cfg, &mut messages, &never, None, None, |_| {},
)
.await;
assert_eq!(outcome.status, "success");
assert!(!dir.path().join("x.txt").exists(), "gated write must not run");
assert!(!outcome.tools_called.contains(&"write_file".to_string()));
}
#[tokio::test]
async fn gated_tool_runs_when_approved() {
let dir = tempfile::tempdir().unwrap();
let rt = runtime_for(dir.path()).await;
let script = Script {
turns: vec![
turn(
"",
json!([{ "id": "w1", "name": "write_file", "arguments": { "path": "ok.txt", "content": "yes" } }]),
),
turn("wrote it", json!([])),
],
cursor: AtomicUsize::new(0),
};
let mut cfg = cfg();
cfg.gated_tools = vec!["write_file".into()];
let gate = FixedGate(true);
let mut messages = vec![
Message::System { content: "s".into() },
Message::User { content: "write ok".into() },
];
let never = std::sync::atomic::AtomicBool::new(false);
let outcome = run_assistant_loop_cancellable(
&script, &rt, &cfg, &mut messages, &never, Some(&gate), None, |_| {},
)
.await;
assert_eq!(outcome.status, "success");
assert_eq!(
std::fs::read_to_string(dir.path().join("ok.txt")).unwrap(),
"yes"
);
}
#[tokio::test]
async fn loop_writes_a_file_through_the_runtime() {
let dir = tempfile::tempdir().unwrap();
let rt = runtime_for(dir.path()).await;
let script = Script {
turns: vec![
turn(
"",
json!([{ "id": "w1", "name": "write_file", "arguments": { "path": "hi.txt", "content": "hello" } }]),
),
turn("Wrote hi.txt.", json!([])),
],
cursor: AtomicUsize::new(0),
};
let mut messages = vec![
Message::System { content: "sys".into() },
Message::User { content: "write hi.txt".into() },
];
let outcome = run_assistant_loop(&script, &rt, &cfg(), &mut messages, |_| {}).await;
assert_eq!(outcome.status, "success");
assert_eq!(
std::fs::read_to_string(dir.path().join("hi.txt")).unwrap(),
"hello"
);
}
}