1use async_trait::async_trait;
32use mofa_kernel::agent::context::AgentContext;
33use mofa_kernel::agent::error::{AgentError, AgentResult};
34use mofa_kernel::agent::types::{ChatCompletionRequest, ChatMessage, LLMProvider, ToolDefinition};
35use mofa_kernel::agent::{AgentCapabilities, AgentState, MoFAAgent};
36use mofa_kernel::agent::{AgentInput, AgentOutput, InputType, OutputType};
37use serde_json::Value;
38use std::collections::HashMap;
39use std::path::Path;
40use std::sync::Arc;
41use tokio::sync::RwLock;
42
43use crate::agent::base::BaseAgent;
44use crate::agent::context::prompt::PromptContext;
45
46use super::components::tool::SimpleToolRegistry;
47use super::{Session, SessionManager};
48use mofa_kernel::agent::components::tool::{Tool, ToolInput, ToolRegistry};
49
50#[derive(Clone)]
56pub struct AgentExecutorConfig {
57 pub max_iterations: usize,
59 pub session_timeout: Option<std::time::Duration>,
61 pub default_model: Option<String>,
63 pub temperature: Option<f32>,
65 pub max_tokens: Option<u32>,
67}
68
69impl Default for AgentExecutorConfig {
70 fn default() -> Self {
71 Self {
72 max_iterations: 10,
73 session_timeout: None,
74 default_model: None,
75 temperature: None,
76 max_tokens: None,
77 }
78 }
79}
80
81impl AgentExecutorConfig {
82 pub fn new() -> Self {
83 Self::default()
84 }
85
86 pub fn with_max_iterations(mut self, max: usize) -> Self {
87 self.max_iterations = max;
88 self
89 }
90
91 pub fn with_model(mut self, model: impl Into<String>) -> Self {
92 self.default_model = Some(model.into());
93 self
94 }
95
96 pub fn with_temperature(mut self, temp: f32) -> Self {
97 self.temperature = Some(temp);
98 self
99 }
100}
101
102pub struct AgentExecutor {
136 base: BaseAgent,
138
139 llm: Arc<dyn LLMProvider>,
142 context: Arc<RwLock<PromptContext>>,
144 tools: Arc<RwLock<SimpleToolRegistry>>,
146 sessions: Arc<SessionManager>,
148 config: AgentExecutorConfig,
150}
151
152impl AgentExecutor {
153 pub async fn new(llm: Arc<dyn LLMProvider>, workspace: impl AsRef<Path>) -> AgentResult<Self> {
155 let workspace = workspace.as_ref();
156 let context = Arc::new(RwLock::new(PromptContext::new(workspace).await?));
157 let sessions = Arc::new(SessionManager::with_jsonl(workspace).await?);
158 let tools = Arc::new(RwLock::new(SimpleToolRegistry::new()));
159
160 let base = BaseAgent::new(uuid::Uuid::now_v7().to_string(), "LLMExecutor")
162 .with_description("LLM-based agent with tool calling")
163 .with_version("1.0.0")
164 .with_capabilities(
165 AgentCapabilities::builder()
166 .tag("llm")
167 .tag("tool-calling")
168 .input_type(InputType::Text)
169 .output_type(OutputType::Text)
170 .supports_tools(true)
171 .build(),
172 );
173
174 Ok(Self {
175 base,
176 llm,
177 context,
178 tools,
179 sessions,
180 config: AgentExecutorConfig::default(),
181 })
182 }
183
184 pub async fn with_config(
186 llm: Arc<dyn LLMProvider>,
187 workspace: impl AsRef<Path>,
188 config: AgentExecutorConfig,
189 ) -> AgentResult<Self> {
190 let workspace = workspace.as_ref();
191 let context = Arc::new(RwLock::new(PromptContext::new(workspace).await?));
192 let sessions = Arc::new(SessionManager::with_jsonl(workspace).await?);
193 let tools = Arc::new(RwLock::new(SimpleToolRegistry::new()));
194
195 let base = BaseAgent::new(uuid::Uuid::now_v7().to_string(), "LLMExecutor")
197 .with_description("LLM-based agent with tool calling")
198 .with_version("1.0.0")
199 .with_capabilities(
200 AgentCapabilities::builder()
201 .tag("llm")
202 .tag("tool-calling")
203 .input_type(InputType::Text)
204 .output_type(OutputType::Text)
205 .supports_tools(true)
206 .build(),
207 );
208
209 Ok(Self {
210 base,
211 llm,
212 context,
213 tools,
214 sessions,
215 config,
216 })
217 }
218
219 pub async fn register_tool(&self, tool: Arc<dyn Tool>) -> AgentResult<()> {
221 let mut tools = self.tools.write().await;
222 tools.register(tool)
223 }
224
225 pub async fn process_message(
227 &mut self,
228 session_key: &str,
229 message: &str,
230 ) -> AgentResult<String> {
231 let session = self.sessions.get_or_create(session_key).await;
233
234 let system_prompt = {
236 let mut ctx = self.context.write().await;
237 ctx.build_system_prompt().await?
238 };
239
240 let mut messages = self
242 .build_messages(&session, &system_prompt, message)
243 .await?;
244
245 let response = self.run_agent_loop(&mut messages).await?;
247
248 let mut session_updated = session.clone();
250 session_updated.add_message("user", message);
251 session_updated.add_message("assistant", &response);
252 self.sessions.save(&session_updated).await?;
253
254 Ok(response)
255 }
256
257 async fn build_messages(
259 &self,
260 session: &Session,
261 system_prompt: &str,
262 current_message: &str,
263 ) -> AgentResult<Vec<ChatMessage>> {
264 let mut messages = Vec::new();
265
266 messages.push(ChatMessage {
268 role: "system".to_string(),
269 content: Some(system_prompt.to_string()),
270 tool_call_id: None,
271 tool_calls: None,
272 });
273
274 let history = session.get_history(50); for msg in history {
277 messages.push(ChatMessage {
278 role: msg.role,
279 content: Some(msg.content),
280 tool_call_id: None,
281 tool_calls: None,
282 });
283 }
284
285 messages.push(ChatMessage {
287 role: "user".to_string(),
288 content: Some(current_message.to_string()),
289 tool_call_id: None,
290 tool_calls: None,
291 });
292
293 Ok(messages)
294 }
295
296 async fn run_agent_loop(&self, messages: &mut Vec<ChatMessage>) -> AgentResult<String> {
298 for _iteration in 0..self.config.max_iterations {
299 let tools = {
301 let tools_guard = self.tools.read().await;
302 tools_guard.list()
303 };
304
305 let tool_definitions = if tools.is_empty() {
307 None
308 } else {
309 Some(
310 tools
311 .iter()
312 .map(|t| ToolDefinition {
313 name: t.name.clone(),
314 description: t.description.clone(),
315 parameters: t.parameters_schema.clone(),
316 })
317 .collect(),
318 )
319 };
320
321 let request = ChatCompletionRequest {
323 messages: messages.clone(),
324 model: self.config.default_model.clone(),
325 tools: tool_definitions,
326 temperature: self.config.temperature,
327 max_tokens: self.config.max_tokens,
328 };
329
330 let response = self.llm.chat(request).await?;
331
332 if let Some(tool_calls) = response.tool_calls {
334 if tool_calls.is_empty() {
335 return Ok(response.content.unwrap_or_default());
337 }
338
339 messages.push(ChatMessage {
341 role: "assistant".to_string(),
342 content: response.content,
343 tool_call_id: None,
344 tool_calls: Some(tool_calls.clone()),
345 });
346
347 for tool_call in tool_calls {
349 let _args_map: HashMap<String, Value> =
351 if let Value::Object(map) = &tool_call.arguments {
352 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
353 } else {
354 return Err(AgentError::ExecutionFailed(format!(
355 "Invalid tool arguments for {}: {:?}",
356 tool_call.name, tool_call.arguments
357 )));
358 };
359
360 let result = {
361 let tools_guard = self.tools.read().await;
362 if let Some(tool) = tools_guard.get(&tool_call.name) {
363 let input = ToolInput::from_json(tool_call.arguments.clone());
364 tool.execute(input, &AgentContext::new("executor")).await
365 } else {
366 return Err(AgentError::ExecutionFailed(format!(
367 "Tool not found: {}",
368 tool_call.name
369 )));
370 }
371 };
372
373 let result_str = if result.success {
375 result.to_string_output()
376 } else {
377 format!(
378 "Error: {}",
379 result.error.unwrap_or_else(|| "Unknown error".to_string())
380 )
381 };
382
383 messages.push(ChatMessage {
385 role: "tool".to_string(),
386 content: Some(result_str),
387 tool_call_id: Some(tool_call.id.clone()),
388 tool_calls: None,
389 });
390 }
391 } else {
392 return Ok(response.content.unwrap_or_default());
394 }
395 }
396
397 Ok("I've completed processing but hit the maximum iteration limit.".to_string())
399 }
400
401 pub fn sessions(&self) -> &Arc<SessionManager> {
403 &self.sessions
404 }
405
406 pub fn tools(&self) -> &Arc<RwLock<SimpleToolRegistry>> {
408 &self.tools
409 }
410
411 pub fn context(&self) -> &Arc<RwLock<PromptContext>> {
413 &self.context
414 }
415
416 pub fn llm(&self) -> &Arc<dyn LLMProvider> {
418 &self.llm
419 }
420
421 pub fn config(&self) -> &AgentExecutorConfig {
423 &self.config
424 }
425
426 pub fn base_mut(&mut self) -> &mut BaseAgent {
428 &mut self.base
429 }
430
431 pub fn base(&self) -> &BaseAgent {
433 &self.base
434 }
435}
436
437#[async_trait]
442impl MoFAAgent for AgentExecutor {
443 fn id(&self) -> &str {
444 self.base.id()
445 }
446
447 fn name(&self) -> &str {
448 self.base.name()
449 }
450
451 fn capabilities(&self) -> &AgentCapabilities {
452 self.base.capabilities()
453 }
454
455 fn state(&self) -> AgentState {
456 self.base.state()
457 }
458
459 async fn initialize(&mut self, ctx: &AgentContext) -> AgentResult<()> {
460 self.base.initialize(ctx).await?;
462
463 self.base.transition_to(AgentState::Ready)?;
465
466 Ok(())
467 }
468
469 async fn execute(
470 &mut self,
471 input: AgentInput,
472 _ctx: &AgentContext,
473 ) -> AgentResult<AgentOutput> {
474 let message = input.as_text().unwrap_or("");
476 let session_key = "default"; let response = self.process_message(session_key, message).await?;
480
481 Ok(AgentOutput::text(response))
483 }
484
485 async fn shutdown(&mut self) -> AgentResult<()> {
486 self.base.shutdown().await
488 }
489}