1use crate::audit::{AuditEventType, AuditLog};
7use crate::config::Config;
8use crate::error::Result;
9use crate::llm::{
10 ChatMessage, Choice, LLMProviderTrait, MultiModelManager, ProviderFallbackChain, TokenBudget,
11};
12use crate::mcp::McpClient;
13use crate::policy::{Decision, PolicyEngine};
14use crate::ravenfabric::RavenFabricClient;
15use crate::sandbox::Sandbox;
16use crate::tools::{ToolCall, ToolRegistry, ToolResult};
17use serde::{Deserialize, Serialize};
18use std::path::PathBuf;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21use tracing::{debug, info, instrument, warn};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ConversationMemory {
29 max_messages: usize,
31 messages: Vec<ChatMessage>,
33}
34
35impl ConversationMemory {
36 pub fn new(system_prompt: &str, max_messages: usize) -> Self {
39 Self {
40 max_messages,
41 messages: vec![ChatMessage {
42 role: "system".to_string(),
43 content: system_prompt.to_string(),
44 }],
45 }
46 }
47
48 pub fn add_user_message(&mut self, content: &str) -> &[ChatMessage] {
50 self.messages.push(ChatMessage {
51 role: "user".to_string(),
52 content: content.to_string(),
53 });
54 self.trim_to_max();
55 &self.messages
56 }
57
58 pub fn add_assistant_message(&mut self, content: &str) {
60 self.messages.push(ChatMessage {
61 role: "assistant".to_string(),
62 content: content.to_string(),
63 });
64 self.trim_to_max();
65 }
66
67 pub fn history(&self) -> &[ChatMessage] {
69 &self.messages
70 }
71
72 pub fn from_history(messages: Vec<ChatMessage>, max_messages: usize) -> Self {
75 Self {
76 max_messages,
77 messages,
78 }
79 }
80
81 #[allow(dead_code)]
83 pub fn len(&self) -> usize {
84 self.messages.len()
85 }
86
87 #[allow(dead_code)]
89 pub fn is_empty(&self) -> bool {
90 self.messages.is_empty()
91 }
92
93 fn trim_to_max(&mut self) {
95 if self.max_messages == 0 {
96 return;
97 }
98 while self.messages.len() > self.max_messages {
99 if self.messages.len() > 1 {
101 self.messages.remove(1);
102 } else {
103 break;
104 }
105 }
106 }
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct CheckpointState {
116 pub session_id: String,
118 pub iteration: usize,
120 pub max_iterations: usize,
122 pub messages: Vec<ChatMessage>,
124 pub initial_prompt: String,
126 pub system_prompt: String,
128 pub provider: String,
130 pub model: String,
132 pub enable_tools: bool,
134 pub last_checkpoint: String,
136}
137
138impl CheckpointState {
139 #[allow(clippy::too_many_arguments)]
141 pub fn new(
142 session_id: String,
143 iteration: usize,
144 max_iterations: usize,
145 messages: Vec<ChatMessage>,
146 initial_prompt: &str,
147 system_prompt: &str,
148 provider: &str,
149 model: &str,
150 enable_tools: bool,
151 ) -> Self {
152 Self {
153 session_id,
154 iteration,
155 max_iterations,
156 messages,
157 initial_prompt: initial_prompt.to_string(),
158 system_prompt: system_prompt.to_string(),
159 provider: provider.to_string(),
160 model: model.to_string(),
161 enable_tools,
162 last_checkpoint: chrono::Utc::now().to_rfc3339(),
163 }
164 }
165}
166
167pub fn save_checkpoint(
172 checkpoint_dir: &std::path::Path,
173 state: &CheckpointState,
174) -> std::result::Result<std::path::PathBuf, String> {
175 let path = checkpoint_dir.join(format!("{}.json", state.session_id));
176
177 std::fs::create_dir_all(checkpoint_dir)
179 .map_err(|e| format!("Failed to create checkpoint directory: {}", e))?;
180
181 let content = serde_json::to_string_pretty(state)
182 .map_err(|e| format!("Failed to serialize checkpoint: {}", e))?;
183
184 let tmp_path = path.with_extension("json.tmp");
186 std::fs::write(&tmp_path, &content)
187 .map_err(|e| format!("Failed to write checkpoint: {}", e))?;
188 std::fs::rename(&tmp_path, &path)
189 .map_err(|e| format!("Failed to finalize checkpoint: {}", e))?;
190
191 Ok(path)
192}
193
194pub fn load_checkpoint(
199 checkpoint_dir: &std::path::Path,
200 session_id: &str,
201) -> Option<CheckpointState> {
202 let path = checkpoint_dir.join(format!("{}.json", session_id));
203
204 match std::fs::read_to_string(&path) {
205 Ok(content) => match serde_json::from_str::<CheckpointState>(&content) {
206 Ok(state) => {
207 info!(
208 session_id = %session_id,
209 iteration = state.iteration,
210 max_iterations = state.max_iterations,
211 "Loaded checkpoint"
212 );
213 Some(state)
214 }
215 Err(e) => {
216 warn!(
217 session_id = %session_id,
218 error = %e,
219 "Failed to deserialize checkpoint"
220 );
221 None
222 }
223 },
224 Err(e) => {
225 if e.kind() != std::io::ErrorKind::NotFound {
226 warn!(
227 session_id = %session_id,
228 error = %e,
229 "Failed to read checkpoint"
230 );
231 }
232 None
233 }
234 }
235}
236
237pub fn delete_checkpoint(checkpoint_dir: &std::path::Path, session_id: &str) {
241 let path = checkpoint_dir.join(format!("{}.json", session_id));
242 if path.exists() {
243 if let Err(e) = std::fs::remove_file(&path) {
244 warn!(
245 session_id = %session_id,
246 error = %e,
247 "Failed to delete checkpoint"
248 );
249 } else {
250 debug!(
251 session_id = %session_id,
252 "Deleted checkpoint"
253 );
254 }
255 }
256}
257
258pub struct AgentLoopConfig {
263 pub max_iterations: usize,
265 pub enable_tools: bool,
267 pub require_approval: bool,
269 pub prompt_injection_protection: bool,
271 pub token_lifetime_secs: u64,
275 pub no_final_required: bool,
277 pub fallback_chain: Option<Arc<std::sync::Mutex<ProviderFallbackChain>>>,
279 pub token_budget: Option<Arc<std::sync::Mutex<TokenBudget>>>,
281 pub ravenfabric: Option<RavenFabricClient>,
283 pub checkpoint_dir: Option<PathBuf>,
287 pub session_id: Option<String>,
290 pub metrics_callback: Option<Box<dyn Fn(u64, u64) + Send + Sync>>,
294}
295
296impl Default for AgentLoopConfig {
297 fn default() -> Self {
298 Self {
299 max_iterations: 10,
300 enable_tools: false,
301 require_approval: false,
302 prompt_injection_protection: true,
303 token_lifetime_secs: 0,
304 no_final_required: false,
305 fallback_chain: None,
306 token_budget: None,
307 ravenfabric: None,
308 checkpoint_dir: None,
309 session_id: None,
310 metrics_callback: None,
311 }
312 }
313}
314
315impl std::fmt::Debug for AgentLoopConfig {
317 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318 f.debug_struct("AgentLoopConfig")
319 .field("max_iterations", &self.max_iterations)
320 .field("enable_tools", &self.enable_tools)
321 .field("require_approval", &self.require_approval)
322 .field(
323 "prompt_injection_protection",
324 &self.prompt_injection_protection,
325 )
326 .field("token_lifetime_secs", &self.token_lifetime_secs)
327 .field("no_final_required", &self.no_final_required)
328 .field("fallback_chain", &self.fallback_chain)
329 .field("token_budget", &self.token_budget)
330 .field("ravenfabric", &self.ravenfabric)
331 .field("checkpoint_dir", &self.checkpoint_dir)
332 .field("session_id", &self.session_id)
333 .field(
334 "metrics_callback",
335 &self.metrics_callback.as_ref().map(|_| "Box<Fn>"),
336 )
337 .finish()
338 }
339}
340
341impl Clone for AgentLoopConfig {
344 fn clone(&self) -> Self {
345 Self {
346 max_iterations: self.max_iterations,
347 enable_tools: self.enable_tools,
348 require_approval: self.require_approval,
349 prompt_injection_protection: self.prompt_injection_protection,
350 token_lifetime_secs: self.token_lifetime_secs,
351 no_final_required: self.no_final_required,
352 fallback_chain: self.fallback_chain.clone(),
353 token_budget: self.token_budget.clone(),
354 ravenfabric: self.ravenfabric.clone(),
355 checkpoint_dir: self.checkpoint_dir.clone(),
356 session_id: self.session_id.clone(),
357 metrics_callback: None,
358 }
359 }
360}
361
362#[instrument(skip_all, fields(provider = %llm.provider_name(), model = %llm.model()))]
369pub async fn run_agent_loop(
370 llm: Arc<dyn LLMProviderTrait>,
371 initial_prompt: &str,
372 system_prompt: &str,
373 config: AgentLoopConfig,
374) -> Result<String> {
375 run_agent_loop_with_registry(llm, initial_prompt, system_prompt, config, None).await
376}
377
378#[instrument(skip_all, fields(provider = %llm.provider_name(), model = %llm.model()))]
383pub async fn run_agent_loop_with_registry(
384 llm: Arc<dyn LLMProviderTrait>,
385 initial_prompt: &str,
386 system_prompt: &str,
387 config: AgentLoopConfig,
388 tool_registry: Option<ToolRegistry>,
389) -> Result<String> {
390 let registry = tool_registry.unwrap_or_else(ToolRegistry::with_default_tools);
391 run_agent_loop_inner(
392 llm,
393 initial_prompt,
394 system_prompt,
395 config,
396 registry,
397 "security integration",
398 false,
399 )
400 .await
401}
402
403#[allow(clippy::too_many_arguments)]
414#[instrument(skip_all, fields(provider = %llm.provider_name(), model = %llm.model()))]
415async fn run_agent_loop_inner(
416 llm: Arc<dyn LLMProviderTrait>,
417 initial_prompt: &str,
418 system_prompt: &str,
419 config: AgentLoopConfig,
420 registry: ToolRegistry,
421 loop_label: &str,
422 mcp_enabled: bool,
423) -> Result<String> {
424 let policy_engine = PolicyEngine::default_secure();
426 let mut sandbox = Sandbox::default();
427 sandbox.init().await.map_err(|e| {
428 crate::error::RavenClawsError::CommandExecution(format!("Sandbox init failed: {}", e))
429 })?;
430 let audit_log = AuditLog::new(format!("agent-{}", std::process::id()));
431
432 let injection_detector = if config.prompt_injection_protection {
434 Some(crate::policy::InjectionDetector::new())
435 } else {
436 None
437 };
438
439 let session_start = std::time::Instant::now();
441
442 info!(
443 provider = llm.provider_name(),
444 model = llm.model(),
445 max_iterations = config.max_iterations,
446 enable_tools = config.enable_tools,
447 tool_count = registry.len(),
448 require_approval = config.require_approval,
449 prompt_injection_protection = config.prompt_injection_protection,
450 token_lifetime_secs = config.token_lifetime_secs,
451 "Agent loop starting with {}",
452 loop_label
453 );
454
455 let _ = audit_log.append(
457 AuditEventType::AgentStart,
458 "agent",
459 &format!(
460 "Agent loop started with {} (model: {})",
461 llm.provider_name(),
462 llm.model()
463 ),
464 Some(serde_json::json!({
465 "provider": llm.provider_name(),
466 "model": llm.model(),
467 "max_iterations": config.max_iterations,
468 "enable_tools": config.enable_tools,
469 "mcp_enabled": mcp_enabled,
470 "tool_count": registry.len(),
471 "require_approval": config.require_approval,
472 "prompt_injection_protection": config.prompt_injection_protection,
473 "token_lifetime_secs": config.token_lifetime_secs,
474 })),
475 );
476
477 let (mut memory, start_iteration) = if let Some(ref checkpoint_dir) = config.checkpoint_dir {
481 if let Some(ref session_id) = config.session_id {
482 if let Some(checkpoint) = load_checkpoint(checkpoint_dir, session_id) {
483 info!(
484 session_id = %session_id,
485 iteration = checkpoint.iteration,
486 max_iterations = checkpoint.max_iterations,
487 "Resuming agent loop from checkpoint"
488 );
489 (
490 ConversationMemory::from_history(checkpoint.messages, 0),
491 checkpoint.iteration + 1, )
493 } else {
494 info!(
495 session_id = %session_id,
496 "No checkpoint found, starting fresh"
497 );
498 let mut m = ConversationMemory::new(system_prompt, 0);
499 m.add_user_message(initial_prompt);
500 (m, 0)
501 }
502 } else {
503 let mut m = ConversationMemory::new(system_prompt, 0);
504 m.add_user_message(initial_prompt);
505 (m, 0)
506 }
507 } else {
508 let mut m = ConversationMemory::new(system_prompt, 0);
509 m.add_user_message(initial_prompt);
510 (m, 0)
511 };
512
513 let session_id = config
515 .session_id
516 .clone()
517 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
518
519 for iteration in start_iteration..config.max_iterations {
520 if config.token_lifetime_secs > 0 {
522 let elapsed = session_start.elapsed().as_secs();
523 if elapsed >= config.token_lifetime_secs {
524 warn!(
525 iteration = iteration,
526 elapsed_secs = elapsed,
527 token_lifetime_secs = config.token_lifetime_secs,
528 "Agent loop reached token lifetime limit"
529 );
530 let _ = audit_log.append(
531 AuditEventType::SecurityViolation,
532 "token_lifetime",
533 &format!(
534 "Session expired after {} seconds (limit: {}s)",
535 elapsed, config.token_lifetime_secs
536 ),
537 Some(serde_json::json!({
538 "elapsed_secs": elapsed,
539 "token_lifetime_secs": config.token_lifetime_secs,
540 "iteration": iteration,
541 })),
542 );
543 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
545 delete_checkpoint(checkpoint_dir, &session_id);
546 }
547 return Err(crate::error::RavenClawsError::SecurityViolation(format!(
548 "Session token expired after {} seconds (limit: {}s)",
549 elapsed, config.token_lifetime_secs
550 )));
551 }
552 }
553 let messages = memory.history().to_vec();
554
555 if let Some(ref budget) = config.token_budget {
557 let budget = budget.lock().unwrap();
558 if budget.remaining() < 100 {
559 warn!(
560 iteration = iteration,
561 remaining = budget.remaining(),
562 "Token budget exhausted"
563 );
564 let _ = audit_log.append(
565 AuditEventType::SecurityViolation,
566 "token_budget",
567 &format!("Token budget exhausted (remaining: {})", budget.remaining()),
568 Some(serde_json::json!({
569 "remaining": budget.remaining(),
570 "used": budget.used_tokens,
571 "iteration": iteration,
572 })),
573 );
574 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
576 delete_checkpoint(checkpoint_dir, &session_id);
577 }
578 return Err(crate::error::RavenClawsError::SecurityViolation(
579 "Token budget exhausted".to_string(),
580 ));
581 }
582 }
583
584 let response = match llm.chat(messages.clone()).await {
585 Ok(r) => r,
586 Err(e) => {
587 if let Some(ref chain) = config.fallback_chain {
589 warn!(error = %e, "Primary LLM failed, trying fallback chain");
590 let _ = audit_log.append(
591 AuditEventType::Error,
592 "llm",
593 &format!("Primary LLM failed, trying fallback: {}", e),
594 None,
595 );
596 let configs = {
598 let c = chain.lock().unwrap();
599 c.configs.clone()
600 };
601 let mut temp_chain = ProviderFallbackChain::new(configs);
602 match temp_chain.chat_with_fallback(messages).await {
603 Ok(r) => {
604 info!("Fallback chain succeeded");
605 if let Some(ref budget) = config.token_budget {
607 if let Some(usage) = &r.usage {
608 let mut b = budget.lock().unwrap();
609 b.record_usage(usage.total_tokens);
610 }
611 }
612 r
613 }
614 Err(fallback_e) => {
615 warn!(error = %fallback_e, "Fallback chain also failed");
616 let _ = audit_log.append(
617 AuditEventType::Error,
618 "llm",
619 &format!("All providers failed: {}", fallback_e),
620 None,
621 );
622 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
624 delete_checkpoint(checkpoint_dir, &session_id);
625 }
626 return Err(crate::error::RavenClawsError::Llm(fallback_e));
627 }
628 }
629 } else {
630 warn!(error = %e, "LLM request failed");
631 let _ = audit_log.append(
632 AuditEventType::Error,
633 "llm",
634 &format!("LLM request failed: {}", e),
635 None,
636 );
637 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
639 delete_checkpoint(checkpoint_dir, &session_id);
640 }
641 return Err(crate::error::RavenClawsError::Llm(e));
642 }
643 }
644 };
645
646 let mut iteration_tokens: u64 = 0;
648 if let Some(ref budget) = config.token_budget {
649 if let Some(usage) = &response.usage {
650 let mut b = budget.lock().unwrap();
651 b.record_usage(usage.total_tokens);
652 iteration_tokens = usage.total_tokens as u64;
653 debug!(
654 iteration = iteration,
655 tokens_used = usage.total_tokens,
656 total_used = b.used_tokens,
657 remaining = b.remaining(),
658 "Token usage recorded"
659 );
660 }
661 } else if let Some(usage) = &response.usage {
662 iteration_tokens = usage.total_tokens as u64;
663 }
664
665 if let Some(ref cb) = config.metrics_callback {
667 cb(iteration_tokens, 0);
668 }
669
670 if let Some(ref rf) = config.ravenfabric {
672 if rf.is_enabled() {
673 let _ = rf.health().await;
674 info!(
675 iteration = iteration,
676 ravenfabric = true,
677 "RavenFabric health check completed"
678 );
679 }
680 }
681
682 let first_choice = response.choices.first();
683 let content = first_choice
684 .map(|c| c.message.content.clone())
685 .unwrap_or_default();
686
687 debug!(
688 iteration = iteration,
689 response_length = content.len(),
690 response_preview = %content[..content.len().min(500)],
691 "LLM response received"
692 );
693
694 if let Some(ref detector) = injection_detector {
696 match detector.check(&content) {
697 crate::policy::InjectionVerdict::Suspicious(reason) => {
698 warn!(
699 iteration = iteration,
700 reason = %reason,
701 "Prompt-injection detected in LLM response"
702 );
703 let _ = audit_log.append(
704 AuditEventType::SecurityViolation,
705 "injection_detector",
706 &format!("Prompt-injection detected: {}", reason),
707 Some(serde_json::json!({
708 "reason": reason,
709 "iteration": iteration,
710 "content_preview": &content[..content.len().min(200)],
711 })),
712 );
713 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
715 delete_checkpoint(checkpoint_dir, &session_id);
716 }
717 return Err(crate::error::RavenClawsError::SecurityViolation(format!(
718 "LLM response blocked: potential prompt injection ({})",
719 reason
720 )));
721 }
722 crate::policy::InjectionVerdict::Clean => {}
723 }
724 }
725
726 if config.enable_tools {
728 if let Some((tool_name, args)) = first_choice.and_then(parse_structured_tool_call) {
729 info!(tool = %tool_name, "Structured tool call detected");
730
731 if let Some(tool_result) = execute_parsed_tool_call(
733 tool_name,
734 args,
735 ®istry,
736 &policy_engine,
737 &sandbox,
738 &audit_log,
739 config.require_approval,
740 )
741 .await
742 {
743 let observation = if tool_result.success {
744 format!("OBSERVATION: {}", tool_result.output)
745 } else {
746 format!(
747 "OBSERVATION: Tool failed with error: {}",
748 tool_result.error.as_deref().unwrap_or("unknown error")
749 )
750 };
751
752 memory.add_user_message(&observation);
753
754 if let Some(ref cb) = config.metrics_callback {
756 cb(0, 1);
757 }
758
759 info!(
760 iteration = iteration,
761 tool = %tool_result.tool_name,
762 success = tool_result.success,
763 "Structured tool executed"
764 );
765 continue;
766 }
767 }
768 }
769
770 if content.contains("FINAL:") {
772 let final_response = content
773 .split("FINAL:")
774 .nth(1)
775 .unwrap_or("")
776 .trim()
777 .to_string();
778
779 memory.add_assistant_message(&content);
780
781 let _ = audit_log.append(
783 AuditEventType::AgentFinish,
784 "agent",
785 "Agent loop completed successfully",
786 Some(serde_json::json!({
787 "iterations": iteration + 1,
788 "final_response_length": final_response.len(),
789 })),
790 );
791
792 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
794 delete_checkpoint(checkpoint_dir, &session_id);
795 }
796
797 return Ok(final_response);
798 }
799
800 if config.enable_tools {
802 if let Some(tool_result) = execute_tool_call_with_security(
803 &content,
804 ®istry,
805 &policy_engine,
806 &sandbox,
807 &audit_log,
808 )
809 .await
810 {
811 let observation = if tool_result.success {
812 format!("OBSERVATION: {}", tool_result.output)
813 } else {
814 format!(
815 "OBSERVATION: Tool failed with error: {}",
816 tool_result.error.as_deref().unwrap_or("unknown error")
817 )
818 };
819
820 memory.add_assistant_message(&content);
821 memory.add_user_message(&observation);
822
823 if let Some(ref cb) = config.metrics_callback {
825 cb(0, 1);
826 }
827
828 info!(
829 iteration = iteration,
830 tool = %tool_result.tool_name,
831 success = tool_result.success,
832 "Tool executed"
833 );
834 continue;
835 }
836 }
837
838 memory.add_assistant_message(&content);
840
841 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
843 let checkpoint = CheckpointState::new(
844 session_id.clone(),
845 iteration,
846 config.max_iterations,
847 memory.history().to_vec(),
848 initial_prompt,
849 system_prompt,
850 llm.provider_name(),
851 llm.model(),
852 config.enable_tools,
853 );
854 if let Err(e) = save_checkpoint(checkpoint_dir, &checkpoint) {
855 warn!(
856 session_id = %session_id,
857 iteration = iteration,
858 error = %e,
859 "Failed to save checkpoint"
860 );
861 } else {
862 debug!(
863 session_id = %session_id,
864 iteration = iteration,
865 "Checkpoint saved"
866 );
867 }
868 }
869
870 if config.no_final_required {
872 info!(
873 iteration = iteration,
874 response_length = content.len(),
875 "no_final_required: treating response as completion"
876 );
877 let _ = audit_log.append(
878 AuditEventType::AgentFinish,
879 "agent",
880 "Agent loop completed (no_final_required)",
881 Some(serde_json::json!({
882 "iterations": iteration + 1,
883 "final_response_length": content.len(),
884 })),
885 );
886 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
888 delete_checkpoint(checkpoint_dir, &session_id);
889 }
890 return Ok(content);
891 }
892
893 info!(
894 iteration = iteration,
895 thought = %content.lines().find(|l| l.starts_with("THOUGHT:")).unwrap_or("<no thought>"),
896 "Agent loop progress"
897 );
898 }
899
900 warn!(
902 max_iterations = config.max_iterations,
903 "Agent loop reached max iterations"
904 );
905
906 let _ = audit_log.append(
907 AuditEventType::Error,
908 "agent",
909 "Agent loop reached max iterations without completing",
910 Some(serde_json::json!({
911 "max_iterations": config.max_iterations,
912 })),
913 );
914
915 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
917 delete_checkpoint(checkpoint_dir, &session_id);
918 }
919
920 let history = memory.history();
921 if history.len() > 1 {
922 if let Some(last) = history.last() {
923 return Ok(last.content.clone());
924 }
925 }
926
927 Err(crate::error::RavenClawsError::CommandExecution(
928 "Agent loop reached max iterations without completing the task".to_string(),
929 ))
930}
931
932#[allow(dead_code)]
938#[instrument(skip_all, fields(provider = %llm.provider_name(), model = %llm.model()))]
939pub async fn run_agent_loop_with_mcp(
940 llm: Arc<dyn LLMProviderTrait>,
941 initial_prompt: &str,
942 system_prompt: &str,
943 config: AgentLoopConfig,
944 mcp_client: Option<Arc<RwLock<McpClient>>>,
945) -> Result<String> {
946 run_agent_loop_with_mcp_and_registry(
947 llm,
948 initial_prompt,
949 system_prompt,
950 config,
951 mcp_client,
952 None,
953 )
954 .await
955}
956
957#[instrument(skip_all, fields(provider = %llm.provider_name(), model = %llm.model()))]
959pub async fn run_agent_loop_with_mcp_and_registry(
960 llm: Arc<dyn LLMProviderTrait>,
961 initial_prompt: &str,
962 system_prompt: &str,
963 config: AgentLoopConfig,
964 mcp_client: Option<Arc<RwLock<McpClient>>>,
965 tool_registry: Option<ToolRegistry>,
966) -> Result<String> {
967 let mut registry = tool_registry.unwrap_or_else(ToolRegistry::with_default_tools);
969
970 if let Some(client) = &mcp_client {
972 match crate::mcp::register_mcp_tools(&mut registry, client.clone()).await {
973 Ok(count) => {
974 info!(count, "MCP tools registered");
975 }
976 Err(e) => {
977 warn!(error = %e, "Failed to register MCP tools");
978 }
979 }
980 }
981
982 let mcp_enabled = mcp_client.is_some();
983 run_agent_loop_inner(
984 llm,
985 initial_prompt,
986 system_prompt,
987 config,
988 registry,
989 "MCP integration",
990 mcp_enabled,
991 )
992 .await
993}
994
995async fn prompt_for_approval(tool_name: &str, args: &serde_json::Value) -> bool {
1000 use std::io::{IsTerminal, Write};
1001
1002 let args_str = serde_json::to_string_pretty(args).unwrap_or_default();
1003
1004 if !std::io::stdin().is_terminal() {
1006 warn!(
1007 tool = %tool_name,
1008 "stdin is not a TTY — auto-approving tool call (use --require-approval only in interactive mode)"
1009 );
1010 return true;
1011 }
1012
1013 eprintln!("\n⚠️ Tool requires approval:");
1015 eprintln!(" Tool: {}", tool_name);
1016 for line in args_str.lines() {
1017 eprintln!(" {}", line);
1018 }
1019 eprint!(" Approve? [y/N] ");
1020 std::io::stderr().flush().ok();
1021
1022 let mut input = String::new();
1023 match std::io::stdin().read_line(&mut input) {
1024 Ok(_) => {
1025 let trimmed = input.trim().to_lowercase();
1026 trimmed == "y" || trimmed == "yes"
1027 }
1028 Err(e) => {
1029 warn!(error = %e, "Failed to read approval input — denying by default");
1030 false
1031 }
1032 }
1033}
1034
1035#[cfg(test)]
1038async fn prompt_for_approval_with_input(
1039 tool_name: &str,
1040 args: &serde_json::Value,
1041 input: &str,
1042) -> bool {
1043 use std::io::Write;
1044
1045 let args_str = serde_json::to_string_pretty(args).unwrap_or_default();
1046
1047 eprintln!("\n⚠️ Tool requires approval:");
1048 eprintln!(" Tool: {}", tool_name);
1049 for line in args_str.lines() {
1050 eprintln!(" {}", line);
1051 }
1052 eprint!(" Approve? [y/N] ");
1053 std::io::stderr().flush().ok();
1054
1055 let trimmed = input.trim().to_lowercase();
1056 trimmed == "y" || trimmed == "yes"
1057}
1058
1059async fn execute_parsed_tool_call(
1068 tool_name: String,
1069 args: serde_json::Value,
1070 registry: &ToolRegistry,
1071 policy_engine: &PolicyEngine,
1072 _sandbox: &Sandbox,
1073 audit_log: &AuditLog,
1074 require_approval: bool,
1075) -> Option<ToolResult> {
1076 info!(tool = %tool_name, "Executing parsed tool call");
1077
1078 let _ = audit_log.tool_call(&tool_name, &args);
1080
1081 if require_approval && policy_engine.requires_approval(&tool_name) {
1083 let _ = audit_log.append(
1084 AuditEventType::ApprovalRequested,
1085 "approval",
1086 &format!("Approval required for tool: {}", tool_name),
1087 Some(serde_json::json!({"tool": tool_name, "args": args})),
1088 );
1089
1090 let granted = prompt_for_approval(&tool_name, &args).await;
1092
1093 if !granted {
1094 let _ = audit_log.approval(&tool_name, false, Some("Denied by user"));
1095 warn!(tool = %tool_name, "Tool call denied by user");
1096 return Some(ToolResult {
1097 tool_name: tool_name.clone(),
1098 success: false,
1099 output: String::new(),
1100 error: Some(format!("Approval denied by user for tool: {}", tool_name)),
1101 exit_code: Some(-1),
1102 duration_ms: None,
1103 });
1104 }
1105
1106 let _ = audit_log.approval(&tool_name, true, Some("Approved by user"));
1107 info!(tool = %tool_name, "Tool call approved by user");
1108 }
1109
1110 let policy_decision = policy_engine.check_tool_call(&tool_name, &args);
1112
1113 match &policy_decision {
1115 Decision::Allow => {
1116 let _ = audit_log.policy_decision(&tool_name, true, None);
1117 }
1118 Decision::Deny(reason) => {
1119 let _ = audit_log.policy_decision(&tool_name, false, Some(reason));
1120 warn!(tool = %tool_name, reason = %reason, "Tool call denied by policy");
1121 return Some(ToolResult {
1122 tool_name: tool_name.clone(),
1123 success: false,
1124 output: String::new(),
1125 error: Some(format!("Policy denied: {}", reason)),
1126 exit_code: Some(-1),
1127 duration_ms: None,
1128 });
1129 }
1130 }
1131
1132 let tool_name_clone = tool_name.clone();
1134 let call = ToolCall {
1135 name: tool_name.clone(),
1136 arguments: args,
1137 id: None,
1138 };
1139
1140 let result = match registry.execute(call).await {
1141 Ok(result) => {
1142 let _ = audit_log.append(
1144 AuditEventType::ToolResult,
1145 &tool_name_clone,
1146 &format!(
1147 "Tool executed: {} (success: {})",
1148 tool_name_clone, result.success
1149 ),
1150 Some(serde_json::json!({
1151 "success": result.success,
1152 "exit_code": result.exit_code,
1153 "duration_ms": result.duration_ms,
1154 })),
1155 );
1156 result
1157 }
1158 Err(e) => {
1159 let _ = audit_log.append(
1161 AuditEventType::Error,
1162 &tool_name_clone,
1163 &format!("Tool execution failed: {}", e),
1164 None,
1165 );
1166 ToolResult {
1167 tool_name: tool_name_clone,
1168 success: false,
1169 output: String::new(),
1170 error: Some(e.to_string()),
1171 exit_code: Some(-1),
1172 duration_ms: None,
1173 }
1174 }
1175 };
1176
1177 Some(result)
1178}
1179
1180async fn execute_tool_call_with_security(
1189 content: &str,
1190 registry: &ToolRegistry,
1191 policy_engine: &PolicyEngine,
1192 _sandbox: &Sandbox,
1193 audit_log: &AuditLog,
1194) -> Option<ToolResult> {
1195 let (tool_name, args) = parse_tool_call(content)?;
1197
1198 execute_parsed_tool_call(
1200 tool_name,
1201 args,
1202 registry,
1203 policy_engine,
1204 _sandbox,
1205 audit_log,
1206 false, )
1208 .await
1209}
1210
1211fn parse_structured_tool_call(choice: &Choice) -> Option<(String, serde_json::Value)> {
1215 let tool_calls = choice.tool_calls.as_ref()?;
1216 let first_call = tool_calls.first()?;
1217
1218 let tool_name = first_call.function.name.clone();
1219 let args: serde_json::Value = serde_json::from_str(&first_call.function.arguments).ok()?;
1220
1221 Some((tool_name, args))
1222}
1223
1224fn parse_tool_call(content: &str) -> Option<(String, serde_json::Value)> {
1226 let mut lines = content.lines();
1227 let tool_call_line = lines.find(|l| l.trim().starts_with("TOOL_CALL:"))?;
1228
1229 let tool_name = tool_call_line
1230 .trim()
1231 .strip_prefix("TOOL_CALL:")
1232 .map(|s| s.trim())
1233 .filter(|s| !s.is_empty())?
1234 .to_string();
1235
1236 let args_line = lines.find(|l| l.trim().starts_with("ARGS:"))?;
1238 let args_str = args_line.trim().strip_prefix("ARGS:").map(|s| s.trim())?;
1239
1240 let args: serde_json::Value = serde_json::from_str(args_str).ok()?;
1241
1242 Some((tool_name, args))
1243}
1244
1245pub async fn run_single(
1247 llm: Arc<dyn LLMProviderTrait>,
1248 config: Config,
1249 ravenfabric: Option<RavenFabricClient>,
1250) -> Result<()> {
1251 info!(
1252 "Starting single agent mode with provider: {}",
1253 llm.provider_name()
1254 );
1255
1256 if let Some(ref rf) = ravenfabric {
1258 if rf.is_enabled() {
1259 info!("RavenFabric remote execution available");
1260 match rf.health().await {
1261 Ok(true) => info!("RavenFabric mesh is healthy"),
1262 Ok(false) => warn!("RavenFabric mesh returned unhealthy status"),
1263 Err(e) => warn!(error = %e, "RavenFabric health check failed"),
1264 }
1265 }
1266 }
1267
1268 let system_prompt = &config.llm.system_prompt;
1269
1270 let messages = vec![
1271 ChatMessage {
1272 role: "system".to_string(),
1273 content: system_prompt.to_string(),
1274 },
1275 ChatMessage {
1276 role: "user".to_string(),
1277 content: "Ready. Awaiting instructions.".to_string(),
1278 },
1279 ];
1280
1281 match llm.chat(messages).await {
1282 Ok(response) => {
1283 if let Some(choice) = response.choices.first() {
1284 info!(provider = llm.provider_name(), model = llm.model(), response = %choice.message.content, "Agent response received");
1285
1286 if let Some(ref rf) = ravenfabric {
1288 if rf.is_enabled() {
1289 let preview = choice.message.content.chars().take(500).collect::<String>();
1290 let _ = rf.broadcast(&preview, 30).await;
1291 info!("Agent result broadcast to RavenFabric mesh");
1292 }
1293 }
1294 }
1295 }
1296 Err(e) => {
1297 warn!(error = %e, provider = llm.provider_name(), "LLM request failed");
1298 }
1299 }
1300
1301 Ok(())
1302}
1303
1304pub async fn run_swarm(
1309 llm: Arc<dyn LLMProviderTrait>,
1310 config: Config,
1311 ravenfabric: Option<RavenFabricClient>,
1312) -> Result<()> {
1313 info!("Starting swarm mode (single-provider) — 3 parallel agents");
1314
1315 if let Some(ref rf) = ravenfabric {
1317 if rf.is_enabled() {
1318 info!("RavenFabric remote execution available for swarm coordination");
1319 match rf.health().await {
1320 Ok(true) => info!("RavenFabric mesh is healthy"),
1321 Ok(false) => warn!("RavenFabric mesh returned unhealthy status"),
1322 Err(e) => warn!(error = %e, "RavenFabric health check failed"),
1323 }
1324 }
1325 }
1326
1327 let _system_prompt = &config.llm.system_prompt;
1328 let num_agents = 3;
1329 let mut handles = Vec::new();
1330
1331 let personas = [
1333 "You are an analytical agent. Focus on logic, structure, and precision.",
1334 "You are a creative agent. Focus on innovation, alternatives, and possibilities.",
1335 "You are a pragmatic agent. Focus on simplicity, efficiency, and practicality.",
1336 ];
1337
1338 for (i, persona) in personas.iter().enumerate().take(num_agents) {
1339 let llm_clone = llm.clone();
1340 let persona = persona.to_string();
1341 let task = "Analyze the given task and provide your solution.".to_string();
1342
1343 let handle = tokio::spawn(async move {
1344 let mut memory = ConversationMemory::new(&persona, 10);
1345 memory.add_user_message(&task);
1346
1347 let messages = memory.history().to_vec();
1348 match llm_clone.chat(messages).await {
1349 Ok(response) => {
1350 let content = response
1351 .choices
1352 .first()
1353 .map(|c| c.message.content.clone())
1354 .unwrap_or_default();
1355 Ok((i, content))
1356 }
1357 Err(e) => Err(format!("Agent {} failed: {}", i, e)),
1358 }
1359 });
1360
1361 handles.push(handle);
1362 }
1363
1364 let mut results: Vec<(usize, String)> = Vec::new();
1366 for handle in handles {
1367 match handle.await {
1368 Ok(Ok((idx, result))) => {
1369 info!("Agent {} completed: {} chars", idx, result.len());
1370 results.push((idx, result));
1371 }
1372 Ok(Err(e)) => warn!("Agent failed: {}", e),
1373 Err(e) => warn!("Agent join failed: {}", e),
1374 }
1375 }
1376
1377 println!("\n🐦⬛ Swarm Results ({} agents):", results.len());
1379 for (idx, result) in &results {
1380 println!(
1381 "\n── Agent {} ({}) ──",
1382 idx + 1,
1383 personas[*idx].split('.').next().unwrap_or("Unknown")
1384 );
1385 println!("{}", result);
1386 }
1387
1388 if let Some(ref rf) = ravenfabric {
1390 if rf.is_enabled() {
1391 let summary = format!(
1392 "Swarm completed: {} agents, results: {}",
1393 results.len(),
1394 results
1395 .iter()
1396 .map(|(i, r)| format!("Agent {}: {} chars", i, r.len()))
1397 .collect::<Vec<_>>()
1398 .join(", ")
1399 );
1400 let _ = rf.broadcast(&summary, 30).await;
1401 info!("Swarm results broadcast to RavenFabric mesh");
1402 }
1403 }
1404
1405 Ok(())
1406}
1407
1408pub async fn run_supervisor(
1413 llm: Arc<dyn LLMProviderTrait>,
1414 config: Config,
1415 ravenfabric: Option<RavenFabricClient>,
1416) -> Result<()> {
1417 info!("Starting supervisor mode (single-provider)");
1418
1419 if let Some(ref rf) = ravenfabric {
1421 if rf.is_enabled() {
1422 info!("RavenFabric remote execution available for supervisor coordination");
1423 match rf.health().await {
1424 Ok(true) => info!("RavenFabric mesh is healthy"),
1425 Ok(false) => warn!("RavenFabric mesh returned unhealthy status"),
1426 Err(e) => warn!(error = %e, "RavenFabric health check failed"),
1427 }
1428 }
1429 }
1430
1431 let system_prompt = &config.llm.system_prompt;
1432 let policy_engine = PolicyEngine::default_secure();
1433 let mut sandbox = Sandbox::default();
1434 sandbox.init().await.map_err(|e| {
1435 crate::error::RavenClawsError::CommandExecution(format!("Sandbox init failed: {}", e))
1436 })?;
1437 let audit_log = AuditLog::new(format!("supervisor-{}", std::process::id()));
1438 let registry = ToolRegistry::with_default_tools();
1439
1440 let supervisor_prompt = format!(
1442 "You are a supervisor agent. Your task is to decompose complex tasks into subtasks \
1443 and coordinate sub-agents to complete them. \
1444 \n\nFor each subtask, respond with:\n\
1445 SUBTASK: <description>\n\
1446 AGENT: <agent_number>\n\
1447 \nWhen all subtasks are complete, respond with:\n\
1448 FINAL: <aggregated result>\n\
1449 \nTask: {}",
1450 "Coordinate the completion of the assigned task."
1451 );
1452
1453 let mut memory = ConversationMemory::new(system_prompt, 20);
1454 memory.add_user_message(&supervisor_prompt);
1455
1456 let mut subtask_results: Vec<String> = Vec::new();
1457 let mut iteration = 0;
1458 let max_iterations = 15;
1459
1460 loop {
1461 iteration += 1;
1462 if iteration > max_iterations {
1463 warn!("Supervisor reached max iterations");
1464 break;
1465 }
1466
1467 let messages = memory.history().to_vec();
1468 let response = match llm.chat(messages).await {
1469 Ok(r) => r,
1470 Err(e) => {
1471 warn!(error = %e, "Supervisor LLM request failed");
1472 continue;
1473 }
1474 };
1475
1476 let content = response
1477 .choices
1478 .first()
1479 .map(|c| c.message.content.clone())
1480 .unwrap_or_default();
1481
1482 if content.contains("FINAL:") {
1484 let final_response = content
1485 .split("FINAL:")
1486 .nth(1)
1487 .unwrap_or("")
1488 .trim()
1489 .to_string();
1490 info!("Supervisor completed task: {} chars", final_response.len());
1491
1492 let _ = audit_log.append(
1493 AuditEventType::AgentFinish,
1494 "supervisor",
1495 "Supervisor completed task coordination",
1496 Some(serde_json::json!({
1497 "iterations": iteration,
1498 "subtasks_completed": subtask_results.len(),
1499 })),
1500 );
1501
1502 println!("\n🐦⬛ Supervisor Result:\n{}", final_response);
1503 return Ok(());
1504 }
1505
1506 if content.contains("SUBTASK:") {
1508 let subtask_block = content.split("SUBTASK:").nth(1).unwrap_or("");
1509 let subtask_lines: Vec<&str> = subtask_block.lines().take(3).collect();
1510
1511 let subtask_desc = subtask_lines.first().unwrap_or(&"").trim();
1512 let agent_num = subtask_lines
1513 .iter()
1514 .find(|l| l.starts_with("AGENT:"))
1515 .and_then(|l| l.split(':').nth(1))
1516 .unwrap_or("1")
1517 .trim();
1518
1519 if !subtask_desc.is_empty() {
1520 info!("Subtask {}: {}", agent_num, subtask_desc);
1521
1522 let subtask_result = run_subtask_agent(
1524 llm.clone(),
1525 subtask_desc,
1526 system_prompt,
1527 &policy_engine,
1528 &sandbox,
1529 &audit_log,
1530 ®istry,
1531 )
1532 .await;
1533
1534 match subtask_result {
1535 Ok(result) => {
1536 info!("Subtask {} completed: {} chars", agent_num, result.len());
1537 subtask_results.push(format!("Agent {} result: {}", agent_num, result));
1538
1539 memory.add_assistant_message(&format!(
1540 "Decomposed subtask {}: {}",
1541 agent_num, subtask_desc
1542 ));
1543 memory
1544 .add_user_message(&format!("Subtask {} result: {}", agent_num, result));
1545 }
1546 Err(e) => {
1547 warn!("Subtask {} failed: {}", agent_num, e);
1548 memory
1549 .add_assistant_message(&format!("Subtask {} failed: {}", agent_num, e));
1550 }
1551 }
1552 }
1553 } else {
1554 memory.add_assistant_message(&content);
1555 }
1556 }
1557
1558 if !subtask_results.is_empty() {
1560 let aggregated = subtask_results.join("\n\n");
1561 info!(
1562 "Supervisor aggregated {} subtask results",
1563 subtask_results.len()
1564 );
1565
1566 if let Some(ref rf) = ravenfabric {
1568 if rf.is_enabled() {
1569 let summary = format!(
1570 "Supervisor completed: {} subtasks, result: {} chars",
1571 subtask_results.len(),
1572 aggregated.len()
1573 );
1574 let _ = rf.broadcast(&summary, 30).await;
1575 info!("Supervisor result broadcast to RavenFabric mesh");
1576 }
1577 }
1578
1579 println!("\n🐦⬛ Supervisor Aggregated Result:\n{}", aggregated);
1580 return Ok(());
1581 }
1582
1583 Err(crate::error::RavenClawsError::CommandExecution(
1584 "Supervisor mode completed without results".to_string(),
1585 ))
1586}
1587
1588async fn run_subtask_agent(
1590 llm: Arc<dyn LLMProviderTrait>,
1591 subtask: &str,
1592 system_prompt: &str,
1593 policy_engine: &PolicyEngine,
1594 sandbox: &Sandbox,
1595 audit_log: &AuditLog,
1596 registry: &ToolRegistry,
1597) -> Result<String> {
1598 let mut memory = ConversationMemory::new(system_prompt, 10);
1599 memory.add_user_message(&format!("Execute this subtask: {}", subtask));
1600
1601 for i in 0..5 {
1602 let messages = memory.history().to_vec();
1603 let response = match llm.chat(messages).await {
1604 Ok(r) => r,
1605 Err(e) => {
1606 warn!(error = %e, iteration = i, "Subtask agent LLM failed");
1607 continue;
1608 }
1609 };
1610
1611 let content = response
1612 .choices
1613 .first()
1614 .map(|c| c.message.content.clone())
1615 .unwrap_or_default();
1616
1617 if content.contains("FINAL:") || content.contains("DONE:") {
1618 return Ok(content
1619 .replace("FINAL:", "")
1620 .replace("DONE:", "")
1621 .trim()
1622 .to_string());
1623 }
1624
1625 if let Some(tool_result) =
1627 execute_tool_call_with_security(&content, registry, policy_engine, sandbox, audit_log)
1628 .await
1629 {
1630 memory.add_assistant_message(&content);
1631 memory.add_user_message(&format!("Tool result: {}", tool_result.output));
1632 } else {
1633 memory.add_assistant_message(&content);
1634 memory.add_user_message("Continue with next step.");
1635 }
1636 }
1637
1638 Ok("Subtask completed".to_string())
1639}
1640
1641pub async fn run_single_multi(
1643 multi_llm: MultiModelManager,
1644 config: Config,
1645 ravenfabric: Option<RavenFabricClient>,
1646) -> Result<()> {
1647 info!(
1648 "Starting single agent mode (multi-model) with {} providers",
1649 multi_llm.client_count()
1650 );
1651
1652 if let Some(ref rf) = ravenfabric {
1654 if rf.is_enabled() {
1655 info!("RavenFabric remote execution available");
1656 match rf.health().await {
1657 Ok(true) => info!("RavenFabric mesh is healthy"),
1658 Ok(false) => warn!("RavenFabric mesh returned unhealthy status"),
1659 Err(e) => warn!(error = %e, "RavenFabric health check failed"),
1660 }
1661 }
1662 }
1663
1664 let system_prompt = &config.llm.system_prompt;
1665
1666 let messages = vec![
1667 ChatMessage {
1668 role: "system".to_string(),
1669 content: system_prompt.to_string(),
1670 },
1671 ChatMessage {
1672 role: "user".to_string(),
1673 content: "Ready. Awaiting instructions.".to_string(),
1674 },
1675 ];
1676
1677 let mut last_index = 0;
1679 for i in 0..multi_llm.client_count() {
1680 let client = if i == 0 {
1681 multi_llm.get_client(0)
1682 } else {
1683 multi_llm.next_client(last_index)
1684 };
1685
1686 if let Some(client) = client {
1687 match client.chat(messages.clone()).await {
1688 Ok(response) => {
1689 if let Some(choice) = response.choices.first() {
1690 info!(provider = client.provider_name(), model = client.model(), response = %choice.message.content, "Provider response received");
1691 }
1692 }
1693 Err(e) => {
1694 warn!(error = %e, provider = client.provider_name(), model = client.model(), "Provider request failed");
1695 }
1696 }
1697 last_index = i;
1698 }
1699 }
1700
1701 if let Some(ref rf) = ravenfabric {
1703 if rf.is_enabled() {
1704 let _ = rf
1705 .broadcast("Single agent (multi-model) completed", 30)
1706 .await;
1707 info!("Multi-model result broadcast to RavenFabric mesh");
1708 }
1709 }
1710
1711 Ok(())
1712}
1713
1714pub async fn run_swarm_multi(
1719 multi_llm: MultiModelManager,
1720 config: Config,
1721 ravenfabric: Option<RavenFabricClient>,
1722) -> Result<()> {
1723 info!(
1724 "Starting swarm mode (multi-model) — {} parallel agents",
1725 multi_llm.client_count()
1726 );
1727
1728 if let Some(ref rf) = ravenfabric {
1730 if rf.is_enabled() {
1731 info!("RavenFabric remote execution available for swarm coordination");
1732 match rf.health().await {
1733 Ok(true) => info!("RavenFabric mesh is healthy"),
1734 Ok(false) => warn!("RavenFabric mesh returned unhealthy status"),
1735 Err(e) => warn!(error = %e, "RavenFabric health check failed"),
1736 }
1737 }
1738 }
1739
1740 let _system_prompt = &config.llm.system_prompt;
1741 let num_agents = multi_llm.client_count().min(3); let mut handles = Vec::new();
1743
1744 let personas = [
1746 "You are an analytical agent. Focus on logic, structure, and precision.",
1747 "You are a creative agent. Focus on innovation, alternatives, and possibilities.",
1748 "You are a pragmatic agent. Focus on simplicity, efficiency, and practicality.",
1749 ];
1750
1751 for i in 0..num_agents {
1752 let client = multi_llm.get_client(i).unwrap().clone();
1753 let persona = personas.get(i).unwrap_or(&personas[0]).to_string();
1754 let task = "Analyze the given task and provide your solution.".to_string();
1755
1756 let handle = tokio::spawn(async move {
1757 let mut memory = ConversationMemory::new(&persona, 10);
1758 memory.add_user_message(&task);
1759
1760 let messages = memory.history().to_vec();
1761 match client.chat(messages).await {
1762 Ok(response) => {
1763 let content = response
1764 .choices
1765 .first()
1766 .map(|c| c.message.content.clone())
1767 .unwrap_or_default();
1768 Ok((
1769 i,
1770 client.provider_name().to_string(),
1771 client.model().to_string(),
1772 content,
1773 ))
1774 }
1775 Err(e) => Err(format!("Agent {} failed: {}", i, e)),
1776 }
1777 });
1778
1779 handles.push(handle);
1780 }
1781
1782 let mut results: Vec<(usize, String, String, String)> = Vec::new();
1784 for handle in handles {
1785 match handle.await {
1786 Ok(Ok((idx, provider, model, result))) => {
1787 info!(
1788 "Agent {} ({}:{}) completed: {} chars",
1789 idx,
1790 provider,
1791 model,
1792 result.len()
1793 );
1794 results.push((idx, provider, model, result));
1795 }
1796 Ok(Err(e)) => warn!("Agent failed: {}", e),
1797 Err(e) => warn!("Agent join failed: {}", e),
1798 }
1799 }
1800
1801 println!(
1803 "\n🐦⬛ Swarm Results ({} agents, multi-model):",
1804 results.len()
1805 );
1806 for (idx, provider, model, result) in &results {
1807 println!("\n── Agent {} ({}:{}) ──", idx + 1, provider, model);
1808 println!("{}", result);
1809 }
1810
1811 if let Some(ref rf) = ravenfabric {
1813 if rf.is_enabled() {
1814 let summary = format!("Multi-model swarm completed: {} agents", results.len());
1815 let _ = rf.broadcast(&summary, 30).await;
1816 info!("Multi-model swarm results broadcast to RavenFabric mesh");
1817 }
1818 }
1819
1820 Ok(())
1821}
1822
1823pub async fn run_supervisor_multi(
1828 multi_llm: MultiModelManager,
1829 config: Config,
1830 ravenfabric: Option<RavenFabricClient>,
1831) -> Result<()> {
1832 info!(
1833 "Starting supervisor mode (multi-model) with {} providers",
1834 multi_llm.client_count()
1835 );
1836
1837 if let Some(ref rf) = ravenfabric {
1839 if rf.is_enabled() {
1840 info!("RavenFabric remote execution available for supervisor coordination");
1841 match rf.health().await {
1842 Ok(true) => info!("RavenFabric mesh is healthy"),
1843 Ok(false) => warn!("RavenFabric mesh returned unhealthy status"),
1844 Err(e) => warn!(error = %e, "RavenFabric health check failed"),
1845 }
1846 }
1847 }
1848
1849 let system_prompt = &config.llm.system_prompt;
1850 let policy_engine = PolicyEngine::default_secure();
1851 let mut sandbox = Sandbox::default();
1852 sandbox.init().await.map_err(|e| {
1853 crate::error::RavenClawsError::CommandExecution(format!("Sandbox init failed: {}", e))
1854 })?;
1855 let audit_log = AuditLog::new(format!("supervisor-multi-{}", std::process::id()));
1856 let registry = ToolRegistry::with_default_tools();
1857
1858 let supervisor_prompt = format!(
1860 "You are a supervisor agent coordinating multiple LLM providers. \
1861 Decompose tasks and assign them to appropriate providers based on their strengths. \
1862 \n\nFor each subtask, respond with:\n\
1863 SUBTASK: <description>\n\
1864 PROVIDER: <provider_index 0-{}>\n\
1865 \nWhen complete, respond with:\n\
1866 FINAL: <aggregated result>\n\
1867 \nTask: {}",
1868 multi_llm.client_count() - 1,
1869 "Coordinate the completion of the assigned task using available providers."
1870 );
1871
1872 let mut memory = ConversationMemory::new(system_prompt, 20);
1873 memory.add_user_message(&supervisor_prompt);
1874
1875 let mut subtask_results: Vec<String> = Vec::new();
1876 let mut iteration = 0;
1877 let max_iterations = 15;
1878
1879 loop {
1880 iteration += 1;
1881 if iteration > max_iterations {
1882 warn!("Supervisor reached max iterations");
1883 break;
1884 }
1885
1886 let supervisor_client = multi_llm
1888 .get_client(iteration % multi_llm.client_count())
1889 .or_else(|| multi_llm.get_client(0))
1890 .cloned();
1891
1892 let messages = memory.history().to_vec();
1893 let response =
1894 match supervisor_client.map(|c| tokio::spawn(async move { c.chat(messages).await })) {
1895 Some(handle) => match handle.await {
1896 Ok(Ok(r)) => r,
1897 Ok(Err(e)) => {
1898 warn!(error = %e, "Supervisor LLM request failed");
1899 continue;
1900 }
1901 Err(e) => {
1902 warn!(error = %e, "Supervisor task join failed");
1903 continue;
1904 }
1905 },
1906 None => {
1907 warn!("No LLM clients available");
1908 break;
1909 }
1910 };
1911
1912 let content = response
1913 .choices
1914 .first()
1915 .map(|c| c.message.content.clone())
1916 .unwrap_or_default();
1917
1918 if content.contains("FINAL:") {
1920 let final_response = content
1921 .split("FINAL:")
1922 .nth(1)
1923 .unwrap_or("")
1924 .trim()
1925 .to_string();
1926 info!("Supervisor completed task: {} chars", final_response.len());
1927
1928 let _ = audit_log.append(
1929 AuditEventType::AgentFinish,
1930 "supervisor",
1931 "Supervisor completed task coordination",
1932 Some(serde_json::json!({
1933 "iterations": iteration,
1934 "subtasks_completed": subtask_results.len(),
1935 "providers_used": multi_llm.client_count(),
1936 })),
1937 );
1938
1939 println!("\n🐦⬛ Supervisor Result (multi-model):\n{}", final_response);
1940 return Ok(());
1941 }
1942
1943 if content.contains("SUBTASK:") && content.contains("PROVIDER:") {
1945 let subtask_block = content.split("SUBTASK:").nth(1).unwrap_or("");
1946 let subtask_lines: Vec<&str> = subtask_block.lines().take(4).collect();
1947
1948 let subtask_desc = subtask_lines.first().unwrap_or(&"").trim();
1949 let provider_idx = subtask_lines
1950 .iter()
1951 .find(|l| l.starts_with("PROVIDER:"))
1952 .and_then(|l| l.split(':').nth(1))
1953 .and_then(|s| s.trim().parse::<usize>().ok())
1954 .unwrap_or(0);
1955
1956 if !subtask_desc.is_empty() {
1957 info!("Subtask for provider {}: {}", provider_idx, subtask_desc);
1958
1959 let client = multi_llm
1960 .get_client(provider_idx)
1961 .or_else(|| multi_llm.get_client(0));
1962
1963 if let Some(client) = client {
1964 let subtask_result = run_subtask_agent(
1965 client.clone(),
1966 subtask_desc,
1967 system_prompt,
1968 &policy_engine,
1969 &sandbox,
1970 &audit_log,
1971 ®istry,
1972 )
1973 .await;
1974
1975 match subtask_result {
1976 Ok(result) => {
1977 info!("Subtask {} completed: {} chars", provider_idx, result.len());
1978 subtask_results.push(format!(
1979 "Provider {} ({}): {}",
1980 provider_idx,
1981 client.provider_name(),
1982 result
1983 ));
1984
1985 memory.add_assistant_message(&format!(
1986 "Assigned subtask to provider {}: {}",
1987 provider_idx, subtask_desc
1988 ));
1989 memory.add_user_message(&format!(
1990 "Provider {} result: {}",
1991 provider_idx, result
1992 ));
1993 }
1994 Err(e) => {
1995 warn!("Subtask {} failed: {}", provider_idx, e);
1996 memory.add_assistant_message(&format!(
1997 "Provider {} subtask failed: {}",
1998 provider_idx, e
1999 ));
2000 }
2001 }
2002 }
2003 }
2004 } else {
2005 memory.add_assistant_message(&content);
2006 }
2007 }
2008
2009 if !subtask_results.is_empty() {
2011 let aggregated = subtask_results.join("\n\n");
2012 info!(
2013 "Supervisor aggregated {} subtask results",
2014 subtask_results.len()
2015 );
2016
2017 if let Some(ref rf) = ravenfabric {
2019 if rf.is_enabled() {
2020 let summary = format!(
2021 "Multi-model supervisor completed: {} subtasks, result: {} chars",
2022 subtask_results.len(),
2023 aggregated.len()
2024 );
2025 let _ = rf.broadcast(&summary, 30).await;
2026 info!("Multi-model supervisor result broadcast to RavenFabric mesh");
2027 }
2028 }
2029
2030 println!(
2031 "\n🐦⬛ Supervisor Aggregated Result (multi-model):\n{}",
2032 aggregated
2033 );
2034 return Ok(());
2035 }
2036
2037 Err(crate::error::RavenClawsError::CommandExecution(
2038 "Supervisor mode completed without results".to_string(),
2039 ))
2040}
2041
2042pub async fn run_repl(llm: Arc<dyn LLMProviderTrait>, config: Config) -> Result<()> {
2044 use tokio::io::{AsyncBufReadExt, BufReader};
2045
2046 info!("Starting interactive REPL mode");
2047
2048 let system_prompt = &config.llm.system_prompt;
2049 let mut memory = ConversationMemory::new(system_prompt, 0);
2050
2051 let stdin = BufReader::new(tokio::io::stdin());
2052 let mut lines = stdin.lines();
2053
2054 println!("RavenClaws REPL — type /exit to quit, /reset to clear history");
2055
2056 loop {
2057 print!("\n> ");
2058 use tokio::io::AsyncWriteExt;
2059 tokio::io::stdout().flush().await?;
2060
2061 let line = match lines.next_line().await {
2062 Ok(Some(l)) => l,
2063 Ok(None) => break, Err(e) => {
2065 warn!(error = %e, "REPL read error");
2066 break;
2067 }
2068 };
2069
2070 let input = line.trim();
2071
2072 if input.is_empty() {
2073 continue;
2074 }
2075
2076 match input {
2077 "/exit" | "/quit" => {
2078 println!("Exiting REPL.");
2079 break;
2080 }
2081 "/reset" => {
2082 memory = ConversationMemory::new(system_prompt, 0);
2083 println!("Conversation history reset.");
2084 continue;
2085 }
2086 _ => {}
2087 }
2088
2089 memory.add_user_message(input);
2090 let messages = memory.history().to_vec();
2091
2092 match llm.chat(messages).await {
2093 Ok(response) => {
2094 if let Some(choice) = response.choices.first() {
2095 let content = &choice.message.content;
2096 println!("{}", content);
2097 memory.add_assistant_message(content);
2098 }
2099 }
2100 Err(e) => {
2101 warn!(error = %e, "LLM request failed");
2102 println!("Error: {}", e);
2103 }
2104 }
2105 }
2106
2107 Ok(())
2108}
2109
2110#[cfg(test)]
2111mod tests {
2112 use super::*;
2113
2114 #[test]
2115 fn test_swarm_function_exists() {
2116 let _fn_ptr: fn(Arc<dyn LLMProviderTrait>, Config, Option<RavenFabricClient>) -> _ =
2118 run_swarm;
2119 }
2120
2121 #[test]
2122 fn test_supervisor_function_exists() {
2123 let _fn_ptr: fn(Arc<dyn LLMProviderTrait>, Config, Option<RavenFabricClient>) -> _ =
2125 run_supervisor;
2126 }
2127
2128 #[test]
2129 fn test_conversation_memory_new() {
2130 let mem = ConversationMemory::new("system prompt", 10);
2131 assert_eq!(mem.messages.len(), 1);
2132 assert_eq!(mem.messages[0].role, "system");
2133 assert_eq!(mem.messages[0].content, "system prompt");
2134 }
2135
2136 #[test]
2137 fn test_conversation_memory_add_user() {
2138 let mut mem = ConversationMemory::new("system", 10);
2139 mem.add_user_message("hello");
2140 assert_eq!(mem.messages.len(), 2);
2141 assert_eq!(mem.messages[1].role, "user");
2142 assert_eq!(mem.messages[1].content, "hello");
2143 }
2144
2145 #[test]
2146 fn test_conversation_memory_trim() {
2147 let mut mem = ConversationMemory::new("system", 3);
2148 mem.add_user_message("msg1");
2149 mem.add_assistant_message("resp1");
2150 mem.add_user_message("msg2");
2151 mem.add_assistant_message("resp2");
2152 assert!(mem.messages.len() <= 3);
2154 }
2155
2156 #[test]
2157 fn test_parse_tool_call_valid() {
2158 let content = "THOUGHT: I need to run a command\nTOOL_CALL: shell_exec\nARGS: {\"command\": \"echo hello\"}";
2159 let (name, args) = parse_tool_call(content).unwrap();
2160 assert_eq!(name, "shell_exec");
2161 assert_eq!(args["command"], "echo hello");
2162 }
2163
2164 #[test]
2165 fn test_parse_tool_call_missing_tool() {
2166 let content = "THOUGHT: no tool here";
2167 assert!(parse_tool_call(content).is_none());
2168 }
2169
2170 #[test]
2171 fn test_parse_tool_call_missing_args() {
2172 let content = "TOOL_CALL: shell_exec\nNo args line";
2173 assert!(parse_tool_call(content).is_none());
2174 }
2175
2176 #[test]
2177 fn test_parse_tool_call_invalid_json() {
2178 let content = "TOOL_CALL: shell_exec\nARGS: not valid json";
2179 assert!(parse_tool_call(content).is_none());
2180 }
2181
2182 #[test]
2183 fn test_agent_loop_config_default() {
2184 let config = AgentLoopConfig::default();
2185 assert_eq!(config.max_iterations, 10);
2186 assert!(!config.enable_tools);
2187 assert!(!config.require_approval);
2188 }
2189
2190 #[test]
2191 fn test_agent_loop_config_require_approval() {
2192 let config = AgentLoopConfig {
2193 max_iterations: 5,
2194 enable_tools: true,
2195 require_approval: true,
2196 prompt_injection_protection: true,
2197 token_lifetime_secs: 0,
2198 no_final_required: false,
2199 fallback_chain: None,
2200 token_budget: None,
2201 ravenfabric: None,
2202 checkpoint_dir: None,
2203 session_id: None,
2204 metrics_callback: None,
2205 };
2206 assert_eq!(config.max_iterations, 5);
2207 assert!(config.enable_tools);
2208 assert!(config.require_approval);
2209 assert!(config.prompt_injection_protection);
2210 assert_eq!(config.token_lifetime_secs, 0);
2211 }
2212
2213 #[test]
2214 fn test_prompt_for_approval_yes() {
2215 let args = serde_json::json!({"command": "echo hello"});
2216 let result = tokio_test::block_on(prompt_for_approval_with_input("shell_exec", &args, "y"));
2217 assert!(result, "Should approve for 'y'");
2218 }
2219
2220 #[test]
2221 fn test_prompt_for_approval_yes_full() {
2222 let args = serde_json::json!({"command": "echo hello"});
2223 let result =
2224 tokio_test::block_on(prompt_for_approval_with_input("shell_exec", &args, "yes"));
2225 assert!(result, "Should approve for 'yes'");
2226 }
2227
2228 #[test]
2229 fn test_prompt_for_approval_no() {
2230 let args = serde_json::json!({"command": "echo hello"});
2231 let result = tokio_test::block_on(prompt_for_approval_with_input("shell_exec", &args, "n"));
2232 assert!(!result, "Should deny for 'n'");
2233 }
2234
2235 #[test]
2236 fn test_prompt_for_approval_no_full() {
2237 let args = serde_json::json!({"command": "echo hello"});
2238 let result =
2239 tokio_test::block_on(prompt_for_approval_with_input("shell_exec", &args, "no"));
2240 assert!(!result, "Should deny for 'no'");
2241 }
2242
2243 #[test]
2244 fn test_prompt_for_approval_empty() {
2245 let args = serde_json::json!({"command": "echo hello"});
2246 let result = tokio_test::block_on(prompt_for_approval_with_input("shell_exec", &args, ""));
2247 assert!(!result, "Should deny for empty input (default N)");
2248 }
2249
2250 #[test]
2251 fn test_prompt_for_approval_uppercase() {
2252 let args = serde_json::json!({"command": "echo hello"});
2253 let result = tokio_test::block_on(prompt_for_approval_with_input("shell_exec", &args, "Y"));
2254 assert!(result, "Should approve for uppercase 'Y'");
2255 }
2256
2257 #[test]
2258 fn test_prompt_for_approval_auto_approves_non_tty() {
2259 #[allow(clippy::let_underscore_future)]
2265 let _ = prompt_for_approval_with_input("test", &serde_json::json!({}), "y");
2266 }
2267
2268 #[test]
2269 fn test_execute_parsed_tool_call_skips_approval_when_not_required() {
2270 let registry = ToolRegistry::with_default_tools();
2271 let policy_engine = PolicyEngine::default_secure();
2272 let sandbox = Sandbox::default();
2273 let audit_log = AuditLog::new("test-session".to_string());
2274
2275 let args = serde_json::json!({"command": "echo hello"});
2276 let result = tokio_test::block_on(execute_parsed_tool_call(
2277 "shell_exec".to_string(),
2278 args,
2279 ®istry,
2280 &policy_engine,
2281 &sandbox,
2282 &audit_log,
2283 false, ));
2285
2286 assert!(result.is_some());
2287 let tool_result = result.unwrap();
2288 assert_eq!(tool_result.tool_name, "shell_exec");
2289 }
2290
2291 #[test]
2292 fn test_execute_parsed_tool_call_approval_not_needed_for_read_only_tools() {
2293 let registry = ToolRegistry::with_default_tools();
2296 let policy_engine = PolicyEngine::default_secure();
2297 let sandbox = Sandbox::default();
2298 let audit_log = AuditLog::new("test-session".to_string());
2299
2300 let args = serde_json::json!({"path": "/tmp/test.txt"});
2301 let result = tokio_test::block_on(execute_parsed_tool_call(
2302 "read_file".to_string(),
2303 args,
2304 ®istry,
2305 &policy_engine,
2306 &sandbox,
2307 &audit_log,
2308 true, ));
2310
2311 assert!(result.is_some());
2313 let tool_result = result.unwrap();
2314 assert_eq!(tool_result.tool_name, "read_file");
2315 }
2316
2317 #[test]
2318 fn test_agent_loop_config_token_lifetime_zero_disabled() {
2319 let config = AgentLoopConfig {
2320 max_iterations: 10,
2321 enable_tools: false,
2322 require_approval: false,
2323 prompt_injection_protection: false,
2324 token_lifetime_secs: 0,
2325 no_final_required: false,
2326 fallback_chain: None,
2327 token_budget: None,
2328 ravenfabric: None,
2329 checkpoint_dir: None,
2330 session_id: None,
2331 metrics_callback: None,
2332 };
2333 assert_eq!(config.token_lifetime_secs, 0);
2334 }
2336
2337 #[test]
2338 fn test_agent_loop_config_token_lifetime_nonzero() {
2339 let config = AgentLoopConfig {
2340 max_iterations: 10,
2341 enable_tools: false,
2342 require_approval: false,
2343 prompt_injection_protection: false,
2344 token_lifetime_secs: 3600,
2345 no_final_required: false,
2346 fallback_chain: None,
2347 token_budget: None,
2348 ravenfabric: None,
2349 checkpoint_dir: None,
2350 session_id: None,
2351 metrics_callback: None,
2352 };
2353 assert_eq!(config.token_lifetime_secs, 3600);
2354 }
2355
2356 #[test]
2357 fn test_agent_loop_config_default_includes_token_lifetime() {
2358 let config = AgentLoopConfig::default();
2359 assert_eq!(config.token_lifetime_secs, 0);
2360 }
2361}