1use std::sync::Arc;
4use tokio::sync::broadcast;
5use tracing::{info, instrument, error, debug};
6use anyhow;
7
8use crate::error::{Error, Result};
9use crate::agent::context::ContextInjector;
10use crate::agent::message::{Message, Role, Content};
11use crate::agent::provider::Provider;
12use crate::agent::memory::Memory;
13use crate::agent::session::SessionStatus;
14use crate::skills::tool::{Tool, ToolSet};
15use crate::agent::streaming::StreamingResponse;
16use crate::skills::tool::memory::{SearchHistoryTool, RememberThisTool, TieredSearchTool, FetchDocumentTool}; use crate::agent::context::{ContextManager, ContextConfig}; use crate::agent::multi_agent::{Coordinator, AgentRole, MultiAgent, AgentMessage};
19use crate::agent::personality::{Persona, PersonalityManager};
20use crate::agent::cache::Cache;
21use crate::agent::scheduler::Scheduler;
22use crate::skills::tool::{DelegateTool, CronTool};
23use crate::infra::notification::{Notifier, NotifyChannel};
24use crate::agent::swarm::manager::{SwarmManager, SwarmEvent};
25use tokio::sync::mpsc;
26
27#[derive(Debug, Clone)]
29pub struct AgentConfig {
30 pub name: String,
32 pub model: String,
34 pub preamble: String,
36 pub temperature: Option<f64>,
38 pub max_tokens: Option<u64>,
40 pub extra_params: Option<serde_json::Value>,
42 pub tool_policy: RiskyToolPolicy,
44 pub max_history_messages: usize,
46 pub max_tool_output_chars: usize,
48 pub json_mode: bool,
50 pub persona: Option<Persona>,
52 pub role: AgentRole,
54 pub max_parallel_tools: usize,
56 pub loop_similarity_threshold: f64,
58 pub sop: Option<String>,
60 pub enable_cache_control: bool,
62 pub smart_pruning: bool,
64}
65
66impl Default for AgentConfig {
67 fn default() -> Self {
68 Self {
69 name: "agent".to_string(),
70 model: "gpt-4o".to_string(),
71 preamble: "You are a helpful AI assistant.".to_string(),
72 temperature: Some(0.7),
73 max_tokens: Some(128000), extra_params: None,
75 tool_policy: RiskyToolPolicy::default(),
76 max_history_messages: 20,
77 max_tool_output_chars: 8192, json_mode: false,
79 persona: None,
80 role: AgentRole::Assistant,
81 max_parallel_tools: 5,
82 loop_similarity_threshold: 0.8,
83 sop: None,
84 enable_cache_control: false,
85 smart_pruning: false,
86 }
87 }
88}
89
90#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub enum ToolPolicy {
94 Auto,
96 RequiresApproval,
98 Disabled,
100}
101
102#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
104pub struct RiskyToolPolicy {
105 pub default_policy: ToolPolicy,
107 pub overrides: std::collections::HashMap<String, ToolPolicy>,
109}
110
111impl Default for RiskyToolPolicy {
112 fn default() -> Self {
113 Self {
114 default_policy: ToolPolicy::Auto,
115 overrides: std::collections::HashMap::new(),
116 }
117 }
118}
119
120#[derive(Debug, Clone, serde::Serialize)]
122#[serde(tag = "type", content = "data", rename_all = "snake_case")]
123pub enum AgentEvent {
124 Thinking { prompt: String },
126 StepStart { step: usize },
128 ToolCall { tool: String, input: String },
130 ToolExecutionStart { tool: String, input: String },
132 ToolExecutionEnd { tool: String, output_preview: String, duration_ms: u64, success: bool },
134 ApprovalPending { tool: String, input: String },
136 ToolResult { tool: String, output: String },
138 Response { content: String },
140 Error { message: String },
142}
143
144#[async_trait::async_trait]
146pub trait ApprovalHandler: Send + Sync {
147 async fn approve(&self, tool_name: &str, arguments: &str) -> anyhow::Result<bool>;
149}
150
151pub struct RejectAllApprovalHandler;
153
154#[async_trait::async_trait]
155impl ApprovalHandler for RejectAllApprovalHandler {
156 async fn approve(&self, _tool: &str, _args: &str) -> anyhow::Result<bool> {
157 Ok(false)
158 }
159}
160
161#[derive(Debug)]
163pub struct ApprovalRequest {
164 pub id: String,
166 pub tool_name: String,
168 pub arguments: String,
170 pub responder: tokio::sync::oneshot::Sender<bool>,
172}
173
174pub struct ChannelApprovalHandler {
176 sender: tokio::sync::mpsc::Sender<ApprovalRequest>,
177}
178
179#[async_trait::async_trait]
181pub trait InteractionHandler: Send + Sync {
182 async fn ask(&self, question: &str) -> anyhow::Result<String>;
184}
185
186#[derive(serde::Deserialize, schemars::JsonSchema)]
187struct AskUserArgs {
188 question: String,
190}
191
192struct AskUserTool {
193 handler: Arc<dyn InteractionHandler>,
194}
195
196#[async_trait::async_trait]
197impl crate::skills::tool::Tool for AskUserTool {
198 fn name(&self) -> String {
199 "ask_user".to_string()
200 }
201
202 async fn definition(&self) -> crate::skills::tool::ToolDefinition {
203 let gen = schemars::gen::SchemaSettings::openapi3().into_generator();
204 let schema = gen.into_root_schema_for::<AskUserArgs>();
205 let schema_json = serde_json::to_value(schema).unwrap_or_default();
206
207 crate::skills::tool::ToolDefinition {
208 name: "ask_user".to_string(),
209 description: "Ask the user for clarification, additional information, or a final decision. Use this when you are stuck or need human input.".to_string(),
210 parameters: schema_json,
211 parameters_ts: Some("interface AskUserArgs {\n /** The question to ask the user */\n question: string;\n}".to_string()),
212 is_binary: false,
213 is_verified: true,
214 usage_guidelines: Some("Use this only when you need critical missing information or explicit permission to proceed with a dangerous action (e.g., executing a trade). Avoid asking for obvious or non-essential details.".to_string()),
215 }
216 }
217
218 async fn call(&self, arguments: &str) -> anyhow::Result<String> {
219 let args: AskUserArgs = serde_json::from_str(arguments)?;
220 self.handler.ask(&args.question).await
221 }
222}
223
224impl ChannelApprovalHandler {
225 pub fn new(sender: tokio::sync::mpsc::Sender<ApprovalRequest>) -> Self {
227 Self { sender }
228 }
229}
230
231#[async_trait::async_trait]
232impl ApprovalHandler for ChannelApprovalHandler {
233 async fn approve(&self, tool_name: &str, arguments: &str) -> anyhow::Result<bool> {
234 let (tx, rx) = tokio::sync::oneshot::channel();
235
236 let request = ApprovalRequest {
237 id: uuid::Uuid::new_v4().to_string(),
238 tool_name: tool_name.to_string(),
239 arguments: arguments.to_string(),
240 responder: tx,
241 };
242
243 self.sender.send(request).await
244 .map_err(|_| Error::Internal("Approval channel closed".to_string()))?;
245
246 let approved = rx.await
248 .map_err(|_| Error::Internal("Approval responder dropped".to_string()))?;
249
250 Ok(approved)
251 }
252}
253
254pub struct Agent<P: Provider> {
258 provider: Arc<P>,
259 tools: ToolSet,
260 config: AgentConfig,
261 context_manager: ContextManager,
262 events: broadcast::Sender<AgentEvent>,
263 approval_handler: Arc<dyn ApprovalHandler>,
264 cache: Option<Arc<dyn Cache>>,
265 notifier: Option<Arc<dyn Notifier>>,
266 memory: Option<Arc<dyn Memory>>,
267 session_id: Option<String>,
268 swarm_manager: Option<Arc<tokio::sync::Mutex<SwarmManager>>>,
269 swarm_command_rx: Option<Arc<tokio::sync::Mutex<mpsc::Receiver<SwarmEvent>>>>, }
272
273impl<P: Provider> Agent<P> {
274 pub fn builder(provider: P) -> AgentBuilder<P> {
276 AgentBuilder::new(provider)
277 }
278
279 pub fn subscribe(&self) -> broadcast::Receiver<AgentEvent> {
281 self.events.subscribe()
282 }
283
284 fn emit(&self, event: AgentEvent) {
286 if let Err(e) = self.events.send(event) {
287 tracing::debug!("Failed to emit event (no receivers): {}", e);
288 }
289 }
290
291 pub async fn notify(&self, channel: NotifyChannel, message: &str) -> Result<()> {
293 if let Some(notifier) = &self.notifier {
294 notifier.notify(channel, message).await
295 } else {
296 tracing::warn!("Agent tried to notify but no notifier is configured: {}", message);
298 Ok(())
299 }
300 }
301
302 pub async fn checkpoint(&self, messages: &[Message], step: usize, status: SessionStatus) -> Result<()> {
304 if let (Some(memory), Some(session_id)) = (&self.memory, &self.session_id) {
305 let session = crate::agent::session::AgentSession {
306 id: session_id.clone(),
307 messages: messages.to_vec(),
308 step,
309 status,
310 updated_at: chrono::Utc::now(),
311 };
312 memory.store_session(session).await?;
313 debug!("Agent checkpoint saved for session: {}", session_id);
314 }
315 Ok(())
316 }
317
318 pub async fn resume(&self, session_id: &str) -> Result<String> {
320 if let Some(memory) = &self.memory {
321 if let Some(session) = memory.retrieve_session(session_id).await? {
322 info!("Resuming agent session: {}", session_id);
323 return self.chat(session.messages).await;
325 }
326 }
327 Err(Error::Internal(format!("Session not found: {}", session_id)))
328 }
329
330 #[instrument(skip(self, prompt), fields(model = %self.config.model))]
332 pub async fn prompt(&self, prompt: impl Into<String>) -> Result<String> {
333 let prompt_str = prompt.into();
334 self.emit(AgentEvent::Thinking { prompt: prompt_str.clone() });
335
336 let messages = vec![
337 Message::user(prompt_str)
338 ];
339
340 self.chat(messages).await
341 }
342
343 #[instrument(skip(self, messages), fields(model = %self.config.model, message_count = messages.len()))]
345 pub async fn chat(&self, mut messages: Vec<Message>) -> Result<String> {
346 let mut steps = 0;
347 const MAX_STEPS: usize = 15;
348 let mut history = crate::agent::history::QueryHistory::new();
349
350 loop {
351 if steps >= MAX_STEPS {
352 return Err(Error::agent_config("Max agent steps exceeded"));
353 }
354 steps += 1;
355
356 self.emit(AgentEvent::StepStart { step: steps });
357
358 if let Some(last) = messages.last() {
359 if last.role == Role::User {
360 self.emit(AgentEvent::Thinking { prompt: last.content.as_text() });
361 }
362 }
363
364 self.checkpoint(&messages, steps, SessionStatus::Thinking).await?;
366
367 info!("Agent starting chat completion (step {})", steps);
368
369 if let Some(cache) = &self.cache {
371 if let Ok(Some(cached_response)) = cache.get(&messages).await {
372 info!("Cache hit! Returning cached response.");
373 return Ok(cached_response);
374 }
375 }
376
377 let context_messages = self.context_manager.build_context(&messages).await
379 .map_err(|e| Error::agent_config(format!("Failed to build context: {}", e)))?;
380
381 let stream = self.stream_chat(context_messages).await?;
382
383 let mut full_text = String::new();
384 let mut tool_calls = Vec::new(); let mut stream_inner = stream.into_inner();
387
388 use futures::StreamExt;
390 while let Some(chunk) = stream_inner.next().await {
391 match chunk? {
392 crate::agent::streaming::StreamingChoice::Message(text) => {
393 full_text.push_str(&text);
394 }
395 crate::agent::streaming::StreamingChoice::ToolCall { id, name, arguments } => {
396 tool_calls.push((id, name, arguments));
397 }
398 crate::agent::streaming::StreamingChoice::ParallelToolCalls(map) => {
399 let mut sorted: Vec<_> = map.into_iter().collect();
400 sorted.sort_by_key(|(k,_)| *k);
401 for (_, tc) in sorted {
402 tool_calls.push((tc.id, tc.name, tc.arguments));
403 }
404 }
405 _ => {}
406 }
407 }
408
409 if tool_calls.is_empty() {
411 self.emit(AgentEvent::Response { content: full_text.clone() });
412
413 if let Some(cache) = &self.cache {
415 let _ = cache.set(&messages, full_text.clone()).await;
416 }
417
418 return Ok(full_text);
419 }
420
421 let mut parts = Vec::new();
424 if !full_text.is_empty() {
425 parts.push(crate::agent::message::ContentPart::Text { text: full_text.clone() });
426 }
427 for (id, name, args) in &tool_calls {
428 parts.push(crate::agent::message::ContentPart::ToolCall {
429 id: id.clone(),
430 name: name.clone(),
431 arguments: args.clone(),
432 });
433 }
434 messages.push(Message {
435 role: Role::Assistant,
436 name: None,
437 content: Content::Parts(parts),
438 });
439
440 let tools = &self.tools;
442 let policy = &self.config.tool_policy;
443 let events = &self.events;
444 let approval_handler = &self.approval_handler;
445 let max_parallel = self.config.max_parallel_tools;
446
447 use futures::stream;
448
449 let current_messages = Arc::new(messages.clone());
450 let threshold = self.config.loop_similarity_threshold;
451
452 let mut processed_calls = Vec::new();
454 for (id, name, args) in tool_calls {
455 let args_str = args.to_string();
456 if let Some(warning) = history.check_loop(&name, &args_str, threshold) {
457 tracing::warn!(tool = %name, "Loop detected: {}", warning);
458 messages.push(Message {
460 role: Role::Tool,
461 name: None,
462 content: Content::Parts(vec![crate::agent::message::ContentPart::ToolResult {
463 tool_call_id: id,
464 content: format!("Error: Potential loop detected. {}", warning),
465 name: Some(name),
466 }]),
467 });
468 } else {
469 history.record(name.clone(), args_str.clone());
471 processed_calls.push((id, name, args));
472 }
473 }
474
475 if processed_calls.is_empty() && !messages.is_empty() {
476 continue;
478 }
479
480 let results: Vec<crate::error::Result<(String, String, String)>> = stream::iter(processed_calls)
481 .map(|(id, name, args)| {
482 let name_clone = name.clone();
483 let id_clone = id.clone();
484 let args_str = args.to_string();
485 let msgs = Arc::clone(¤t_messages);
486
487 async move {
488 let tool_ref = tools.get(&name_clone).ok_or_else(|| Error::ToolNotFound(name_clone.clone()))?;
490
491 let def = tool_ref.definition().await;
492
493 let mut effective_policy = policy.overrides.get(&name_clone)
495 .unwrap_or(&policy.default_policy).clone();
496
497 if def.is_binary && !def.is_verified {
499 if effective_policy != ToolPolicy::Disabled {
500 tracing::warn!(tool = %name_clone, "Unverified binary skill detected. Enforcing manual approval.");
501 effective_policy = ToolPolicy::RequiresApproval;
502 }
503 }
504
505 let start_time = std::time::Instant::now();
506 let _ = events.send(AgentEvent::ToolExecutionStart {
507 tool: name_clone.clone(),
508 input: args_str.clone()
509 });
510
511 let result = match effective_policy {
512 ToolPolicy::Disabled => {
513 Err(Error::tool_execution(name_clone.clone(), "Tool execution is disabled by policy".to_string()))
514 }
515 ToolPolicy::RequiresApproval => {
516 let _ = events.send(AgentEvent::ApprovalPending {
517 tool: name_clone.clone(),
518 input: args_str.clone()
519 });
520
521 self.checkpoint(&msgs, steps, SessionStatus::AwaitingApproval {
523 tool_name: name_clone.clone(),
524 arguments: args_str.clone()
525 }).await?;
526
527 match approval_handler.approve(&name_clone, &args_str).await {
529 Ok(true) => {
530 tools.call(&name_clone, &args_str).await
531 .map_err(|e| Error::tool_execution(name_clone.clone(), e.to_string()))
532 }
533 Ok(false) => {
534 Err(Error::ToolApprovalRequired { tool_name: name_clone.clone() })
535 }
536 Err(e) => {
537 Err(Error::tool_execution(name_clone.clone(), format!("Approval check failed: {}", e)))
538 }
539 }
540 }
541 ToolPolicy::Auto => {
542 tools.call(&name_clone, &args_str).await
543 .map_err(|e| Error::tool_execution(name_clone.clone(), e.to_string()))
544 }
545 };
546
547 let duration = start_time.elapsed().as_millis() as u64;
548
549 match result {
550 Ok(output) => {
551 let preview = if output.len() > 100 {
552 format!("{}...", &output[..100])
553 } else {
554 output.clone()
555 };
556
557 let _ = events.send(AgentEvent::ToolExecutionEnd {
558 tool: name_clone.clone(),
559 output_preview: preview,
560 duration_ms: duration,
561 success: true
562 });
563
564 let _ = events.send(AgentEvent::ToolResult {
565 tool: name_clone.clone(),
566 output: output.clone()
567 });
568 Ok((id_clone, name_clone, output))
569 },
570 Err(e) => {
571 let _ = events.send(AgentEvent::ToolExecutionEnd {
572 tool: name_clone.clone(),
573 output_preview: e.to_string(),
574 duration_ms: duration,
575 success: false
576 });
577
578 let _ = events.send(AgentEvent::Error { message: e.to_string() });
579 Ok((id_clone, name_clone, format!("Error: {}", e)))
580 }
581 }
582 }
583 })
584 .buffer_unordered(max_parallel)
585 .collect()
586 .await;
587
588 for res in results {
590 let (id, name, output) = res.unwrap(); messages.push(Message {
592 role: Role::Tool,
593 name: None,
594 content: Content::Parts(vec![crate::agent::message::ContentPart::ToolResult {
595 tool_call_id: id,
596 content: output,
597 name: Some(name),
598 }]),
599 });
600 }
601 }
602 }
603
604 pub async fn stream(&self, prompt: impl Into<String>) -> Result<StreamingResponse> {
606 let messages = vec![Message::user(prompt.into())];
607 self.stream_chat(messages).await
608 }
609
610 pub async fn stream_chat(&self, messages: Vec<Message>) -> Result<StreamingResponse> {
612 let mut extra = self.config.extra_params.clone().unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
613
614 if self.config.json_mode {
616 if let serde_json::Value::Object(ref mut map) = extra {
617 if !map.contains_key("response_format") {
618 map.insert("response_format".to_string(), serde_json::json!({ "type": "json_object" }));
619 }
620 }
621 }
622
623 let request = crate::agent::provider::ChatRequest {
624 model: self.config.model.clone(),
625 system_prompt: Some(self.config.preamble.clone()),
626 messages,
627 tools: self.tools.definitions().await,
628 temperature: self.config.temperature,
629 max_tokens: self.config.max_tokens,
630 extra_params: Some(extra),
631 enable_cache_control: self.config.enable_cache_control,
632 };
633
634 self.provider.stream_completion(request).await
635 }
636
637 #[instrument(skip(self, arguments), fields(tool_name = %name))]
639 pub async fn call_tool(&self, name: &str, arguments: &str) -> Result<String> {
640 let policy = self.config.tool_policy.overrides.get(name)
642 .unwrap_or(&self.config.tool_policy.default_policy);
643
644 match policy {
645 ToolPolicy::Disabled => {
646 return Err(Error::tool_execution(name.to_string(), "Tool execution is disabled by policy".to_string()));
647 }
648 ToolPolicy::RequiresApproval => {
649 self.emit(AgentEvent::ApprovalPending { tool: name.to_string(), input: arguments.to_string() });
650
651 match self.approval_handler.approve(name, arguments).await {
652 Ok(true) => {}, Ok(false) => return Err(Error::ToolApprovalRequired { tool_name: name.to_string() }),
654 Err(e) => return Err(Error::tool_execution(name.to_string(), format!("Approval check failed: {}", e)))
655 }
656 }
657 ToolPolicy::Auto => {} }
659
660 self.emit(AgentEvent::ToolCall { tool: name.to_string(), input: arguments.to_string() });
661
662 let result = self.tools.call(name, arguments).await;
663
664 match result {
665 Ok(mut output) => {
666 if output.len() > self.config.max_tool_output_chars {
668 let original_len = output.len();
669 output.truncate(self.config.max_tool_output_chars);
670 output.push_str(&format!("\n\n(Note: Output truncated from {} to {} chars to save tokens)",
671 original_len, self.config.max_tool_output_chars));
672 }
673
674 self.emit(AgentEvent::ToolResult { tool: name.to_string(), output: output.clone() });
675 Ok(output)
676 },
677 Err(e) => {
678 self.emit(AgentEvent::Error { message: e.to_string() });
679 Err(Error::tool_execution(name.to_string(), e.to_string()))
681 }
682 }
683 }
684
685 pub fn has_tool(&self, name: &str) -> bool {
687 self.tools.contains(name)
688 }
689
690 pub async fn tool_definitions(&self) -> Vec<crate::skills::tool::ToolDefinition> {
692 self.tools.definitions().await
693 }
694
695 pub fn config(&self) -> &AgentConfig {
697 &self.config
698 }
699
700 pub fn model(&self) -> &str {
702 &self.config.model
703 }
704
705 pub async fn listen(
707 &self,
708 mut user_input: tokio::sync::mpsc::Receiver<String>,
709 mut external_events: tokio::sync::mpsc::Receiver<AgentMessage>
710 ) -> Result<()> {
711 info!("Agent {} starting proactive loop", self.config.name);
712
713 if let Some(swarm) = &self.swarm_manager {
715 let swarm = swarm.clone();
716 tokio::spawn(async move {
717 loop {
718 {
720 let mut manager = swarm.lock().await;
721 if let Err(e) = manager.process_inbox().await {
722 tracing::debug!("Swarm inbox error: {}", e);
723 }
724 }
725 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
727 }
728 });
729 info!("Swarm manager background task started");
730 }
731
732 let mut swarm_rx_guard = if let Some(rx_mutex) = &self.swarm_command_rx {
734 Some(rx_mutex.lock().await)
735 } else {
736 None
737 };
738
739 loop {
740 tokio::select! {
741 Some(cmd) = async {
743 if let Some(guard) = &mut swarm_rx_guard {
744 guard.recv().await
745 } else {
746 std::future::pending().await
747 }
748 } => {
749 match cmd {
750 SwarmEvent::ExecuteTask { request_id, task, context: _ } => {
751 info!("Swarm: Executing delegated task {}: {}", request_id, task);
752 let result = match self.process(&task).await {
754 Ok(output) => {
755 info!("Swarm: Task execution success");
756 output
757 },
758 Err(e) => {
759 error!("Swarm: Task execution failed: {}", e);
760 format!("Error: {}", e)
761 }
762 };
763
764 if let Some(swarm) = &self.swarm_manager {
766 let mut manager = swarm.lock().await;
767 let success = !result.starts_with("Error:");
768 if let Err(e) = manager.send_result(&request_id, result, success).await {
769 error!("Failed to send swarm result: {}", e);
770 }
771 }
772 }
773 SwarmEvent::TaskResult { request_id, result, success } => {
774 info!("Swarm: Request {} completed. Success: {}. Result: {}", request_id, success, result);
775 }
776 }
777 }
778
779 input = user_input.recv() => {
781 match input {
782 Some(text) => {
783 if let Err(e) = self.process(&text).await {
784 error!("Error in proactive user task: {}", e);
785 }
786 }
787 None => {
788 info!("User input channel closed, exiting proactive loop");
789 break;
790 }
791 }
792 }
793
794 msg = external_events.recv() => {
796 match msg {
797 Some(message) => {
798 if let Err(e) = self.handle_message(message).await {
799 error!("Error in proactive external task: {}", e);
800 }
801 }
802 None => {
803 info!("External events channel closed, exiting proactive loop");
804 break;
805 }
806 }
807 }
808 }
809 }
810
811 Ok(())
812 }
813}
814
815pub struct AgentBuilder<P: Provider> {
817 provider: P,
818 tools: ToolSet,
819 config: AgentConfig,
820 injectors: Vec<Box<dyn ContextInjector>>,
821 approval_handler: Option<Arc<dyn ApprovalHandler>>,
822 interaction_handler: Option<Arc<dyn InteractionHandler>>,
823 notifier: Option<Arc<dyn Notifier>>,
824 cache: Option<Arc<dyn Cache>>,
825 has_sidecar: bool,
827 has_dynamic_skill: bool,
829 memory: Option<Arc<dyn Memory>>,
830 session_id: Option<String>,
831 swarm_manager: Option<Arc<tokio::sync::Mutex<SwarmManager>>>,
832 swarm_command_rx: Option<mpsc::Receiver<SwarmEvent>>,
833}
834
835impl<P: Provider> AgentBuilder<P> {
836 pub fn new(provider: P) -> Self {
838 Self {
839 provider,
840 tools: ToolSet::new(),
841 config: AgentConfig::default(),
842 injectors: Vec::new(),
843 approval_handler: None,
844 interaction_handler: None,
845 notifier: None,
846 cache: None,
847 has_sidecar: false,
848 has_dynamic_skill: false,
849 memory: None,
850 session_id: None,
851 swarm_manager: None,
852 swarm_command_rx: None,
853 }
854 }
855
856 pub fn model(mut self, model: impl Into<String>) -> Self {
858 self.config.model = model.into();
859 self
860 }
861
862 pub fn system_prompt(mut self, prompt: impl Into<String>) -> Self {
864 self.config.preamble = prompt.into();
865 self
866 }
867
868 pub fn preamble(self, prompt: impl Into<String>) -> Self {
870 self.system_prompt(prompt)
871 }
872
873 pub fn temperature(mut self, temp: f64) -> Self {
875 self.config.temperature = Some(temp);
876 self
877 }
878
879 pub fn max_tokens(mut self, tokens: u64) -> Self {
881 self.config.max_tokens = Some(tokens);
882 self
883 }
884
885 pub fn extra_params(mut self, params: serde_json::Value) -> Self {
887 self.config.extra_params = Some(params);
888 self
889 }
890
891 pub fn tool_policy(mut self, policy: RiskyToolPolicy) -> Self {
893 self.config.tool_policy = policy;
894 self
895 }
896
897 pub fn approval_handler(mut self, handler: impl ApprovalHandler + 'static) -> Self {
899 self.approval_handler = Some(Arc::new(handler));
900 self
901 }
902
903 pub fn interaction_handler(mut self, handler: impl InteractionHandler + 'static) -> Self {
905 self.interaction_handler = Some(Arc::new(handler));
906 self
907 }
908
909 pub fn max_history_messages(mut self, count: usize) -> Self {
911 self.config.max_history_messages = count;
912 self
913 }
914
915 pub fn max_tool_output_chars(mut self, count: usize) -> Self {
917 self.config.max_tool_output_chars = count;
918 self
919 }
920
921 pub fn json_mode(mut self, enable: bool) -> Self {
923 self.config.json_mode = enable;
924 self
925 }
926
927 pub fn persona(mut self, persona: Persona) -> Self {
929 self.config.persona = Some(persona);
930 self
931 }
932
933 pub fn notifier(mut self, notifier: impl Notifier + 'static) -> Self {
935 self.notifier = Some(Arc::new(notifier));
936 self
937 }
938
939 pub fn session_id(mut self, id: impl Into<String>) -> Self {
941 self.session_id = Some(id.into());
942 self
943 }
944
945 pub fn role(mut self, role: AgentRole) -> Self {
947 self.config.role = role;
948 self
949 }
950
951 pub fn context_injector(mut self, injector: impl ContextInjector + 'static) -> Self {
953 self.injectors.push(Box::new(injector));
954 self
955 }
956
957 pub fn tool<T: Tool + 'static>(self, tool: T) -> Self {
959 self.tools.add(tool);
960 self
961 }
962
963 pub fn shared_tool(self, tool: Arc<dyn Tool>) -> Self {
965 self.tools.add_shared(tool);
966 self
967 }
968
969 pub fn tools(self, tools: ToolSet) -> Self {
971 for (_, tool) in tools.iter() {
972 self.tools.add_shared(tool);
973 }
974 self
975 }
976
977 pub fn with_memory(self, memory: Arc<dyn crate::agent::memory::Memory>) -> Self {
979 self.tools.add(SearchHistoryTool::new(memory.clone()));
980 self.tools.add(RememberThisTool::new(memory.clone()));
981 self.tools.add(TieredSearchTool::new(memory.clone()));
982 self.tools.add(FetchDocumentTool::new(memory.clone()));
983
984 let mut builder = self;
985 builder.memory = Some(memory);
986 builder
987 }
988
989 pub fn with_dynamic_skills(mut self, skill_loader: Arc<crate::skills::SkillLoader>) -> Result<Self> {
1002 if self.has_sidecar {
1004 return Err(Error::agent_config(
1005 "Security Error: Cannot enable DynamicSkill when Python Sidecar is configured. \
1006 These are mutually exclusive due to context pollution risks. \
1007 See SECURITY.md for details."
1008 ));
1009 }
1010
1011 for skill_ref in skill_loader.skills.iter() {
1013 self.tools.add_shared(Arc::clone(skill_ref.value()) as Arc<dyn crate::skills::tool::Tool>);
1014 }
1015
1016 self.tools.add(crate::skills::ClawHubTool::new(Arc::clone(&skill_loader)));
1018 self.tools.add(crate::skills::ReadSkillDoc::new(Arc::clone(&skill_loader)));
1019 let github_compiler = if let (Ok(token), Ok(repo)) = (std::env::var("GITHUB_TOKEN"), std::env::var("GITHUB_REPO")) {
1020 Some(crate::skills::compiler::GithubCompiler::new(repo, token, self.notifier.clone()))
1021 } else {
1022 None
1023 };
1024
1025 self.tools.add(crate::skills::tool::ForgeSkill::new(
1026 Arc::clone(&skill_loader),
1027 self.tools.clone(),
1028 skill_loader.base_path.clone(),
1029 github_compiler
1030 ));
1031
1032 self.has_dynamic_skill = true;
1033
1034 Ok(self)
1035 }
1036
1037 pub async fn with_code_interpreter(mut self, address: impl Into<String>) -> Result<Self> {
1050 if self.has_dynamic_skill {
1052 return Err(Error::agent_config(
1053 "Security Error: Cannot enable Python Sidecar when DynamicSkill is configured. \
1054 These are mutually exclusive due to context pollution risks. \
1055 See SECURITY.md for details."
1056 ));
1057 }
1058
1059 let sidecar = crate::skills::capabilities::Sidecar::connect(address.into()).await?;
1060 let shared_sidecar = Arc::new(tokio::sync::Mutex::new(sidecar));
1061
1062 self.tools.add(crate::skills::tool::code_interpreter::CodeInterpreter::new(shared_sidecar));
1063 self.has_sidecar = true;
1064
1065 Ok(self)
1066 }
1067
1068 pub fn build(mut self) -> Result<Agent<P>> {
1079 if self.config.model.is_empty() {
1081 return Err(Error::agent_config("model name cannot be empty"));
1082 }
1083 if self.config.max_history_messages == 0 {
1084 return Err(Error::agent_config("max_history_messages must be at least 1"));
1085 }
1086
1087 if !self.has_sidecar && !self.has_dynamic_skill {
1089 info!("No execution model configured. Auto-enabling DynamicSkill (default)...");
1090
1091 let skill_loader = Arc::new(crate::skills::SkillLoader::new("./skills"));
1093
1094 let handle = tokio::runtime::Handle::current();
1097 match tokio::task::block_in_place(|| {
1098 handle.block_on(skill_loader.load_all())
1099 }) {
1100 Ok(_) => {
1101 info!("Loaded DynamicSkills from ./skills");
1102
1103 for skill_ref in skill_loader.skills.iter() {
1105 self.tools.add_shared(Arc::clone(skill_ref.value()) as Arc<dyn crate::skills::tool::Tool>);
1106 }
1107
1108 self.tools.add(crate::skills::ClawHubTool::new(Arc::clone(&skill_loader)));
1110 self.tools.add(crate::skills::ReadSkillDoc::new(Arc::clone(&skill_loader)));
1111 let github_compiler = if let (Ok(token), Ok(repo)) = (std::env::var("GITHUB_TOKEN"), std::env::var("GITHUB_REPO")) {
1112 Some(crate::skills::compiler::GithubCompiler::new(repo, token, self.notifier.clone()))
1113 } else {
1114 None
1115 };
1116
1117 self.tools.add(crate::skills::tool::ForgeSkill::new(
1118 Arc::clone(&skill_loader),
1119 self.tools.clone(),
1120 skill_loader.base_path.clone(),
1121 github_compiler
1122 ));
1123
1124 self.has_dynamic_skill = true;
1125 },
1126 Err(e) => {
1127 info!("DynamicSkill auto-enable skipped (no skills found): {}", e);
1129 }
1131 }
1132 }
1133
1134 let (tx, _) = broadcast::channel(1000);
1135
1136 let mut context_config = ContextConfig {
1137 max_tokens: self.config.max_tokens.unwrap_or(128000) as usize,
1138 max_history_messages: self.config.max_history_messages,
1139 response_reserve: 4096, enable_cache_control: self.config.enable_cache_control,
1141 smart_pruning: self.config.smart_pruning,
1142 };
1143
1144 if context_config.response_reserve > context_config.max_tokens / 2 {
1146 context_config.response_reserve = context_config.max_tokens / 2;
1147 }
1148
1149 let mut context_manager = ContextManager::new(context_config);
1150 context_manager.set_system_prompt(self.config.preamble.clone());
1151
1152 context_manager.add_injector(Box::new(self.tools.clone()));
1155
1156 for injector in self.injectors {
1157 context_manager.add_injector(injector);
1158 }
1159
1160 if let Some(persona) = &self.config.persona {
1161 context_manager.add_injector(Box::new(PersonalityManager::new(persona.clone())));
1162 }
1163
1164 let tools = self.tools;
1166 if let Some(handler) = &self.interaction_handler {
1167 tools.add(AskUserTool { handler: Arc::clone(handler) });
1168 }
1169
1170 let mut agent = Agent {
1171 provider: Arc::new(self.provider),
1172 tools,
1173 config: self.config.clone(),
1174 context_manager,
1175 events: tx,
1176 approval_handler: self.approval_handler.unwrap_or_else(|| Arc::new(RejectAllApprovalHandler)),
1177 cache: self.cache,
1178 notifier: self.notifier,
1179 memory: self.memory,
1180 session_id: self.session_id,
1181 swarm_manager: self.swarm_manager,
1182 swarm_command_rx: self.swarm_command_rx.map(|rx| Arc::new(tokio::sync::Mutex::new(rx))),
1183 };
1184
1185 if let Some(sop) = &agent.config.sop {
1187 agent.context_manager.set_system_prompt(format!("### Standard Operating Procedure (SOP)\n{}\n", sop));
1188
1189 if let Some(swarm) = &agent.swarm_manager {
1191 if let Ok(mut manager) = swarm.try_lock() {
1192 manager.set_sop(Some(sop.clone()));
1193 }
1194 }
1195 }
1196
1197 Ok(agent)
1198 }
1199
1200 pub fn with_delegation(self, coordinator: Arc<Coordinator>) -> Self {
1202 self.tools.add(DelegateTool::new(Arc::downgrade(&coordinator)));
1203 self
1204 }
1205
1206 pub fn with_scheduler(self, scheduler: Arc<Scheduler>) -> Self {
1208 self.tools.add(CronTool::new(Arc::downgrade(&scheduler)));
1209 self
1210 }
1211
1212 pub fn with_swarm(mut self, manager: Arc<tokio::sync::Mutex<crate::agent::swarm::manager::SwarmManager>>, cmd_rx: mpsc::Receiver<SwarmEvent>) -> Self {
1214 self.swarm_manager = Some(manager);
1215 self.swarm_command_rx = Some(cmd_rx);
1216 self
1217 }
1218}
1219
1220#[async_trait::async_trait]
1221impl<P: Provider> MultiAgent for Agent<P> {
1222 fn role(&self) -> AgentRole {
1223 self.config.role.clone()
1224 }
1225
1226 async fn handle_message(&self, message: AgentMessage) -> Result<Option<AgentMessage>> {
1227 info!("Agent {:?} handling message from {:?}", self.role(), message.from);
1228 let response = self.prompt(message.content).await?;
1229
1230 Ok(Some(AgentMessage {
1231 from: self.role(),
1232 to: Some(message.from),
1233 content: response,
1234 msg_type: crate::agent::multi_agent::MessageType::Response,
1235 }))
1236 }
1237
1238 async fn process(&self, input: &str) -> Result<String> {
1239 self.prompt(input).await
1240 }
1241}
1242
1243#[cfg(test)]
1244mod tests {
1245 use super::*;
1246
1247 #[test]
1248 fn test_agent_config_default() {
1249 let config = AgentConfig::default();
1250 assert_eq!(config.model, "gpt-4o");
1251 assert_eq!(config.max_tokens, Some(128000));
1252 }
1253}