use crate::config::Config;
use crate::session::chat::session::ChatSession;
use crate::session::ChatCompletionWithValidationParams;
use crate::{log_debug, log_info};
use anyhow::Result;
use colored::Colorize;
pub async fn process_tool_results(
tool_results: Vec<crate::mcp::McpToolResult>,
total_tool_time_ms: u64,
chat_session: &mut ChatSession,
config: &Config,
role: &str,
operation_cancelled: tokio::sync::watch::Receiver<bool>,
) -> Result<
Option<(
String,
crate::session::ProviderExchange,
Option<Vec<crate::mcp::McpToolCall>>,
Option<String>, // response_id from follow-up API call
Option<crate::providers::ThinkingBlock>, // thinking from follow-up API call
)>,
> {
chat_session.session.info.total_tool_time_ms += total_tool_time_ms;
if *operation_cancelled.borrow() {
crate::log_debug!("Operation cancelled by user.");
return Ok(None);
}
use crate::session::chat::get_animation_manager;
let animation_manager = get_animation_manager();
if !animation_manager.is_suspended() {
animation_manager
.start_animation(
&crate::config::with_thread_config(|c| c.output_mode())
.unwrap_or(crate::session::output::OutputMode::NonInteractive),
)
.await;
} else {
log_debug!("Animation suspended during tool result processing - not restarting");
}
let processing_start = std::time::Instant::now();
let cache_manager = crate::session::cache::CacheManager::new();
let supports_caching = crate::session::model_supports_caching(&chat_session.model);
let mut cache_check_time = 0u128;
let mut accumulated_content_size = 0usize;
for tool_result in &tool_results {
let tool_content = extract_tool_content(tool_result);
let (tool_content, was_truncated) = crate::utils::truncation::truncate_mcp_response_global(
&tool_content,
config.mcp_response_tokens_threshold,
);
if was_truncated {
let is_terminal = crate::config::with_thread_config(|c| c.output_mode())
.unwrap_or(crate::session::output::OutputMode::NonInteractive)
.is_terminal_mode();
if is_terminal {
use colored::Colorize;
eprintln!(
"{}",
format!(
"⚠️ Tool '{}' response truncated to {} tokens (mcp_response_tokens_threshold)",
tool_result.tool_name, config.mcp_response_tokens_threshold
)
.bright_yellow()
);
}
}
let content_size = tool_content.len();
accumulated_content_size += content_size;
let accumulated_is_large = accumulated_content_size > 50000;
chat_session.add_tool_message(
&tool_content,
&tool_result.tool_id,
&tool_result.tool_name,
config,
)?;
if accumulated_is_large {
if let Err(e) =
crate::session::chat::conversation_compression::check_and_compress_conversation(
chat_session,
config,
operation_cancelled.clone(),
false,
)
.await
{
log_debug!(
"Conversation compression failed during large tool output: {}. Continuing.",
e
);
}
accumulated_content_size = 0;
}
}
let mut plan_tool_executed = false;
for tool_result in &tool_results {
if tool_result.tool_name == "plan" {
plan_tool_executed = true;
break;
}
}
if plan_tool_executed {
if crate::mcp::core::plan::core::get_current_task_start_index().is_none()
&& crate::mcp::core::plan::core::has_active_plan()
{
let message_count = chat_session.get_message_count();
if message_count == 0 {
crate::log_debug!("Cannot set start_index: no messages in session");
} else {
let start_index = message_count - 1; crate::mcp::core::plan::set_current_task_start_index(start_index);
crate::log_debug!(
"Plan task start_index set to: {} (last message index, total messages: {})",
start_index,
message_count
);
}
}
if crate::mcp::core::plan::has_pending_compression() {
if let Some(start_index) = crate::mcp::core::plan::core::get_and_clear_start_index() {
let end_index = chat_session.get_message_count() - 1;
crate::log_debug!(
"Setting compression range: start={}, end={} (total messages: {})",
start_index,
end_index,
chat_session.get_message_count()
);
if let Err(e) =
crate::mcp::core::plan::set_pending_compression_range(start_index, end_index)
{
log_info!(
"Compression range could not be set: {}. Compression will be skipped.",
e
);
}
} else {
log_info!(
"Plan compression pending but no start_index found. \
This may indicate plan(next) was called before any task work was done."
);
}
}
}
if crate::mcp::core::plan::has_pending_project_compression() {
log_debug!(
"Deferring plan compression to after assistant response (project compression pending)"
);
} else {
let _task_compression_occurred =
match crate::mcp::core::plan::process_pending_compression(chat_session).await {
Ok(Some(metrics)) => {
chat_session.session.info.compression_stats.add_compression(
crate::session::CompressionKind::Task,
metrics.messages_removed,
metrics.tokens_saved,
);
crate::session::chat::cost_tracker::CostTracker::display_compression_result(
"Task", &metrics,
);
true
}
Ok(None) => false,
Err(e) => {
log_info!(
"Task compression skipped: {}. Context was not compressed.",
e
);
false
}
};
let _phase_compression_occurred =
match crate::mcp::core::plan::process_pending_phase_compression(chat_session).await {
Ok(Some(metrics)) => {
chat_session.session.info.compression_stats.add_compression(
crate::session::CompressionKind::Phase,
metrics.messages_removed,
metrics.tokens_saved,
);
crate::session::chat::cost_tracker::CostTracker::display_compression_result(
"Phase", &metrics,
);
true
}
Ok(None) => false,
Err(e) => {
log_info!(
"Phase compression skipped: {}. Context was not compressed.",
e
);
false
}
};
}
if let Err(e) = crate::session::chat::conversation_compression::check_and_compress_conversation(
chat_session,
config,
operation_cancelled.clone(),
false,
)
.await
{
log_debug!(
"Adaptive conversation compression failed during tool processing: {}. Continuing session.",
e
);
}
let session_id = &chat_session.session.info.name;
if crate::session::context::take_skill_compress_request(session_id) {
if let Err(e) =
crate::session::chat::conversation_compression::check_and_compress_conversation(
chat_session,
config,
operation_cancelled.clone(),
true,
)
.await
{
log_debug!("Skill forget compression failed: {}. Continuing.", e);
}
}
let cache_start = std::time::Instant::now();
if let Ok(true) = cache_manager.check_and_apply_auto_cache_threshold(
&mut chat_session.session,
config,
supports_caching,
role,
) {
log_debug!("Auto-cache threshold reached after processing all tool results - cache checkpoint applied before follow-up API request.");
}
cache_check_time += cache_start.elapsed().as_millis();
let total_processing_time = processing_start.elapsed().as_millis() as u64;
chat_session.session.info.total_layer_time_ms += total_processing_time;
if total_processing_time > 100 {
log_debug!(
"🔍 Tool result processing took {}ms (cache: {}ms)",
total_processing_time,
cache_check_time
);
}
match chat_session.check_spending_threshold(config) {
Ok(should_continue) => {
if !should_continue {
animation_manager.stop_current().await;
println!(
"{}",
"✗ Tool follow-up cancelled due to spending threshold.".bright_red()
);
return Ok(None);
}
}
Err(e) => {
use colored::*;
println!(
"{}: {}",
"Warning: Error checking spending threshold".bright_yellow(),
e
);
}
}
match chat_session.check_request_spending_threshold(config) {
Ok(should_continue) => {
if !should_continue {
animation_manager.stop_current().await;
println!(
"{}",
"✗ Tool follow-up cancelled due to request spending threshold.".bright_red()
);
return Ok(None);
}
}
Err(e) => {
use colored::*;
println!(
"{}: {}",
"Warning: Error checking request spending threshold".bright_yellow(),
e
);
}
}
if *operation_cancelled.borrow() {
animation_manager.stop_current().await;
crate::log_debug!("Operation cancelled by user.");
return Ok(None);
}
let hints = crate::mcp::hint_accumulator::drain_hints();
if !hints.is_empty() {
let bullet_list = hints
.iter()
.map(|h| format!("• {h}"))
.collect::<Vec<_>>()
.join("\n");
let hint_message = format!(
"⚠️ Tool usage notice:\n{bullet_list}\n\nPlease prefer the recommended tools going forward."
);
chat_session.session.messages.push(crate::session::Message {
role: "user".to_string(),
content: hint_message,
..Default::default()
});
}
let follow_up_result =
make_follow_up_api_call(chat_session, config, operation_cancelled.clone()).await;
use crate::session::chat::cost_tracker::CostTracker;
CostTracker::display_intermediate_cost_breakdown(chat_session);
match follow_up_result {
Ok(response) => {
let has_more_tools = if let Some(ref calls) = response.tool_calls {
!calls.is_empty()
} else {
!crate::mcp::parse_tool_calls(&response.content).is_empty()
};
if let Some(ref reason) = response.finish_reason {
log_debug!("Follow-up finish_reason: {}", reason);
}
let should_continue_conversation =
check_should_continue(&response, config, has_more_tools);
handle_follow_up_cost_tracking(chat_session, &response.exchange, config);
let current_cost = chat_session.session.info.total_cost;
let current_context_tokens = chat_session.get_full_context_tokens(config).await as u64;
animation_manager.get_state().update_cost(current_cost);
animation_manager
.get_state()
.update_context_tokens(current_context_tokens);
display_rate_limit_info(&response.exchange);
if should_continue_conversation {
Ok(Some((
response.content,
response.exchange,
response.tool_calls,
response.response_id, response.thinking, )))
} else {
animation_manager.stop_current().await;
Ok(Some((
response.content,
response.exchange,
None,
response.response_id,
response.thinking, )))
}
}
Err(e) => {
let provider_name = if let Ok((provider, _)) =
crate::providers::ProviderFactory::parse_model(&chat_session.model)
{
provider
} else {
"unknown provider".to_string()
};
let error_message =
crate::session::chat::session::format_provider_error(&provider_name, &e);
println!(
"\n{} {}: {}",
"✗".bright_red(),
format!("Error calling {}", provider_name).bright_red(),
error_message
);
log_debug!("Model: {}", chat_session.model);
log_debug!("Temperature: {}", chat_session.temperature);
animation_manager.stop_current().await;
Err(e)
}
}
}
fn extract_tool_content(tool_result: &crate::mcp::McpToolResult) -> String {
tool_result.extract_content()
}
async fn make_follow_up_api_call(
chat_session: &ChatSession,
config: &Config,
cancellation_token: tokio::sync::watch::Receiver<bool>,
) -> Result<crate::providers::ProviderResponse> {
let model = chat_session.model.clone();
let temperature = chat_session.temperature;
let validation_params = ChatCompletionWithValidationParams::new(
&chat_session.session.messages,
&model,
temperature,
chat_session.top_p,
chat_session.top_k,
chat_session.max_tokens,
config,
)
.with_max_retries(chat_session.max_retries)
.with_cancellation_token(cancellation_token);
crate::session::chat_completion_with_validation(validation_params).await
}
pub fn check_should_continue(
response: &crate::providers::ProviderResponse,
_config: &Config,
has_more_tools: bool,
) -> bool {
match response.finish_reason.as_deref() {
Some("tool_calls") | Some("tool_use") => {
log_debug!("finish_reason is 'tool_calls', continuing conversation");
true
}
Some("stop") | Some("length") | Some("end_turn") => {
log_debug!(
"finish_reason is '{}', ending conversation",
response.finish_reason.as_deref().unwrap()
);
false
}
Some(other) => {
log_info!("Unknown finish_reason '{}', continuing conversation", other);
true
}
None => {
log_debug!("Debug: No finish_reason, checking for tool calls");
has_more_tools
}
}
}
fn handle_follow_up_cost_tracking(
chat_session: &mut ChatSession,
exchange: &crate::session::ProviderExchange,
_config: &Config,
) {
if let Some(usage) = &exchange.usage {
chat_session.session.info.total_api_calls += 1;
let cache_manager = crate::session::cache::CacheManager::new();
cache_manager.update_token_tracking(
&mut chat_session.session,
usage.input_tokens, usage.output_tokens,
usage.cache_read_tokens,
usage.cache_write_tokens,
usage.reasoning_tokens,
);
if let Some(api_time_ms) = usage.request_time_ms {
chat_session.session.info.total_api_time_ms += api_time_ms;
}
if let Some(cost) = usage.cost {
chat_session.session.info.total_cost += cost;
chat_session.estimated_cost = chat_session.session.info.total_cost;
log_debug!(
"Adding ${:.5} to total cost (total now: ${:.5})",
cost,
chat_session.session.info.total_cost
);
log_debug!("Tool response usage detail:");
if let Ok(usage_str) = serde_json::to_string_pretty(usage) {
log_debug!("{}", usage_str);
}
if let Some(raw_usage) = exchange.response.get("usage") {
log_debug!("Raw tool response usage object:");
if let Ok(raw_str) = serde_json::to_string_pretty(raw_usage) {
log_debug!("{}", raw_str);
}
if let Some(cache_cost) = raw_usage.get("cache_cost") {
log_debug!("Found cache_cost field: {}", cache_cost);
}
if let Some(cached_cost) = raw_usage.get("cached_cost") {
log_debug!("Found cached_cost field: {}", cached_cost);
}
if let Some(any_cache) = raw_usage.get("cached") {
log_debug!("Found cached field: {}", any_cache);
}
}
} else {
let cost_from_raw = exchange
.response
.get("usage")
.and_then(|u| u.get("cost"))
.and_then(|c| c.as_f64());
if let Some(cost) = cost_from_raw {
chat_session.session.info.total_cost += cost;
chat_session.estimated_cost = chat_session.session.info.total_cost;
log_debug!(
"Using cost ${:.5} from raw response (total now: ${:.5})",
cost,
chat_session.session.info.total_cost
);
let provider_name = &exchange.provider;
log_debug!(
"{} did not provide cost data for tool response API call",
provider_name
);
if provider_name == "openrouter" {
let has_usage_flag = exchange
.request
.get("usage")
.and_then(|u| u.get("include"))
.and_then(|i| i.as_bool())
.unwrap_or(false);
log_debug!(
"{} request had usage.include flag: {}",
provider_name,
has_usage_flag
);
if !has_usage_flag {
log_debug!(
"Make sure usage.include=true is set for {} to get cost data",
provider_name
);
}
}
log_debug!("Raw {} response for debugging:", provider_name);
if let Ok(resp_str) = serde_json::to_string_pretty(&exchange.response) {
log_debug!("Partial response JSON:\n{}", resp_str);
}
}
}
} else {
println!(
"{}",
"ERROR: No usage data for tool response API call".bright_red()
);
}
}
fn display_rate_limit_info(exchange: &crate::session::ProviderExchange) {
if let Some(ref rate_limit_headers) = exchange.rate_limit_headers {
let mut rate_limit_info = Vec::new();
match exchange.provider.as_str() {
"anthropic" => {
if let (Some(tokens_remaining), Some(tokens_limit)) = (
rate_limit_headers.get("tokens_remaining"),
rate_limit_headers.get("tokens_limit"),
) {
rate_limit_info.push(format!("Tokens: {}/{}", tokens_remaining, tokens_limit));
}
if let (Some(input_remaining), Some(input_limit)) = (
rate_limit_headers.get("input_tokens_remaining"),
rate_limit_headers.get("input_tokens_limit"),
) {
rate_limit_info
.push(format!("Input tokens: {}/{}", input_remaining, input_limit));
}
if let (Some(output_remaining), Some(output_limit)) = (
rate_limit_headers.get("output_tokens_remaining"),
rate_limit_headers.get("output_tokens_limit"),
) {
rate_limit_info.push(format!(
"Output tokens: {}/{}",
output_remaining, output_limit
));
}
if !rate_limit_info.is_empty() {
crate::log_info!("📊 Anthropic rate limits: {}", rate_limit_info.join(" | "));
}
}
"openai" => {
if let (Some(requests_remaining), Some(requests_limit)) = (
rate_limit_headers.get("requests_remaining"),
rate_limit_headers.get("requests_limit"),
) {
rate_limit_info.push(format!(
"Requests: {}/{}",
requests_remaining, requests_limit
));
}
if let (Some(tokens_remaining), Some(tokens_limit)) = (
rate_limit_headers.get("tokens_remaining"),
rate_limit_headers.get("tokens_limit"),
) {
rate_limit_info.push(format!("Tokens: {}/{}", tokens_remaining, tokens_limit));
}
if let Some(request_reset) = rate_limit_headers.get("request_reset") {
rate_limit_info.push(format!("Request reset: {}", request_reset));
}
if !rate_limit_info.is_empty() {
crate::log_info!("📊 OpenAI rate limits: {}", rate_limit_info.join(" | "));
}
}
_ => {
if !rate_limit_headers.is_empty() {
let info: Vec<String> = rate_limit_headers
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect();
crate::log_info!("📊 {} rate limits: {}", exchange.provider, info.join(" | "));
}
}
}
}
}