Skip to main content

aagt_core/agent/
core.rs

1//! Agent system - the core AI agent abstraction
2
3use std::sync::Arc;
4use tokio::sync::broadcast;
5use tracing::{info, instrument, error, debug};
6use anyhow;
7
8use crate::error::{Error, Result};
9use crate::agent::context::ContextInjector;
10use crate::agent::message::{Message, Role, Content};
11use crate::agent::provider::Provider;
12use crate::agent::memory::Memory;
13use crate::agent::session::SessionStatus;
14use crate::skills::tool::{Tool, ToolSet};
15use crate::agent::streaming::StreamingResponse;
16use crate::skills::tool::memory::{SearchHistoryTool, RememberThisTool, TieredSearchTool, FetchDocumentTool}; // Corrected import for memory tools
17use crate::agent::context::{ContextManager, ContextConfig}; // ContextInjector is already imported above
18use crate::agent::multi_agent::{Coordinator, AgentRole, MultiAgent, AgentMessage};
19use crate::agent::personality::{Persona, PersonalityManager};
20use crate::agent::cache::Cache;
21use crate::agent::scheduler::Scheduler;
22use crate::skills::tool::{DelegateTool, CronTool};
23use crate::infra::notification::{Notifier, NotifyChannel};
24use crate::agent::swarm::manager::{SwarmManager, SwarmEvent};
25use tokio::sync::mpsc;
26
27/// Configuration for an Agent
28#[derive(Debug, Clone)]
29pub struct AgentConfig {
30    /// Name of the agent (for logging/identity)
31    pub name: String,
32    /// Model to use (provider specific string)
33    pub model: String,
34    /// System prompt / Preamble
35    pub preamble: String,
36    /// Temperature for generation
37    pub temperature: Option<f64>,
38    /// Max tokens to generate
39    pub max_tokens: Option<u64>,
40    /// Additional provider-specific parameters
41    pub extra_params: Option<serde_json::Value>,
42    /// Policy for risky tools
43    pub tool_policy: RiskyToolPolicy,
44    /// Max history messages to send to LLM (Sliding window)
45    pub max_history_messages: usize,
46    /// Max characters allowed in tool output before truncation
47    pub max_tool_output_chars: usize,
48    /// Enable strict JSON mode (response_format: json_object)
49    pub json_mode: bool,
50    /// Optional personality profile
51    pub persona: Option<Persona>,
52    /// Role of the agent in a multi-agent system
53    pub role: AgentRole,
54    /// Max parallel tool calls (default: 5)
55    pub max_parallel_tools: usize,
56    /// Loop detection similarity threshold (0.0 to 1.0, default: 0.8)
57    pub loop_similarity_threshold: f64,
58    /// Standard Operating Procedure / Mission Statement
59    pub sop: Option<String>,
60    /// Whether to enable explicit context caching (e.g. Anthropic cache_control)
61    pub enable_cache_control: bool,
62    /// Whether to use summarization for pruned history instead of truncation
63    pub smart_pruning: bool,
64}
65
66impl Default for AgentConfig {
67    fn default() -> Self {
68        Self {
69            name: "agent".to_string(),
70            model: "gpt-4o".to_string(),
71            preamble: "You are a helpful AI assistant.".to_string(),
72            temperature: Some(0.7),
73            max_tokens: Some(128000), // Updated to larger context window default
74            extra_params: None,
75            tool_policy: RiskyToolPolicy::default(),
76            max_history_messages: 20,
77            max_tool_output_chars: 8192, // Increased for better tool results
78            json_mode: false,
79            persona: None,
80            role: AgentRole::Assistant,
81            max_parallel_tools: 5,
82            loop_similarity_threshold: 0.8,
83            sop: None,
84            enable_cache_control: false,
85            smart_pruning: false,
86        }
87    }
88}
89
90/// Policy for tool execution
91#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub enum ToolPolicy {
94    /// Allow execution without approval
95    Auto,
96    /// Require explicit approval
97    RequiresApproval,
98    /// Disable execution completely
99    Disabled,
100}
101
102/// Configuration for risky tool policies
103#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
104pub struct RiskyToolPolicy {
105    /// Default policy for all tools
106    pub default_policy: ToolPolicy,
107    /// Overrides for specific tools
108    pub overrides: std::collections::HashMap<String, ToolPolicy>,
109}
110
111impl Default for RiskyToolPolicy {
112    fn default() -> Self {
113        Self {
114            default_policy: ToolPolicy::Auto,
115            overrides: std::collections::HashMap::new(),
116        }
117    }
118}
119
120/// Events emitted by the Agent during execution
121#[derive(Debug, Clone, serde::Serialize)]
122#[serde(tag = "type", content = "data", rename_all = "snake_case")]
123pub enum AgentEvent {
124    /// Agent started thinking (prompt received)
125    Thinking { prompt: String },
126    /// A new reasoning step/turn started
127    StepStart { step: usize },
128    /// Agent decided to use a tool (planning phase)
129    ToolCall { tool: String, input: String },
130    /// Tool execution actually started (technical event)
131    ToolExecutionStart { tool: String, input: String },
132    /// Tool execution finished (technical event)
133    ToolExecutionEnd { tool: String, output_preview: String, duration_ms: u64, success: bool },
134    /// Tool execution requires approval
135    ApprovalPending { tool: String, input: String },
136    /// Tool execution result received (semantic event)
137    ToolResult { tool: String, output: String },
138    /// Agent generated a final response
139    Response { content: String },
140    /// Error occurred
141    Error { message: String },
142}
143
144/// Handler for user approvals
145#[async_trait::async_trait]
146pub trait ApprovalHandler: Send + Sync {
147    /// Request approval for a tool call
148    async fn approve(&self, tool_name: &str, arguments: &str) -> anyhow::Result<bool>;
149}
150
151/// A default approval handler that rejects all
152pub struct RejectAllApprovalHandler;
153
154#[async_trait::async_trait]
155impl ApprovalHandler for RejectAllApprovalHandler {
156    async fn approve(&self, _tool: &str, _args: &str) -> anyhow::Result<bool> {
157        Ok(false)
158    }
159}
160
161/// Request sent to the channel handler
162#[derive(Debug)]
163pub struct ApprovalRequest {
164    /// Unique ID for this request
165    pub id: String,
166    /// Tool name
167    pub tool_name: String,
168    /// Tool arguments
169    pub arguments: String,
170    /// Responder channel
171    pub responder: tokio::sync::oneshot::Sender<bool>,
172}
173
174/// A handler that sends approval requests via a channel
175pub struct ChannelApprovalHandler {
176    sender: tokio::sync::mpsc::Sender<ApprovalRequest>,
177}
178
179/// Trait for human-in-the-loop interactions (getting text input)
180#[async_trait::async_trait]
181pub trait InteractionHandler: Send + Sync {
182    /// Ask the user a question and get a string response
183    async fn ask(&self, question: &str) -> anyhow::Result<String>;
184}
185
186#[derive(serde::Deserialize, schemars::JsonSchema)]
187struct AskUserArgs {
188    /// The question to ask the user
189    question: String,
190}
191
192struct AskUserTool {
193    handler: Arc<dyn InteractionHandler>,
194}
195
196#[async_trait::async_trait]
197impl crate::skills::tool::Tool for AskUserTool {
198    fn name(&self) -> String {
199        "ask_user".to_string()
200    }
201
202    async fn definition(&self) -> crate::skills::tool::ToolDefinition {
203        let gen = schemars::gen::SchemaSettings::openapi3().into_generator();
204        let schema = gen.into_root_schema_for::<AskUserArgs>();
205        let schema_json = serde_json::to_value(schema).unwrap_or_default();
206
207        crate::skills::tool::ToolDefinition {
208            name: "ask_user".to_string(),
209            description: "Ask the user for clarification, additional information, or a final decision. Use this when you are stuck or need human input.".to_string(),
210            parameters: schema_json,
211            parameters_ts: Some("interface AskUserArgs {\n  /** The question to ask the user */\n  question: string;\n}".to_string()),
212            is_binary: false,
213            is_verified: true,
214            usage_guidelines: Some("Use this only when you need critical missing information or explicit permission to proceed with a dangerous action (e.g., executing a trade). Avoid asking for obvious or non-essential details.".to_string()),
215        }
216    }
217
218    async fn call(&self, arguments: &str) -> anyhow::Result<String> {
219        let args: AskUserArgs = serde_json::from_str(arguments)?;
220        self.handler.ask(&args.question).await
221    }
222}
223
224impl ChannelApprovalHandler {
225    /// Create a new channel handler
226    pub fn new(sender: tokio::sync::mpsc::Sender<ApprovalRequest>) -> Self {
227        Self { sender }
228    }
229}
230
231#[async_trait::async_trait]
232impl ApprovalHandler for ChannelApprovalHandler {
233    async fn approve(&self, tool_name: &str, arguments: &str) -> anyhow::Result<bool> {
234        let (tx, rx) = tokio::sync::oneshot::channel();
235        
236        let request = ApprovalRequest {
237            id: uuid::Uuid::new_v4().to_string(),
238            tool_name: tool_name.to_string(),
239            arguments: arguments.to_string(),
240            responder: tx,
241        };
242
243        self.sender.send(request).await
244            .map_err(|_| Error::Internal("Approval channel closed".to_string()))?;
245
246        // Wait for response
247        let approved = rx.await
248            .map_err(|_| Error::Internal("Approval responder dropped".to_string()))?;
249            
250        Ok(approved)
251    }
252}
253
254// use crate::infra::notification::{Notifier, NotifyChannel}; // Already imported at top
255
256/// The main Agent struct
257pub struct Agent<P: Provider> {
258    provider: Arc<P>,
259    tools: ToolSet,
260    config: AgentConfig,
261    context_manager: ContextManager,
262    events: broadcast::Sender<AgentEvent>,
263    approval_handler: Arc<dyn ApprovalHandler>,
264    cache: Option<Arc<dyn Cache>>,
265    notifier: Option<Arc<dyn Notifier>>,
266    memory: Option<Arc<dyn Memory>>,
267    session_id: Option<String>,
268    swarm_manager: Option<Arc<tokio::sync::Mutex<SwarmManager>>>,
269    swarm_command_rx: Option<Arc<tokio::sync::Mutex<mpsc::Receiver<SwarmEvent>>>>, // Wrapped in Mutex for easy sharing/cloning if needed, though Agent is immutable usually. 
270    // Actually, receiver is not Sync usually. Arc<Mutex> is required if Agent is shared across threads.
271}
272
273impl<P: Provider> Agent<P> {
274    /// Create a new agent builder
275    pub fn builder(provider: P) -> AgentBuilder<P> {
276        AgentBuilder::new(provider)
277    }
278
279    /// Subscribe to agent events
280    pub fn subscribe(&self) -> broadcast::Receiver<AgentEvent> {
281        self.events.subscribe()
282    }
283
284    /// Helper to emit events safely
285    fn emit(&self, event: AgentEvent) {
286        if let Err(e) = self.events.send(event) {
287            tracing::debug!("Failed to emit event (no receivers): {}", e);
288        }
289    }
290    
291    /// Send a notification via the configured notifier
292    pub async fn notify(&self, channel: NotifyChannel, message: &str) -> Result<()> {
293        if let Some(notifier) = &self.notifier {
294             notifier.notify(channel, message).await
295        } else {
296             // If no notifier configured, log warning but don't fail hard
297             tracing::warn!("Agent tried to notify but no notifier is configured: {}", message);
298             Ok(())
299        }
300    }
301
302    /// Save current state to persistent storage
303    pub async fn checkpoint(&self, messages: &[Message], step: usize, status: SessionStatus) -> Result<()> {
304        if let (Some(memory), Some(session_id)) = (&self.memory, &self.session_id) {
305            let session = crate::agent::session::AgentSession {
306                id: session_id.clone(),
307                messages: messages.to_vec(),
308                step,
309                status,
310                updated_at: chrono::Utc::now(),
311            };
312            memory.store_session(session).await?;
313            debug!("Agent checkpoint saved for session: {}", session_id);
314        }
315        Ok(())
316    }
317
318    /// Resume a previously saved session
319    pub async fn resume(&self, session_id: &str) -> Result<String> {
320        if let Some(memory) = &self.memory {
321            if let Some(session) = memory.retrieve_session(session_id).await? {
322                info!("Resuming agent session: {}", session_id);
323                // We restart the chat with the loaded messages
324                return self.chat(session.messages).await;
325            }
326        }
327        Err(Error::Internal(format!("Session not found: {}", session_id)))
328    }
329
330    /// Send a prompt and get a response (non-streaming)
331    #[instrument(skip(self, prompt), fields(model = %self.config.model))]
332    pub async fn prompt(&self, prompt: impl Into<String>) -> Result<String> {
333        let prompt_str = prompt.into();
334        self.emit(AgentEvent::Thinking { prompt: prompt_str.clone() });
335        
336        let messages = vec![
337            Message::user(prompt_str)
338        ];
339        
340        self.chat(messages).await
341    }
342
343    /// Send messages and get a response (non-streaming)
344    #[instrument(skip(self, messages), fields(model = %self.config.model, message_count = messages.len()))]
345    pub async fn chat(&self, mut messages: Vec<Message>) -> Result<String> {
346        let mut steps = 0;
347        const MAX_STEPS: usize = 15;
348        let mut history = crate::agent::history::QueryHistory::new();
349        
350        loop {
351            if steps >= MAX_STEPS {
352                return Err(Error::agent_config("Max agent steps exceeded"));
353            }
354            steps += 1;
355            
356            self.emit(AgentEvent::StepStart { step: steps });
357
358            if let Some(last) = messages.last() {
359                 if last.role == Role::User {
360                    self.emit(AgentEvent::Thinking { prompt: last.content.as_text() });
361                 }
362            }
363
364            // Save checkpoint before thinking
365            self.checkpoint(&messages, steps, SessionStatus::Thinking).await?;
366
367            info!("Agent starting chat completion (step {})", steps);
368
369            // 1. Check Cache (Step-level caching)
370            if let Some(cache) = &self.cache {
371                if let Ok(Some(cached_response)) = cache.get(&messages).await {
372                    info!("Cache hit! Returning cached response.");
373                    return Ok(cached_response);
374                }
375            }
376
377            // Context Window Management via ContextManager
378            let context_messages = self.context_manager.build_context(&messages).await
379                .map_err(|e| Error::agent_config(format!("Failed to build context: {}", e)))?;
380
381            let stream = self.stream_chat(context_messages).await?;
382            
383            let mut full_text = String::new();
384            let mut tool_calls = Vec::new(); // (id, name, args)
385
386            let mut stream_inner = stream.into_inner();
387
388            // Consume the stream
389            use futures::StreamExt;
390            while let Some(chunk) = stream_inner.next().await {
391                match chunk? {
392                    crate::agent::streaming::StreamingChoice::Message(text) => {
393                        full_text.push_str(&text);
394                    }
395                    crate::agent::streaming::StreamingChoice::ToolCall { id, name, arguments } => {
396                        tool_calls.push((id, name, arguments));
397                    }
398                     crate::agent::streaming::StreamingChoice::ParallelToolCalls(map) => {
399                         let mut sorted: Vec<_> = map.into_iter().collect();
400                         sorted.sort_by_key(|(k,_)| *k);
401                         for (_, tc) in sorted {
402                             tool_calls.push((tc.id, tc.name, tc.arguments));
403                         }
404                    }
405                    _ => {}
406                }
407            }
408
409            // If no tool calls, we are done
410            if tool_calls.is_empty() {
411                self.emit(AgentEvent::Response { content: full_text.clone() });
412                
413                // Store in cache
414                if let Some(cache) = &self.cache {
415                    let _ = cache.set(&messages, full_text.clone()).await;
416                }
417                
418                return Ok(full_text);
419            }
420
421            // We have tool calls.
422            // 1. Append Assistant Message (Thought + Calls) to history
423            let mut parts = Vec::new();
424            if !full_text.is_empty() {
425                parts.push(crate::agent::message::ContentPart::Text { text: full_text.clone() });
426            }
427            for (id, name, args) in &tool_calls {
428                parts.push(crate::agent::message::ContentPart::ToolCall {
429                    id: id.clone(),
430                    name: name.clone(),
431                    arguments: args.clone(),
432                });
433            }
434            messages.push(Message {
435                role: Role::Assistant,
436                name: None,
437                content: Content::Parts(parts),
438            });
439
440            // 2. Execute Tools (Parallel with Limit)
441            let tools = &self.tools;
442            let policy = &self.config.tool_policy;
443            let events = &self.events;
444            let approval_handler = &self.approval_handler;
445            let max_parallel = self.config.max_parallel_tools;
446            
447            use futures::stream;
448            
449            let current_messages = Arc::new(messages.clone());
450            let threshold = self.config.loop_similarity_threshold;
451            
452            // Check for potential loops before execution
453            let mut processed_calls = Vec::new();
454            for (id, name, args) in tool_calls {
455                let args_str = args.to_string();
456                if let Some(warning) = history.check_loop(&name, &args_str, threshold) {
457                    tracing::warn!(tool = %name, "Loop detected: {}", warning);
458                    // Inject warning as a tool result immediately instead of calling the tool
459                    messages.push(Message {
460                        role: Role::Tool,
461                        name: None,
462                        content: Content::Parts(vec![crate::agent::message::ContentPart::ToolResult {
463                            tool_call_id: id,
464                            content: format!("Error: Potential loop detected. {}", warning),
465                            name: Some(name),
466                        }]),
467                    });
468                } else {
469                    // Not a loop, proceed with execution
470                    history.record(name.clone(), args_str.clone());
471                    processed_calls.push((id, name, args));
472                }
473            }
474
475            if processed_calls.is_empty() && !messages.is_empty() {
476                // All calls were loops, continue to next iteration where LLM will see the warnings
477                continue;
478            }
479
480            let results: Vec<crate::error::Result<(String, String, String)>> = stream::iter(processed_calls)
481                .map(|(id, name, args)| {
482                    let name_clone = name.clone();
483                    let id_clone = id.clone();
484                    let args_str = args.to_string();
485                    let msgs = Arc::clone(&current_messages);
486                    
487                    async move {
488                        // 1. Get tool definition (cached in ToolSet)
489                        let tool_ref = tools.get(&name_clone).ok_or_else(|| Error::ToolNotFound(name_clone.clone()))?;
490                        
491                        let def = tool_ref.definition().await;
492
493                        // 2. Check policy and security overrides
494                        let mut effective_policy = policy.overrides.get(&name_clone)
495                            .unwrap_or(&policy.default_policy).clone();
496                        
497                        // Binary Safety Override: Unverified binary skills ALWAYS require approval
498                        if def.is_binary && !def.is_verified {
499                            if effective_policy != ToolPolicy::Disabled {
500                                tracing::warn!(tool = %name_clone, "Unverified binary skill detected. Enforcing manual approval.");
501                                effective_policy = ToolPolicy::RequiresApproval;
502                            }
503                        }
504
505                        let start_time = std::time::Instant::now();
506                        let _ = events.send(AgentEvent::ToolExecutionStart { 
507                            tool: name_clone.clone(), 
508                            input: args_str.clone() 
509                        });
510
511                        let result = match effective_policy {
512                            ToolPolicy::Disabled => {
513                                Err(Error::tool_execution(name_clone.clone(), "Tool execution is disabled by policy".to_string()))
514                            }
515                            ToolPolicy::RequiresApproval => {
516                                let _ = events.send(AgentEvent::ApprovalPending { 
517                                    tool: name_clone.clone(), 
518                                    input: args_str.clone() 
519                                });
520                                
521                                // Checkpoint before awaiting approval
522                                self.checkpoint(&msgs, steps, SessionStatus::AwaitingApproval { 
523                                    tool_name: name_clone.clone(), 
524                                    arguments: args_str.clone() 
525                                }).await?;
526
527                                // Ask approval handler
528                                match approval_handler.approve(&name_clone, &args_str).await {
529                                    Ok(true) => {
530                                        tools.call(&name_clone, &args_str).await
531                                            .map_err(|e| Error::tool_execution(name_clone.clone(), e.to_string()))
532                                    }
533                                    Ok(false) => {
534                                        Err(Error::ToolApprovalRequired { tool_name: name_clone.clone() })
535                                    }
536                                    Err(e) => {
537                                        Err(Error::tool_execution(name_clone.clone(), format!("Approval check failed: {}", e)))
538                                    }
539                                }
540                            }
541                            ToolPolicy::Auto => {
542                                tools.call(&name_clone, &args_str).await
543                                    .map_err(|e| Error::tool_execution(name_clone.clone(), e.to_string()))
544                            }
545                        };
546
547                        let duration = start_time.elapsed().as_millis() as u64;
548                        
549                        match result {
550                            Ok(output) => {
551                                let preview = if output.len() > 100 {
552                                    format!("{}...", &output[..100])
553                                } else {
554                                    output.clone()
555                                };
556                                
557                                let _ = events.send(AgentEvent::ToolExecutionEnd { 
558                                    tool: name_clone.clone(), 
559                                    output_preview: preview,
560                                    duration_ms: duration,
561                                    success: true
562                                });
563
564                                let _ = events.send(AgentEvent::ToolResult { 
565                                    tool: name_clone.clone(), 
566                                    output: output.clone() 
567                                });
568                                Ok((id_clone, name_clone, output))
569                            },
570                            Err(e) => {
571                                let _ = events.send(AgentEvent::ToolExecutionEnd { 
572                                    tool: name_clone.clone(), 
573                                    output_preview: e.to_string(),
574                                    duration_ms: duration,
575                                    success: false
576                                });
577
578                                let _ = events.send(AgentEvent::Error { message: e.to_string() });
579                                Ok((id_clone, name_clone, format!("Error: {}", e)))
580                            }
581                        }
582                    }
583                })
584                .buffer_unordered(max_parallel)
585                .collect()
586                .await;
587
588            // 3. Append Tool Results to history
589            for res in results {
590                let (id, name, output) = res.unwrap(); // Safe because we handle Err inside async move
591                 messages.push(Message {
592                    role: Role::Tool,
593                    name: None,
594                    content: Content::Parts(vec![crate::agent::message::ContentPart::ToolResult {
595                        tool_call_id: id,
596                        content: output,
597                        name: Some(name),
598                    }]),
599                });
600            }
601        }
602    }
603
604    /// Stream a prompt response
605    pub async fn stream(&self, prompt: impl Into<String>) -> Result<StreamingResponse> {
606        let messages = vec![Message::user(prompt.into())];
607        self.stream_chat(messages).await
608    }
609
610    /// Stream a chat response
611    pub async fn stream_chat(&self, messages: Vec<Message>) -> Result<StreamingResponse> {
612        let mut extra = self.config.extra_params.clone().unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
613        
614        // Inject JSON mode if enabled
615        if self.config.json_mode {
616            if let serde_json::Value::Object(ref mut map) = extra {
617                if !map.contains_key("response_format") {
618                     map.insert("response_format".to_string(), serde_json::json!({ "type": "json_object" }));
619                }
620            }
621        }
622
623        let request = crate::agent::provider::ChatRequest {
624            model: self.config.model.clone(),
625            system_prompt: Some(self.config.preamble.clone()),
626            messages,
627            tools: self.tools.definitions().await,
628            temperature: self.config.temperature,
629            max_tokens: self.config.max_tokens,
630            extra_params: Some(extra),
631            enable_cache_control: self.config.enable_cache_control,
632        };
633
634        self.provider.stream_completion(request).await
635    }
636
637    /// Call a tool by name (Direct call helper)
638    #[instrument(skip(self, arguments), fields(tool_name = %name))]
639    pub async fn call_tool(&self, name: &str, arguments: &str) -> Result<String> {
640        // 1. Check Policy
641        let policy = self.config.tool_policy.overrides.get(name)
642            .unwrap_or(&self.config.tool_policy.default_policy);
643
644        match policy {
645            ToolPolicy::Disabled => {
646                 return Err(Error::tool_execution(name.to_string(), "Tool execution is disabled by policy".to_string()));
647            }
648            ToolPolicy::RequiresApproval => {
649                self.emit(AgentEvent::ApprovalPending { tool: name.to_string(), input: arguments.to_string() });
650                
651                match self.approval_handler.approve(name, arguments).await {
652                    Ok(true) => {}, // Proceed
653                    Ok(false) => return Err(Error::ToolApprovalRequired { tool_name: name.to_string() }),
654                    Err(e) => return Err(Error::tool_execution(name.to_string(), format!("Approval check failed: {}", e)))
655                }
656            }
657            ToolPolicy::Auto => {} // Proceed
658        }
659
660        self.emit(AgentEvent::ToolCall { tool: name.to_string(), input: arguments.to_string() });
661
662        let result = self.tools.call(name, arguments).await;
663        
664        match result {
665            Ok(mut output) => {
666                // Quota Protection: Truncate tool output if too long
667                if output.len() > self.config.max_tool_output_chars {
668                    let original_len = output.len();
669                    output.truncate(self.config.max_tool_output_chars);
670                    output.push_str(&format!("\n\n(Note: Output truncated from {} to {} chars to save tokens)", 
671                        original_len, self.config.max_tool_output_chars));
672                }
673
674                self.emit(AgentEvent::ToolResult { tool: name.to_string(), output: output.clone() });
675                Ok(output)
676            },
677            Err(e) => {
678                self.emit(AgentEvent::Error { message: e.to_string() });
679                // Map anyhow error to ToolExecution error
680                Err(Error::tool_execution(name.to_string(), e.to_string()))
681            }
682        }
683    }
684
685    /// Check if agent has a tool
686    pub fn has_tool(&self, name: &str) -> bool {
687        self.tools.contains(name)
688    }
689
690    /// Add tool definitions
691    pub async fn tool_definitions(&self) -> Vec<crate::skills::tool::ToolDefinition> {
692        self.tools.definitions().await
693    }
694
695    /// Get the agent's configuration
696    pub fn config(&self) -> &AgentConfig {
697        &self.config
698    }
699
700    /// Get the model name
701    pub fn model(&self) -> &str {
702        &self.config.model
703    }
704
705    /// Start a proactive loop that listens for tasks from multiple sources
706    pub async fn listen(
707        &self, 
708        mut user_input: tokio::sync::mpsc::Receiver<String>,
709        mut external_events: tokio::sync::mpsc::Receiver<AgentMessage>
710    ) -> Result<()> {
711        info!("Agent {} starting proactive loop", self.config.name);
712        
713        // Start Swarm Manager background task
714        if let Some(swarm) = &self.swarm_manager {
715            let swarm = swarm.clone();
716            tokio::spawn(async move {
717                loop {
718                    // Lock and process
719                    {
720                        let mut manager = swarm.lock().await;
721                        if let Err(e) = manager.process_inbox().await {
722                            tracing::debug!("Swarm inbox error: {}", e);
723                        }
724                    }
725                    // Small sleep to prevent busy loop since process_inbox uses try_recv
726                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
727                }
728            });
729            info!("Swarm manager background task started");
730        }
731        
732        // Prepare swarm command rx if available
733        let mut swarm_rx_guard = if let Some(rx_mutex) = &self.swarm_command_rx {
734             Some(rx_mutex.lock().await)
735        } else {
736             None
737        };
738        
739        loop {
740            tokio::select! {
741                // Handle swarm commands
742                Some(cmd) = async {
743                    if let Some(guard) = &mut swarm_rx_guard {
744                        guard.recv().await
745                    } else {
746                        std::future::pending().await
747                    }
748                } => {
749                     match cmd {
750                        SwarmEvent::ExecuteTask { request_id, task, context: _ } => {
751                            info!("Swarm: Executing delegated task {}: {}", request_id, task);
752                            // Execute task using agent's process
753                            let result = match self.process(&task).await {
754                                Ok(output) => {
755                                    info!("Swarm: Task execution success");
756                                    output
757                                },
758                                Err(e) => {
759                                    error!("Swarm: Task execution failed: {}", e);
760                                    format!("Error: {}", e)
761                                }
762                            };
763                            
764                            // Send result back
765                            if let Some(swarm) = &self.swarm_manager {
766                                let mut manager = swarm.lock().await;
767                                let success = !result.starts_with("Error:");
768                                if let Err(e) = manager.send_result(&request_id, result, success).await {
769                                     error!("Failed to send swarm result: {}", e);
770                                }
771                            }
772                        }
773                        SwarmEvent::TaskResult { request_id, result, success } => {
774                             info!("Swarm: Request {} completed. Success: {}. Result: {}", request_id, success, result);
775                        }
776                     }
777                }
778
779                // 1. Handle user input
780                input = user_input.recv() => {
781                    match input {
782                        Some(text) => {
783                            if let Err(e) = self.process(&text).await {
784                                error!("Error in proactive user task: {}", e);
785                            }
786                        }
787                        None => {
788                            info!("User input channel closed, exiting proactive loop");
789                            break;
790                        }
791                    }
792                }
793                
794                // 2. Handle external agent/system messages (e.g. from Scheduler)
795                msg = external_events.recv() => {
796                    match msg {
797                        Some(message) => {
798                            if let Err(e) = self.handle_message(message).await {
799                                error!("Error in proactive external task: {}", e);
800                            }
801                        }
802                        None => {
803                            info!("External events channel closed, exiting proactive loop");
804                            break;
805                        }
806                    }
807                }
808            }
809        }
810        
811        Ok(())
812    }
813}
814
815/// Builder for creating agents
816pub struct AgentBuilder<P: Provider> {
817    provider: P,
818    tools: ToolSet,
819    config: AgentConfig,
820    injectors: Vec<Box<dyn ContextInjector>>,
821    approval_handler: Option<Arc<dyn ApprovalHandler>>,
822    interaction_handler: Option<Arc<dyn InteractionHandler>>,
823    notifier: Option<Arc<dyn Notifier>>,
824    cache: Option<Arc<dyn Cache>>,
825    /// Security: Track if Python Sidecar is enabled (mutually exclusive with DynamicSkill)
826    has_sidecar: bool,
827    /// Security: Track if DynamicSkill is enabled (mutually exclusive with Sidecar)
828    has_dynamic_skill: bool,
829    memory: Option<Arc<dyn Memory>>,
830    session_id: Option<String>,
831    swarm_manager: Option<Arc<tokio::sync::Mutex<SwarmManager>>>,
832    swarm_command_rx: Option<mpsc::Receiver<SwarmEvent>>,
833}
834
835impl<P: Provider> AgentBuilder<P> {
836    /// Create a new builder with a provider
837    pub fn new(provider: P) -> Self {
838        Self {
839            provider,
840            tools: ToolSet::new(),
841            config: AgentConfig::default(),
842            injectors: Vec::new(),
843            approval_handler: None,
844            interaction_handler: None,
845            notifier: None,
846            cache: None,
847            has_sidecar: false,
848            has_dynamic_skill: false,
849            memory: None,
850            session_id: None,
851            swarm_manager: None,
852            swarm_command_rx: None,
853        }
854    }
855
856    /// Set the model to use
857    pub fn model(mut self, model: impl Into<String>) -> Self {
858        self.config.model = model.into();
859        self
860    }
861
862    /// Set the system prompt
863    pub fn system_prompt(mut self, prompt: impl Into<String>) -> Self {
864        self.config.preamble = prompt.into();
865        self
866    }
867
868    /// Alias for system_prompt
869    pub fn preamble(self, prompt: impl Into<String>) -> Self {
870        self.system_prompt(prompt)
871    }
872
873    /// Set the temperature
874    pub fn temperature(mut self, temp: f64) -> Self {
875        self.config.temperature = Some(temp);
876        self
877    }
878
879    /// Set max tokens
880    pub fn max_tokens(mut self, tokens: u64) -> Self {
881        self.config.max_tokens = Some(tokens);
882        self
883    }
884
885    /// Add extra provider-specific parameters
886    pub fn extra_params(mut self, params: serde_json::Value) -> Self {
887        self.config.extra_params = Some(params);
888        self
889    }
890
891    /// Set tool policy
892    pub fn tool_policy(mut self, policy: RiskyToolPolicy) -> Self {
893        self.config.tool_policy = policy;
894        self
895    }
896
897    /// Set external approval handler
898    pub fn approval_handler(mut self, handler: impl ApprovalHandler + 'static) -> Self {
899        self.approval_handler = Some(Arc::new(handler));
900        self
901    }
902
903    /// Set interaction handler (for HITL)
904    pub fn interaction_handler(mut self, handler: impl InteractionHandler + 'static) -> Self {
905        self.interaction_handler = Some(Arc::new(handler));
906        self
907    }
908
909    /// Set max history messages (sliding window)
910    pub fn max_history_messages(mut self, count: usize) -> Self {
911        self.config.max_history_messages = count;
912        self
913    }
914
915    /// Set max tool output characters
916    pub fn max_tool_output_chars(mut self, count: usize) -> Self {
917        self.config.max_tool_output_chars = count;
918        self
919    }
920
921    /// Enable strict JSON mode (enforces response_format: json_object)
922    pub fn json_mode(mut self, enable: bool) -> Self {
923        self.config.json_mode = enable;
924        self
925    }
926    
927    /// Set the agent's personality
928    pub fn persona(mut self, persona: Persona) -> Self {
929        self.config.persona = Some(persona);
930        self
931    }
932    
933    /// Set a notifier
934    pub fn notifier(mut self, notifier: impl Notifier + 'static) -> Self {
935        self.notifier = Some(Arc::new(notifier));
936        self
937    }
938
939    /// Set session ID for persistence
940    pub fn session_id(mut self, id: impl Into<String>) -> Self {
941        self.session_id = Some(id.into());
942        self
943    }
944
945    /// Set the agent's role
946    pub fn role(mut self, role: AgentRole) -> Self {
947        self.config.role = role;
948        self
949    }
950
951    /// Add a context injector
952    pub fn context_injector(mut self, injector: impl ContextInjector + 'static) -> Self {
953        self.injectors.push(Box::new(injector));
954        self
955    }
956
957    /// Add a tool
958    pub fn tool<T: Tool + 'static>(self, tool: T) -> Self {
959        self.tools.add(tool);
960        self
961    }
962
963    /// Add a shared tool
964    pub fn shared_tool(self, tool: Arc<dyn Tool>) -> Self {
965        self.tools.add_shared(tool);
966        self
967    }
968
969    /// Add multiple tools from a toolset
970    pub fn tools(self, tools: ToolSet) -> Self {
971        for (_, tool) in tools.iter() {
972            self.tools.add_shared(tool);
973        }
974        self
975    }
976
977    /// Add memory tools using the provided memory implementation
978    pub fn with_memory(self, memory: Arc<dyn crate::agent::memory::Memory>) -> Self {
979        self.tools.add(SearchHistoryTool::new(memory.clone()));
980        self.tools.add(RememberThisTool::new(memory.clone()));
981        self.tools.add(TieredSearchTool::new(memory.clone()));
982        self.tools.add(FetchDocumentTool::new(memory.clone()));
983        
984        let mut builder = self;
985        builder.memory = Some(memory);
986        builder
987    }
988
989    /// Add DynamicSkill support (ClawHub skills, custom scripts)
990    /// 
991    /// # Security
992    /// 
993    /// **CRITICAL**: DynamicSkill and Python Sidecar are mutually exclusive.
994    /// This method will return an error if Python Sidecar has already been configured.
995    /// 
996    /// **Rationale**: If both are enabled, malicious DynamicSkills can pollute the
997    /// Agent's context with secrets, which may then be used by LLM-generated Python
998    /// code in the unsandboxed Sidecar to exfiltrate data.
999    /// 
1000    /// See SECURITY.md for details.
1001    pub fn with_dynamic_skills(mut self, skill_loader: Arc<crate::skills::SkillLoader>) -> Result<Self> {
1002        // Security check: prevent enabling both Sidecar and DynamicSkill
1003        if self.has_sidecar {
1004            return Err(Error::agent_config(
1005                "Security Error: Cannot enable DynamicSkill when Python Sidecar is configured. \
1006                These are mutually exclusive due to context pollution risks. \
1007                See SECURITY.md for details."
1008            ));
1009        }
1010        
1011        // Add all loaded skills as tools
1012        for skill_ref in skill_loader.skills.iter() {
1013            self.tools.add_shared(Arc::clone(skill_ref.value()) as Arc<dyn crate::skills::tool::Tool>);
1014        }
1015        
1016        // Add ClawHub, ReadSkillDoc and ForgeSkill tools
1017        self.tools.add(crate::skills::ClawHubTool::new(Arc::clone(&skill_loader)));
1018        self.tools.add(crate::skills::ReadSkillDoc::new(Arc::clone(&skill_loader)));
1019        let github_compiler = if let (Ok(token), Ok(repo)) = (std::env::var("GITHUB_TOKEN"), std::env::var("GITHUB_REPO")) {
1020            Some(crate::skills::compiler::GithubCompiler::new(repo, token, self.notifier.clone()))
1021        } else {
1022            None
1023        };
1024
1025        self.tools.add(crate::skills::tool::ForgeSkill::new(
1026            Arc::clone(&skill_loader),
1027            self.tools.clone(),
1028            skill_loader.base_path.clone(),
1029            github_compiler
1030        ));
1031        
1032        self.has_dynamic_skill = true;
1033        
1034        Ok(self)
1035    }
1036
1037    /// Add code interpreter capability using the given sidecar address
1038    /// 
1039    /// # Security
1040    /// 
1041    /// **CRITICAL**: Python Sidecar and DynamicSkill are mutually exclusive.
1042    /// This method will return an error if DynamicSkill has already been configured.
1043    /// 
1044    /// **Rationale**: Python Sidecar has no sandbox isolation. If DynamicSkill is also
1045    /// enabled, malicious skills can pollute the Agent's context, leading to secret
1046    /// exfiltration via LLM-generated Python code in the Sidecar.
1047    /// 
1048    /// See SECURITY.md for details.
1049    pub async fn with_code_interpreter(mut self, address: impl Into<String>) -> Result<Self> {
1050        // Security check: prevent enabling both Sidecar and DynamicSkill
1051        if self.has_dynamic_skill {
1052            return Err(Error::agent_config(
1053                "Security Error: Cannot enable Python Sidecar when DynamicSkill is configured. \
1054                These are mutually exclusive due to context pollution risks. \
1055                See SECURITY.md for details."
1056            ));
1057        }
1058        
1059        let sidecar = crate::skills::capabilities::Sidecar::connect(address.into()).await?;
1060        let shared_sidecar = Arc::new(tokio::sync::Mutex::new(sidecar));
1061        
1062        self.tools.add(crate::skills::tool::code_interpreter::CodeInterpreter::new(shared_sidecar));
1063        self.has_sidecar = true;
1064        
1065        Ok(self)
1066    }
1067
1068    /// Build the agent
1069    /// 
1070    /// # Security Defaults
1071    /// 
1072    /// If neither Python Sidecar nor DynamicSkill has been explicitly configured,
1073    /// this method will automatically enable DynamicSkill with default settings:
1074    /// - Skills directory: `./skills`
1075    /// - Network access: disabled (secure sandbox)
1076    /// 
1077    /// To use Python Sidecar instead, call `.with_code_interpreter()` before `.build()`.
1078    pub fn build(mut self) -> Result<Agent<P>> {
1079        // Validate configuration
1080        if self.config.model.is_empty() {
1081            return Err(Error::agent_config("model name cannot be empty"));
1082        }
1083        if self.config.max_history_messages == 0 {
1084            return Err(Error::agent_config("max_history_messages must be at least 1"));
1085        }
1086
1087        // SECURITY DEFAULT: Auto-enable DynamicSkill if no execution model configured
1088        if !self.has_sidecar && !self.has_dynamic_skill {
1089            info!("No execution model configured. Auto-enabling DynamicSkill (default)...");
1090            
1091            // Try to load skills from default directory
1092            let skill_loader = Arc::new(crate::skills::SkillLoader::new("./skills"));
1093            
1094            // Attempt to load skills (non-fatal if directory doesn't exist)
1095            // Capture handle before entering blocking context
1096            let handle = tokio::runtime::Handle::current();
1097            match tokio::task::block_in_place(|| {
1098                handle.block_on(skill_loader.load_all())
1099            }) {
1100                Ok(_) => {
1101                    info!("Loaded DynamicSkills from ./skills");
1102                    
1103                    // Add all loaded skills as tools
1104                    for skill_ref in skill_loader.skills.iter() {
1105                        self.tools.add_shared(Arc::clone(skill_ref.value()) as Arc<dyn crate::skills::tool::Tool>);
1106                    }
1107                    
1108                    // Add ClawHub, ReadSkillDoc and ForgeSkill tools
1109                    self.tools.add(crate::skills::ClawHubTool::new(Arc::clone(&skill_loader)));
1110                    self.tools.add(crate::skills::ReadSkillDoc::new(Arc::clone(&skill_loader)));
1111                    let github_compiler = if let (Ok(token), Ok(repo)) = (std::env::var("GITHUB_TOKEN"), std::env::var("GITHUB_REPO")) {
1112                        Some(crate::skills::compiler::GithubCompiler::new(repo, token, self.notifier.clone()))
1113                    } else {
1114                        None
1115                    };
1116
1117                    self.tools.add(crate::skills::tool::ForgeSkill::new(
1118                        Arc::clone(&skill_loader),
1119                        self.tools.clone(),
1120                        skill_loader.base_path.clone(),
1121                        github_compiler
1122                    ));
1123                    
1124                    self.has_dynamic_skill = true;
1125                },
1126                Err(e) => {
1127                    // Non-fatal: Skills directory doesn't exist or is empty
1128                    info!("DynamicSkill auto-enable skipped (no skills found): {}", e);
1129                    // Continue without skills - agent will still function with other tools
1130                }
1131            }
1132        }
1133
1134        let (tx, _) = broadcast::channel(1000);
1135
1136        let mut context_config = ContextConfig {
1137            max_tokens: self.config.max_tokens.unwrap_or(128000) as usize,
1138            max_history_messages: self.config.max_history_messages,
1139            response_reserve: 4096, // Use a fixed safe reserve for responses
1140            enable_cache_control: self.config.enable_cache_control,
1141            smart_pruning: self.config.smart_pruning,
1142        };
1143
1144        // Ensure response_reserve doesn't eat more than 50% of context
1145        if context_config.response_reserve > context_config.max_tokens / 2 {
1146            context_config.response_reserve = context_config.max_tokens / 2;
1147        }
1148
1149        let mut context_manager = ContextManager::new(context_config);
1150        context_manager.set_system_prompt(self.config.preamble.clone());
1151        
1152        // Inject all tools as TS interfaces in the system prompt
1153        // This fulfills the 'Replace JSON with TS in Prompt' requirement.
1154        context_manager.add_injector(Box::new(self.tools.clone()));
1155
1156        for injector in self.injectors {
1157            context_manager.add_injector(injector);
1158        }
1159
1160        if let Some(persona) = &self.config.persona {
1161            context_manager.add_injector(Box::new(PersonalityManager::new(persona.clone())));
1162        }
1163
1164        // Auto-register AskUser tool if handler available
1165        let tools = self.tools;
1166        if let Some(handler) = &self.interaction_handler {
1167            tools.add(AskUserTool { handler: Arc::clone(handler) });
1168        }
1169
1170        let mut agent = Agent {
1171            provider: Arc::new(self.provider),
1172            tools,
1173            config: self.config.clone(),
1174            context_manager,
1175            events: tx,
1176            approval_handler: self.approval_handler.unwrap_or_else(|| Arc::new(RejectAllApprovalHandler)),
1177            cache: self.cache,
1178            notifier: self.notifier,
1179            memory: self.memory,
1180            session_id: self.session_id,
1181            swarm_manager: self.swarm_manager,
1182            swarm_command_rx: self.swarm_command_rx.map(|rx| Arc::new(tokio::sync::Mutex::new(rx))),
1183        };
1184
1185        // Inject SOP into system prompt if exists
1186        if let Some(sop) = &agent.config.sop {
1187            agent.context_manager.set_system_prompt(format!("### Standard Operating Procedure (SOP)\n{}\n", sop));
1188            
1189            // Also sync with SwarmManager if it exists
1190            if let Some(swarm) = &agent.swarm_manager {
1191                if let Ok(mut manager) = swarm.try_lock() {
1192                    manager.set_sop(Some(sop.clone()));
1193                }
1194            }
1195        }
1196
1197        Ok(agent)
1198    }
1199
1200    /// Add delegation support using the provided coordinator
1201    pub fn with_delegation(self, coordinator: Arc<Coordinator>) -> Self {
1202        self.tools.add(DelegateTool::new(Arc::downgrade(&coordinator)));
1203        self
1204    }
1205
1206    /// Add scheduling support using the provided scheduler
1207    pub fn with_scheduler(self, scheduler: Arc<Scheduler>) -> Self {
1208        self.tools.add(CronTool::new(Arc::downgrade(&scheduler)));
1209        self
1210    }
1211
1212    /// Enable dynamic swarming capabilities
1213    pub fn with_swarm(mut self, manager: Arc<tokio::sync::Mutex<crate::agent::swarm::manager::SwarmManager>>, cmd_rx: mpsc::Receiver<SwarmEvent>) -> Self {
1214        self.swarm_manager = Some(manager);
1215        self.swarm_command_rx = Some(cmd_rx);
1216        self
1217    }
1218}
1219
1220#[async_trait::async_trait]
1221impl<P: Provider> MultiAgent for Agent<P> {
1222    fn role(&self) -> AgentRole {
1223        self.config.role.clone()
1224    }
1225
1226    async fn handle_message(&self, message: AgentMessage) -> Result<Option<AgentMessage>> {
1227        info!("Agent {:?} handling message from {:?}", self.role(), message.from);
1228        let response = self.prompt(message.content).await?;
1229        
1230        Ok(Some(AgentMessage {
1231            from: self.role(),
1232            to: Some(message.from),
1233            content: response,
1234            msg_type: crate::agent::multi_agent::MessageType::Response,
1235        }))
1236    }
1237
1238    async fn process(&self, input: &str) -> Result<String> {
1239        self.prompt(input).await
1240    }
1241}
1242
1243#[cfg(test)]
1244mod tests {
1245    use super::*;
1246
1247    #[test]
1248    fn test_agent_config_default() {
1249        let config = AgentConfig::default();
1250        assert_eq!(config.model, "gpt-4o");
1251        assert_eq!(config.max_tokens, Some(128000));
1252    }
1253}