pub mod tool_execution;
pub mod tool_result_processor;
use super::{CostTracker, MessageHandler, ToolProcessor};
use crate::config::Config;
use crate::log_debug;
use crate::providers::ThinkingBlock;
use crate::session::chat::assistant_output::print_assistant_response;
use crate::session::chat::display_thinking;
use crate::session::chat::session::ChatSession;
use crate::session::chat::session_continuation;
use crate::session::ProviderExchange;
use anyhow::Result;
use colored::Colorize;
pub struct ResponseProcessingParams<'a> {
pub content: String,
pub exchange: ProviderExchange,
pub tool_calls: Option<Vec<crate::mcp::McpToolCall>>,
pub thinking: Option<ThinkingBlock>,
pub finish_reason: Option<String>,
pub chat_session: &'a mut ChatSession,
pub config: &'a Config,
pub role: &'a str,
pub operation_cancelled: tokio::sync::watch::Receiver<bool>,
pub is_interactive: bool,
}
impl<'a> ResponseProcessingParams<'a> {
#[allow(clippy::too_many_arguments)]
pub fn new(
content: String,
exchange: ProviderExchange,
tool_calls: Option<Vec<crate::mcp::McpToolCall>>,
finish_reason: Option<String>,
chat_session: &'a mut ChatSession,
config: &'a Config,
role: &'a str,
operation_cancelled: tokio::sync::watch::Receiver<bool>,
) -> Self {
Self {
content,
exchange,
tool_calls,
thinking: None,
finish_reason,
chat_session,
config,
role,
operation_cancelled,
is_interactive: true, }
}
pub fn with_thinking(mut self, thinking: Option<ThinkingBlock>) -> Self {
self.thinking = thinking;
self
}
pub fn with_interactive(mut self, is_interactive: bool) -> Self {
self.is_interactive = is_interactive;
self
}
}
fn log_response_debug(
_config: &Config,
finish_reason: &Option<String>,
tool_calls: &Option<Vec<crate::mcp::McpToolCall>>,
) {
if let Some(ref reason) = finish_reason {
log_debug!("Processing response with finish_reason: {}", reason);
}
if let Some(ref calls) = tool_calls {
log_debug!("Processing {} tool calls", calls.len());
}
}
fn handle_final_response(
content: &str,
thinking: &Option<ThinkingBlock>,
chat_session: &mut ChatSession,
config: &Config,
role: &str,
is_interactive: bool,
) -> Result<()> {
if is_interactive {
if let Some(ref thinking_block) = thinking {
display_thinking(thinking_block);
}
}
chat_session.add_assistant_message(content, None, config, role)?;
if is_interactive {
print_assistant_response(content, config, role, thinking);
}
use std::io::IsTerminal;
if !std::io::stdin().is_terminal() {
CostTracker::display_cost_line(chat_session);
}
Ok(())
}
pub async fn get_tool_server_name_async(tool_name: &str, _config: &Config) -> String {
crate::mcp::tool_map::get_tool_server_name(tool_name).unwrap_or_else(|| "unknown".to_string())
}
async fn display_tool_parameters_only(config: &Config, tool_calls: &[crate::mcp::McpToolCall]) {
if !tool_calls.is_empty() {
log_debug!("Found {} tool calls in response", tool_calls.len());
let is_single_tool = tool_calls.len() == 1;
for (index, call) in tool_calls.iter().enumerate() {
let tool_index = index + 1;
let server_name = get_tool_server_name_async(&call.tool_name, config).await;
let title = if is_single_tool {
format!(
" {} | {} ",
call.tool_name.bright_cyan(),
server_name.bright_blue()
)
} else {
format!(
" [{}] {} | {} ",
tool_index,
call.tool_name.bright_cyan(),
server_name.bright_blue()
)
};
let separator_length = 70.max(title.len() + 4);
let dashes = "─".repeat(separator_length - title.len());
let separator = format!("──{}{}──", title, dashes.dimmed());
println!("{}", separator);
if config.get_log_level().is_debug_enabled() || config.get_log_level().is_info_enabled()
{
display_tool_parameters_full(call, config);
}
if index < tool_calls.len() - 1 {
println!();
}
}
println!();
}
}
pub fn display_tool_parameters_full(tool_call: &crate::mcp::McpToolCall, config: &Config) {
crate::session::chat::tool_display::display_tool_parameters_full(tool_call, config);
}
fn resolve_tool_calls(
current_tool_calls_param: &mut Option<Vec<crate::mcp::McpToolCall>>,
current_content: &str,
) -> Vec<crate::mcp::McpToolCall> {
if let Some(calls) = current_tool_calls_param.take() {
if !calls.is_empty() {
calls
} else {
crate::mcp::parse_tool_calls(current_content) }
} else {
crate::mcp::parse_tool_calls(current_content)
}
}
fn check_cancellation(operation_cancelled: &tokio::sync::watch::Receiver<bool>) -> Result<()> {
if *operation_cancelled.borrow() {
println!("{}", "\nOperation cancelled by user.".bright_yellow());
return Err(anyhow::anyhow!("Operation cancelled"));
}
Ok(())
}
fn add_assistant_message_with_tool_calls(
chat_session: &mut ChatSession,
current_content: &str,
current_exchange: &ProviderExchange,
_config: &Config,
_role: &str,
) -> Result<()> {
let original_tool_calls = MessageHandler::extract_original_tool_calls(current_exchange);
let assistant_message = crate::session::Message {
role: "assistant".to_string(),
content: current_content.to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
cached: false,
tool_calls: original_tool_calls, ..Default::default()
};
chat_session.session.messages.push(assistant_message);
chat_session.last_response = current_content.to_string();
let _ = crate::session::logger::log_assistant_response(
&chat_session.session.info.name,
current_content,
);
let _ = crate::session::logger::log_raw_exchange(current_exchange);
Ok(())
}
pub async fn process_response(params: ResponseProcessingParams<'_>) -> Result<()> {
check_cancellation(¶ms.operation_cancelled)?;
log_response_debug(params.config, ¶ms.finish_reason, ¶ms.tool_calls);
let has_tool_calls = params
.tool_calls
.as_ref()
.is_some_and(|calls| !calls.is_empty());
if params.chat_session.continuation_pending
&& session_continuation::process_continuation_response(
params.chat_session,
¶ms.content,
has_tool_calls,
params.config,
params.role,
)
.await?
{
return process_continuation_message_immediately(params).await;
}
let last_message = params.chat_session.session.messages.last();
if last_message.is_none_or(|msg| msg.role != "user") {
println!(
"{}",
"Warning: User message not found in session. This is unexpected.".yellow()
);
}
let mut tool_processor = ToolProcessor::new();
let mut thinking_displayed = false;
let mut current_content = params.content.clone();
let mut current_exchange = params.exchange;
let mut current_tool_calls_param = params.tool_calls.clone(); let operation_cancelled_ref = ¶ms.operation_cancelled;
loop {
check_cancellation(operation_cancelled_ref)?;
if !params.config.mcp.servers.is_empty() {
let current_tool_calls =
resolve_tool_calls(&mut current_tool_calls_param, ¤t_content);
if !current_tool_calls.is_empty() {
if params.is_interactive && !thinking_displayed {
if let Some(ref thinking_block) = params.thinking {
display_thinking(thinking_block);
thinking_displayed = true;
}
}
if params.is_interactive {
print_assistant_response(
¤t_content,
params.config,
params.role,
¶ms.thinking,
);
}
if params.is_interactive {
display_tool_parameters_only(params.config, ¤t_tool_calls).await;
}
let operation_cancelled_clone = params.operation_cancelled.clone();
if *operation_cancelled_clone.borrow() {
println!("{}", "\nOperation cancelled by user.".bright_yellow());
return Ok(());
}
let (tool_results, total_tool_time_ms) =
match tool_execution::execute_tools_parallel(
current_tool_calls.clone(),
params.chat_session,
params.config,
&mut tool_processor,
operation_cancelled_clone.clone(),
)
.await
{
Ok(results) => results,
Err(e) => {
if e.to_string().contains("cancelled")
|| *operation_cancelled_clone.borrow()
{
println!("{}", "\nOperation cancelled by user.".bright_yellow());
return Ok(());
}
return Err(e);
}
};
if *operation_cancelled_clone.borrow() {
println!("{}", "\nTool execution cancelled.".bright_yellow());
return Ok(());
}
add_assistant_message_with_tool_calls(
params.chat_session,
¤t_content,
¤t_exchange,
params.config,
params.role,
)?;
if !tool_results.is_empty() {
if let Some((new_content, new_exchange, new_tool_calls)) =
tool_result_processor::process_tool_results(
tool_results,
total_tool_time_ms,
params.chat_session,
params.config,
params.role,
operation_cancelled_clone.clone(),
)
.await?
{
current_content = new_content;
current_exchange = new_exchange;
current_tool_calls_param = new_tool_calls;
if current_tool_calls_param.is_some()
&& !current_tool_calls_param.as_ref().unwrap().is_empty()
{
continue;
} else {
let more_tools = crate::mcp::parse_tool_calls(¤t_content);
if !more_tools.is_empty() {
log_debug!(
"Found {} more tool calls to process in content",
more_tools.len()
);
continue;
} else {
break;
}
}
} else {
if params.chat_session.continuation_pending {
log_debug!("Tool processing stopped due to continuation trigger - breaking out of tool loop to handle continuation");
break;
}
return Ok(());
}
} else {
let more_tools = crate::mcp::parse_tool_calls(¤t_content);
if !more_tools.is_empty() {
log_debug!(
"Found {} more tool calls to process (no previous tool results)",
more_tools.len()
);
continue;
} else {
break;
}
}
} else {
break;
}
} else {
break;
}
}
if params.chat_session.continuation_pending {
log_debug!("Continuation pending after tool processing - skipping final response, main session loop will handle continuation");
return Ok(());
}
let thinking_for_final = if thinking_displayed {
None
} else {
params.thinking.clone()
};
handle_final_response(
¤t_content,
&thinking_for_final,
params.chat_session,
params.config,
params.role,
params.is_interactive,
)?;
Ok(())
}
pub async fn process_continuation_message_immediately(
params: ResponseProcessingParams<'_>,
) -> Result<()> {
use crate::session::ChatCompletionWithValidationParams;
use crate::{log_debug, log_info};
log_info!("Processing continuation message automatically...");
let continuation_message = params
.chat_session
.session
.messages
.last()
.ok_or_else(|| anyhow::anyhow!("No continuation message found"))?;
if continuation_message.role != "user" {
return Err(anyhow::anyhow!(
"Expected user continuation message, found: {}",
continuation_message.role
));
}
let messages = params.chat_session.session.messages.clone();
let chat_params = ChatCompletionWithValidationParams::new(
&messages,
¶ms.chat_session.model,
params.chat_session.temperature,
params.chat_session.top_p,
params.chat_session.top_k,
params.chat_session.max_tokens,
params.config,
)
.with_max_retries(params.chat_session.max_retries)
.with_cancellation_token(params.operation_cancelled.clone())
.as_continuation_call();
match crate::session::chat_completion_with_validation(chat_params).await {
Ok(response) => {
log_debug!("Continuation API call successful");
let continuation_params = ResponseProcessingParams::new(
response.content,
response.exchange,
response.tool_calls,
response.finish_reason,
params.chat_session,
params.config,
params.role,
params.operation_cancelled.clone(),
)
.with_interactive(params.is_interactive);
Box::pin(process_response(continuation_params)).await
}
Err(e) => {
log_info!(
"Continuation API call failed after exhausting retries: {}",
e
);
params.chat_session.continuation_pending = false;
log_debug!("Continuation state reset due to API failure - breaking continuation loop");
Err(e)
}
}
}