pub mod tool_execution;
pub mod tool_result_processor;
use super::{CostTracker, MessageHandler, ToolProcessor};
use crate::config::Config;
use crate::providers::ThinkingBlock;
use crate::session::chat::assistant_output::print_assistant_response;
use crate::session::chat::display_thinking;
use crate::session::chat::session::ChatSession;
use crate::session::ProviderExchange;
use crate::{log_debug, log_info};
use anyhow::Result;
use colored::Colorize;
use crate::session::output::{OutputMode, OutputSink};
use crate::websocket::{
AssistantPayload, CostPayload, ServerMessage, ThinkingPayload, ToolResultPayload,
ToolUsePayload,
};
pub struct ResponseProcessingParams<'a, S: OutputSink> {
pub content: String,
pub exchange: ProviderExchange,
pub tool_calls: Option<Vec<crate::mcp::McpToolCall>>,
pub thinking: Option<ThinkingBlock>,
pub finish_reason: Option<String>,
pub response_id: Option<String>,
pub chat_session: &'a mut ChatSession,
pub config: &'a Config,
pub role: &'a str,
pub operation_cancelled: tokio::sync::watch::Receiver<bool>,
pub sink: S,
pub mode: OutputMode,
}
impl<'a, S: OutputSink> ResponseProcessingParams<'a, S> {
pub fn with_thinking(mut self, thinking: Option<ThinkingBlock>) -> Self {
self.thinking = thinking;
self
}
pub fn with_mode(mut self, mode: OutputMode) -> Self {
self.mode = mode;
self
}
pub fn emit(&self, msg: ServerMessage) {
self.sink.emit(msg);
}
}
fn emit_thinking_event<S: OutputSink>(
params: &ResponseProcessingParams<'_, S>,
thinking: &ThinkingBlock,
session_id: &str,
) {
params.emit(ServerMessage::Thinking(ThinkingPayload {
content: thinking.content.clone(),
session_id: session_id.to_string(),
}));
}
fn log_response_debug(
_config: &Config,
finish_reason: &Option<String>,
tool_calls: &Option<Vec<crate::mcp::McpToolCall>>,
) {
if let Some(ref reason) = finish_reason {
log_debug!("Processing response with finish_reason: {}", reason);
}
if let Some(ref calls) = tool_calls {
log_debug!("Processing {} tool calls", calls.len());
}
}
fn handle_final_response(
content: &str,
thinking: &Option<ThinkingBlock>,
response_id: Option<String>,
chat_session: &mut ChatSession,
config: &Config,
role: &str,
mode: OutputMode,
) -> Result<()> {
if mode.is_interactive() {
if let Some(ref thinking_block) = thinking {
display_thinking(thinking_block);
}
}
let assistant_message = crate::session::Message {
role: "assistant".to_string(),
content: content.to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
cached: false,
cache_ttl: None,
tool_call_id: None,
name: None,
tool_calls: None,
images: None,
videos: None,
thinking: None,
id: response_id, };
chat_session
.session
.messages
.push(assistant_message.clone());
chat_session.last_response = content.to_string();
if let Some(session_file) = &chat_session.session.session_file {
let message_json = serde_json::to_string(&assistant_message)?;
crate::session::append_to_session_file(session_file, &message_json)?;
}
if mode.is_terminal_mode() {
print_assistant_response(content, config, role, thinking);
}
use std::io::IsTerminal;
if !std::io::stdin().is_terminal() && mode.is_terminal_mode() {
CostTracker::display_cost_line(chat_session);
}
Ok(())
}
pub async fn get_tool_server_name_async(tool_name: &str, _config: &Config) -> String {
if let Some(name) = crate::mcp::tool_map::get_tool_server_name(tool_name) {
return name;
}
if let Some(name) = crate::mcp::core::dynamic::get_dynamic_server_name_by_tool(tool_name) {
return name;
}
if crate::mcp::core::dynamic_agents::is_dynamic_by_tool(tool_name) {
return "agent".to_string();
}
"unknown".to_string()
}
async fn display_tool_parameters_only(config: &Config, tool_calls: &[crate::mcp::McpToolCall]) {
if !tool_calls.is_empty() {
log_debug!("Found {} tool calls in response", tool_calls.len());
let is_single_tool = tool_calls.len() == 1;
for (index, call) in tool_calls.iter().enumerate() {
let tool_index = index + 1;
let server_name = get_tool_server_name_async(&call.tool_name, config).await;
let title = if is_single_tool {
format!(
" {} | {} ",
call.tool_name.bright_cyan(),
server_name.bright_blue()
)
} else {
format!(
" [{}] {} | {} ",
tool_index,
call.tool_name.bright_cyan(),
server_name.bright_blue()
)
};
let separator_length = 70.max(title.len() + 4);
let dashes = "─".repeat(separator_length - title.len());
let separator = format!("──{}{}──", title, dashes.dimmed());
println!("{}", separator);
if config.get_log_level().is_debug_enabled() || config.get_log_level().is_info_enabled()
{
display_tool_parameters_full(call, config);
}
if index < tool_calls.len() - 1 {
println!();
}
}
println!();
}
}
pub fn display_tool_parameters_full(tool_call: &crate::mcp::McpToolCall, config: &Config) {
crate::session::chat::tool_display::display_tool_parameters_full(tool_call, config);
}
fn resolve_tool_calls(
current_tool_calls_param: &mut Option<Vec<crate::mcp::McpToolCall>>,
current_content: &str,
) -> Vec<crate::mcp::McpToolCall> {
if let Some(calls) = current_tool_calls_param.take() {
if !calls.is_empty() {
calls
} else {
crate::mcp::parse_tool_calls(current_content) }
} else {
crate::mcp::parse_tool_calls(current_content)
}
}
fn check_cancellation(operation_cancelled: &tokio::sync::watch::Receiver<bool>) -> Result<()> {
if *operation_cancelled.borrow() {
crate::log_debug!("Operation cancelled by user.");
return Err(anyhow::anyhow!("Operation cancelled"));
}
Ok(())
}
fn add_assistant_message_with_tool_calls(
chat_session: &mut ChatSession,
current_content: &str,
current_exchange: &ProviderExchange,
response_id: Option<String>,
thinking: &Option<ThinkingBlock>,
_config: &Config,
_role: &str,
) -> Result<()> {
let original_tool_calls = MessageHandler::extract_original_tool_calls(current_exchange);
let thinking_value = thinking
.as_ref()
.and_then(|block| serde_json::to_value(block).ok());
let assistant_message = crate::session::Message {
role: "assistant".to_string(),
content: current_content.to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
cached: false,
cache_ttl: None,
tool_call_id: None,
name: None,
tool_calls: original_tool_calls.clone(),
images: None,
videos: None,
thinking: thinking_value,
id: response_id.clone(),
};
chat_session.session.messages.push(assistant_message);
if let Some(session_file) = &chat_session.session.session_file {
let message_json = serde_json::to_string(chat_session.session.messages.last().unwrap())?;
crate::session::append_to_session_file(session_file, &message_json)?;
}
chat_session.last_response = current_content.to_string();
Ok(())
}
pub async fn process_response<S: OutputSink>(
params: ResponseProcessingParams<'_, S>,
) -> Result<()> {
check_cancellation(¶ms.operation_cancelled)?;
log_response_debug(params.config, ¶ms.finish_reason, ¶ms.tool_calls);
let last_message = params.chat_session.session.messages.last();
if params.mode.is_terminal_mode() && last_message.is_none_or(|msg| msg.role != "user") {
println!(
"{}",
"Warning: User message not found in session. This is unexpected.".yellow()
);
}
let mut tool_processor = ToolProcessor::new();
let mut thinking_displayed = false;
let mut current_content = params.content.clone();
let mut current_exchange = params.exchange.clone(); let mut current_tool_calls_param = params.tool_calls.clone(); let mut current_response_id = params.response_id.clone(); let mut current_thinking = params.thinking.clone(); let mut last_emitted_thinking: Option<String> = None;
let operation_cancelled_ref = ¶ms.operation_cancelled;
loop {
check_cancellation(operation_cancelled_ref)?;
if !params.config.mcp.servers.is_empty() {
let current_tool_calls =
resolve_tool_calls(&mut current_tool_calls_param, ¤t_content);
if !current_tool_calls.is_empty() {
let session_id = params.chat_session.session.info.name.clone();
if params.mode.should_suppress_cli_output() {
if let Some(ref thinking_block) = current_thinking {
if last_emitted_thinking.as_deref() != Some(thinking_block.content.as_str())
{
emit_thinking_event(¶ms, thinking_block, &session_id);
last_emitted_thinking = Some(thinking_block.content.clone());
}
}
}
if params.mode.is_interactive() && !thinking_displayed {
if let Some(ref thinking_block) = current_thinking {
display_thinking(thinking_block);
thinking_displayed = true;
}
}
if params.mode.is_interactive() {
print_assistant_response(
¤t_content,
params.config,
params.role,
¤t_thinking,
);
}
if params.mode.is_interactive() {
display_tool_parameters_only(params.config, ¤t_tool_calls).await;
}
if params.mode.is_interactive() {
use crate::session::chat::get_animation_manager;
get_animation_manager().start_animation(¶ms.mode).await;
}
let operation_cancelled_clone = params.operation_cancelled.clone();
if *operation_cancelled_clone.borrow() {
crate::log_debug!("Operation cancelled by user.");
return Ok(());
}
if params.mode.should_suppress_cli_output() {
for call in ¤t_tool_calls {
let server =
get_tool_server_name_async(&call.tool_name, params.config).await;
params.emit(ServerMessage::ToolUse(ToolUsePayload {
tool: call.tool_name.clone(),
tool_id: call.tool_id.clone(),
server,
params: call.parameters.clone(),
session_id: session_id.clone(),
}));
}
}
let (tool_results, total_tool_time_ms) =
match tool_execution::execute_tools_parallel(
current_tool_calls.clone(),
params.chat_session,
params.config,
&mut tool_processor,
operation_cancelled_clone.clone(),
params.mode,
)
.await
{
Ok(results) => results,
Err(e) => {
if e.to_string().contains("cancelled")
|| *operation_cancelled_clone.borrow()
{
crate::log_debug!("Operation cancelled by user.");
return Ok(());
}
return Err(e);
}
};
let session_id = params.chat_session.session.info.name.clone();
for tool_result in &tool_results {
let actual_content = tool_result.extract_content();
let success = !tool_result.is_error();
let tool_msg = ServerMessage::ToolResult(ToolResultPayload {
tool: tool_result.tool_name.clone(),
tool_id: tool_result.tool_id.clone(),
server: crate::session::chat::response::get_tool_server_name_async(
&tool_result.tool_name,
params.config,
)
.await,
content: actual_content,
success,
session_id: session_id.clone(),
});
params.emit(tool_msg);
}
if *operation_cancelled_clone.borrow() {
if params.mode.is_terminal_mode() {
println!("{}", "\nTool execution cancelled.".bright_yellow());
}
return Ok(());
}
add_assistant_message_with_tool_calls(
params.chat_session,
¤t_content,
¤t_exchange,
current_response_id.clone(), ¤t_thinking,
params.config,
params.role,
)?;
if !tool_results.is_empty() {
if let Some((
new_content,
new_exchange,
new_tool_calls,
new_response_id,
new_thinking,
)) = tool_result_processor::process_tool_results(
tool_results,
total_tool_time_ms,
params.chat_session,
params.config,
params.role,
operation_cancelled_clone.clone(),
)
.await?
{
current_content = new_content;
current_exchange = new_exchange;
current_tool_calls_param = new_tool_calls;
current_response_id = new_response_id; current_thinking = new_thinking;
if current_tool_calls_param.is_some()
&& !current_tool_calls_param.as_ref().unwrap().is_empty()
{
continue;
} else {
let more_tools = crate::mcp::parse_tool_calls(¤t_content);
if !more_tools.is_empty() {
log_debug!(
"Found {} more tool calls to process in content",
more_tools.len()
);
continue;
} else {
break;
}
}
} else {
return Ok(());
}
} else {
let more_tools = crate::mcp::parse_tool_calls(¤t_content);
if !more_tools.is_empty() {
log_debug!(
"Found {} more tool calls to process (no previous tool results)",
more_tools.len()
);
continue;
} else {
break;
}
}
} else {
break;
}
} else {
break;
}
}
let session_id = params.chat_session.session.info.name.clone();
let thinking_for_final = if thinking_displayed {
None
} else {
current_thinking.clone()
};
if params.mode.should_suppress_cli_output() {
if let Some(ref thinking_block) = thinking_for_final {
if last_emitted_thinking.as_deref() != Some(thinking_block.content.as_str()) {
emit_thinking_event(¶ms, thinking_block, &session_id);
}
}
}
params.emit(ServerMessage::Assistant(AssistantPayload {
content: current_content.clone(),
session_id: session_id.clone(),
}));
handle_final_response(
¤t_content,
&thinking_for_final,
current_response_id, params.chat_session,
params.config,
params.role,
params.mode,
)?;
{
let workdir = crate::mcp::get_thread_working_directory();
let failures =
crate::mcp::core::skill_auto::run_validators(¤t_content, &workdir).await;
for (skill_name, error) in &failures {
let error_msg = format!(
"Validation failed ({}): {}\nPlease fix the issue.",
skill_name, error
);
params.chat_session.add_user_message(&error_msg)?;
log_info!("Validator '{}' failed on assistant event", skill_name);
}
}
if crate::mcp::core::plan::has_pending_project_compression() {
log_debug!("Processing deferred plan(done) compression after assistant response");
match crate::mcp::core::plan::process_pending_compression(params.chat_session).await {
Ok(Some(metrics)) => {
params
.chat_session
.session
.info
.compression_stats
.add_compression(
crate::session::CompressionKind::Task,
metrics.messages_removed,
metrics.tokens_saved,
);
CostTracker::display_compression_result("Task", &metrics);
}
Ok(None) => {}
Err(e) => {
log_debug!("Deferred task compression failed: {}. Continuing.", e);
}
}
match crate::mcp::core::plan::process_pending_phase_compression(params.chat_session).await {
Ok(Some(metrics)) => {
params
.chat_session
.session
.info
.compression_stats
.add_compression(
crate::session::CompressionKind::Phase,
metrics.messages_removed,
metrics.tokens_saved,
);
CostTracker::display_compression_result("Phase", &metrics);
}
Ok(None) => {}
Err(e) => {
log_debug!("Deferred phase compression failed: {}. Continuing.", e);
}
}
match crate::mcp::core::plan::process_pending_project_compression(params.chat_session).await
{
Ok(Some(metrics)) => {
params
.chat_session
.session
.info
.compression_stats
.add_compression(
crate::session::CompressionKind::Project,
metrics.messages_removed,
metrics.tokens_saved,
);
CostTracker::display_compression_result("Project", &metrics);
}
Ok(None) => {}
Err(e) => {
log_debug!("Deferred project compression failed: {}. Continuing.", e);
}
}
}
let total_tokens = params.chat_session.session.info.input_tokens
+ params.chat_session.session.info.output_tokens
+ params.chat_session.session.info.cache_read_tokens
+ params.chat_session.session.info.cache_write_tokens
+ params.chat_session.session.info.reasoning_tokens;
let cost_msg = ServerMessage::Cost(CostPayload {
session_tokens: total_tokens,
session_cost: params.chat_session.session.info.total_cost,
input_tokens: params.chat_session.session.info.input_tokens,
output_tokens: params.chat_session.session.info.output_tokens,
cache_read_tokens: params.chat_session.session.info.cache_read_tokens,
cache_write_tokens: params.chat_session.session.info.cache_write_tokens,
reasoning_tokens: params.chat_session.session.info.reasoning_tokens,
session_id,
});
params.emit(cost_msg);
if params.mode.is_terminal_mode() {
if let Some(hint) = params.chat_session.get_compression_hint(params.config) {
println!("{}", hint);
}
}
Ok(())
}