pub use crate::agent::config::{AgentConfig, AgentRole};
#[cfg(feature = "subagent")]
use crate::agent::subagent::SubagentRegistry;
#[cfg(feature = "subagent")]
use crate::agent::subagent::executor::{SubagentExecutor, SubagentExecutorConfig};
use crate::agent::{Agent, AgentEvent, CancellationToken};
use crate::compression::ContextManager;
use crate::error::{LlmError, ReactError, Result};
use crate::guard::GuardManager;
#[cfg(feature = "human-loop")]
use crate::human_loop::{HumanLoopProvider, PermissionService};
use crate::llm::config::LlmConfig;
#[cfg(feature = "mcp")]
use crate::mcp::McpManager;
use crate::memory::checkpointer::{Checkpointer, FileCheckpointer};
use crate::memory::snapshot::{SnapshotManager, StateSnapshot};
use crate::memory::store::{FileStore, Store};
use crate::sandbox::SandboxManager;
use crate::skills::SkillRegistry;
use crate::skills::hooks::HookRegistry;
#[cfg(feature = "tasks")]
use crate::tasks::TaskManager;
use crate::tools::ToolManager;
#[cfg(feature = "subagent")]
use crate::tools::builtin::agent_dispatch::AgentDispatchTool;
use crate::tools::builtin::answer::FinalAnswerTool;
#[cfg(feature = "human-loop")]
use crate::tools::builtin::human_in_loop::HumanInLoop;
use crate::tools::builtin::memory::{ForgetTool, RecallTool, RememberTool, SearchMemoryTool};
#[cfg(feature = "tasks")]
use crate::tools::builtin::plan::PlanTool;
#[cfg(feature = "tasks")]
use crate::tools::builtin::task::{
CreateTaskTool, GetExecutionOrderTool, ListTasksTool, UpdateTaskTool, VisualizeDependenciesTool,
};
use echo_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use reqwest::Client;
use std::sync::Arc;
use tracing::{Instrument, info, info_span, warn};
use crate::agent::react::subsystems::approval::ApprovalSubsystem;
use crate::agent::react::subsystems::guard::GuardSubsystem;
use crate::agent::react::subsystems::memory::MemorySubsystem;
use crate::agent::react::subsystems::tool_exec::ToolExecutionSubsystem;
pub mod builder;
mod capabilities;
mod extract;
#[cfg(feature = "tasks")]
mod planning;
mod run;
pub mod structured;
pub(crate) mod subsystems;
#[cfg(test)]
mod tests;
pub(crate) const TOOL_FINAL_ANSWER: &str = "final_answer";
#[cfg(feature = "tasks")]
pub(crate) const TOOL_CREATE_TASK: &str = "create_task";
#[cfg(feature = "tasks")]
pub(crate) const TOOL_PLAN: &str = "plan";
#[cfg(feature = "tasks")]
pub(crate) const TOOL_UPDATE_TASK: &str = "update_task";
pub(crate) fn is_retryable_llm_error(err: &ReactError) -> bool {
match err {
ReactError::Llm(e) => match e.as_ref() {
LlmError::NetworkError(_) => true,
LlmError::ApiError { status, .. } => *status == 429 || *status >= 500,
_ => false,
},
_ => false,
}
}
pub struct ReactAgent {
pub(crate) config: AgentConfig,
pub(crate) tools: ToolExecutionSubsystem,
pub(crate) guard: GuardSubsystem,
pub(crate) memory: MemorySubsystem,
pub(crate) approval: ApprovalSubsystem,
client: Arc<Client>,
llm_client: Option<Arc<dyn crate::llm::LlmClient>>,
llm_config: Option<LlmConfig>,
cancel_token: tokio::sync::Mutex<Option<CancellationToken>>,
}
impl ReactAgent {
#[cfg(feature = "tasks")]
pub(crate) fn has_planning_tools(&self) -> bool {
self.config.enable_task
&& [TOOL_PLAN, TOOL_CREATE_TASK, TOOL_UPDATE_TASK]
.iter()
.all(|name| self.tools.tool_manager.get_tool(name).is_some())
}
#[cfg(not(feature = "tasks"))]
#[allow(dead_code)]
pub(crate) fn has_planning_tools(&self) -> bool {
false
}
const COT_INSTRUCTION: &'static str =
"Before calling any tool, briefly describe your analysis and execution plan.";
pub fn new(config: AgentConfig) -> Self {
let system_prompt = Self::build_system_prompt(&config);
let context = Arc::new(tokio::sync::Mutex::new(
ContextManager::builder(config.token_limit)
.with_system(system_prompt)
.build(),
));
let mut tool_manager = ToolManager::new_with_config(config.tool_execution.clone());
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()
.unwrap_or_default();
tool_manager.register(Box::new(FinalAnswerTool));
#[cfg(feature = "tasks")]
let task_manager = Arc::new(TaskManager::default());
#[cfg(feature = "subagent")]
let subagent_registry = Arc::new(SubagentRegistry::new());
#[cfg(feature = "subagent")]
let subagent_executor = Arc::new(SubagentExecutor::new(
subagent_registry.clone(),
SubagentExecutorConfig::default(),
));
#[cfg(feature = "human-loop")]
let approval_provider = crate::human_loop::default_provider();
#[cfg(feature = "subagent")]
if config.enable_subagent {
tool_manager.register(Box::new(AgentDispatchTool::new(
subagent_executor.clone(),
config.agent_name.clone(),
CancellationToken::new(),
)));
}
#[cfg(feature = "human-loop")]
if config.enable_human_in_loop {
tool_manager.register(Box::new(HumanInLoop::new(approval_provider.clone())));
}
#[cfg(feature = "tasks")]
if config.enable_task {
tool_manager.register(Box::new(PlanTool));
tool_manager.register(Box::new(CreateTaskTool::new(task_manager.clone())));
tool_manager.register(Box::new(UpdateTaskTool::new(task_manager.clone())));
tool_manager.register(Box::new(ListTasksTool::new(task_manager.clone())));
tool_manager.register(Box::new(VisualizeDependenciesTool::new(
task_manager.clone(),
)));
tool_manager.register(Box::new(GetExecutionOrderTool::new(task_manager.clone())));
}
Self::register_feature_gated_tools(&config, &mut tool_manager);
let store = Self::setup_memory_store(&config, &mut tool_manager);
let checkpointer = Self::setup_checkpointer(&config);
Self {
config,
tools: ToolExecutionSubsystem {
tool_manager,
#[cfg(feature = "subagent")]
subagent_registry,
#[cfg(feature = "tasks")]
task_manager,
skill_registry: SkillRegistry::new(),
progressive_skill_registry: None,
hook_registry: Arc::new(tokio::sync::RwLock::new(HookRegistry::new())),
#[cfg(feature = "mcp")]
mcp_manager: McpManager::new(),
sandbox_manager: None,
},
guard: GuardSubsystem {
guard_manager: None,
permission_policy: None,
audit_logger: None,
circuit_breaker: None,
},
memory: MemorySubsystem {
context,
store,
checkpointer,
snapshot_manager: Arc::new(std::sync::RwLock::new(None)),
conversation_store: None,
},
approval: ApprovalSubsystem {
#[cfg(feature = "human-loop")]
approval_provider,
#[cfg(feature = "human-loop")]
permission_service: None,
#[cfg(feature = "human-loop")]
pending_permission_rules: std::sync::Mutex::new(Vec::new()),
},
client: Arc::new(client),
llm_client: None,
llm_config: None,
cancel_token: tokio::sync::Mutex::new(None),
}
}
pub fn from_config_file(path: Option<&str>) -> Self {
let app_config = crate::config::load_config(path);
Self::new(app_config.to_agent_config())
}
fn build_system_prompt(config: &AgentConfig) -> String {
let mut prompt = if config.enable_tool && config.enable_cot {
format!(
"{}\n\n{}",
config.system_prompt.trim_end(),
Self::COT_INSTRUCTION,
)
} else {
config.system_prompt.clone()
};
#[cfg(feature = "project-rules")]
if config.auto_project_rules {
let wd = config
.working_dir
.clone()
.unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
prompt = echo_core::project_rules::inject_rules(&prompt, &wd);
}
prompt
}
fn register_feature_gated_tools(config: &AgentConfig, tool_manager: &mut ToolManager) {
if config.enable_tool {
echo_tools::register_all_tools(tool_manager);
}
}
fn setup_memory_store(
config: &AgentConfig,
tool_manager: &mut ToolManager,
) -> Option<Arc<dyn Store>> {
if !config.enable_memory {
return None;
}
match FileStore::new(&config.memory_path) {
Ok(file_store) => {
let store: Arc<dyn Store> = Self::wrap_with_embedding_store_if_available(
Arc::new(file_store),
&config.memory_path,
);
let agent_name = config.agent_name.clone();
let namespace = vec![agent_name, "memories".to_string()];
tool_manager.register(Box::new(RememberTool::new(
store.clone(),
namespace.clone(),
)));
tool_manager.register(Box::new(RecallTool::new(store.clone(), namespace.clone())));
tool_manager.register(Box::new(SearchMemoryTool::new(
store.clone(),
namespace.clone(),
)));
tool_manager.register(Box::new(ForgetTool::new(store.clone(), namespace)));
Some(store)
}
Err(e) => {
tracing::warn!("Long-term memory Store init failed, memory disabled: {e}");
None
}
}
}
fn wrap_with_embedding_store_if_available(
inner: Arc<dyn Store>,
memory_path: &str,
) -> Arc<dyn Store> {
use crate::memory::{EmbeddingStore, HttpEmbedder};
if std::env::var("EMBEDDING_API_KEY").is_err()
&& std::env::var("OPENAI_API_KEY").is_err()
&& std::env::var("EMBEDDING_APIKEY").is_err()
{
tracing::info!(
"Memory Store: keyword-only retrieval (no embedding env vars configured)"
);
return inner;
}
let embedder = Arc::new(HttpEmbedder::from_env());
let vec_path = format!("{}.vecs.json", memory_path.trim_end_matches(".json"));
match EmbeddingStore::with_persistence(Arc::clone(&inner), embedder, &vec_path) {
Ok(embedding_store) => {
tracing::info!(
vec_path = %vec_path,
"Memory Store: vector index enabled (semantic/hybrid search available)"
);
Arc::new(embedding_store)
}
Err(e) => {
tracing::warn!(
error = %e,
"EmbeddingStore init failed, falling back to keyword-only retrieval"
);
inner
}
}
}
fn setup_checkpointer(config: &AgentConfig) -> Option<Arc<dyn Checkpointer>> {
config.session_id.as_ref()?;
match FileCheckpointer::new(&config.checkpointer_path) {
Ok(cp) => Some(Arc::new(cp)),
Err(e) => {
tracing::warn!("Checkpointer init failed, session resume disabled: {e}");
None
}
}
}
pub fn with_llm_config(mut self, config: LlmConfig) -> Self {
self.config.model_name = config.model.clone();
self.llm_config = Some(config);
self
}
pub fn with_llm_client(mut self, client: Arc<dyn crate::llm::LlmClient>) -> Self {
self.config.model_name = client.model_name().to_string();
self.llm_client = Some(client);
self
}
pub fn set_llm_config(&mut self, config: LlmConfig) {
self.config.model_name = config.model.clone();
self.llm_config = Some(config);
}
pub fn set_llm_client(&mut self, client: Arc<dyn crate::llm::LlmClient>) {
self.config.model_name = client.model_name().to_string();
self.llm_client = Some(client);
}
pub fn llm_config(&self) -> Option<&LlmConfig> {
self.llm_config.as_ref()
}
pub fn config(&self) -> &AgentConfig {
&self.config
}
pub fn set_store(&mut self, store: Arc<dyn Store>) {
self.memory.store = Some(store);
}
pub fn set_memory_store(&mut self, store: Arc<dyn Store>) {
let ns = vec![self.config.agent_name.clone(), "memories".to_string()];
self.tools
.tool_manager
.register(Box::new(RememberTool::new(store.clone(), ns.clone())));
self.tools
.tool_manager
.register(Box::new(RecallTool::new(store.clone(), ns.clone())));
self.tools
.tool_manager
.register(Box::new(SearchMemoryTool::new(store.clone(), ns.clone())));
self.tools
.tool_manager
.register(Box::new(ForgetTool::new(store.clone(), ns)));
self.memory.store = Some(store);
}
pub fn store(&self) -> Option<&Arc<dyn Store>> {
self.memory.store.as_ref()
}
pub fn set_checkpointer(&mut self, checkpointer: Arc<dyn Checkpointer>, session_id: String) {
self.memory.checkpointer = Some(checkpointer);
self.config.session_id = Some(session_id);
}
pub fn set_thread_store(&mut self, store: Arc<dyn Checkpointer>, session_id: String) {
self.set_checkpointer(store, session_id);
}
pub fn checkpointer(&self) -> Option<&Arc<dyn Checkpointer>> {
self.memory.checkpointer.as_ref()
}
pub fn thread_store(&self) -> Option<&Arc<dyn Checkpointer>> {
self.memory.checkpointer.as_ref()
}
pub fn set_conversation_id(&mut self, conversation_id: impl Into<String>) {
self.config.conversation_id = Some(conversation_id.into());
}
pub fn conversation_id(&self) -> Option<&str> {
self.config.get_conversation_id()
}
pub async fn get_messages(&self) -> Vec<crate::llm::types::Message> {
self.memory.context.lock().await.messages().to_vec()
}
pub fn tool_names(&self) -> Vec<&str> {
self.tools.tool_manager.list_tools()
}
pub fn skill_names(&self) -> Vec<&str> {
self.tools
.skill_registry
.list()
.iter()
.map(|s| s.name.as_str())
.collect()
}
#[cfg(feature = "mcp")]
pub fn mcp_server_names(&self) -> Vec<&str> {
self.tools.mcp_manager.server_names()
}
#[cfg(not(feature = "mcp"))]
pub fn mcp_server_names(&self) -> Vec<&str> {
vec![]
}
pub fn set_circuit_breaker(&mut self, config: CircuitBreakerConfig) {
self.guard.circuit_breaker = Some(Arc::new(CircuitBreaker::new(config)));
}
pub fn set_guard_manager(&mut self, manager: GuardManager) {
self.guard.guard_manager = Some(manager);
}
pub fn set_permission_policy(
&mut self,
policy: Arc<dyn crate::tools::permission::PermissionPolicy>,
) {
self.guard.permission_policy = Some(policy);
}
#[cfg(feature = "human-loop")]
pub fn set_permission_service(&mut self, service: Arc<PermissionService>) {
self.approval.permission_service = Some(service);
}
#[cfg(feature = "human-loop")]
pub fn build_permission_service(&mut self) {
use crate::human_loop::service::PermissionService;
let policy = self.guard.permission_policy.take();
let provider = self.approval.approval_provider.clone();
let service = PermissionService::from_provider(provider);
let service = if let Some(p) = policy {
service.with_legacy_policy(p)
} else {
service
};
self.approval.permission_service = Some(Arc::new(service));
}
pub fn set_audit_logger(&mut self, logger: Arc<dyn crate::audit::AuditLogger>) {
self.guard.audit_logger = Some(logger);
}
pub fn set_sandbox_manager(&mut self, manager: Arc<SandboxManager>) {
self.tools
.skill_registry
.set_sandbox_manager(manager.clone());
if let Some(shared) = &self.tools.progressive_skill_registry
&& let Ok(mut registry) = shared.try_write()
{
registry.set_sandbox_manager(manager.clone());
}
if let Ok(mut hooks) = self.tools.hook_registry.try_write() {
hooks.set_sandbox_manager(manager.clone());
}
self.tools.sandbox_manager = Some(manager);
}
pub fn set_snapshot_manager(&self, manager: SnapshotManager) {
let mut guard = self
.memory
.snapshot_manager
.write()
.unwrap_or_else(|e| e.into_inner());
*guard = Some(manager);
}
pub async fn snapshot(&self) -> Option<String> {
let ctx = self.memory.context.lock().await;
let messages = ctx.messages().to_vec();
let mut guard = self
.memory
.snapshot_manager
.write()
.unwrap_or_else(|e| e.into_inner());
guard.as_mut().map(|mgr| mgr.capture(0, &messages))
}
pub async fn rollback(&self, steps_back: usize) -> Option<StateSnapshot> {
let snapshot = {
let mut guard = self
.memory
.snapshot_manager
.write()
.unwrap_or_else(|e| e.into_inner());
guard.as_mut().and_then(|mgr| mgr.rollback(steps_back))
};
let snapshot = snapshot?;
let mut ctx = self.memory.context.lock().await;
ctx.clear();
for msg in &snapshot.messages {
ctx.push(msg.clone());
}
Some(snapshot)
}
pub async fn rollback_to(&self, snapshot_id: &str) -> Option<StateSnapshot> {
let snapshot = {
let mut guard = self
.memory
.snapshot_manager
.write()
.unwrap_or_else(|e| e.into_inner());
guard.as_mut().and_then(|mgr| mgr.rollback_to(snapshot_id))
};
let snapshot = snapshot?;
let mut ctx = self.memory.context.lock().await;
ctx.clear();
for msg in &snapshot.messages {
ctx.push(msg.clone());
}
Some(snapshot)
}
pub fn snapshots(&self) -> Vec<StateSnapshot> {
let guard = self
.memory
.snapshot_manager
.read()
.unwrap_or_else(|e| e.into_inner());
guard
.as_ref()
.map(|mgr| mgr.list().to_vec())
.unwrap_or_default()
}
pub fn latest_snapshot(&self) -> Option<StateSnapshot> {
let guard = self
.memory
.snapshot_manager
.read()
.unwrap_or_else(|e| e.into_inner());
guard.as_ref().and_then(|mgr| mgr.latest().cloned())
}
#[cfg(feature = "human-loop")]
pub fn set_approval_provider(&mut self, provider: Arc<dyn HumanLoopProvider>) {
self.set_human_loop_provider(provider);
}
#[cfg(feature = "human-loop")]
pub fn set_human_loop_provider(&mut self, provider: Arc<dyn HumanLoopProvider>) {
self.approval.approval_provider = provider.clone();
if self.tools.tool_manager.get_tool("human_in_loop").is_some() {
self.tools
.tool_manager
.register(Box::new(HumanInLoop::new(provider)));
}
}
pub fn set_conversation_store(&mut self, store: Arc<dyn crate::memory::ConversationStore>) {
self.memory.conversation_store = Some(store);
}
pub async fn load_messages(&self, messages: Vec<crate::llm::types::Message>) {
self.memory.context.lock().await.set_messages(messages);
}
pub async fn shutdown(&self) {
#[cfg(feature = "mcp")]
{
self.tools.mcp_manager.close_all().await;
}
info!(agent = %self.config.agent_name, "Agent shut down complete");
}
pub fn set_max_iterations(&mut self, max: usize) {
assert!(max > 0, "max_iterations must be > 0");
self.config.max_iterations = max;
}
#[cfg(feature = "subagent")]
pub async fn delegate_task(&self, task: &str) -> Result<String> {
use crate::agent::subagent::executor::DispatchRequest;
use crate::agent::subagent::types::ExecutionMode;
let agents = self.tools.subagent_registry.list_available().await;
if !agents.is_empty() {
let agent_name = agents
.first()
.map(|d| d.name.clone())
.unwrap_or_else(|| "default".to_string());
let req = DispatchRequest {
agent_name,
task: task.to_string(),
mode_override: Some(ExecutionMode::Fork),
cancel: CancellationToken::new(),
parent_agent: self.config.agent_name.clone(),
parent_context: None,
delegate_depth: 0,
};
let executor = SubagentExecutor::new(
self.tools.subagent_registry.clone(),
SubagentExecutorConfig::default(),
);
let result = executor.dispatch(req).await?;
Ok(result.output)
} else {
<Self as Agent>::chat(self, task).await
}
}
}
impl Drop for ReactAgent {
fn drop(&mut self) {
#[cfg(feature = "mcp")]
{
let mcp_mgr =
std::mem::replace(&mut self.tools.mcp_manager, crate::mcp::McpManager::new());
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
mcp_mgr.close_all().await;
});
}
}
}
}
pub use echo_core::agent::StepType;
impl Agent for ReactAgent {
fn name(&self) -> &str {
&self.config.agent_name
}
fn model_name(&self) -> &str {
&self.config.model_name
}
fn system_prompt(&self) -> &str {
&self.config.system_prompt
}
fn execute<'a>(&'a self, task: &'a str) -> BoxFuture<'a, Result<String>> {
let agent = self.config.agent_name.clone();
let model = self.config.model_name.clone();
Box::pin(
async move {
#[cfg(feature = "tasks")]
if self.has_planning_tools() {
return self.execute_with_planning(task).await;
}
self.run_direct(task).await
}
.instrument(info_span!("agent_execute", agent.name = %agent, agent.model = %model)),
)
}
fn execute_stream<'a>(
&'a self,
task: &'a str,
) -> BoxFuture<'a, Result<BoxStream<'a, Result<AgentEvent>>>> {
let agent = self.config.agent_name.clone();
let model = self.config.model_name.clone();
Box::pin(
async move { self.run_stream(task, run::StreamMode::Execute).await }.instrument(
info_span!("agent_execute_stream", agent.name = %agent, agent.model = %model),
),
)
}
fn chat<'a>(&'a self, message: &'a str) -> BoxFuture<'a, Result<String>> {
let agent = self.config.agent_name.clone();
let model = self.config.model_name.clone();
Box::pin(
async move { self.run_chat_direct(message).await }
.instrument(info_span!("agent_chat", agent.name = %agent, agent.model = %model)),
)
}
fn chat_stream<'a>(
&'a self,
message: &'a str,
) -> BoxFuture<'a, Result<BoxStream<'a, Result<AgentEvent>>>> {
let agent = self.config.agent_name.clone();
let model = self.config.model_name.clone();
Box::pin(
async move { self.run_stream(message, run::StreamMode::Chat).await }.instrument(
info_span!("agent_chat_stream", agent.name = %agent, agent.model = %model),
),
)
}
fn chat_stream_with_cancel<'a>(
&'a self,
_message: &'a str,
cancel: CancellationToken,
) -> BoxFuture<'a, Result<BoxStream<'a, Result<AgentEvent>>>> {
let agent = self.config.agent_name.clone();
let model = self.config.model_name.clone();
Box::pin(
async move {
*self.cancel_token.lock().await = Some(cancel.clone());
<Self as Agent>::chat_stream_with_cancel(self, _message, cancel).await
}
.instrument(info_span!("agent_chat_stream_with_cancel", agent.name = %agent, agent.model = %model)),
)
}
fn execute_stream_with_cancel<'a>(
&'a self,
_task: &'a str,
cancel: CancellationToken,
) -> BoxFuture<'a, Result<BoxStream<'a, Result<AgentEvent>>>> {
let agent = self.config.agent_name.clone();
let model = self.config.model_name.clone();
Box::pin(
async move {
*self.cancel_token.lock().await = Some(cancel.clone());
<Self as Agent>::execute_stream_with_cancel(self, _task, cancel).await
}
.instrument(info_span!("agent_execute_stream_with_cancel", agent.name = %agent, agent.model = %model)),
)
}
fn reset(&self) {
match self.memory.context.try_lock() {
Ok(mut ctx) => {
ctx.clear();
ctx.push(crate::llm::types::Message::system(
self.config.system_prompt.clone(),
));
}
Err(_) => {
warn!(
agent = %self.config.agent_name,
"Cannot reset: context locked by active stream"
);
}
}
}
fn tool_names(&self) -> Vec<String> {
self.tools
.tool_manager
.list_tools()
.into_iter()
.filter(|n| *n != TOOL_FINAL_ANSWER)
.map(|n| n.to_string())
.collect()
}
fn tool_definitions(&self) -> Vec<crate::llm::types::ToolDefinition> {
self.tools
.tool_manager
.get_tool_definitions()
.into_iter()
.filter(|d| d.function.name != TOOL_FINAL_ANSWER)
.collect()
}
fn skill_names(&self) -> Vec<String> {
let mut names: Vec<String> = self
.tools
.skill_registry
.list()
.into_iter()
.map(|s| s.name.clone())
.collect();
for desc in self.tools.skill_registry.list_descriptors() {
if !names.contains(&desc.name) {
names.push(desc.name.clone());
}
}
names
}
fn mcp_server_names(&self) -> Vec<String> {
#[cfg(feature = "mcp")]
{
self.tools
.mcp_manager
.server_names()
.into_iter()
.map(|s| s.to_string())
.collect()
}
#[cfg(not(feature = "mcp"))]
{
vec![]
}
}
fn close(&self) -> BoxFuture<'_, ()> {
Box::pin(async move {
#[cfg(feature = "mcp")]
self.tools.mcp_manager.close_all().await;
})
}
}
impl ReactAgent {
pub async fn chat_stream_message(
&self,
message: crate::llm::types::Message,
) -> Result<futures::stream::BoxStream<'_, Result<AgentEvent>>> {
self.run_stream_with_message(message, run::StreamMode::Chat)
.await
}
pub async fn execute_stream_message(
&self,
message: crate::llm::types::Message,
) -> Result<futures::stream::BoxStream<'_, Result<AgentEvent>>> {
self.run_stream_with_message(message, run::StreamMode::Execute)
.await
}
pub async fn chat_with_image_url(&self, text: &str, image_url: &str) -> Result<String> {
use crate::llm::types::{ContentPart, ImageUrl, Message};
let message = Message::user_multimodal(vec![
ContentPart::Text {
text: text.to_string(),
},
ContentPart::ImageUrl {
image_url: ImageUrl {
url: image_url.to_string(),
detail: None,
},
},
]);
self.chat_multimodal(message).await
}
pub async fn chat_multimodal(&self, message: crate::llm::types::Message) -> Result<String> {
use crate::llm::{ChatRequest, chat};
{
let mut ctx = self.memory.context.lock().await;
if ctx.messages().is_empty() {
ctx.push(crate::llm::types::Message::system(
self.config.system_prompt.clone(),
));
}
ctx.push(message.clone());
}
let messages = {
let ctx = self.memory.context.lock().await;
ctx.messages().to_vec()
};
let content = if let Some(llm_client) = &self.llm_client {
let response = llm_client
.chat(ChatRequest {
messages: messages.clone(),
temperature: None,
max_tokens: None,
tools: None,
tool_choice: None,
response_format: None,
cancel_token: None,
})
.await?;
response.content().unwrap_or_default()
} else {
let response = chat(
self.client.clone(),
&self.config.model_name,
&messages,
None, None, Some(false), None, None, None, )
.await?;
response
.choices
.first()
.and_then(|c| c.message.content.as_text())
.unwrap_or_default()
};
self.memory
.context
.lock()
.await
.push(crate::llm::types::Message::assistant(content.clone()));
Ok(content)
}
pub async fn execute_with_image_url(&self, task: &str, image_url: &str) -> Result<String> {
use crate::llm::types::{ContentPart, ImageUrl, Message};
self.reset_messages().await;
let message = Message::user_multimodal(vec![
ContentPart::Text {
text: task.to_string(),
},
ContentPart::ImageUrl {
image_url: ImageUrl {
url: image_url.to_string(),
detail: None,
},
},
]);
self.chat_multimodal(message).await
}
}