use anyhow::{anyhow, Result};
use log::{debug, error, info, warn};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::time::Duration;
use uuid::Uuid;
use crate::services::{ai_service::PlannedTool, mcp_service::McpTool};
use super::super::state::AdvancedChatState;
use super::super::types::{AgentState, ChatMessage};
impl AdvancedChatState {
pub async fn process_input_async(&self, session_id: Uuid, input: String) -> Result<()> {
self.set_agent_state(session_id, AgentState::Thinking);
self.add_message_to_session(session_id, ChatMessage::User(input.clone()))?;
self.add_message_to_session(
session_id,
ChatMessage::Processing {
message: "Analyzing your request...".to_string(),
spinner_index: self.spinner_state.load(Ordering::Relaxed),
},
)?;
if let Err(e) = self.refresh_tools_from_mcp().await {
warn!("Failed to refresh tools: {}", e);
}
self.set_agent_state(session_id, AgentState::Planning);
self.add_message_to_session(
session_id,
ChatMessage::Processing {
message: "Creating execution plan...".to_string(),
spinner_index: self.spinner_state.load(Ordering::Relaxed),
},
)?;
let available_tools = self
.available_tools
.read()
.map_err(|e| anyhow!("Failed to read available tools: {}", e))?
.clone();
let tool_plan_result = tokio::time::timeout(
Duration::from_secs(120), self.ai_service.create_tool_plan(&input, &available_tools),
)
.await;
let build_heuristic_plan = |text: &str| -> Vec<PlannedTool> {
let mut tools = Vec::new();
let lc = text.to_lowercase();
if lc.contains("balance") {
tools.push(PlannedTool {
server_id: "local_sim".into(),
tool_name: "get_balance".into(),
args: serde_json::json!({}),
reason: "Heuristic: user asked about balance".into(),
});
}
if lc.contains("transaction") || lc.contains("transactions") || lc.contains("tx") {
tools.push(PlannedTool {
server_id: "local_sim".into(),
tool_name: "get_transactions".into(),
args: serde_json::json!({}),
reason: "Heuristic: user asked about transactions".into(),
});
}
tools
};
let no_configured_tools = available_tools.is_empty();
match tool_plan_result {
Err(_) => {
warn!("AI planning timed out");
self.add_message_to_session(
session_id,
ChatMessage::Error(
"AI planning timed out. Using heuristic fallback.".to_string(),
),
)?;
if no_configured_tools {
self.run_heuristic_fallback(session_id, &input, build_heuristic_plan(&input))
.await?;
} else {
self.simple_response(session_id, &input).await?;
}
}
Ok(Err(e)) => {
error!("AI service failed: {}", e);
self.add_message_to_session(
session_id,
ChatMessage::Error(
"AI planning service failed. Using heuristic fallback.".to_string(),
),
)?;
if no_configured_tools {
self.run_heuristic_fallback(session_id, &input, build_heuristic_plan(&input))
.await?;
} else {
self.simple_response(session_id, &input).await?;
}
}
Ok(Ok(tool_plan)) => {
self.add_message_to_session(
session_id,
ChatMessage::AgentPlan(tool_plan.osvm_tools_to_use.clone()),
)?;
if tool_plan.osvm_tools_to_use.is_empty() {
if no_configured_tools {
let heur = build_heuristic_plan(&input);
if !heur.is_empty() {
self.add_message_to_session(
session_id,
ChatMessage::AgentPlan(heur.clone()),
)?;
self.run_heuristic_fallback(session_id, &input, heur)
.await?;
} else {
self.simple_response(session_id, &input).await?;
}
} else {
self.simple_response(session_id, &input).await?;
}
} else {
let mut executed_tools = Vec::new();
let mut iteration_count = 0;
let max_iterations = 5;
let mut current_tools = tool_plan.osvm_tools_to_use;
while !current_tools.is_empty() && iteration_count < max_iterations {
iteration_count += 1;
for planned_tool in ¤t_tools {
self.execute_planned_tool(session_id, planned_tool.clone())
.await?;
executed_tools.push(planned_tool.clone());
}
let session = self
.get_session_by_id(session_id)
.ok_or_else(|| anyhow!("Session not found"))?;
let recent_results: Vec<(String, Value)> = session
.messages
.iter()
.rev()
.filter_map(|msg| match msg {
ChatMessage::ToolResult {
tool_name, result, ..
} => Some((tool_name.clone(), result.clone())),
_ => None,
})
.take(current_tools.len()) .collect();
if iteration_count < max_iterations && !recent_results.is_empty() {
match self
.check_for_follow_up_actions(
&input,
&recent_results,
&available_tools,
)
.await
{
Ok(follow_up_tools) if !follow_up_tools.is_empty() => {
self.add_message_to_session(
session_id,
ChatMessage::AgentThinking(format!(
"Executing {} follow-up actions...",
follow_up_tools.len()
)),
)?;
current_tools = follow_up_tools;
}
_ => break, }
} else {
break;
}
}
self.generate_final_response(session_id, &input, &tool_plan.expected_outcome)
.await?;
}
}
}
let _ = self.remove_last_processing_message(session_id);
self.set_agent_state(session_id, AgentState::Idle);
let _ = self.generate_reply_suggestions(session_id).await;
Ok(())
}
async fn run_heuristic_fallback(
&self,
session_id: Uuid,
original_input: &str,
tools: Vec<PlannedTool>,
) -> Result<()> {
if tools.is_empty() {
return self.simple_response(session_id, original_input).await;
}
self.add_message_to_session(session_id, ChatMessage::AgentPlan(tools.clone()))?;
for planned_tool in tools {
self.execute_planned_tool(session_id, planned_tool).await?;
}
self.generate_final_response(session_id, original_input, "Heuristic simulated execution")
.await?;
Ok(())
}
async fn execute_planned_tool(
&self,
session_id: Uuid,
planned_tool: PlannedTool,
) -> Result<()> {
let execution_id = Uuid::new_v4().to_string();
self.set_agent_state(
session_id,
AgentState::ExecutingTool(planned_tool.tool_name.clone()),
);
self.add_message_to_session(
session_id,
ChatMessage::ToolCall {
tool_name: planned_tool.tool_name.clone(),
description: planned_tool.reason.clone(),
args: Some(planned_tool.args.clone()),
execution_id: execution_id.clone(),
},
)?;
match self.call_mcp_tool(&planned_tool).await {
Ok(result) => {
self.add_message_to_session(
session_id,
ChatMessage::ToolResult {
tool_name: planned_tool.tool_name,
result,
execution_id,
},
)?;
}
Err(e) => {
error!("Tool execution failed: {}", e);
self.add_message_to_session(
session_id,
ChatMessage::Error(format!("Tool {} failed: {}", planned_tool.tool_name, e)),
)?;
}
}
Ok(())
}
async fn call_mcp_tool(&self, planned_tool: &PlannedTool) -> Result<Value> {
let mut mcp_service = self.mcp_service.lock().await;
if let Some(server_config) = mcp_service.get_server(&planned_tool.server_id) {
if server_config.enabled {
if let Err(e) = mcp_service.initialize_server(&planned_tool.server_id).await {
warn!(
"Failed to initialize MCP server {}: {}",
planned_tool.server_id, e
);
}
match mcp_service
.call_tool(
&planned_tool.server_id,
&planned_tool.tool_name,
Some(planned_tool.args.clone()),
)
.await
{
Ok(result) => return Ok(result),
Err(e) => {
warn!("MCP tool call failed: {}, falling back to simulation", e);
}
}
}
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
match planned_tool.tool_name.as_str() {
"get_balance" => Ok(serde_json::json!({"balance": "2.5 SOL", "usd_value": 250.75})),
"get_transactions" => Ok(serde_json::json!({
"transactions": [
{"hash": "abc123", "amount": "0.1 SOL", "type": "sent"},
{"hash": "def456", "amount": "1.0 SOL", "type": "received"}
]
})),
"get_network_status" => Ok(serde_json::json!({
"network": "mainnet-beta",
"slot": 250000000,
"tps": 3000,
"validators": {"active": 1800, "delinquent": 12}
})),
_ => Ok(serde_json::json!({"result": "Tool executed successfully"})),
}
}
async fn generate_final_response(
&self,
session_id: Uuid,
original_input: &str,
expected_outcome: &str,
) -> Result<()> {
let session = self
.get_session_by_id(session_id)
.ok_or_else(|| anyhow!("Session not found"))?;
let tool_results: Vec<(String, Value)> = session
.messages
.iter()
.rev()
.filter_map(|msg| match msg {
ChatMessage::ToolResult {
tool_name, result, ..
} => Some((tool_name.clone(), result.clone())),
_ => None,
})
.collect();
match self
.ai_service
.generate_contextual_response(original_input, &tool_results, expected_outcome)
.await
{
Ok(response) => {
self.add_message_to_session(session_id, ChatMessage::Agent(response))?;
}
Err(e) => {
error!("Failed to generate final response: {}", e);
self.add_message_to_session(session_id, ChatMessage::Agent(
"I've completed the requested operations. Please check the tool results above.".to_string()
))?;
}
}
Ok(())
}
async fn simple_response(&self, session_id: Uuid, input: &str) -> Result<()> {
match self.ai_service.query_with_debug(input, false).await {
Ok(response) => {
self.add_message_to_session(session_id, ChatMessage::Agent(response))?;
}
Err(e) => {
warn!("AI service failed for simple response: {}", e);
let response = if input.to_lowercase().contains("balance") {
"I can help you check your wallet balance. However, I need MCP tools to be properly configured to fetch real data."
} else if input.to_lowercase().contains("transaction") {
"I can help you with transaction history. Please make sure MCP servers are configured for blockchain operations."
} else {
"I understand you're asking about blockchain operations. Please ensure MCP servers are configured so I can assist you with real data."
};
self.add_message_to_session(session_id, ChatMessage::Agent(response.to_string()))?;
}
}
Ok(())
}
async fn check_for_follow_up_actions(
&self,
original_input: &str,
tool_results: &[(String, Value)],
available_tools: &HashMap<String, Vec<McpTool>>,
) -> Result<Vec<PlannedTool>> {
let results_context = tool_results
.iter()
.map(|(name, result)| {
format!(
"{}: {}",
name,
serde_json::to_string(result).unwrap_or_default()
)
})
.collect::<Vec<_>>()
.join("\n");
let follow_up_prompt = format!(
"Based on the user's request: '{}'\n\n\
And these tool execution results:\n{}\n\n\
Do we need any follow-up tools? If yes, create an OSVM plan for the next steps.",
original_input, results_context
);
match tokio::time::timeout(
Duration::from_secs(10),
self.ai_service
.create_tool_plan(&follow_up_prompt, available_tools),
)
.await
{
Ok(Ok(plan)) if !plan.osvm_tools_to_use.is_empty() => Ok(plan.osvm_tools_to_use),
_ => Ok(Vec::new()), }
}
}