use crate::config::Config;
use crate::mcp::get_available_functions;
use crate::session::chat::animation::show_smart_animation;
use crate::session::chat::session::ChatSession;
use crate::session::estimate_full_context_tokens;
use crate::session::ChatCompletionWithValidationParams;
use crate::{log_debug, log_info};
use anyhow::Result;
use colored::Colorize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
pub async fn process_tool_results(
tool_results: Vec<crate::mcp::McpToolResult>,
total_tool_time_ms: u64,
chat_session: &mut ChatSession,
config: &Config,
role: &str,
operation_cancelled: tokio::sync::watch::Receiver<bool>,
) -> Result<
Option<(
String,
crate::session::ProviderExchange,
Option<Vec<crate::mcp::McpToolCall>>,
Option<String>, // response_id from follow-up API call
Option<crate::providers::ThinkingBlock>, // thinking from follow-up API call
)>,
> {
chat_session.session.info.total_tool_time_ms += total_tool_time_ms;
if *operation_cancelled.borrow() {
crate::log_debug!("Operation cancelled by user.");
return Ok(None);
}
let animation_cancel = Arc::new(AtomicBool::new(false));
let animation_cancel_monitor = animation_cancel.clone();
let operation_cancelled_monitor = operation_cancelled.clone();
let _cancel_monitor = tokio::spawn(async move {
while !animation_cancel_monitor.load(Ordering::SeqCst) {
if *operation_cancelled_monitor.borrow() {
animation_cancel_monitor.store(true, Ordering::SeqCst);
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
}
});
let animation_cancel_flag = animation_cancel.clone();
let current_cost = chat_session.session.info.total_cost;
let max_threshold = config.max_session_tokens_threshold;
let (_, _, _, _, system_prompt) = config.get_role_config(role);
let tools = get_available_functions(config).await;
let current_context_tokens = estimate_full_context_tokens(
&chat_session.session.messages,
Some(system_prompt),
Some(&tools),
) as u64;
let animation_task = tokio::spawn(async move {
let _ = show_smart_animation(
animation_cancel_flag,
current_cost,
current_context_tokens,
max_threshold,
)
.await;
});
let processing_start = std::time::Instant::now();
let cache_manager = crate::session::cache::CacheManager::new();
let supports_caching = crate::session::model_supports_caching(&chat_session.model);
let mut cache_check_time = 0u128;
let mut truncation_time = 0u128;
let mut accumulated_content_size = 0;
let mut needs_truncation_check = false;
for tool_result in &tool_results {
let tool_content = extract_tool_content(tool_result);
let content_size = tool_content.len();
accumulated_content_size += content_size;
let is_large_output = content_size > 10000; let accumulated_is_large = accumulated_content_size > 50000;
if is_large_output || accumulated_is_large {
needs_truncation_check = true;
}
chat_session.add_tool_message(
&tool_content,
&tool_result.tool_id,
&tool_result.tool_name,
config,
)?;
if is_large_output {
let truncation_start = std::time::Instant::now();
if let Err(e) = crate::session::chat::context_truncation::check_and_truncate_context(
chat_session,
config,
crate::session::chat::TruncationOptions {
defer_continuation: true, },
)
.await
{
log_info!("Warning: Error during tool result truncation check: {}", e);
}
truncation_time += truncation_start.elapsed().as_millis();
needs_truncation_check = false;
accumulated_content_size = 0;
}
}
let mut plan_tool_executed = false;
for tool_result in &tool_results {
if tool_result.tool_name == "plan" {
plan_tool_executed = true;
break;
}
}
if plan_tool_executed && crate::mcp::dev::plan::has_pending_compression() {
if let Some(start_index) = crate::mcp::dev::plan::core::get_and_clear_start_index() {
let end_index = chat_session.get_message_count() - 1;
if let Err(e) =
crate::mcp::dev::plan::set_pending_compression_range(start_index, end_index)
{
log_info!(
"Failed to set compression range: {}. Compression will be skipped.",
e
);
}
} else {
log_info!(
"Plan tool executed but no start index found for compression. \
This may indicate the start index was not set before tool execution."
);
}
}
let task_compression_occurred =
match crate::mcp::dev::plan::process_pending_compression(chat_session).await {
Ok(Some(metrics)) => {
chat_session
.session
.info
.compression_stats
.add_task_compression(metrics.messages_removed, metrics.tokens_saved);
crate::session::chat::cost_tracker::CostTracker::display_compression_result(
"Task", &metrics,
);
true
}
Ok(None) => {
false
}
Err(e) => {
log_info!(
"❌ Task compression failed: {}. Context was not compressed.",
e
);
false
}
};
if task_compression_occurred && crate::mcp::dev::plan::core::has_active_plan() {
log_debug!("Task compression completed with active plan - checking if continuation needed");
use crate::session::chat::session_continuation;
if session_continuation::check_and_handle_continuation(chat_session, config).await? {
animation_cancel.store(true, Ordering::SeqCst);
let _ = animation_task.await;
log_info!("Continuation triggered after task compression - returning to main loop");
return Ok(None);
}
}
let phase_compression_occurred =
match crate::mcp::dev::plan::process_pending_phase_compression(chat_session).await {
Ok(Some(metrics)) => {
chat_session
.session
.info
.compression_stats
.add_phase_compression(metrics.messages_removed, metrics.tokens_saved);
crate::session::chat::cost_tracker::CostTracker::display_compression_result(
"Phase", &metrics,
);
true
}
Ok(None) => false,
Err(e) => {
log_info!(
"❌ Phase compression failed: {}. Context was not compressed.",
e
);
false
}
};
if phase_compression_occurred && crate::mcp::dev::plan::core::has_active_plan() {
log_debug!(
"Phase compression completed with active plan - checking if continuation needed"
);
use crate::session::chat::session_continuation;
if session_continuation::check_and_handle_continuation(chat_session, config).await? {
animation_cancel.store(true, Ordering::SeqCst);
let _ = animation_task.await;
log_info!("Continuation triggered after phase compression - returning to main loop");
return Ok(None);
}
}
let project_compression_occurred =
match crate::mcp::dev::plan::process_pending_project_compression(chat_session).await {
Ok(Some(metrics)) => {
chat_session
.session
.info
.compression_stats
.add_project_compression(metrics.messages_removed, metrics.tokens_saved);
crate::session::chat::cost_tracker::CostTracker::display_compression_result(
"Project", &metrics,
);
true
}
Ok(None) => false,
Err(e) => {
log_info!(
"❌ Project compression failed: {}. Context was not compressed.",
e
);
false
}
};
if project_compression_occurred && crate::mcp::dev::plan::core::has_active_plan() {
log_debug!(
"Project compression completed with active plan - checking if continuation needed"
);
use crate::session::chat::session_continuation;
if session_continuation::check_and_handle_continuation(chat_session, config).await? {
animation_cancel.store(true, Ordering::SeqCst);
let _ = animation_task.await;
log_info!("Continuation triggered after project compression - returning to main loop");
return Ok(None);
}
}
if needs_truncation_check {
let truncation_start = std::time::Instant::now();
if let Err(e) = crate::session::chat::context_truncation::check_and_truncate_context(
chat_session,
config,
crate::session::chat::TruncationOptions {
defer_continuation: true, },
)
.await
{
log_info!("Warning: Error during batch truncation check: {}", e);
}
truncation_time += truncation_start.elapsed().as_millis();
}
if let Err(e) = crate::session::chat::conversation_compression::check_and_compress_conversation(
chat_session,
config,
)
.await
{
log_debug!(
"Adaptive conversation compression failed during tool processing: {}. Continuing session.",
e
);
}
use crate::session::chat::session_continuation;
if session_continuation::check_and_handle_continuation(chat_session, config).await? {
animation_cancel.store(true, Ordering::SeqCst);
let _ = animation_task.await;
log_info!("Token limit reached after processing all tool results - continuation triggered");
return Ok(None); }
let cache_start = std::time::Instant::now();
if let Ok(true) = cache_manager.check_and_apply_auto_cache_threshold(
&mut chat_session.session,
config,
supports_caching,
role,
) {
log_debug!("Auto-cache threshold reached after processing all tool results - cache checkpoint applied before follow-up API request.");
}
cache_check_time += cache_start.elapsed().as_millis();
let total_processing_time = processing_start.elapsed().as_millis() as u64;
chat_session.session.info.total_layer_time_ms += total_processing_time;
if total_processing_time > 100 {
log_debug!(
"🔍 Tool result processing took {}ms (cache: {}ms, truncation: {}ms)",
total_processing_time,
cache_check_time,
truncation_time
);
}
match chat_session.check_spending_threshold(config) {
Ok(should_continue) => {
if !should_continue {
animation_cancel.store(true, Ordering::SeqCst);
let _ = animation_task.await;
println!(
"{}",
"✗ Tool follow-up cancelled due to spending threshold.".bright_red()
);
return Ok(None);
}
}
Err(e) => {
use colored::*;
println!(
"{}: {}",
"Warning: Error checking spending threshold".bright_yellow(),
e
);
}
}
match chat_session.check_request_spending_threshold(config) {
Ok(should_continue) => {
if !should_continue {
animation_cancel.store(true, Ordering::SeqCst);
let _ = animation_task.await;
println!(
"{}",
"✗ Tool follow-up cancelled due to request spending threshold.".bright_red()
);
return Ok(None);
}
}
Err(e) => {
use colored::*;
println!(
"{}: {}",
"Warning: Error checking request spending threshold".bright_yellow(),
e
);
}
}
if *operation_cancelled.borrow() {
animation_cancel.store(true, Ordering::SeqCst);
let _ = animation_task.await;
crate::log_debug!("Operation cancelled by user.");
return Ok(None);
}
let follow_up_result =
make_follow_up_api_call(chat_session, config, operation_cancelled.clone()).await;
animation_cancel.store(true, Ordering::SeqCst);
let _ = animation_task.await;
use crate::session::chat::cost_tracker::CostTracker;
CostTracker::display_intermediate_cost_breakdown(chat_session);
match follow_up_result {
Ok(response) => {
let has_more_tools = if let Some(ref calls) = response.tool_calls {
!calls.is_empty()
} else {
!crate::mcp::parse_tool_calls(&response.content).is_empty()
};
if let Some(ref reason) = response.finish_reason {
log_debug!("Follow-up finish_reason: {}", reason);
}
let should_continue_conversation =
check_should_continue(&response, config, has_more_tools);
handle_follow_up_cost_tracking(chat_session, &response.exchange, config);
display_rate_limit_info(&response.exchange);
if should_continue_conversation {
Ok(Some((
response.content,
response.exchange,
response.tool_calls,
response.response_id, response.thinking, )))
} else {
Ok(Some((
response.content,
response.exchange,
None,
response.response_id,
response.thinking, )))
}
}
Err(e) => {
let provider_name = if let Ok((provider, _)) =
crate::providers::ProviderFactory::parse_model(&chat_session.model)
{
provider
} else {
"unknown provider".to_string()
};
let error_message =
crate::session::chat::session::format_provider_error(&provider_name, &e);
println!(
"\n{} {}: {}",
"✗".bright_red(),
format!("Error calling {}", provider_name).bright_red(),
error_message
);
log_debug!("Model: {}", chat_session.model);
log_debug!("Temperature: {}", chat_session.temperature);
Err(e)
}
}
}
fn extract_tool_content(tool_result: &crate::mcp::McpToolResult) -> String {
if let Some(output) = tool_result.result.get("output") {
if let Some(output_str) = output.as_str() {
output_str.to_string()
} else {
serde_json::to_string(output).unwrap_or_default()
}
} else if tool_result.result.is_string() {
tool_result.result.as_str().unwrap_or("").to_string()
} else {
if let Some(error) = tool_result.result.get("error") {
format!("Error: {}", error)
} else {
serde_json::to_string(&tool_result.result).unwrap_or_default()
}
}
}
async fn make_follow_up_api_call(
chat_session: &ChatSession,
config: &Config,
cancellation_token: tokio::sync::watch::Receiver<bool>,
) -> Result<crate::providers::ProviderResponse> {
let model = chat_session.model.clone();
let temperature = chat_session.temperature;
let validation_params = ChatCompletionWithValidationParams::new(
&chat_session.session.messages,
&model,
temperature,
chat_session.top_p,
chat_session.top_k,
chat_session.max_tokens,
config,
)
.with_max_retries(chat_session.max_retries)
.with_cancellation_token(cancellation_token);
crate::session::chat_completion_with_validation(validation_params).await
}
pub fn check_should_continue(
response: &crate::providers::ProviderResponse,
_config: &Config,
has_more_tools: bool,
) -> bool {
match response.finish_reason.as_deref() {
Some("tool_calls") | Some("tool_use") => {
log_debug!("finish_reason is 'tool_calls', continuing conversation");
true
}
Some("stop") | Some("length") | Some("end_turn") => {
log_debug!(
"finish_reason is '{}', ending conversation",
response.finish_reason.as_deref().unwrap()
);
false
}
Some(other) => {
log_info!("Unknown finish_reason '{}', continuing conversation", other);
true
}
None => {
log_debug!("Debug: No finish_reason, checking for tool calls");
has_more_tools
}
}
}
fn handle_follow_up_cost_tracking(
chat_session: &mut ChatSession,
exchange: &crate::session::ProviderExchange,
_config: &Config,
) {
if let Some(usage) = &exchange.usage {
let cached_tokens = usage.cached_tokens;
let regular_prompt_tokens = usage.prompt_tokens.saturating_sub(cached_tokens);
let cache_manager = crate::session::cache::CacheManager::new();
cache_manager.update_token_tracking(
&mut chat_session.session,
regular_prompt_tokens,
usage.output_tokens,
cached_tokens,
);
if let Some(api_time_ms) = usage.request_time_ms {
chat_session.session.info.total_api_time_ms += api_time_ms;
}
if let Some(cost) = usage.cost {
chat_session.session.info.total_cost += cost;
chat_session.estimated_cost = chat_session.session.info.total_cost;
log_debug!(
"Adding ${:.5} to total cost (total now: ${:.5})",
cost,
chat_session.session.info.total_cost
);
let _ = crate::session::logger::log_session_stats(
&chat_session.session.info.name,
&chat_session.session.info,
);
log_debug!("Tool response usage detail:");
if let Ok(usage_str) = serde_json::to_string_pretty(usage) {
log_debug!("{}", usage_str);
}
if let Some(raw_usage) = exchange.response.get("usage") {
log_debug!("Raw tool response usage object:");
if let Ok(raw_str) = serde_json::to_string_pretty(raw_usage) {
log_debug!("{}", raw_str);
}
if let Some(cache_cost) = raw_usage.get("cache_cost") {
log_debug!("Found cache_cost field: {}", cache_cost);
}
if let Some(cached_cost) = raw_usage.get("cached_cost") {
log_debug!("Found cached_cost field: {}", cached_cost);
}
if let Some(any_cache) = raw_usage.get("cached") {
log_debug!("Found cached field: {}", any_cache);
}
}
} else {
let cost_from_raw = exchange
.response
.get("usage")
.and_then(|u| u.get("cost"))
.and_then(|c| c.as_f64());
if let Some(cost) = cost_from_raw {
chat_session.session.info.total_cost += cost;
chat_session.estimated_cost = chat_session.session.info.total_cost;
log_debug!(
"Using cost ${:.5} from raw response (total now: ${:.5})",
cost,
chat_session.session.info.total_cost
);
let _ = crate::session::logger::log_session_stats(
&chat_session.session.info.name,
&chat_session.session.info,
);
} else {
let provider_name = &exchange.provider;
println!(
"{} {} {}",
"ERROR:".bright_red(),
provider_name.bright_yellow(),
"did not provide cost data for tool response API call".bright_red()
);
if provider_name == "openrouter" {
let has_usage_flag = exchange
.request
.get("usage")
.and_then(|u| u.get("include"))
.and_then(|i| i.as_bool())
.unwrap_or(false);
println!(
"{} {}",
"Request had usage.include flag:".bright_yellow(),
has_usage_flag
);
if !has_usage_flag {
println!(
"{}",
"Make sure usage.include=true is set for OpenRouter!".bright_yellow()
);
}
}
log_debug!("Raw {} response for debugging:", provider_name);
if let Ok(resp_str) = serde_json::to_string_pretty(&exchange.response) {
log_debug!("Partial response JSON:\n{}", resp_str);
}
}
}
} else {
println!(
"{}",
"ERROR: No usage data for tool response API call".bright_red()
);
}
}
fn display_rate_limit_info(exchange: &crate::session::ProviderExchange) {
if let Some(ref rate_limit_headers) = exchange.rate_limit_headers {
let mut rate_limit_info = Vec::new();
match exchange.provider.as_str() {
"anthropic" => {
if let (Some(tokens_remaining), Some(tokens_limit)) = (
rate_limit_headers.get("tokens_remaining"),
rate_limit_headers.get("tokens_limit"),
) {
rate_limit_info.push(format!("Tokens: {}/{}", tokens_remaining, tokens_limit));
}
if let (Some(input_remaining), Some(input_limit)) = (
rate_limit_headers.get("input_tokens_remaining"),
rate_limit_headers.get("input_tokens_limit"),
) {
rate_limit_info
.push(format!("Input tokens: {}/{}", input_remaining, input_limit));
}
if let (Some(output_remaining), Some(output_limit)) = (
rate_limit_headers.get("output_tokens_remaining"),
rate_limit_headers.get("output_tokens_limit"),
) {
rate_limit_info.push(format!(
"Output tokens: {}/{}",
output_remaining, output_limit
));
}
if !rate_limit_info.is_empty() {
crate::log_info!("📊 Anthropic rate limits: {}", rate_limit_info.join(" | "));
}
}
"openai" => {
if let (Some(requests_remaining), Some(requests_limit)) = (
rate_limit_headers.get("requests_remaining"),
rate_limit_headers.get("requests_limit"),
) {
rate_limit_info.push(format!(
"Requests: {}/{}",
requests_remaining, requests_limit
));
}
if let (Some(tokens_remaining), Some(tokens_limit)) = (
rate_limit_headers.get("tokens_remaining"),
rate_limit_headers.get("tokens_limit"),
) {
rate_limit_info.push(format!("Tokens: {}/{}", tokens_remaining, tokens_limit));
}
if let Some(request_reset) = rate_limit_headers.get("request_reset") {
rate_limit_info.push(format!("Request reset: {}", request_reset));
}
if !rate_limit_info.is_empty() {
crate::log_info!("📊 OpenAI rate limits: {}", rate_limit_info.join(" | "));
}
}
_ => {
if !rate_limit_headers.is_empty() {
let info: Vec<String> = rate_limit_headers
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect();
crate::log_info!("📊 {} rate limits: {}", exchange.provider, info.join(" | "));
}
}
}
}
}