mod hook_dispatch;
mod parallel;
mod recovery;
mod turn;
pub const MAX_FORCED_CONTINUATIONS: u8 = 3;
fn updated_input_is_valid(
schema: &serde_json::Value,
input: &serde_json::Value,
) -> std::result::Result<(), String> {
let Some(obj) = input.as_object() else {
return Err("rewritten input is not a JSON object".to_string());
};
if let Some(required) = schema.get("required").and_then(|r| r.as_array()) {
for r in required {
if let Some(name) = r.as_str()
&& !obj.contains_key(name)
{
return Err(format!("missing required field `{name}`"));
}
}
}
Ok(())
}
fn denied_tool_result(tool_use_id: &str, text: String) -> ToolResultBlock {
ToolResultBlock {
tool_use_id: tool_use_id.to_string(),
content: vec![ContentBlock::Text(TextBlock {
text,
cache_control: None,
})],
is_error: true,
}
}
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_stream::try_stream;
use async_trait::async_trait;
use caliban_provider::{
CompletionRequest, ContentBlock, Message, RequestMetadata, Role, StopReason, StreamEvent,
StreamingContentType, StreamingDelta, TextBlock, ToolResultBlock, Usage,
};
use futures::StreamExt as _;
use futures::stream::{FuturesUnordered, Stream};
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use crate::agent::Agent;
use crate::error::Result;
use crate::hooks::{
CompactCtx, CompactOutcome, HookDecision, RunCtx, RunHookOutcome, ToolCtx, TurnCtx,
TurnDecision,
};
use crate::retry::with_retry;
use crate::tool::ToolError;
use hook_dispatch::dispatch_tool;
use parallel::DispatchPlan;
use turn::{ActiveBlock, MessageAccumulator, TurnTiming};
const EMPTY_TURN_NUDGE: &str = "Your previous response made no tool call and gave no answer — \
it contained only internal reasoning. To make progress you must take a concrete action now: \
call a tool to work on the task, or, if the task is genuinely complete, state your final \
answer as plain text.";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum TurnEvent {
TurnStart {
turn_index: u32,
message_id: String,
model: String,
},
AssistantTextDelta {
turn_index: u32,
content_block_index: u32,
text: String,
},
AssistantThinkingDelta {
turn_index: u32,
content_block_index: u32,
text: String,
},
ToolCallStart {
turn_index: u32,
tool_use_id: String,
name: String,
},
ToolCallInputDelta {
turn_index: u32,
tool_use_id: String,
partial_json: String,
},
ToolCallEnd {
turn_index: u32,
tool_use_id: String,
is_error: bool,
content: Vec<ContentBlock>,
},
TurnEnd {
turn_index: u32,
assistant_message: Message,
tool_results: Vec<Message>,
stop_reason: StopReason,
usage: Usage,
ttft: Option<Duration>,
tbt: Option<Duration>,
},
RunEnd {
final_messages: Vec<Message>,
total_usage: Usage,
turn_count: u32,
stopped_for: StopCondition,
#[serde(default)]
turns_without_edit: u32,
#[serde(default)]
no_edit_nudge_emitted: bool,
},
}
#[derive(Debug, Clone)]
pub struct TurnOutcome {
pub assistant_message: Message,
pub tool_results: Vec<Message>,
pub stop_reason: StopReason,
pub usage: Usage,
pub continue_loop: bool,
}
#[derive(Debug, Clone)]
pub struct RunOutcome {
pub final_messages: Vec<Message>,
pub turn_count: u32,
pub total_usage: Usage,
pub stopped_for: StopCondition,
pub turns_without_edit: u32,
pub no_edit_nudge_emitted: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StopCondition {
EndOfTurn,
MaxTurnsReached(u32),
Cancelled,
ProviderError(String),
HookDenied(String),
CompactionFailed(String),
MaxTokensExhausted,
Refusal(String),
ContentFilter(String),
StreamIdle(std::time::Duration),
}
impl StopCondition {
#[must_use]
pub fn is_failure(&self) -> bool {
matches!(
self,
Self::ProviderError(_)
| Self::HookDenied(_)
| Self::CompactionFailed(_)
| Self::MaxTokensExhausted
| Self::Refusal(_)
| Self::ContentFilter(_)
| Self::StreamIdle(_)
)
}
#[must_use]
pub fn surface(&self) -> Option<StopSurface> {
let (body, level) = match self {
Self::EndOfTurn => return None,
Self::ProviderError(msg) => (format!("provider error: {msg}"), StopLevel::Error),
Self::HookDenied(msg) => (format!("hook denied: {msg}"), StopLevel::Error),
Self::CompactionFailed(msg) => (format!("compaction failed: {msg}"), StopLevel::Error),
Self::MaxTurnsReached(n) => (format!("max-turns ({n}) reached"), StopLevel::Info),
Self::Cancelled => ("cancelled".to_string(), StopLevel::Info),
Self::MaxTokensExhausted => (
"max-tokens recovery exhausted \u{2014} try /effort low to reduce reasoning budget"
.to_string(),
StopLevel::Error,
),
Self::Refusal(msg) => (format!("model refusal: {msg}"), StopLevel::Error),
Self::ContentFilter(msg) => (format!("content filter: {msg}"), StopLevel::Error),
Self::StreamIdle(d) => (
format!("stream idle for {}s", d.as_secs()),
StopLevel::Error,
),
};
Some(StopSurface {
line: format!("[caliban: {body}]"),
level,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StopLevel {
Error,
Info,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StopSurface {
pub line: String,
pub level: StopLevel,
}
#[cfg(test)]
mod updated_input_tests {
use super::*;
use serde_json::json;
#[test]
fn valid_rewrite_passes() {
let schema = json!({"type": "object", "required": ["path"]});
assert!(updated_input_is_valid(&schema, &json!({"path": "/x", "extra": 1})).is_ok());
}
#[test]
fn non_object_rewrite_is_rejected() {
let schema = json!({"type": "object"});
assert!(updated_input_is_valid(&schema, &json!("not an object")).is_err());
}
#[test]
fn missing_required_field_is_rejected() {
let schema = json!({"type": "object", "required": ["path"]});
let err = updated_input_is_valid(&schema, &json!({"other": 1})).unwrap_err();
assert!(err.contains("required field `path`"), "got: {err}");
}
}
#[cfg(test)]
mod stop_condition_tests {
use super::*;
use std::time::Duration;
#[test]
fn is_failure_classifies_correctly() {
assert!(!StopCondition::EndOfTurn.is_failure());
assert!(!StopCondition::MaxTurnsReached(5).is_failure());
assert!(!StopCondition::Cancelled.is_failure());
assert!(StopCondition::ProviderError("x".into()).is_failure());
assert!(StopCondition::HookDenied("x".into()).is_failure());
assert!(StopCondition::CompactionFailed("x".into()).is_failure());
assert!(StopCondition::MaxTokensExhausted.is_failure());
assert!(StopCondition::Refusal("x".into()).is_failure());
assert!(StopCondition::ContentFilter("x".into()).is_failure());
assert!(StopCondition::StreamIdle(Duration::from_secs(90)).is_failure());
}
}
#[cfg(test)]
mod serde_tests {
use super::*;
use caliban_provider::{Message, StopReason, Usage};
use std::time::Duration;
fn round_trips(ev: &TurnEvent) -> serde_json::Value {
let json = serde_json::to_value(ev).expect("serialize TurnEvent");
let back: TurnEvent = serde_json::from_value(json.clone()).expect("deserialize TurnEvent");
let again = serde_json::to_value(&back).expect("re-serialize TurnEvent");
assert_eq!(json, again, "round-trip JSON mismatch");
json
}
#[test]
fn text_delta_round_trips_with_type_tag() {
let ev = TurnEvent::AssistantTextDelta {
turn_index: 1,
content_block_index: 0,
text: "hello".into(),
};
let json = round_trips(&ev);
assert_eq!(json["type"], "AssistantTextDelta");
assert_eq!(json["text"], "hello");
assert_eq!(json["turn_index"], 1);
}
#[test]
fn run_end_round_trips_carrying_a_stop_condition() {
let ev = TurnEvent::RunEnd {
final_messages: vec![],
total_usage: Usage::default(),
turn_count: 3,
stopped_for: StopCondition::MaxTurnsReached(3),
turns_without_edit: 0,
no_edit_nudge_emitted: false,
};
let json = round_trips(&ev);
assert_eq!(json["type"], "RunEnd");
assert_eq!(json["turn_count"], 3);
assert_eq!(json["stopped_for"]["MaxTurnsReached"], 3);
}
#[test]
fn turn_end_round_trips_with_message_and_duration_shape() {
let ev = TurnEvent::TurnEnd {
turn_index: 0,
assistant_message: Message::assistant_text("done"),
tool_results: vec![],
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
ttft: Some(Duration::from_millis(123)),
tbt: None,
};
let json = round_trips(&ev);
assert_eq!(json["type"], "TurnEnd");
assert_eq!(json["stop_reason"], "end_turn");
assert_eq!(json["ttft"]["secs"], 0);
assert_eq!(json["ttft"]["nanos"], 123_000_000);
assert!(json["tbt"].is_null());
}
#[test]
fn unit_stop_condition_serializes_as_bare_string() {
let json = serde_json::to_value(StopCondition::EndOfTurn).unwrap();
assert_eq!(json, serde_json::json!("EndOfTurn"));
let back: StopCondition = serde_json::from_value(json).unwrap();
assert!(matches!(back, StopCondition::EndOfTurn));
}
}
pub type TurnEventStream = Pin<Box<dyn Stream<Item = Result<TurnEvent>> + Send + 'static>>;
#[async_trait]
pub trait InputProvider: Send + Sync {
async fn next_input(
&self,
cancel: &CancellationToken,
) -> Option<Vec<caliban_provider::Message>>;
}
#[derive(Clone)]
pub struct RunSettings {
pub session_id: String,
pub workspace_root: std::path::PathBuf,
pub prompt_index: u32,
pub input_source: Option<std::sync::Arc<dyn InputProvider>>,
}
impl std::fmt::Debug for RunSettings {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RunSettings")
.field("session_id", &self.session_id)
.field("workspace_root", &self.workspace_root)
.field("prompt_index", &self.prompt_index)
.field(
"input_source",
if self.input_source.is_some() {
&"<set>"
} else {
&"<unset>"
},
)
.finish()
}
}
impl Default for RunSettings {
fn default() -> Self {
Self {
session_id: String::new(),
workspace_root: std::path::PathBuf::from("."),
prompt_index: 0,
input_source: None,
}
}
}
impl Agent {
async fn maybe_compact(
&self,
history: &mut Vec<Message>,
tracking: &mut recovery::AutoCompactTracking,
) {
let active_model_snapshot = self.active_model();
let caps = self.provider.capabilities(active_model_snapshot.as_str());
let token_count_before = crate::compact::estimate_tokens(history);
let threshold = self.config.auto_compact_threshold;
let should_attempt = threshold.is_some_and(|t| {
#[allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
let utilization = token_count_before as f32 / caps.max_input_tokens.max(1) as f32;
!tracking.disabled && utilization >= t
});
if !should_attempt {
return;
}
let strategy = self.compactor.strategy_name();
let compact_ctx = CompactCtx {
session_id: "",
token_count_before,
strategy,
};
if let Err(e) = self.hooks.pre_compact(&compact_ctx).await {
tracing::warn!(error = %e, "pre_compact hook error (non-fatal)");
}
match self.compactor.compact(history, &caps).await {
Err(e) => {
tracing::warn!(error = %e, "autocompact failed");
tracking.consecutive_failures = tracking.consecutive_failures.saturating_add(1);
if tracking.consecutive_failures >= recovery::MAX_CONSECUTIVE_COMPACT_FAILURES {
tracking.disabled = true;
tracing::warn!(
"autocompact disabled after {} consecutive failures",
recovery::MAX_CONSECUTIVE_COMPACT_FAILURES
);
}
}
Ok(Some(new)) => {
tracking.consecutive_failures = 0;
let token_count_after = crate::compact::estimate_tokens(&new);
*history = new;
let outcome = CompactOutcome {
token_count_after,
compacted: true,
};
if let Err(e) = self.hooks.post_compact(&compact_ctx, &outcome).await {
tracing::warn!(error = %e, "post_compact hook error (non-fatal)");
}
}
Ok(None) => {
tracking.consecutive_failures = 0;
let outcome = CompactOutcome {
token_count_after: token_count_before,
compacted: false,
};
if let Err(e) = self.hooks.post_compact(&compact_ctx, &outcome).await {
tracing::warn!(error = %e, "post_compact hook error (non-fatal)");
}
}
}
}
fn build_request(&self, history: &[Message], effective_max_tokens: u32) -> CompletionRequest {
let mut req_messages = history.to_vec();
let active_guard = self.mcp_active.load();
let filter = crate::wire_filter::WireFilter {
lazy_mcp: self.config.lazy_mcp,
active: &active_guard,
eager_servers: &self.mcp_eager_servers,
};
let crate::wire_filter::WireFilterResult {
tools: mut req_tools,
dropped_mcp_count,
} = self.tools.to_caliban_tools_filtered(&filter);
crate::deferred_block::splice_into_messages(
&mut req_messages,
self.config.lazy_mcp,
dropped_mcp_count,
);
if self.prompt_cache {
crate::cache::apply_prompt_cache(
&mut req_messages,
&mut req_tools,
self.config.min_cache_block_tokens,
);
}
let effort_snapshot = self.config.effort.load_full();
let thinking_snapshot = self.config.thinking.load_full();
let active_model_for_req = self.active_model();
CompletionRequest {
model: active_model_for_req.as_str().to_string(),
messages: req_messages,
tools: req_tools,
tool_choice: self.config.tool_choice.clone(),
max_tokens: effective_max_tokens,
temperature: self.config.temperature,
top_p: self.config.top_p,
top_k: None,
stop_sequences: self.config.stop_sequences.clone(),
thinking: *thinking_snapshot,
effort: Some(*effort_snapshot),
metadata: RequestMetadata {
user_id: self.config.user_id.clone(),
purpose: Some(caliban_provider::RequestPurpose::MainLoop),
},
}
}
async fn finalize_tool_results(
&self,
history: &mut Vec<Message>,
assistant_message: &Message,
ordered_results: Vec<Option<ToolResultBlock>>,
file_mutating_tool_ids: &std::collections::HashSet<String>,
session_id: &str,
) -> (Vec<Message>, bool) {
let mut tool_result_blocks: Vec<ContentBlock> = Vec::new();
let mut had_successful_edit_this_turn = false;
for slot in ordered_results.into_iter().flatten() {
if !slot.is_error && file_mutating_tool_ids.contains(&slot.tool_use_id) {
had_successful_edit_this_turn = true;
}
tool_result_blocks.push(ContentBlock::ToolResult(slot));
}
if self.config.tool_result_cap_chars > 0 && !tool_result_blocks.is_empty() {
let overflow_dir = directories::ProjectDirs::from("dev", "caliban", "caliban")
.map_or_else(
|| std::path::PathBuf::from("/tmp/caliban-tool-overflows"),
|d| d.cache_dir().join("tool-overflows"),
);
let cap = crate::post_process::ToolResultCap {
max_chars: self.config.tool_result_cap_chars,
overflow_dir,
session_id: session_id.to_string(),
};
if let Err(e) = cap.cap(&mut tool_result_blocks).await {
tracing::warn!(
error = %e,
"ToolResultCap io error (non-fatal); inline content kept",
);
}
}
let tool_results: Vec<Message> = if tool_result_blocks.is_empty() {
vec![]
} else {
vec![Message {
role: Role::User,
content: tool_result_blocks,
}]
};
history.push(assistant_message.clone());
for tr_msg in &tool_results {
history.push(tr_msg.clone());
}
(tool_results, had_successful_edit_this_turn)
}
async fn plan_tool_call(
&self,
tu: &caliban_provider::ToolUseBlock,
original_index: usize,
turn_index: u32,
session_id: &str,
) -> ToolPlan {
let tool_is_read_only = self.tools.get(&tu.name).is_some_and(|t| t.is_read_only());
let tool_mutates_files = self.tools.get(&tu.name).is_some_and(|t| t.mutates_files());
let plan_mode_active = self
.plan_mode
.as_ref()
.is_some_and(|f| f.load(std::sync::atomic::Ordering::Relaxed));
if plan_mode_active
&& !(tool_is_read_only || crate::plan_mode::is_plan_control_tool(&tu.name))
{
let result = denied_tool_result(
&tu.id,
format!(
"Tool '{}' is not available in plan mode. Use ExitPlanMode to proceed.",
tu.name
),
);
return ToolPlan::Denied {
original_index,
result,
};
}
let tool_ctx = ToolCtx {
session_id,
turn_index,
tool_use_id: &tu.id,
tool_name: &tu.name,
input: &tu.input,
is_read_only: tool_is_read_only,
};
let decision = match self.hooks.before_tool(&tool_ctx).await {
Ok(d) => d,
Err(e) => {
return ToolPlan::Fatal(StopCondition::HookDenied(format!(
"before_tool hook failed: {e}"
)));
}
};
let decision = if let HookDecision::UpdatedInput(new_input) = &decision {
match self.tools.get(&tu.name) {
Some(tool) => match updated_input_is_valid(tool.input_schema(), new_input) {
Ok(()) => decision,
Err(why) => HookDecision::Deny(format!(
"before_tool hook rewrote input to an invalid shape: {why}"
)),
},
None => decision,
}
} else {
decision
};
match decision {
HookDecision::Deny(msg) | HookDecision::AskDenied(msg) => {
let denial_err =
ToolError::execution(std::io::Error::other(format!("denied: {msg}")));
if let Err(e) = self.hooks.after_tool(&tool_ctx, &Err(denial_err)).await {
tracing::warn!(
tool = %tu.name, error = %e,
"after_tool hook error (non-fatal)"
);
}
ToolPlan::Denied {
original_index,
result: denied_tool_result(&tu.id, format!("Tool call denied: {msg}")),
}
}
HookDecision::Allow | HookDecision::UpdatedInput(_) => {
let input = if let HookDecision::UpdatedInput(new_input) = decision {
tracing::info!(
tool = %tu.name,
tool_use_id = %tu.id,
"hook.updated_input: tool input rewritten by before_tool hook"
);
new_input
} else {
tu.input.clone()
};
let conflict_key = self
.tools
.get(&tu.name)
.and_then(|t| t.parallel_conflict_key(&input));
ToolPlan::Allowed {
plan: DispatchPlan::Allowed {
original_index,
id: tu.id.clone(),
name: tu.name.clone(),
input,
conflict_key,
},
mutates_files: tool_mutates_files,
}
}
}
}
}
enum ToolPlan {
Denied {
original_index: usize,
result: ToolResultBlock,
},
Allowed {
plan: DispatchPlan,
mutates_files: bool,
},
Fatal(StopCondition),
}
impl Agent {
pub fn stream_until_done(
self: Arc<Self>,
messages: Vec<Message>,
cancel: CancellationToken,
) -> TurnEventStream {
self.stream_until_done_with_settings(messages, cancel, RunSettings::default())
}
#[allow(clippy::too_many_lines)]
#[instrument(
skip(self, messages, cancel, settings),
fields(model = %self.active_model(), session = %settings.session_id, prompt = settings.prompt_index)
)]
pub fn stream_until_done_with_settings(
self: Arc<Self>,
messages: Vec<Message>,
cancel: CancellationToken,
settings: RunSettings,
) -> TurnEventStream {
Box::pin(try_stream! {
let mut history = messages;
let mut total_usage = Usage::default();
let user_msg_owned: Option<Message> = history
.iter()
.rev()
.find(|m| m.role == Role::User)
.cloned();
{
let run_ctx = RunCtx {
session_id: &settings.session_id,
workspace_root: &settings.workspace_root,
user_message: user_msg_owned.as_ref(),
prompt_index: settings.prompt_index,
cancel: cancel.clone(),
};
if let Err(e) = self.hooks.before_run(&run_ctx).await {
yield TurnEvent::RunEnd {
final_messages: history,
total_usage,
turn_count: 0,
stopped_for: StopCondition::HookDenied(format!("before_run: {e}")),
turns_without_edit: 0,
no_edit_nudge_emitted: false,
};
return;
}
}
let mut stopped_for = StopCondition::MaxTurnsReached(self.config.max_turns);
let max_turns = self.config.max_turns;
let mut turns_completed: u32 = 0;
let mut recovery = recovery::RecoveryState::default();
let mut turns_since_last_edit: u32 = 0;
let mut turns_without_edit: u32 = 0;
let mut no_edit_nudge_armed = true;
let mut no_edit_nudge_emitted = false;
let mut empty_turn_nudges: u32 = 0;
'outer: for turn_index in 0..max_turns {
let mut drain_retries: u32 = 0;
'inner: loop {
if cancel.is_cancelled() {
stopped_for = StopCondition::Cancelled;
break 'outer;
}
{
let turn_ctx = TurnCtx {
turn_index,
messages: &history,
config: &self.config,
};
if let Err(e) = self.hooks.before_turn(&turn_ctx).await {
stopped_for = StopCondition::HookDenied(format!("before_turn: {e}"));
break 'outer;
}
}
if self.config.micro_compact_enabled {
use crate::compact::Compactor as _;
let caps = self.provider.capabilities(&self.config.model);
if let Ok(Some(new)) = crate::compact::MicroCompactor::new()
.compact(&history, &caps)
.await
{
let freed = crate::compact::estimate_tokens(&history)
.saturating_sub(crate::compact::estimate_tokens(&new));
tracing::debug!(
target: "caliban::compact",
freed_tokens = freed,
"microcompact",
);
history = new;
}
}
self.maybe_compact(&mut history, recovery.auto_tracking_mut())
.await;
let req = self.build_request(
&history,
recovery.effective_max_tokens(self.config.max_tokens),
);
let mut timing = TurnTiming::start();
let provider = Arc::clone(&self.provider);
let req_clone = req.clone();
let cancel_for_retry = cancel.clone();
let stream_result = with_retry(&self.retry, &cancel, move || {
let p = Arc::clone(&provider);
let r = req_clone.clone();
let _ = &cancel_for_retry; async move { p.stream(r).await }
})
.await;
let provider_stream = match stream_result {
Ok(s) => s,
Err(e) => match e {
caliban_provider::Error::Cancelled => {
stopped_for = StopCondition::Cancelled;
break 'outer;
}
caliban_provider::Error::StreamIdle(d) => {
stopped_for = StopCondition::StreamIdle(d);
break 'outer;
}
caliban_provider::Error::ContextTooLong { .. }
if recovery.reactive_compact_available() =>
{
match recovery.on_context_too_long(&self, &mut history).await {
recovery::RecoveryAction::RetryTurn => continue 'inner,
recovery::RecoveryAction::Surrender(stop) => {
stopped_for = stop;
break 'outer;
}
recovery::RecoveryAction::InjectAndContinue(_) => {
unreachable!(
"on_context_too_long only yields RetryTurn or Surrender"
)
}
}
}
other => {
stopped_for = StopCondition::ProviderError(other.to_string());
break 'outer;
}
},
};
let mut provider_stream: Pin<
Box<dyn Stream<Item = caliban_provider::Result<StreamEvent>> + Send>,
> = if self.config.stream_idle_timeout_ms > 0 {
Box::pin(caliban_provider::stream::WatchedStream::new(
provider_stream,
Duration::from_millis(self.config.stream_idle_timeout_ms.into()),
))
} else {
provider_stream
};
let mut acc = MessageAccumulator::new();
let mut emitted_content_this_turn = false;
while let Some(evt_result) = provider_stream.next().await {
if cancel.is_cancelled() {
stopped_for = StopCondition::Cancelled;
break 'outer;
}
let evt = match evt_result {
Ok(e) => e,
Err(caliban_provider::Error::StreamIdle(d)) => {
stopped_for = StopCondition::StreamIdle(d);
break 'outer;
}
Err(e) => {
if crate::retry::is_retryable(&e)
&& !emitted_content_this_turn
&& drain_retries
< self.retry.max_attempts.saturating_sub(1)
{
drain_retries += 1;
tracing::warn!(
error = %e,
attempt = drain_retries,
"stream interrupted before content; re-issuing turn"
);
continue 'inner;
}
stopped_for = StopCondition::ProviderError(e.to_string());
break 'outer;
}
};
match evt {
StreamEvent::MessageStart { id, model } => {
acc.message_id.clone_from(&id);
acc.model.clone_from(&model);
yield TurnEvent::TurnStart {
turn_index,
message_id: id,
model,
};
}
StreamEvent::ContentBlockStart { index, content_type } => {
let i = index as usize;
acc.ensure_index(i);
match &content_type {
StreamingContentType::Text => {
acc.active[i] =
Some(ActiveBlock::Text { accumulated: String::new() });
}
StreamingContentType::Thinking => {
acc.active[i] =
Some(ActiveBlock::Thinking { accumulated: String::new() });
}
StreamingContentType::ToolUse { id, name } => {
let id = id.clone();
let name = name.clone();
acc.active[i] = Some(ActiveBlock::ToolUse {
id: id.clone(),
name: name.clone(),
json_buf: String::new(),
});
yield TurnEvent::ToolCallStart {
turn_index,
tool_use_id: id,
name,
};
emitted_content_this_turn = true;
}
StreamingContentType::Image => {
}
}
}
StreamEvent::Delta { index, delta } => {
timing.observe_delta();
emitted_content_this_turn = true;
let i = index as usize;
if i >= acc.active.len() {
continue;
}
match (&mut acc.active[i], delta) {
(
Some(ActiveBlock::Text { accumulated }),
StreamingDelta::Text(s),
) => {
accumulated.push_str(&s);
yield TurnEvent::AssistantTextDelta {
turn_index,
content_block_index: index,
text: s,
};
}
(
Some(ActiveBlock::Thinking { accumulated }),
StreamingDelta::Thinking(s),
) => {
accumulated.push_str(&s);
yield TurnEvent::AssistantThinkingDelta {
turn_index,
content_block_index: index,
text: s,
};
}
(
Some(ActiveBlock::ToolUse { id, json_buf, .. }),
StreamingDelta::ToolUseInputJson(s),
) => {
json_buf.push_str(&s);
let id_clone = id.clone();
yield TurnEvent::ToolCallInputDelta {
turn_index,
tool_use_id: id_clone,
partial_json: s,
};
}
_ => {
}
}
}
StreamEvent::ContentBlockStop { index } => {
acc.finalize_block(index as usize);
}
StreamEvent::MessageDelta {
stop_reason,
usage_delta,
} => {
if let Some(sr) = stop_reason {
acc.stop_reason = Some(sr);
}
if let Some(u) = usage_delta {
acc.usage.merge(u);
}
}
StreamEvent::MessageStop | StreamEvent::Ping => {}
}
}
let turn_stop_reason = acc.stop_reason.unwrap_or(StopReason::EndTurn);
let turn_usage = acc.usage;
let mut assistant_message = acc.into_message();
if let Some(action) =
recovery.on_max_tokens_pre_dispatch(&self.config, turn_stop_reason)
{
match action {
recovery::RecoveryAction::RetryTurn => {
total_usage.merge(turn_usage);
continue 'inner;
}
recovery::RecoveryAction::InjectAndContinue(_)
| recovery::RecoveryAction::Surrender(_) => {
unreachable!("on_max_tokens_pre_dispatch only yields RetryTurn")
}
}
}
for block in &mut assistant_message.content {
if let ContentBlock::Text(t) = block {
let processed = self.post_processor.process(&t.text);
if let std::borrow::Cow::Owned(new_text) = processed {
t.text = new_text;
}
}
}
let mut plans: Vec<DispatchPlan> = Vec::new();
let mut file_mutating_tool_ids: std::collections::HashSet<String> =
std::collections::HashSet::new();
for (idx, block) in assistant_message.content.iter().enumerate() {
if cancel.is_cancelled() {
stopped_for = StopCondition::Cancelled;
break 'outer;
}
let ContentBlock::ToolUse(tu) = block else { continue };
match self
.plan_tool_call(tu, idx, turn_index, &settings.session_id)
.await
{
ToolPlan::Denied {
original_index,
result,
} => {
yield TurnEvent::ToolCallEnd {
turn_index,
tool_use_id: result.tool_use_id.clone(),
is_error: true,
content: result.content.clone(),
};
plans.push(DispatchPlan::Denied {
original_index,
result,
});
}
ToolPlan::Allowed {
plan,
mutates_files,
} => {
if mutates_files {
file_mutating_tool_ids.insert(tu.id.clone());
}
plans.push(plan);
}
ToolPlan::Fatal(stop) => {
stopped_for = stop;
break 'outer;
}
}
}
let permits = if self.parallel_tools {
self.parallel_tool_limit.get()
} else {
1
};
let sem = Arc::new(Semaphore::new(permits));
let dispatch_started_at = Instant::now();
let agent_ref = &self;
let mut ordered_results: Vec<Option<ToolResultBlock>> =
vec![None; assistant_message.content.len()];
let mut denied_count: usize = 0;
let mut dispatched_count: usize = 0;
let conflict_locks = parallel::build_conflict_locks(&plans);
let mut pending = FuturesUnordered::new();
for plan in plans {
match plan {
DispatchPlan::Denied { original_index, result } => {
denied_count += 1;
ordered_results[original_index] = Some(result);
}
DispatchPlan::Allowed {
original_index,
id,
name,
input,
conflict_key,
} => {
if cancel.is_cancelled() {
stopped_for = StopCondition::Cancelled;
break;
}
dispatched_count += 1;
let sem_for_tool = Arc::clone(&sem);
let cancel_for_tool = cancel.clone();
let session_for_tool = settings.session_id.clone();
let lock_for_tool = conflict_key
.as_ref()
.and_then(|k| conflict_locks.get(k).map(Arc::clone));
pending.push(async move {
let _key_guard = match lock_for_tool {
Some(m) => Some(m.lock_owned().await),
None => None,
};
let _permit = sem_for_tool
.acquire_owned()
.await
.expect("semaphore not closed");
let res = dispatch_tool(
agent_ref,
&session_for_tool,
turn_index,
&id,
&name,
input,
&cancel_for_tool,
)
.await;
(original_index, id, res)
});
}
}
}
let mut fatal_stop: Option<StopCondition> = None;
while let Some((idx, id, dispatch_res)) = pending.next().await {
match dispatch_res {
Err(stop) => {
fatal_stop = Some(stop);
}
Ok(tool_result) => {
let is_error = tool_result.is_error;
let content = tool_result.content.clone();
yield TurnEvent::ToolCallEnd {
turn_index,
tool_use_id: id,
is_error,
content,
};
ordered_results[idx] = Some(tool_result);
}
}
}
let dispatch_elapsed = dispatch_started_at.elapsed();
tracing::info!(
target: caliban_common::tracing_targets::TARGET_TOOLS,
turn = turn_index,
parallel_tools = self.parallel_tools,
parallel_tool_limit = self.parallel_tool_limit.get(),
dispatched = dispatched_count,
denied = denied_count,
total_wall_ms = u64::try_from(dispatch_elapsed.as_millis())
.unwrap_or(u64::MAX),
"parallel tool dispatch",
);
if let Some(stop) = fatal_stop {
stopped_for = stop;
break 'outer;
}
let (tool_results, had_successful_edit_this_turn) = self
.finalize_tool_results(
&mut history,
&assistant_message,
ordered_results,
&file_mutating_tool_ids,
&settings.session_id,
)
.await;
let produced_actionable_content = assistant_message.content.iter().any(|b| {
matches!(b, ContentBlock::ToolUse(_))
|| matches!(b, ContentBlock::Text(t) if !t.text.trim().is_empty())
});
let turn_was_degenerate = !produced_actionable_content
&& turn_usage.output_tokens > 0
&& matches!(
turn_stop_reason,
StopReason::EndTurn | StopReason::StopSequence
);
#[allow(unused_assignments)]
let mut after_turn_decision_for_loop: TurnDecision = TurnDecision::Continue;
{
let turn_outcome = TurnOutcome {
assistant_message: assistant_message.clone(),
tool_results: tool_results.clone(),
stop_reason: turn_stop_reason,
usage: turn_usage,
continue_loop: turn_stop_reason == StopReason::ToolUse,
};
let turn_ctx = TurnCtx {
turn_index,
messages: &history,
config: &self.config,
};
let turn_is_failure =
recovery.turn_is_failure(&self.config, turn_stop_reason);
let after_turn_decision: TurnDecision = if turn_is_failure {
match self.hooks.after_turn_failure(&turn_ctx, &turn_outcome).await {
Ok(()) => TurnDecision::Continue,
Err(e) => {
tracing::warn!(error = %e, "after_turn_failure hook error (non-fatal)");
TurnDecision::Continue
}
}
} else {
match self.hooks.after_turn(&turn_ctx, &turn_outcome).await {
Ok(d) => d,
Err(e) => {
stopped_for =
StopCondition::HookDenied(format!("after_turn: {e}"));
yield TurnEvent::TurnEnd {
turn_index,
assistant_message,
tool_results,
stop_reason: turn_stop_reason,
usage: turn_usage,
ttft: timing.ttft(),
tbt: timing.tbt(),
};
total_usage.merge(turn_usage);
turns_completed += 1;
break 'outer;
}
}
};
after_turn_decision_for_loop = after_turn_decision;
}
let ttft = timing.ttft();
let tbt = timing.tbt();
yield TurnEvent::TurnEnd {
turn_index,
assistant_message,
tool_results,
stop_reason: turn_stop_reason,
usage: turn_usage,
ttft,
tbt,
};
if let Some(t) = ttft {
tracing::info!(
target: caliban_common::tracing_targets::TARGET_TIMING,
ttft_ms = u64::try_from(t.as_millis()).unwrap_or(u64::MAX),
tbt_ms = tbt.map(|d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX)),
delta_count = timing.delta_count,
turn = turn_index,
"turn timing",
);
}
let cache_read = turn_usage.cache_read_input_tokens.unwrap_or(0);
let cache_creation = turn_usage.cache_creation_input_tokens.unwrap_or(0);
if cache_read > 0 || cache_creation > 0 {
tracing::info!(
target: caliban_common::tracing_targets::TARGET_CACHE,
cache_read,
cache_creation,
turn = turn_index,
"prompt cache stats",
);
}
total_usage.merge(turn_usage);
turns_completed += 1;
if had_successful_edit_this_turn {
turns_since_last_edit = 0;
no_edit_nudge_armed = true;
} else {
turns_since_last_edit += 1;
turns_without_edit = turns_without_edit.max(turns_since_last_edit);
let threshold = self.config.no_edit_nudge_threshold;
if threshold > 0
&& turns_since_last_edit >= threshold
&& no_edit_nudge_armed
{
tracing::info!(turns_since_last_edit, "no-edit nudge injected");
history.push(Message::user_text(format!(
"You have taken {turns_since_last_edit} turns without editing \
any files. If you have already identified the change you need \
to make, make the edit now rather than continuing to \
investigate. If you are still investigating, you can \
disregard this note."
)));
no_edit_nudge_emitted = true;
no_edit_nudge_armed = false;
recovery.reset_for_new_turn();
break 'inner; }
}
match after_turn_decision_for_loop {
TurnDecision::Continue => {
}
TurnDecision::ContinueWith(msgs) => {
if recovery.forced_continuation_available() {
history.extend(msgs);
recovery.record_forced_continuation();
recovery.reset_for_new_turn();
break 'inner; }
tracing::warn!(
forced_continuations = recovery.forced_continuations(),
"after_turn ContinueWith ignored (cap reached)"
);
}
TurnDecision::Stop => {
stopped_for = StopCondition::HookDenied("after_turn: Stop".into());
break 'outer;
}
}
if turn_was_degenerate
&& settings.input_source.is_none()
&& self.config.empty_turn_nudge_max > 0
{
if empty_turn_nudges < self.config.empty_turn_nudge_max {
empty_turn_nudges += 1;
tracing::warn!(
target: "caliban::recovery",
turn = turn_index,
output_tokens = turn_usage.output_tokens,
empty_turn_nudge = empty_turn_nudges,
"empty-turn nudge injected (no tool call / actionable text)"
);
history.push(Message::user_text(EMPTY_TURN_NUDGE));
recovery.reset_for_new_turn();
break 'inner; }
tracing::warn!(
target: "caliban::recovery",
turn = turn_index,
empty_turn_nudges,
"empty-turn nudge budget exhausted; ending run"
);
} else if !turn_was_degenerate {
empty_turn_nudges = 0;
}
match recovery
.on_stop_reason(
turn_stop_reason,
&self.config,
&mut history,
settings.input_source.as_ref(),
&cancel,
)
.await
{
recovery::RecoveryAction::InjectAndContinue(msgs) => {
history.extend(msgs);
break 'inner;
}
recovery::RecoveryAction::Surrender(stop) => {
stopped_for = stop;
break 'outer;
}
recovery::RecoveryAction::RetryTurn => {
unreachable!("on_stop_reason never yields RetryTurn")
}
}
} }
{
let is_failure = stopped_for.is_failure();
let outcome = RunHookOutcome {
turn_count: turns_completed,
input_tokens: total_usage.input_tokens,
output_tokens: total_usage.output_tokens,
success: !is_failure,
};
let run_ctx = RunCtx {
session_id: &settings.session_id,
workspace_root: &settings.workspace_root,
user_message: user_msg_owned.as_ref(),
prompt_index: settings.prompt_index,
cancel: cancel.clone(),
};
let dispatch = if is_failure {
self.hooks.after_run_failure(&run_ctx, &outcome).await
} else {
self.hooks.after_run(&run_ctx, &outcome).await
};
if let Err(e) = dispatch {
tracing::warn!(error = %e, "after_run* hook error (non-fatal)");
}
}
yield TurnEvent::RunEnd {
final_messages: history,
total_usage,
turn_count: turns_completed,
stopped_for,
turns_without_edit,
no_edit_nudge_emitted,
};
})
}
}