1use crate::audit::{AuditEventType, AuditLog};
7use crate::config::Config;
8use crate::error::Result;
9use crate::llm::{
10 ChatMessage, Choice, LLMProviderTrait, MultiModelManager, ProviderFallbackChain, TokenBudget,
11};
12use crate::mcp::McpClient;
13use crate::policy::{Decision, PolicyEngine};
14use crate::ravenfabric::RavenFabricClient;
15use crate::sandbox::Sandbox;
16use crate::tools::{ToolCall, ToolRegistry, ToolResult};
17use serde::{Deserialize, Serialize};
18use std::path::PathBuf;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21use tracing::{debug, info, instrument, warn};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ConversationMemory {
29 max_messages: usize,
31 messages: Vec<ChatMessage>,
33}
34
35impl ConversationMemory {
36 pub fn new(system_prompt: &str, max_messages: usize) -> Self {
39 Self {
40 max_messages,
41 messages: vec![ChatMessage::new("system", system_prompt.to_string())],
42 }
43 }
44
45 pub fn add_user_message(&mut self, content: &str) -> &[ChatMessage] {
47 self.messages
48 .push(ChatMessage::new("user", content.to_string()));
49 self.trim_to_max();
50 &self.messages
51 }
52
53 pub fn add_user_message_with_images(
55 &mut self,
56 text: &str,
57 image_data_uris: Vec<String>,
58 ) -> &[ChatMessage] {
59 self.messages.push(ChatMessage::with_images(
60 "user",
61 text.to_string(),
62 image_data_uris,
63 ));
64 self.trim_to_max();
65 &self.messages
66 }
67
68 pub fn add_assistant_message(&mut self, content: &str) {
70 self.messages
71 .push(ChatMessage::new("assistant", content.to_string()));
72 self.trim_to_max();
73 }
74
75 pub fn history(&self) -> &[ChatMessage] {
77 &self.messages
78 }
79
80 pub fn from_history(messages: Vec<ChatMessage>, max_messages: usize) -> Self {
83 Self {
84 max_messages,
85 messages,
86 }
87 }
88
89 #[allow(dead_code)]
91 pub fn len(&self) -> usize {
92 self.messages.len()
93 }
94
95 #[allow(dead_code)]
97 pub fn is_empty(&self) -> bool {
98 self.messages.is_empty()
99 }
100
101 fn trim_to_max(&mut self) {
103 if self.max_messages == 0 {
104 return;
105 }
106 while self.messages.len() > self.max_messages {
107 if self.messages.len() > 1 {
109 self.messages.remove(1);
110 } else {
111 break;
112 }
113 }
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct CheckpointState {
124 pub session_id: String,
126 pub iteration: usize,
128 pub max_iterations: usize,
130 pub messages: Vec<ChatMessage>,
132 pub initial_prompt: String,
134 pub system_prompt: String,
136 pub provider: String,
138 pub model: String,
140 pub enable_tools: bool,
142 pub last_checkpoint: String,
144}
145
146impl CheckpointState {
147 #[allow(clippy::too_many_arguments)]
149 pub fn new(
150 session_id: String,
151 iteration: usize,
152 max_iterations: usize,
153 messages: Vec<ChatMessage>,
154 initial_prompt: &str,
155 system_prompt: &str,
156 provider: &str,
157 model: &str,
158 enable_tools: bool,
159 ) -> Self {
160 Self {
161 session_id,
162 iteration,
163 max_iterations,
164 messages,
165 initial_prompt: initial_prompt.to_string(),
166 system_prompt: system_prompt.to_string(),
167 provider: provider.to_string(),
168 model: model.to_string(),
169 enable_tools,
170 last_checkpoint: chrono::Utc::now().to_rfc3339(),
171 }
172 }
173}
174
175pub fn save_checkpoint(
180 checkpoint_dir: &std::path::Path,
181 state: &CheckpointState,
182) -> std::result::Result<std::path::PathBuf, String> {
183 let path = checkpoint_dir.join(format!("{}.json", state.session_id));
184
185 std::fs::create_dir_all(checkpoint_dir)
187 .map_err(|e| format!("Failed to create checkpoint directory: {}", e))?;
188
189 let content = serde_json::to_string_pretty(state)
190 .map_err(|e| format!("Failed to serialize checkpoint: {}", e))?;
191
192 let tmp_path = path.with_extension("json.tmp");
194 std::fs::write(&tmp_path, &content)
195 .map_err(|e| format!("Failed to write checkpoint: {}", e))?;
196 std::fs::rename(&tmp_path, &path)
197 .map_err(|e| format!("Failed to finalize checkpoint: {}", e))?;
198
199 Ok(path)
200}
201
202pub fn load_checkpoint(
207 checkpoint_dir: &std::path::Path,
208 session_id: &str,
209) -> Option<CheckpointState> {
210 let path = checkpoint_dir.join(format!("{}.json", session_id));
211
212 match std::fs::read_to_string(&path) {
213 Ok(content) => match serde_json::from_str::<CheckpointState>(&content) {
214 Ok(state) => {
215 info!(
216 session_id = %session_id,
217 iteration = state.iteration,
218 max_iterations = state.max_iterations,
219 "Loaded checkpoint"
220 );
221 Some(state)
222 }
223 Err(e) => {
224 warn!(
225 session_id = %session_id,
226 error = %e,
227 "Failed to deserialize checkpoint"
228 );
229 None
230 }
231 },
232 Err(e) => {
233 if e.kind() != std::io::ErrorKind::NotFound {
234 warn!(
235 session_id = %session_id,
236 error = %e,
237 "Failed to read checkpoint"
238 );
239 }
240 None
241 }
242 }
243}
244
245pub fn delete_checkpoint(checkpoint_dir: &std::path::Path, session_id: &str) {
249 let path = checkpoint_dir.join(format!("{}.json", session_id));
250 if path.exists() {
251 if let Err(e) = std::fs::remove_file(&path) {
252 warn!(
253 session_id = %session_id,
254 error = %e,
255 "Failed to delete checkpoint"
256 );
257 } else {
258 debug!(
259 session_id = %session_id,
260 "Deleted checkpoint"
261 );
262 }
263 }
264}
265
266pub struct AgentLoopConfig {
271 pub max_iterations: usize,
273 pub enable_tools: bool,
275 pub require_approval: bool,
277 pub prompt_injection_protection: bool,
279 pub token_lifetime_secs: u64,
283 pub no_final_required: bool,
285 pub fallback_chain: Option<Arc<std::sync::Mutex<ProviderFallbackChain>>>,
287 pub token_budget: Option<Arc<std::sync::Mutex<TokenBudget>>>,
289 pub ravenfabric: Option<RavenFabricClient>,
291 pub checkpoint_dir: Option<PathBuf>,
295 pub session_id: Option<String>,
298 pub metrics_callback: Option<Box<dyn Fn(u64, u64) + Send + Sync>>,
302
303 pub load_manager: Option<Arc<crate::load::LoadManager>>,
306}
307
308impl Default for AgentLoopConfig {
309 fn default() -> Self {
310 Self {
311 max_iterations: 10,
312 enable_tools: false,
313 require_approval: false,
314 prompt_injection_protection: true,
315 token_lifetime_secs: 0,
316 no_final_required: true,
317 fallback_chain: None,
318 token_budget: None,
319 ravenfabric: None,
320 checkpoint_dir: None,
321 session_id: None,
322 metrics_callback: None,
323 load_manager: None,
324 }
325 }
326}
327
328impl std::fmt::Debug for AgentLoopConfig {
330 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331 f.debug_struct("AgentLoopConfig")
332 .field("max_iterations", &self.max_iterations)
333 .field("enable_tools", &self.enable_tools)
334 .field("require_approval", &self.require_approval)
335 .field(
336 "prompt_injection_protection",
337 &self.prompt_injection_protection,
338 )
339 .field("token_lifetime_secs", &self.token_lifetime_secs)
340 .field("no_final_required", &self.no_final_required)
341 .field("fallback_chain", &self.fallback_chain)
342 .field("token_budget", &self.token_budget)
343 .field("ravenfabric", &self.ravenfabric)
344 .field("checkpoint_dir", &self.checkpoint_dir)
345 .field("session_id", &self.session_id)
346 .field(
347 "metrics_callback",
348 &self.metrics_callback.as_ref().map(|_| "Box<Fn>"),
349 )
350 .field(
351 "load_manager",
352 &self.load_manager.as_ref().map(|_| "Arc<LoadManager>"),
353 )
354 .finish()
355 }
356}
357
358impl Clone for AgentLoopConfig {
361 fn clone(&self) -> Self {
362 Self {
363 max_iterations: self.max_iterations,
364 enable_tools: self.enable_tools,
365 require_approval: self.require_approval,
366 prompt_injection_protection: self.prompt_injection_protection,
367 token_lifetime_secs: self.token_lifetime_secs,
368 no_final_required: self.no_final_required,
369 fallback_chain: self.fallback_chain.clone(),
370 token_budget: self.token_budget.clone(),
371 ravenfabric: self.ravenfabric.clone(),
372 checkpoint_dir: self.checkpoint_dir.clone(),
373 session_id: self.session_id.clone(),
374 metrics_callback: None,
375 load_manager: self.load_manager.clone(),
376 }
377 }
378}
379
380#[instrument(skip_all, fields(provider = %llm.provider_name(), model = %llm.model()))]
387pub async fn run_agent_loop(
388 llm: Arc<dyn LLMProviderTrait>,
389 initial_prompt: &str,
390 system_prompt: &str,
391 config: AgentLoopConfig,
392) -> Result<String> {
393 run_agent_loop_with_registry(llm, initial_prompt, system_prompt, config, None).await
394}
395
396#[instrument(skip_all, fields(provider = %llm.provider_name(), model = %llm.model()))]
401pub async fn run_agent_loop_with_registry(
402 llm: Arc<dyn LLMProviderTrait>,
403 initial_prompt: &str,
404 system_prompt: &str,
405 config: AgentLoopConfig,
406 tool_registry: Option<ToolRegistry>,
407) -> Result<String> {
408 let registry = tool_registry.unwrap_or_else(ToolRegistry::with_default_tools);
409 run_agent_loop_inner(
410 llm,
411 initial_prompt,
412 system_prompt,
413 config,
414 registry,
415 "security integration",
416 false,
417 Vec::new(),
418 )
419 .await
420}
421
422#[allow(dead_code)]
427#[instrument(skip_all, fields(provider = %llm.provider_name(), model = %llm.model(), image_count = image_data_uris.len()))]
428pub async fn run_agent_loop_with_images(
429 llm: Arc<dyn LLMProviderTrait>,
430 initial_prompt: &str,
431 system_prompt: &str,
432 config: AgentLoopConfig,
433 tool_registry: Option<ToolRegistry>,
434 image_data_uris: Vec<String>,
435) -> Result<String> {
436 let registry = tool_registry.unwrap_or_else(ToolRegistry::with_default_tools);
437 run_agent_loop_inner(
438 llm,
439 initial_prompt,
440 system_prompt,
441 config,
442 registry,
443 "security integration",
444 false,
445 image_data_uris,
446 )
447 .await
448}
449
450#[allow(clippy::too_many_arguments)]
462#[instrument(skip_all, fields(provider = %llm.provider_name(), model = %llm.model()))]
463async fn run_agent_loop_inner(
464 llm: Arc<dyn LLMProviderTrait>,
465 initial_prompt: &str,
466 system_prompt: &str,
467 config: AgentLoopConfig,
468 registry: ToolRegistry,
469 loop_label: &str,
470 mcp_enabled: bool,
471 image_data_uris: Vec<String>,
472) -> Result<String> {
473 let policy_engine = PolicyEngine::default_secure();
475 let mut sandbox = Sandbox::default();
476 sandbox.init().await.map_err(|e| {
477 crate::error::RavenClawsError::CommandExecution(format!("Sandbox init failed: {}", e))
478 })?;
479 let audit_log = AuditLog::new(format!("agent-{}", std::process::id()));
480
481 let injection_detector = if config.prompt_injection_protection {
483 Some(crate::policy::InjectionDetector::new())
484 } else {
485 None
486 };
487
488 let session_start = std::time::Instant::now();
490
491 info!(
492 provider = llm.provider_name(),
493 model = llm.model(),
494 max_iterations = config.max_iterations,
495 enable_tools = config.enable_tools,
496 tool_count = registry.len(),
497 require_approval = config.require_approval,
498 prompt_injection_protection = config.prompt_injection_protection,
499 token_lifetime_secs = config.token_lifetime_secs,
500 "Agent loop starting with {}",
501 loop_label
502 );
503
504 let _ = audit_log.append(
506 AuditEventType::AgentStart,
507 "agent",
508 &format!(
509 "Agent loop started with {} (model: {})",
510 llm.provider_name(),
511 llm.model()
512 ),
513 Some(serde_json::json!({
514 "provider": llm.provider_name(),
515 "model": llm.model(),
516 "max_iterations": config.max_iterations,
517 "enable_tools": config.enable_tools,
518 "mcp_enabled": mcp_enabled,
519 "tool_count": registry.len(),
520 "require_approval": config.require_approval,
521 "prompt_injection_protection": config.prompt_injection_protection,
522 "token_lifetime_secs": config.token_lifetime_secs,
523 })),
524 );
525
526 let (mut memory, start_iteration) = if let Some(ref checkpoint_dir) = config.checkpoint_dir {
530 if let Some(ref session_id) = config.session_id {
531 if let Some(checkpoint) = load_checkpoint(checkpoint_dir, session_id) {
532 info!(
533 session_id = %session_id,
534 iteration = checkpoint.iteration,
535 max_iterations = checkpoint.max_iterations,
536 "Resuming agent loop from checkpoint"
537 );
538 (
539 ConversationMemory::from_history(checkpoint.messages, 0),
540 checkpoint.iteration + 1, )
542 } else {
543 info!(
544 session_id = %session_id,
545 "No checkpoint found, starting fresh"
546 );
547 let mut m = ConversationMemory::new(system_prompt, 0);
548 if image_data_uris.is_empty() {
549 m.add_user_message(initial_prompt);
550 } else {
551 m.add_user_message_with_images(initial_prompt, image_data_uris.clone());
552 }
553 (m, 0)
554 }
555 } else {
556 let mut m = ConversationMemory::new(system_prompt, 0);
557 if image_data_uris.is_empty() {
558 m.add_user_message(initial_prompt);
559 } else {
560 m.add_user_message_with_images(initial_prompt, image_data_uris.clone());
561 }
562 (m, 0)
563 }
564 } else {
565 let mut m = ConversationMemory::new(system_prompt, 0);
566 if image_data_uris.is_empty() {
567 m.add_user_message(initial_prompt);
568 } else {
569 m.add_user_message_with_images(initial_prompt, image_data_uris.clone());
570 }
571 (m, 0)
572 };
573
574 let session_id = config
576 .session_id
577 .clone()
578 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
579
580 for iteration in start_iteration..config.max_iterations {
581 if config.token_lifetime_secs > 0 {
583 let elapsed = session_start.elapsed().as_secs();
584 if elapsed >= config.token_lifetime_secs {
585 warn!(
586 iteration = iteration,
587 elapsed_secs = elapsed,
588 token_lifetime_secs = config.token_lifetime_secs,
589 "Agent loop reached token lifetime limit"
590 );
591 let _ = audit_log.append(
592 AuditEventType::SecurityViolation,
593 "token_lifetime",
594 &format!(
595 "Session expired after {} seconds (limit: {}s)",
596 elapsed, config.token_lifetime_secs
597 ),
598 Some(serde_json::json!({
599 "elapsed_secs": elapsed,
600 "token_lifetime_secs": config.token_lifetime_secs,
601 "iteration": iteration,
602 })),
603 );
604 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
606 delete_checkpoint(checkpoint_dir, &session_id);
607 }
608 return Err(crate::error::RavenClawsError::SecurityViolation(format!(
609 "Session token expired after {} seconds (limit: {}s)",
610 elapsed, config.token_lifetime_secs
611 )));
612 }
613 }
614 let messages = memory.history().to_vec();
615
616 if let Some(ref budget) = config.token_budget {
618 let budget = budget.lock().unwrap();
619 if budget.remaining() < 100 {
620 warn!(
621 iteration = iteration,
622 remaining = budget.remaining(),
623 "Token budget exhausted"
624 );
625 let _ = audit_log.append(
626 AuditEventType::SecurityViolation,
627 "token_budget",
628 &format!("Token budget exhausted (remaining: {})", budget.remaining()),
629 Some(serde_json::json!({
630 "remaining": budget.remaining(),
631 "used": budget.used_tokens,
632 "iteration": iteration,
633 })),
634 );
635 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
637 delete_checkpoint(checkpoint_dir, &session_id);
638 }
639 return Err(crate::error::RavenClawsError::SecurityViolation(
640 "Token budget exhausted".to_string(),
641 ));
642 }
643 }
644
645 if let Some(ref load_manager) = config.load_manager {
647 let admission = load_manager.check_admission();
648 if !admission.is_allowed() {
649 warn!(
650 ?admission,
651 iteration = iteration,
652 "Admission denied before LLM call"
653 );
654 let _ = audit_log.append(
655 AuditEventType::Error,
656 "load_manager",
657 &format!("Admission denied: {:?}", admission),
658 None,
659 );
660 load_manager.record_outcome(crate::load::RequestOutcome::Failure);
661 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
663 delete_checkpoint(checkpoint_dir, &session_id);
664 }
665 return Err(crate::error::RavenClawsError::SecurityViolation(format!(
666 "Admission denied: {:?}",
667 admission
668 )));
669 }
670 }
671
672 let response = match llm.chat(messages.clone()).await {
673 Ok(r) => {
674 if let Some(ref load_manager) = config.load_manager {
676 load_manager.record_outcome(crate::load::RequestOutcome::Success);
677 }
678 r
679 }
680 Err(e) => {
681 if let Some(ref load_manager) = config.load_manager {
683 load_manager.record_outcome(crate::load::RequestOutcome::Failure);
684 }
685 if let Some(ref chain) = config.fallback_chain {
687 warn!(error = %e, "Primary LLM failed, trying fallback chain");
688 let _ = audit_log.append(
689 AuditEventType::Error,
690 "llm",
691 &format!("Primary LLM failed, trying fallback: {}", e),
692 None,
693 );
694 let configs = {
696 let c = chain.lock().unwrap();
697 c.configs.clone()
698 };
699 let mut temp_chain = ProviderFallbackChain::new(configs);
700 match temp_chain.chat_with_fallback(messages).await {
701 Ok(r) => {
702 info!("Fallback chain succeeded");
703 if let Some(ref budget) = config.token_budget {
705 if let Some(usage) = &r.usage {
706 let mut b = budget.lock().unwrap();
707 b.record_usage(usage.total_tokens);
708 }
709 }
710 r
711 }
712 Err(fallback_e) => {
713 warn!(error = %fallback_e, "Fallback chain also failed");
714 let _ = audit_log.append(
715 AuditEventType::Error,
716 "llm",
717 &format!("All providers failed: {}", fallback_e),
718 None,
719 );
720 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
722 delete_checkpoint(checkpoint_dir, &session_id);
723 }
724 return Err(crate::error::RavenClawsError::Llm(fallback_e));
725 }
726 }
727 } else {
728 warn!(error = %e, "LLM request failed");
729 let _ = audit_log.append(
730 AuditEventType::Error,
731 "llm",
732 &format!("LLM request failed: {}", e),
733 None,
734 );
735 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
737 delete_checkpoint(checkpoint_dir, &session_id);
738 }
739 return Err(crate::error::RavenClawsError::Llm(e));
740 }
741 }
742 };
743
744 let mut iteration_tokens: u64 = 0;
746 if let Some(ref budget) = config.token_budget {
747 if let Some(usage) = &response.usage {
748 let mut b = budget.lock().unwrap();
749 b.record_usage(usage.total_tokens);
750 iteration_tokens = usage.total_tokens as u64;
751 debug!(
752 iteration = iteration,
753 tokens_used = usage.total_tokens,
754 total_used = b.used_tokens,
755 remaining = b.remaining(),
756 "Token usage recorded"
757 );
758 }
759 } else if let Some(usage) = &response.usage {
760 iteration_tokens = usage.total_tokens as u64;
761 }
762
763 if let Some(ref cb) = config.metrics_callback {
765 cb(iteration_tokens, 0);
766 }
767
768 if let Some(ref rf) = config.ravenfabric {
770 if rf.is_enabled() {
771 let _ = rf.health().await;
772 info!(
773 iteration = iteration,
774 ravenfabric = true,
775 "RavenFabric health check completed"
776 );
777 }
778 }
779
780 let first_choice = response.choices.first();
781 let content = first_choice
782 .map(|c| c.message.content.clone())
783 .unwrap_or_default();
784
785 debug!(
786 iteration = iteration,
787 response_length = content.len(),
788 response_preview = %content[..content.len().min(500)],
789 "LLM response received"
790 );
791
792 if let Some(ref detector) = injection_detector {
794 match detector.check(&content) {
795 crate::policy::InjectionVerdict::Suspicious(reason) => {
796 warn!(
797 iteration = iteration,
798 reason = %reason,
799 "Prompt-injection detected in LLM response"
800 );
801 let _ = audit_log.append(
802 AuditEventType::SecurityViolation,
803 "injection_detector",
804 &format!("Prompt-injection detected: {}", reason),
805 Some(serde_json::json!({
806 "reason": reason,
807 "iteration": iteration,
808 "content_preview": &content[..content.len().min(200)],
809 })),
810 );
811 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
813 delete_checkpoint(checkpoint_dir, &session_id);
814 }
815 return Err(crate::error::RavenClawsError::SecurityViolation(format!(
816 "LLM response blocked: potential prompt injection ({})",
817 reason
818 )));
819 }
820 crate::policy::InjectionVerdict::Clean => {}
821 }
822 }
823
824 if config.enable_tools {
826 if let Some((tool_name, args)) = first_choice.and_then(parse_structured_tool_call) {
827 info!(tool = %tool_name, "Structured tool call detected");
828
829 if let Some(tool_result) = execute_parsed_tool_call(
831 tool_name,
832 args,
833 ®istry,
834 &policy_engine,
835 &sandbox,
836 &audit_log,
837 config.require_approval,
838 )
839 .await
840 {
841 let observation = if tool_result.success {
842 format!("OBSERVATION: {}", tool_result.output)
843 } else {
844 format!(
845 "OBSERVATION: Tool failed with error: {}",
846 tool_result.error.as_deref().unwrap_or("unknown error")
847 )
848 };
849
850 memory.add_user_message(&observation);
851
852 if let Some(ref cb) = config.metrics_callback {
854 cb(0, 1);
855 }
856
857 info!(
858 iteration = iteration,
859 tool = %tool_result.tool_name,
860 success = tool_result.success,
861 "Structured tool executed"
862 );
863 continue;
864 }
865 }
866 }
867
868 if content.contains("FINAL:") {
870 let final_response = content
871 .split("FINAL:")
872 .nth(1)
873 .unwrap_or("")
874 .trim()
875 .to_string();
876
877 memory.add_assistant_message(&content);
878
879 let _ = audit_log.append(
881 AuditEventType::AgentFinish,
882 "agent",
883 "Agent loop completed successfully",
884 Some(serde_json::json!({
885 "iterations": iteration + 1,
886 "final_response_length": final_response.len(),
887 })),
888 );
889
890 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
892 delete_checkpoint(checkpoint_dir, &session_id);
893 }
894
895 return Ok(final_response);
896 }
897
898 if config.enable_tools {
900 if let Some(tool_result) = execute_tool_call_with_security(
901 &content,
902 ®istry,
903 &policy_engine,
904 &sandbox,
905 &audit_log,
906 )
907 .await
908 {
909 let observation = if tool_result.success {
910 format!("OBSERVATION: {}", tool_result.output)
911 } else {
912 format!(
913 "OBSERVATION: Tool failed with error: {}",
914 tool_result.error.as_deref().unwrap_or("unknown error")
915 )
916 };
917
918 memory.add_assistant_message(&content);
919 memory.add_user_message(&observation);
920
921 if let Some(ref cb) = config.metrics_callback {
923 cb(0, 1);
924 }
925
926 info!(
927 iteration = iteration,
928 tool = %tool_result.tool_name,
929 success = tool_result.success,
930 "Tool executed"
931 );
932 continue;
933 }
934 }
935
936 memory.add_assistant_message(&content);
938
939 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
941 let checkpoint = CheckpointState::new(
942 session_id.clone(),
943 iteration,
944 config.max_iterations,
945 memory.history().to_vec(),
946 initial_prompt,
947 system_prompt,
948 llm.provider_name(),
949 llm.model(),
950 config.enable_tools,
951 );
952 if let Err(e) = save_checkpoint(checkpoint_dir, &checkpoint) {
953 warn!(
954 session_id = %session_id,
955 iteration = iteration,
956 error = %e,
957 "Failed to save checkpoint"
958 );
959 } else {
960 debug!(
961 session_id = %session_id,
962 iteration = iteration,
963 "Checkpoint saved"
964 );
965 }
966 }
967
968 if config.no_final_required {
970 info!(
971 iteration = iteration,
972 response_length = content.len(),
973 "no_final_required: treating response as completion"
974 );
975 let _ = audit_log.append(
976 AuditEventType::AgentFinish,
977 "agent",
978 "Agent loop completed (no_final_required)",
979 Some(serde_json::json!({
980 "iterations": iteration + 1,
981 "final_response_length": content.len(),
982 })),
983 );
984 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
986 delete_checkpoint(checkpoint_dir, &session_id);
987 }
988 return Ok(content);
989 }
990
991 info!(
992 iteration = iteration,
993 thought = %content.lines().find(|l| l.starts_with("THOUGHT:")).unwrap_or("<no thought>"),
994 "Agent loop progress"
995 );
996 }
997
998 warn!(
1000 max_iterations = config.max_iterations,
1001 "Agent loop reached max iterations"
1002 );
1003
1004 let _ = audit_log.append(
1005 AuditEventType::Error,
1006 "agent",
1007 "Agent loop reached max iterations without completing",
1008 Some(serde_json::json!({
1009 "max_iterations": config.max_iterations,
1010 })),
1011 );
1012
1013 if let Some(ref checkpoint_dir) = config.checkpoint_dir {
1015 delete_checkpoint(checkpoint_dir, &session_id);
1016 }
1017
1018 let history = memory.history();
1019 if history.len() > 1 {
1020 if let Some(last) = history.last() {
1021 return Ok(last.content.clone());
1022 }
1023 }
1024
1025 Err(crate::error::RavenClawsError::CommandExecution(
1026 "Agent loop reached max iterations without completing the task".to_string(),
1027 ))
1028}
1029
1030#[allow(dead_code)]
1036#[instrument(skip_all, fields(provider = %llm.provider_name(), model = %llm.model()))]
1037pub async fn run_agent_loop_with_mcp(
1038 llm: Arc<dyn LLMProviderTrait>,
1039 initial_prompt: &str,
1040 system_prompt: &str,
1041 config: AgentLoopConfig,
1042 mcp_client: Option<Arc<RwLock<McpClient>>>,
1043) -> Result<String> {
1044 run_agent_loop_with_mcp_and_registry(
1045 llm,
1046 initial_prompt,
1047 system_prompt,
1048 config,
1049 mcp_client,
1050 None,
1051 )
1052 .await
1053}
1054
1055#[instrument(skip_all, fields(provider = %llm.provider_name(), model = %llm.model()))]
1057pub async fn run_agent_loop_with_mcp_and_registry(
1058 llm: Arc<dyn LLMProviderTrait>,
1059 initial_prompt: &str,
1060 system_prompt: &str,
1061 config: AgentLoopConfig,
1062 mcp_client: Option<Arc<RwLock<McpClient>>>,
1063 tool_registry: Option<ToolRegistry>,
1064) -> Result<String> {
1065 let mut registry = tool_registry.unwrap_or_else(ToolRegistry::with_default_tools);
1067
1068 if let Some(client) = &mcp_client {
1070 match crate::mcp::register_mcp_tools(&mut registry, client.clone()).await {
1071 Ok(count) => {
1072 info!(count, "MCP tools registered");
1073 }
1074 Err(e) => {
1075 warn!(error = %e, "Failed to register MCP tools");
1076 }
1077 }
1078 }
1079
1080 let mcp_enabled = mcp_client.is_some();
1081 run_agent_loop_inner(
1082 llm,
1083 initial_prompt,
1084 system_prompt,
1085 config,
1086 registry,
1087 "MCP integration",
1088 mcp_enabled,
1089 Vec::new(),
1090 )
1091 .await
1092}
1093
1094#[instrument(skip_all, fields(provider = %llm.provider_name(), model = %llm.model(), image_count = image_data_uris.len()))]
1096pub async fn run_agent_loop_with_mcp_and_images(
1097 llm: Arc<dyn LLMProviderTrait>,
1098 initial_prompt: &str,
1099 system_prompt: &str,
1100 config: AgentLoopConfig,
1101 mcp_client: Option<Arc<RwLock<McpClient>>>,
1102 tool_registry: Option<ToolRegistry>,
1103 image_data_uris: Vec<String>,
1104) -> Result<String> {
1105 let mut registry = tool_registry.unwrap_or_else(ToolRegistry::with_default_tools);
1106
1107 if let Some(client) = &mcp_client {
1108 match crate::mcp::register_mcp_tools(&mut registry, client.clone()).await {
1109 Ok(count) => {
1110 info!(count, "MCP tools registered");
1111 }
1112 Err(e) => {
1113 warn!(error = %e, "Failed to register MCP tools");
1114 }
1115 }
1116 }
1117
1118 let mcp_enabled = mcp_client.is_some();
1119 run_agent_loop_inner(
1120 llm,
1121 initial_prompt,
1122 system_prompt,
1123 config,
1124 registry,
1125 "MCP integration",
1126 mcp_enabled,
1127 image_data_uris,
1128 )
1129 .await
1130}
1131
1132async fn prompt_for_approval(tool_name: &str, args: &serde_json::Value) -> bool {
1137 use std::io::{IsTerminal, Write};
1138
1139 let args_str = serde_json::to_string_pretty(args).unwrap_or_default();
1140
1141 if !std::io::stdin().is_terminal() {
1143 warn!(
1144 tool = %tool_name,
1145 "stdin is not a TTY — auto-approving tool call (use --require-approval only in interactive mode)"
1146 );
1147 return true;
1148 }
1149
1150 eprintln!("\n⚠️ Tool requires approval:");
1152 eprintln!(" Tool: {}", tool_name);
1153 for line in args_str.lines() {
1154 eprintln!(" {}", line);
1155 }
1156 eprint!(" Approve? [y/N] ");
1157 std::io::stderr().flush().ok();
1158
1159 let mut input = String::new();
1160 match std::io::stdin().read_line(&mut input) {
1161 Ok(_) => {
1162 let trimmed = input.trim().to_lowercase();
1163 trimmed == "y" || trimmed == "yes"
1164 }
1165 Err(e) => {
1166 warn!(error = %e, "Failed to read approval input — denying by default");
1167 false
1168 }
1169 }
1170}
1171
1172#[cfg(test)]
1175async fn prompt_for_approval_with_input(
1176 tool_name: &str,
1177 args: &serde_json::Value,
1178 input: &str,
1179) -> bool {
1180 use std::io::Write;
1181
1182 let args_str = serde_json::to_string_pretty(args).unwrap_or_default();
1183
1184 eprintln!("\n⚠️ Tool requires approval:");
1185 eprintln!(" Tool: {}", tool_name);
1186 for line in args_str.lines() {
1187 eprintln!(" {}", line);
1188 }
1189 eprint!(" Approve? [y/N] ");
1190 std::io::stderr().flush().ok();
1191
1192 let trimmed = input.trim().to_lowercase();
1193 trimmed == "y" || trimmed == "yes"
1194}
1195
1196async fn execute_parsed_tool_call(
1205 tool_name: String,
1206 args: serde_json::Value,
1207 registry: &ToolRegistry,
1208 policy_engine: &PolicyEngine,
1209 _sandbox: &Sandbox,
1210 audit_log: &AuditLog,
1211 require_approval: bool,
1212) -> Option<ToolResult> {
1213 info!(tool = %tool_name, "Executing parsed tool call");
1214
1215 let _ = audit_log.tool_call(&tool_name, &args);
1217
1218 if require_approval && policy_engine.requires_approval(&tool_name) {
1220 let _ = audit_log.append(
1221 AuditEventType::ApprovalRequested,
1222 "approval",
1223 &format!("Approval required for tool: {}", tool_name),
1224 Some(serde_json::json!({"tool": tool_name, "args": args})),
1225 );
1226
1227 let granted = prompt_for_approval(&tool_name, &args).await;
1229
1230 if !granted {
1231 let _ = audit_log.approval(&tool_name, false, Some("Denied by user"));
1232 warn!(tool = %tool_name, "Tool call denied by user");
1233 return Some(ToolResult {
1234 tool_name: tool_name.clone(),
1235 success: false,
1236 output: String::new(),
1237 error: Some(format!("Approval denied by user for tool: {}", tool_name)),
1238 exit_code: Some(-1),
1239 duration_ms: None,
1240 });
1241 }
1242
1243 let _ = audit_log.approval(&tool_name, true, Some("Approved by user"));
1244 info!(tool = %tool_name, "Tool call approved by user");
1245 }
1246
1247 let policy_decision = policy_engine.check_tool_call(&tool_name, &args);
1249
1250 match &policy_decision {
1252 Decision::Allow => {
1253 let _ = audit_log.policy_decision(&tool_name, true, None);
1254 }
1255 Decision::Deny(reason) => {
1256 let _ = audit_log.policy_decision(&tool_name, false, Some(reason));
1257 warn!(tool = %tool_name, reason = %reason, "Tool call denied by policy");
1258 return Some(ToolResult {
1259 tool_name: tool_name.clone(),
1260 success: false,
1261 output: String::new(),
1262 error: Some(format!("Policy denied: {}", reason)),
1263 exit_code: Some(-1),
1264 duration_ms: None,
1265 });
1266 }
1267 }
1268
1269 let tool_name_clone = tool_name.clone();
1271 let call = ToolCall {
1272 name: tool_name.clone(),
1273 arguments: args,
1274 id: None,
1275 };
1276
1277 let result = match registry.execute(call).await {
1278 Ok(result) => {
1279 let _ = audit_log.append(
1281 AuditEventType::ToolResult,
1282 &tool_name_clone,
1283 &format!(
1284 "Tool executed: {} (success: {})",
1285 tool_name_clone, result.success
1286 ),
1287 Some(serde_json::json!({
1288 "success": result.success,
1289 "exit_code": result.exit_code,
1290 "duration_ms": result.duration_ms,
1291 })),
1292 );
1293 result
1294 }
1295 Err(e) => {
1296 let _ = audit_log.append(
1298 AuditEventType::Error,
1299 &tool_name_clone,
1300 &format!("Tool execution failed: {}", e),
1301 None,
1302 );
1303 ToolResult {
1304 tool_name: tool_name_clone,
1305 success: false,
1306 output: String::new(),
1307 error: Some(e.to_string()),
1308 exit_code: Some(-1),
1309 duration_ms: None,
1310 }
1311 }
1312 };
1313
1314 Some(result)
1315}
1316
1317async fn execute_tool_call_with_security(
1326 content: &str,
1327 registry: &ToolRegistry,
1328 policy_engine: &PolicyEngine,
1329 _sandbox: &Sandbox,
1330 audit_log: &AuditLog,
1331) -> Option<ToolResult> {
1332 let (tool_name, args) = parse_tool_call(content)?;
1334
1335 execute_parsed_tool_call(
1337 tool_name,
1338 args,
1339 registry,
1340 policy_engine,
1341 _sandbox,
1342 audit_log,
1343 false, )
1345 .await
1346}
1347
1348fn parse_structured_tool_call(choice: &Choice) -> Option<(String, serde_json::Value)> {
1352 let tool_calls = choice.tool_calls.as_ref()?;
1353 let first_call = tool_calls.first()?;
1354
1355 let tool_name = first_call.function.name.clone();
1356 let args: serde_json::Value = serde_json::from_str(&first_call.function.arguments).ok()?;
1357
1358 Some((tool_name, args))
1359}
1360
1361fn parse_tool_call(content: &str) -> Option<(String, serde_json::Value)> {
1363 let mut lines = content.lines();
1364 let tool_call_line = lines.find(|l| l.trim().starts_with("TOOL_CALL:"))?;
1365
1366 let tool_name = tool_call_line
1367 .trim()
1368 .strip_prefix("TOOL_CALL:")
1369 .map(|s| s.trim())
1370 .filter(|s| !s.is_empty())?
1371 .to_string();
1372
1373 let args_line = lines.find(|l| l.trim().starts_with("ARGS:"))?;
1375 let args_str = args_line.trim().strip_prefix("ARGS:").map(|s| s.trim())?;
1376
1377 let args: serde_json::Value = serde_json::from_str(args_str).ok()?;
1378
1379 Some((tool_name, args))
1380}
1381
1382pub async fn run_single(
1384 llm: Arc<dyn LLMProviderTrait>,
1385 config: Config,
1386 ravenfabric: Option<RavenFabricClient>,
1387) -> Result<()> {
1388 info!(
1389 "Starting single agent mode with provider: {}",
1390 llm.provider_name()
1391 );
1392
1393 if let Some(ref rf) = ravenfabric {
1395 if rf.is_enabled() {
1396 info!("RavenFabric remote execution available");
1397 match rf.health().await {
1398 Ok(true) => info!("RavenFabric mesh is healthy"),
1399 Ok(false) => warn!("RavenFabric mesh returned unhealthy status"),
1400 Err(e) => warn!(error = %e, "RavenFabric health check failed"),
1401 }
1402 }
1403 }
1404
1405 let system_prompt = &config.llm.system_prompt;
1406
1407 let messages = vec![
1408 ChatMessage::new("system", system_prompt.to_string()),
1409 ChatMessage::new("user", "Ready. Awaiting instructions."),
1410 ];
1411
1412 match llm.chat(messages).await {
1413 Ok(response) => {
1414 if let Some(choice) = response.choices.first() {
1415 info!(provider = llm.provider_name(), model = llm.model(), response = %choice.message.content, "Agent response received");
1416
1417 if let Some(ref rf) = ravenfabric {
1419 if rf.is_enabled() {
1420 let preview = choice.message.content.chars().take(500).collect::<String>();
1421 let _ = rf.broadcast(&preview, 30).await;
1422 info!("Agent result broadcast to RavenFabric mesh");
1423 }
1424 }
1425 }
1426 }
1427 Err(e) => {
1428 warn!(error = %e, provider = llm.provider_name(), "LLM request failed");
1429 }
1430 }
1431
1432 Ok(())
1433}
1434
1435pub async fn run_swarm(
1440 llm: Arc<dyn LLMProviderTrait>,
1441 config: Config,
1442 ravenfabric: Option<RavenFabricClient>,
1443) -> Result<()> {
1444 info!("Starting swarm mode (single-provider) — 3 parallel agents");
1445
1446 if let Some(ref rf) = ravenfabric {
1448 if rf.is_enabled() {
1449 info!("RavenFabric remote execution available for swarm coordination");
1450 match rf.health().await {
1451 Ok(true) => info!("RavenFabric mesh is healthy"),
1452 Ok(false) => warn!("RavenFabric mesh returned unhealthy status"),
1453 Err(e) => warn!(error = %e, "RavenFabric health check failed"),
1454 }
1455 }
1456 }
1457
1458 let _system_prompt = &config.llm.system_prompt;
1459 let num_agents = 3;
1460 let mut handles = Vec::new();
1461
1462 let personas = [
1464 "You are an analytical agent. Focus on logic, structure, and precision.",
1465 "You are a creative agent. Focus on innovation, alternatives, and possibilities.",
1466 "You are a pragmatic agent. Focus on simplicity, efficiency, and practicality.",
1467 ];
1468
1469 for (i, persona) in personas.iter().enumerate().take(num_agents) {
1470 let llm_clone = llm.clone();
1471 let persona = persona.to_string();
1472 let task = "Analyze the given task and provide your solution.".to_string();
1473
1474 let handle = tokio::spawn(async move {
1475 let mut memory = ConversationMemory::new(&persona, 10);
1476 memory.add_user_message(&task);
1477
1478 let messages = memory.history().to_vec();
1479 match llm_clone.chat(messages).await {
1480 Ok(response) => {
1481 let content = response
1482 .choices
1483 .first()
1484 .map(|c| c.message.content.clone())
1485 .unwrap_or_default();
1486 Ok((i, content))
1487 }
1488 Err(e) => Err(format!("Agent {} failed: {}", i, e)),
1489 }
1490 });
1491
1492 handles.push(handle);
1493 }
1494
1495 let mut results: Vec<(usize, String)> = Vec::new();
1497 for handle in handles {
1498 match handle.await {
1499 Ok(Ok((idx, result))) => {
1500 info!("Agent {} completed: {} chars", idx, result.len());
1501 results.push((idx, result));
1502 }
1503 Ok(Err(e)) => warn!("Agent failed: {}", e),
1504 Err(e) => warn!("Agent join failed: {}", e),
1505 }
1506 }
1507
1508 println!("\n🐦⬛ Swarm Results ({} agents):", results.len());
1510 for (idx, result) in &results {
1511 println!(
1512 "\n── Agent {} ({}) ──",
1513 idx + 1,
1514 personas[*idx].split('.').next().unwrap_or("Unknown")
1515 );
1516 println!("{}", result);
1517 }
1518
1519 if let Some(ref rf) = ravenfabric {
1521 if rf.is_enabled() {
1522 let summary = format!(
1523 "Swarm completed: {} agents, results: {}",
1524 results.len(),
1525 results
1526 .iter()
1527 .map(|(i, r)| format!("Agent {}: {} chars", i, r.len()))
1528 .collect::<Vec<_>>()
1529 .join(", ")
1530 );
1531 let _ = rf.broadcast(&summary, 30).await;
1532 info!("Swarm results broadcast to RavenFabric mesh");
1533 }
1534 }
1535
1536 Ok(())
1537}
1538
1539pub async fn run_supervisor(
1544 llm: Arc<dyn LLMProviderTrait>,
1545 config: Config,
1546 ravenfabric: Option<RavenFabricClient>,
1547) -> Result<()> {
1548 info!("Starting supervisor mode (single-provider)");
1549
1550 if let Some(ref rf) = ravenfabric {
1552 if rf.is_enabled() {
1553 info!("RavenFabric remote execution available for supervisor coordination");
1554 match rf.health().await {
1555 Ok(true) => info!("RavenFabric mesh is healthy"),
1556 Ok(false) => warn!("RavenFabric mesh returned unhealthy status"),
1557 Err(e) => warn!(error = %e, "RavenFabric health check failed"),
1558 }
1559 }
1560 }
1561
1562 let system_prompt = &config.llm.system_prompt;
1563 let policy_engine = PolicyEngine::default_secure();
1564 let mut sandbox = Sandbox::default();
1565 sandbox.init().await.map_err(|e| {
1566 crate::error::RavenClawsError::CommandExecution(format!("Sandbox init failed: {}", e))
1567 })?;
1568 let audit_log = AuditLog::new(format!("supervisor-{}", std::process::id()));
1569 let registry = ToolRegistry::with_default_tools();
1570
1571 let supervisor_prompt = format!(
1573 "You are a supervisor agent. Your task is to decompose complex tasks into subtasks \
1574 and coordinate sub-agents to complete them. \
1575 \n\nFor each subtask, respond with:\n\
1576 SUBTASK: <description>\n\
1577 AGENT: <agent_number>\n\
1578 \nWhen all subtasks are complete, respond with:\n\
1579 FINAL: <aggregated result>\n\
1580 \nTask: {}",
1581 "Coordinate the completion of the assigned task."
1582 );
1583
1584 let mut memory = ConversationMemory::new(system_prompt, 20);
1585 memory.add_user_message(&supervisor_prompt);
1586
1587 let mut subtask_results: Vec<String> = Vec::new();
1588 let mut iteration = 0;
1589 let max_iterations = 15;
1590
1591 loop {
1592 iteration += 1;
1593 if iteration > max_iterations {
1594 warn!("Supervisor reached max iterations");
1595 break;
1596 }
1597
1598 let messages = memory.history().to_vec();
1599 let response = match llm.chat(messages).await {
1600 Ok(r) => r,
1601 Err(e) => {
1602 warn!(error = %e, "Supervisor LLM request failed");
1603 continue;
1604 }
1605 };
1606
1607 let content = response
1608 .choices
1609 .first()
1610 .map(|c| c.message.content.clone())
1611 .unwrap_or_default();
1612
1613 if content.contains("FINAL:") {
1615 let final_response = content
1616 .split("FINAL:")
1617 .nth(1)
1618 .unwrap_or("")
1619 .trim()
1620 .to_string();
1621 info!("Supervisor completed task: {} chars", final_response.len());
1622
1623 let _ = audit_log.append(
1624 AuditEventType::AgentFinish,
1625 "supervisor",
1626 "Supervisor completed task coordination",
1627 Some(serde_json::json!({
1628 "iterations": iteration,
1629 "subtasks_completed": subtask_results.len(),
1630 })),
1631 );
1632
1633 println!("\n🐦⬛ Supervisor Result:\n{}", final_response);
1634 return Ok(());
1635 }
1636
1637 if content.contains("SUBTASK:") {
1639 let subtask_block = content.split("SUBTASK:").nth(1).unwrap_or("");
1640 let subtask_lines: Vec<&str> = subtask_block.lines().take(3).collect();
1641
1642 let subtask_desc = subtask_lines.first().unwrap_or(&"").trim();
1643 let agent_num = subtask_lines
1644 .iter()
1645 .find(|l| l.starts_with("AGENT:"))
1646 .and_then(|l| l.split(':').nth(1))
1647 .unwrap_or("1")
1648 .trim();
1649
1650 if !subtask_desc.is_empty() {
1651 info!("Subtask {}: {}", agent_num, subtask_desc);
1652
1653 let subtask_result = run_subtask_agent(
1655 llm.clone(),
1656 subtask_desc,
1657 system_prompt,
1658 &policy_engine,
1659 &sandbox,
1660 &audit_log,
1661 ®istry,
1662 )
1663 .await;
1664
1665 match subtask_result {
1666 Ok(result) => {
1667 info!("Subtask {} completed: {} chars", agent_num, result.len());
1668 subtask_results.push(format!("Agent {} result: {}", agent_num, result));
1669
1670 memory.add_assistant_message(&format!(
1671 "Decomposed subtask {}: {}",
1672 agent_num, subtask_desc
1673 ));
1674 memory
1675 .add_user_message(&format!("Subtask {} result: {}", agent_num, result));
1676 }
1677 Err(e) => {
1678 warn!("Subtask {} failed: {}", agent_num, e);
1679 memory
1680 .add_assistant_message(&format!("Subtask {} failed: {}", agent_num, e));
1681 }
1682 }
1683 }
1684 } else {
1685 memory.add_assistant_message(&content);
1686 }
1687 }
1688
1689 if !subtask_results.is_empty() {
1691 let aggregated = subtask_results.join("\n\n");
1692 info!(
1693 "Supervisor aggregated {} subtask results",
1694 subtask_results.len()
1695 );
1696
1697 if let Some(ref rf) = ravenfabric {
1699 if rf.is_enabled() {
1700 let summary = format!(
1701 "Supervisor completed: {} subtasks, result: {} chars",
1702 subtask_results.len(),
1703 aggregated.len()
1704 );
1705 let _ = rf.broadcast(&summary, 30).await;
1706 info!("Supervisor result broadcast to RavenFabric mesh");
1707 }
1708 }
1709
1710 println!("\n🐦⬛ Supervisor Aggregated Result:\n{}", aggregated);
1711 return Ok(());
1712 }
1713
1714 Err(crate::error::RavenClawsError::CommandExecution(
1715 "Supervisor mode completed without results".to_string(),
1716 ))
1717}
1718
1719async fn run_subtask_agent(
1721 llm: Arc<dyn LLMProviderTrait>,
1722 subtask: &str,
1723 system_prompt: &str,
1724 policy_engine: &PolicyEngine,
1725 sandbox: &Sandbox,
1726 audit_log: &AuditLog,
1727 registry: &ToolRegistry,
1728) -> Result<String> {
1729 let mut memory = ConversationMemory::new(system_prompt, 10);
1730 memory.add_user_message(&format!("Execute this subtask: {}", subtask));
1731
1732 for i in 0..5 {
1733 let messages = memory.history().to_vec();
1734 let response = match llm.chat(messages).await {
1735 Ok(r) => r,
1736 Err(e) => {
1737 warn!(error = %e, iteration = i, "Subtask agent LLM failed");
1738 continue;
1739 }
1740 };
1741
1742 let content = response
1743 .choices
1744 .first()
1745 .map(|c| c.message.content.clone())
1746 .unwrap_or_default();
1747
1748 if content.contains("FINAL:") || content.contains("DONE:") {
1749 return Ok(content
1750 .replace("FINAL:", "")
1751 .replace("DONE:", "")
1752 .trim()
1753 .to_string());
1754 }
1755
1756 if let Some(tool_result) =
1758 execute_tool_call_with_security(&content, registry, policy_engine, sandbox, audit_log)
1759 .await
1760 {
1761 memory.add_assistant_message(&content);
1762 memory.add_user_message(&format!("Tool result: {}", tool_result.output));
1763 } else {
1764 memory.add_assistant_message(&content);
1765 memory.add_user_message("Continue with next step.");
1766 }
1767 }
1768
1769 Ok("Subtask completed".to_string())
1770}
1771
1772pub async fn run_single_multi(
1774 multi_llm: MultiModelManager,
1775 config: Config,
1776 ravenfabric: Option<RavenFabricClient>,
1777) -> Result<()> {
1778 info!(
1779 "Starting single agent mode (multi-model) with {} providers",
1780 multi_llm.client_count()
1781 );
1782
1783 if let Some(ref rf) = ravenfabric {
1785 if rf.is_enabled() {
1786 info!("RavenFabric remote execution available");
1787 match rf.health().await {
1788 Ok(true) => info!("RavenFabric mesh is healthy"),
1789 Ok(false) => warn!("RavenFabric mesh returned unhealthy status"),
1790 Err(e) => warn!(error = %e, "RavenFabric health check failed"),
1791 }
1792 }
1793 }
1794
1795 let system_prompt = &config.llm.system_prompt;
1796
1797 let messages = vec![
1798 ChatMessage::new("system", system_prompt.to_string()),
1799 ChatMessage::new("user", "Ready. Awaiting instructions."),
1800 ];
1801
1802 let mut last_index = 0;
1804 for i in 0..multi_llm.client_count() {
1805 let client = if i == 0 {
1806 multi_llm.get_client(0)
1807 } else {
1808 multi_llm.next_client(last_index)
1809 };
1810
1811 if let Some(client) = client {
1812 match client.chat(messages.clone()).await {
1813 Ok(response) => {
1814 if let Some(choice) = response.choices.first() {
1815 info!(provider = client.provider_name(), model = client.model(), response = %choice.message.content, "Provider response received");
1816 }
1817 }
1818 Err(e) => {
1819 warn!(error = %e, provider = client.provider_name(), model = client.model(), "Provider request failed");
1820 }
1821 }
1822 last_index = i;
1823 }
1824 }
1825
1826 if let Some(ref rf) = ravenfabric {
1828 if rf.is_enabled() {
1829 let _ = rf
1830 .broadcast("Single agent (multi-model) completed", 30)
1831 .await;
1832 info!("Multi-model result broadcast to RavenFabric mesh");
1833 }
1834 }
1835
1836 Ok(())
1837}
1838
1839pub async fn run_swarm_multi(
1844 multi_llm: MultiModelManager,
1845 config: Config,
1846 ravenfabric: Option<RavenFabricClient>,
1847) -> Result<()> {
1848 info!(
1849 "Starting swarm mode (multi-model) — {} parallel agents",
1850 multi_llm.client_count()
1851 );
1852
1853 if let Some(ref rf) = ravenfabric {
1855 if rf.is_enabled() {
1856 info!("RavenFabric remote execution available for swarm coordination");
1857 match rf.health().await {
1858 Ok(true) => info!("RavenFabric mesh is healthy"),
1859 Ok(false) => warn!("RavenFabric mesh returned unhealthy status"),
1860 Err(e) => warn!(error = %e, "RavenFabric health check failed"),
1861 }
1862 }
1863 }
1864
1865 let _system_prompt = &config.llm.system_prompt;
1866 let num_agents = multi_llm.client_count().min(3); let mut handles = Vec::new();
1868
1869 let personas = [
1871 "You are an analytical agent. Focus on logic, structure, and precision.",
1872 "You are a creative agent. Focus on innovation, alternatives, and possibilities.",
1873 "You are a pragmatic agent. Focus on simplicity, efficiency, and practicality.",
1874 ];
1875
1876 for i in 0..num_agents {
1877 let client = multi_llm.get_client(i).unwrap().clone();
1878 let persona = personas.get(i).unwrap_or(&personas[0]).to_string();
1879 let task = "Analyze the given task and provide your solution.".to_string();
1880
1881 let handle = tokio::spawn(async move {
1882 let mut memory = ConversationMemory::new(&persona, 10);
1883 memory.add_user_message(&task);
1884
1885 let messages = memory.history().to_vec();
1886 match client.chat(messages).await {
1887 Ok(response) => {
1888 let content = response
1889 .choices
1890 .first()
1891 .map(|c| c.message.content.clone())
1892 .unwrap_or_default();
1893 Ok((
1894 i,
1895 client.provider_name().to_string(),
1896 client.model().to_string(),
1897 content,
1898 ))
1899 }
1900 Err(e) => Err(format!("Agent {} failed: {}", i, e)),
1901 }
1902 });
1903
1904 handles.push(handle);
1905 }
1906
1907 let mut results: Vec<(usize, String, String, String)> = Vec::new();
1909 for handle in handles {
1910 match handle.await {
1911 Ok(Ok((idx, provider, model, result))) => {
1912 info!(
1913 "Agent {} ({}:{}) completed: {} chars",
1914 idx,
1915 provider,
1916 model,
1917 result.len()
1918 );
1919 results.push((idx, provider, model, result));
1920 }
1921 Ok(Err(e)) => warn!("Agent failed: {}", e),
1922 Err(e) => warn!("Agent join failed: {}", e),
1923 }
1924 }
1925
1926 println!(
1928 "\n🐦⬛ Swarm Results ({} agents, multi-model):",
1929 results.len()
1930 );
1931 for (idx, provider, model, result) in &results {
1932 println!("\n── Agent {} ({}:{}) ──", idx + 1, provider, model);
1933 println!("{}", result);
1934 }
1935
1936 if let Some(ref rf) = ravenfabric {
1938 if rf.is_enabled() {
1939 let summary = format!("Multi-model swarm completed: {} agents", results.len());
1940 let _ = rf.broadcast(&summary, 30).await;
1941 info!("Multi-model swarm results broadcast to RavenFabric mesh");
1942 }
1943 }
1944
1945 Ok(())
1946}
1947
1948pub async fn run_supervisor_multi(
1953 multi_llm: MultiModelManager,
1954 config: Config,
1955 ravenfabric: Option<RavenFabricClient>,
1956) -> Result<()> {
1957 info!(
1958 "Starting supervisor mode (multi-model) with {} providers",
1959 multi_llm.client_count()
1960 );
1961
1962 if let Some(ref rf) = ravenfabric {
1964 if rf.is_enabled() {
1965 info!("RavenFabric remote execution available for supervisor coordination");
1966 match rf.health().await {
1967 Ok(true) => info!("RavenFabric mesh is healthy"),
1968 Ok(false) => warn!("RavenFabric mesh returned unhealthy status"),
1969 Err(e) => warn!(error = %e, "RavenFabric health check failed"),
1970 }
1971 }
1972 }
1973
1974 let system_prompt = &config.llm.system_prompt;
1975 let policy_engine = PolicyEngine::default_secure();
1976 let mut sandbox = Sandbox::default();
1977 sandbox.init().await.map_err(|e| {
1978 crate::error::RavenClawsError::CommandExecution(format!("Sandbox init failed: {}", e))
1979 })?;
1980 let audit_log = AuditLog::new(format!("supervisor-multi-{}", std::process::id()));
1981 let registry = ToolRegistry::with_default_tools();
1982
1983 let supervisor_prompt = format!(
1985 "You are a supervisor agent coordinating multiple LLM providers. \
1986 Decompose tasks and assign them to appropriate providers based on their strengths. \
1987 \n\nFor each subtask, respond with:\n\
1988 SUBTASK: <description>\n\
1989 PROVIDER: <provider_index 0-{}>\n\
1990 \nWhen complete, respond with:\n\
1991 FINAL: <aggregated result>\n\
1992 \nTask: {}",
1993 multi_llm.client_count() - 1,
1994 "Coordinate the completion of the assigned task using available providers."
1995 );
1996
1997 let mut memory = ConversationMemory::new(system_prompt, 20);
1998 memory.add_user_message(&supervisor_prompt);
1999
2000 let mut subtask_results: Vec<String> = Vec::new();
2001 let mut iteration = 0;
2002 let max_iterations = 15;
2003
2004 loop {
2005 iteration += 1;
2006 if iteration > max_iterations {
2007 warn!("Supervisor reached max iterations");
2008 break;
2009 }
2010
2011 let supervisor_client = multi_llm
2013 .get_client(iteration % multi_llm.client_count())
2014 .or_else(|| multi_llm.get_client(0))
2015 .cloned();
2016
2017 let messages = memory.history().to_vec();
2018 let response =
2019 match supervisor_client.map(|c| tokio::spawn(async move { c.chat(messages).await })) {
2020 Some(handle) => match handle.await {
2021 Ok(Ok(r)) => r,
2022 Ok(Err(e)) => {
2023 warn!(error = %e, "Supervisor LLM request failed");
2024 continue;
2025 }
2026 Err(e) => {
2027 warn!(error = %e, "Supervisor task join failed");
2028 continue;
2029 }
2030 },
2031 None => {
2032 warn!("No LLM clients available");
2033 break;
2034 }
2035 };
2036
2037 let content = response
2038 .choices
2039 .first()
2040 .map(|c| c.message.content.clone())
2041 .unwrap_or_default();
2042
2043 if content.contains("FINAL:") {
2045 let final_response = content
2046 .split("FINAL:")
2047 .nth(1)
2048 .unwrap_or("")
2049 .trim()
2050 .to_string();
2051 info!("Supervisor completed task: {} chars", final_response.len());
2052
2053 let _ = audit_log.append(
2054 AuditEventType::AgentFinish,
2055 "supervisor",
2056 "Supervisor completed task coordination",
2057 Some(serde_json::json!({
2058 "iterations": iteration,
2059 "subtasks_completed": subtask_results.len(),
2060 "providers_used": multi_llm.client_count(),
2061 })),
2062 );
2063
2064 println!("\n🐦⬛ Supervisor Result (multi-model):\n{}", final_response);
2065 return Ok(());
2066 }
2067
2068 if content.contains("SUBTASK:") && content.contains("PROVIDER:") {
2070 let subtask_block = content.split("SUBTASK:").nth(1).unwrap_or("");
2071 let subtask_lines: Vec<&str> = subtask_block.lines().take(4).collect();
2072
2073 let subtask_desc = subtask_lines.first().unwrap_or(&"").trim();
2074 let provider_idx = subtask_lines
2075 .iter()
2076 .find(|l| l.starts_with("PROVIDER:"))
2077 .and_then(|l| l.split(':').nth(1))
2078 .and_then(|s| s.trim().parse::<usize>().ok())
2079 .unwrap_or(0);
2080
2081 if !subtask_desc.is_empty() {
2082 info!("Subtask for provider {}: {}", provider_idx, subtask_desc);
2083
2084 let client = multi_llm
2085 .get_client(provider_idx)
2086 .or_else(|| multi_llm.get_client(0));
2087
2088 if let Some(client) = client {
2089 let subtask_result = run_subtask_agent(
2090 client.clone(),
2091 subtask_desc,
2092 system_prompt,
2093 &policy_engine,
2094 &sandbox,
2095 &audit_log,
2096 ®istry,
2097 )
2098 .await;
2099
2100 match subtask_result {
2101 Ok(result) => {
2102 info!("Subtask {} completed: {} chars", provider_idx, result.len());
2103 subtask_results.push(format!(
2104 "Provider {} ({}): {}",
2105 provider_idx,
2106 client.provider_name(),
2107 result
2108 ));
2109
2110 memory.add_assistant_message(&format!(
2111 "Assigned subtask to provider {}: {}",
2112 provider_idx, subtask_desc
2113 ));
2114 memory.add_user_message(&format!(
2115 "Provider {} result: {}",
2116 provider_idx, result
2117 ));
2118 }
2119 Err(e) => {
2120 warn!("Subtask {} failed: {}", provider_idx, e);
2121 memory.add_assistant_message(&format!(
2122 "Provider {} subtask failed: {}",
2123 provider_idx, e
2124 ));
2125 }
2126 }
2127 }
2128 }
2129 } else {
2130 memory.add_assistant_message(&content);
2131 }
2132 }
2133
2134 if !subtask_results.is_empty() {
2136 let aggregated = subtask_results.join("\n\n");
2137 info!(
2138 "Supervisor aggregated {} subtask results",
2139 subtask_results.len()
2140 );
2141
2142 if let Some(ref rf) = ravenfabric {
2144 if rf.is_enabled() {
2145 let summary = format!(
2146 "Multi-model supervisor completed: {} subtasks, result: {} chars",
2147 subtask_results.len(),
2148 aggregated.len()
2149 );
2150 let _ = rf.broadcast(&summary, 30).await;
2151 info!("Multi-model supervisor result broadcast to RavenFabric mesh");
2152 }
2153 }
2154
2155 println!(
2156 "\n🐦⬛ Supervisor Aggregated Result (multi-model):\n{}",
2157 aggregated
2158 );
2159 return Ok(());
2160 }
2161
2162 Err(crate::error::RavenClawsError::CommandExecution(
2163 "Supervisor mode completed without results".to_string(),
2164 ))
2165}
2166
2167pub async fn run_repl(llm: Arc<dyn LLMProviderTrait>, config: Config) -> Result<()> {
2169 use tokio::io::{AsyncBufReadExt, BufReader};
2170
2171 info!("Starting interactive REPL mode");
2172
2173 let system_prompt = &config.llm.system_prompt;
2174 let mut memory = ConversationMemory::new(system_prompt, 0);
2175
2176 let stdin = BufReader::new(tokio::io::stdin());
2177 let mut lines = stdin.lines();
2178
2179 println!("RavenClaws REPL — type /exit to quit, /reset to clear history");
2180
2181 loop {
2182 print!("\n> ");
2183 use tokio::io::AsyncWriteExt;
2184 tokio::io::stdout().flush().await?;
2185
2186 let line = match lines.next_line().await {
2187 Ok(Some(l)) => l,
2188 Ok(None) => break, Err(e) => {
2190 warn!(error = %e, "REPL read error");
2191 break;
2192 }
2193 };
2194
2195 let input = line.trim();
2196
2197 if input.is_empty() {
2198 continue;
2199 }
2200
2201 match input {
2202 "/exit" | "/quit" => {
2203 println!("Exiting REPL.");
2204 break;
2205 }
2206 "/reset" => {
2207 memory = ConversationMemory::new(system_prompt, 0);
2208 println!("Conversation history reset.");
2209 continue;
2210 }
2211 _ => {}
2212 }
2213
2214 memory.add_user_message(input);
2215 let messages = memory.history().to_vec();
2216
2217 match llm.chat(messages).await {
2218 Ok(response) => {
2219 if let Some(choice) = response.choices.first() {
2220 let content = &choice.message.content;
2221 println!("{}", content);
2222 memory.add_assistant_message(content);
2223 }
2224 }
2225 Err(e) => {
2226 warn!(error = %e, "LLM request failed");
2227 println!("Error: {}", e);
2228 }
2229 }
2230 }
2231
2232 Ok(())
2233}
2234
2235#[cfg(test)]
2236mod tests {
2237 use super::*;
2238
2239 #[test]
2240 fn test_swarm_function_exists() {
2241 let _fn_ptr: fn(Arc<dyn LLMProviderTrait>, Config, Option<RavenFabricClient>) -> _ =
2243 run_swarm;
2244 }
2245
2246 #[test]
2247 fn test_supervisor_function_exists() {
2248 let _fn_ptr: fn(Arc<dyn LLMProviderTrait>, Config, Option<RavenFabricClient>) -> _ =
2250 run_supervisor;
2251 }
2252
2253 #[test]
2254 fn test_conversation_memory_new() {
2255 let mem = ConversationMemory::new("system prompt", 10);
2256 assert_eq!(mem.messages.len(), 1);
2257 assert_eq!(mem.messages[0].role, "system");
2258 assert_eq!(mem.messages[0].content, "system prompt");
2259 }
2260
2261 #[test]
2262 fn test_conversation_memory_add_user() {
2263 let mut mem = ConversationMemory::new("system", 10);
2264 mem.add_user_message("hello");
2265 assert_eq!(mem.messages.len(), 2);
2266 assert_eq!(mem.messages[1].role, "user");
2267 assert_eq!(mem.messages[1].content, "hello");
2268 }
2269
2270 #[test]
2271 fn test_conversation_memory_trim() {
2272 let mut mem = ConversationMemory::new("system", 3);
2273 mem.add_user_message("msg1");
2274 mem.add_assistant_message("resp1");
2275 mem.add_user_message("msg2");
2276 mem.add_assistant_message("resp2");
2277 assert!(mem.messages.len() <= 3);
2279 }
2280
2281 #[test]
2282 fn test_parse_tool_call_valid() {
2283 let content = "THOUGHT: I need to run a command\nTOOL_CALL: shell_exec\nARGS: {\"command\": \"echo hello\"}";
2284 let (name, args) = parse_tool_call(content).unwrap();
2285 assert_eq!(name, "shell_exec");
2286 assert_eq!(args["command"], "echo hello");
2287 }
2288
2289 #[test]
2290 fn test_parse_tool_call_missing_tool() {
2291 let content = "THOUGHT: no tool here";
2292 assert!(parse_tool_call(content).is_none());
2293 }
2294
2295 #[test]
2296 fn test_parse_tool_call_missing_args() {
2297 let content = "TOOL_CALL: shell_exec\nNo args line";
2298 assert!(parse_tool_call(content).is_none());
2299 }
2300
2301 #[test]
2302 fn test_parse_tool_call_invalid_json() {
2303 let content = "TOOL_CALL: shell_exec\nARGS: not valid json";
2304 assert!(parse_tool_call(content).is_none());
2305 }
2306
2307 #[test]
2308 fn test_agent_loop_config_default() {
2309 let config = AgentLoopConfig::default();
2310 assert_eq!(config.max_iterations, 10);
2311 assert!(!config.enable_tools);
2312 assert!(!config.require_approval);
2313 }
2314
2315 #[test]
2316 fn test_agent_loop_config_require_approval() {
2317 let config = AgentLoopConfig {
2318 max_iterations: 5,
2319 enable_tools: true,
2320 require_approval: true,
2321 prompt_injection_protection: true,
2322 token_lifetime_secs: 0,
2323 no_final_required: false,
2324 fallback_chain: None,
2325 token_budget: None,
2326 ravenfabric: None,
2327 checkpoint_dir: None,
2328 session_id: None,
2329 metrics_callback: None,
2330 load_manager: None,
2331 };
2332 assert_eq!(config.max_iterations, 5);
2333 assert!(config.enable_tools);
2334 assert!(config.require_approval);
2335 assert!(config.prompt_injection_protection);
2336 assert_eq!(config.token_lifetime_secs, 0);
2337 }
2338
2339 #[test]
2340 fn test_prompt_for_approval_yes() {
2341 let args = serde_json::json!({"command": "echo hello"});
2342 let result = tokio_test::block_on(prompt_for_approval_with_input("shell_exec", &args, "y"));
2343 assert!(result, "Should approve for 'y'");
2344 }
2345
2346 #[test]
2347 fn test_prompt_for_approval_yes_full() {
2348 let args = serde_json::json!({"command": "echo hello"});
2349 let result =
2350 tokio_test::block_on(prompt_for_approval_with_input("shell_exec", &args, "yes"));
2351 assert!(result, "Should approve for 'yes'");
2352 }
2353
2354 #[test]
2355 fn test_prompt_for_approval_no() {
2356 let args = serde_json::json!({"command": "echo hello"});
2357 let result = tokio_test::block_on(prompt_for_approval_with_input("shell_exec", &args, "n"));
2358 assert!(!result, "Should deny for 'n'");
2359 }
2360
2361 #[test]
2362 fn test_prompt_for_approval_no_full() {
2363 let args = serde_json::json!({"command": "echo hello"});
2364 let result =
2365 tokio_test::block_on(prompt_for_approval_with_input("shell_exec", &args, "no"));
2366 assert!(!result, "Should deny for 'no'");
2367 }
2368
2369 #[test]
2370 fn test_prompt_for_approval_empty() {
2371 let args = serde_json::json!({"command": "echo hello"});
2372 let result = tokio_test::block_on(prompt_for_approval_with_input("shell_exec", &args, ""));
2373 assert!(!result, "Should deny for empty input (default N)");
2374 }
2375
2376 #[test]
2377 fn test_prompt_for_approval_uppercase() {
2378 let args = serde_json::json!({"command": "echo hello"});
2379 let result = tokio_test::block_on(prompt_for_approval_with_input("shell_exec", &args, "Y"));
2380 assert!(result, "Should approve for uppercase 'Y'");
2381 }
2382
2383 #[test]
2384 fn test_prompt_for_approval_auto_approves_non_tty() {
2385 #[allow(clippy::let_underscore_future)]
2391 let _ = prompt_for_approval_with_input("test", &serde_json::json!({}), "y");
2392 }
2393
2394 #[test]
2395 fn test_execute_parsed_tool_call_skips_approval_when_not_required() {
2396 let registry = ToolRegistry::with_default_tools();
2397 let policy_engine = PolicyEngine::default_secure();
2398 let sandbox = Sandbox::default();
2399 let audit_log = AuditLog::new("test-session".to_string());
2400
2401 let args = serde_json::json!({"command": "echo hello"});
2402 let result = tokio_test::block_on(execute_parsed_tool_call(
2403 "shell_exec".to_string(),
2404 args,
2405 ®istry,
2406 &policy_engine,
2407 &sandbox,
2408 &audit_log,
2409 false, ));
2411
2412 assert!(result.is_some());
2413 let tool_result = result.unwrap();
2414 assert_eq!(tool_result.tool_name, "shell_exec");
2415 }
2416
2417 #[test]
2418 fn test_execute_parsed_tool_call_approval_not_needed_for_read_only_tools() {
2419 let registry = ToolRegistry::with_default_tools();
2422 let policy_engine = PolicyEngine::default_secure();
2423 let sandbox = Sandbox::default();
2424 let audit_log = AuditLog::new("test-session".to_string());
2425
2426 let args = serde_json::json!({"path": "/tmp/test.txt"});
2427 let result = tokio_test::block_on(execute_parsed_tool_call(
2428 "read_file".to_string(),
2429 args,
2430 ®istry,
2431 &policy_engine,
2432 &sandbox,
2433 &audit_log,
2434 true, ));
2436
2437 assert!(result.is_some());
2439 let tool_result = result.unwrap();
2440 assert_eq!(tool_result.tool_name, "read_file");
2441 }
2442
2443 #[test]
2444 fn test_agent_loop_config_token_lifetime_zero_disabled() {
2445 let config = AgentLoopConfig {
2446 max_iterations: 10,
2447 enable_tools: false,
2448 require_approval: false,
2449 prompt_injection_protection: false,
2450 token_lifetime_secs: 0,
2451 no_final_required: false,
2452 fallback_chain: None,
2453 token_budget: None,
2454 ravenfabric: None,
2455 checkpoint_dir: None,
2456 session_id: None,
2457 metrics_callback: None,
2458 load_manager: None,
2459 };
2460 assert_eq!(config.token_lifetime_secs, 0);
2461 }
2463
2464 #[test]
2465 fn test_agent_loop_config_token_lifetime_nonzero() {
2466 let config = AgentLoopConfig {
2467 max_iterations: 10,
2468 enable_tools: false,
2469 require_approval: false,
2470 prompt_injection_protection: false,
2471 token_lifetime_secs: 3600,
2472 no_final_required: false,
2473 fallback_chain: None,
2474 token_budget: None,
2475 ravenfabric: None,
2476 checkpoint_dir: None,
2477 session_id: None,
2478 metrics_callback: None,
2479 load_manager: None,
2480 };
2481 assert_eq!(config.token_lifetime_secs, 3600);
2482 }
2483
2484 #[test]
2485 fn test_agent_loop_config_default_includes_token_lifetime() {
2486 let config = AgentLoopConfig::default();
2487 assert_eq!(config.token_lifetime_secs, 0);
2488 }
2489}