1use crate::agent::ConversationMemory;
39use crate::audit::{AuditEventType, AuditLog};
40use crate::error::{RavenClawsError, Result};
41use crate::llm::{ChatMessage, LLMProviderTrait, MultiModelManager};
42use crate::policy::PolicyEngine;
43use crate::ravenfabric::RavenFabricClient;
44use crate::sandbox::Sandbox;
45use crate::tools::ToolRegistry;
46use serde::{Deserialize, Serialize};
47use std::collections::HashMap;
48use std::sync::Arc;
49use tokio::sync::RwLock;
50use tracing::{info, instrument, warn};
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct AgentMessage {
62 pub id: String,
64
65 pub sender: String,
67
68 pub recipient: String,
70
71 pub msg_type: MessageType,
73
74 pub content: String,
76
77 pub timestamp: String,
79
80 #[serde(default)]
82 pub metadata: HashMap<String, String>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
87pub enum MessageType {
88 Information,
90 Question,
92 Result,
94 Error,
96 Coordination,
98 Generic,
100}
101
102impl std::fmt::Display for MessageType {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 match self {
105 MessageType::Information => write!(f, "information"),
106 MessageType::Question => write!(f, "question"),
107 MessageType::Result => write!(f, "result"),
108 MessageType::Error => write!(f, "error"),
109 MessageType::Coordination => write!(f, "coordination"),
110 MessageType::Generic => write!(f, "generic"),
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
121pub struct AgentMessageBus {
122 messages: Vec<AgentMessage>,
124 max_messages: usize,
126}
127
128#[allow(dead_code)]
129impl AgentMessageBus {
130 pub fn new(max_messages: usize) -> Self {
132 Self {
133 messages: Vec::new(),
134 max_messages,
135 }
136 }
137
138 pub fn send(
140 &mut self,
141 sender: &str,
142 recipient: &str,
143 msg_type: MessageType,
144 content: &str,
145 metadata: HashMap<String, String>,
146 ) -> String {
147 let id = uuid::Uuid::new_v4().to_string();
148 let timestamp = chrono::Utc::now().to_rfc3339();
149
150 let msg = AgentMessage {
151 id: id.clone(),
152 sender: sender.to_string(),
153 recipient: recipient.to_string(),
154 msg_type,
155 content: content.to_string(),
156 timestamp,
157 metadata,
158 };
159
160 self.messages.push(msg);
161
162 if self.max_messages > 0 && self.messages.len() > self.max_messages {
164 self.messages.remove(0);
165 }
166
167 id
168 }
169
170 pub fn messages_for(&self, role: &str) -> Vec<&AgentMessage> {
172 self.messages
173 .iter()
174 .filter(|m| m.recipient == role || m.recipient == "*")
175 .collect()
176 }
177
178 pub fn messages_from(&self, sender: &str) -> Vec<&AgentMessage> {
180 self.messages
181 .iter()
182 .filter(|m| m.sender == sender)
183 .collect()
184 }
185
186 pub fn messages_of_type(&self, msg_type: &MessageType) -> Vec<&AgentMessage> {
188 self.messages
189 .iter()
190 .filter(|m| m.msg_type == *msg_type)
191 .collect()
192 }
193
194 pub fn all_messages(&self) -> &[AgentMessage] {
196 &self.messages
197 }
198
199 pub fn len(&self) -> usize {
201 self.messages.len()
202 }
203
204 pub fn is_empty(&self) -> bool {
206 self.messages.is_empty()
207 }
208
209 pub fn format_for_prompt(&self, role: &str, max_messages: usize) -> String {
213 let relevant: Vec<&AgentMessage> = self
214 .messages
215 .iter()
216 .filter(|m| m.recipient == role || m.recipient == "*" || m.sender == role)
217 .rev()
218 .take(max_messages)
219 .collect();
220
221 if relevant.is_empty() {
222 return String::new();
223 }
224
225 let mut output = String::from("\n\n--- Inter-Agent Messages ---\n");
226 for msg in relevant.iter().rev() {
227 output.push_str(&format!(
228 "[{}] {} → {} ({}): {}\n",
229 msg.msg_type, msg.sender, msg.recipient, msg.timestamp, msg.content
230 ));
231 }
232 output.push_str("--- End Inter-Agent Messages ---\n");
233 output
234 }
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
243pub enum WorkerHealthStatus {
244 Healthy,
246 Degraded,
248 Unhealthy,
250 Dead,
252}
253
254impl std::fmt::Display for WorkerHealthStatus {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 match self {
257 WorkerHealthStatus::Healthy => write!(f, "healthy"),
258 WorkerHealthStatus::Degraded => write!(f, "degraded"),
259 WorkerHealthStatus::Unhealthy => write!(f, "unhealthy"),
260 WorkerHealthStatus::Dead => write!(f, "dead"),
261 }
262 }
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct WorkerTelemetry {
268 pub role: String,
270 pub status: WorkerHealthStatus,
272 pub tasks_completed: u64,
274 pub tasks_failed: u64,
276 pub error_count: u64,
278 pub avg_duration_ms: f64,
280 pub last_heartbeat: String,
282 pub spawned_at: String,
284 pub messages_sent: u64,
286 pub messages_received: u64,
288 pub iteration: u64,
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize)]
294pub struct SwarmMetrics {
295 pub total_workers: usize,
297 pub healthy_workers: usize,
299 pub degraded_workers: usize,
301 pub unhealthy_workers: usize,
303 pub dead_workers: usize,
305 pub total_tasks_completed: u64,
307 pub total_tasks_failed: u64,
309 pub total_errors: u64,
311 pub overall_avg_duration_ms: f64,
313 pub task_throughput: f64,
315 pub communication_latency_ms: f64,
317 pub worker_utilization: f64,
319 pub error_rate: f64,
321 pub timestamp: String,
323}
324
325#[derive(Debug, Clone)]
327struct WorkerHeartbeat {
328 role: String,
330 spawned_at: chrono::DateTime<chrono::Utc>,
332 last_heartbeat: chrono::DateTime<chrono::Utc>,
334 missed_beats: u32,
336 status: WorkerHealthStatus,
338 tasks_completed: u64,
340 tasks_failed: u64,
342 error_count: u64,
344 total_duration_ms: f64,
346 duration_samples: u64,
348 messages_sent: u64,
350 messages_received: u64,
352 iteration: u64,
354 is_busy: bool,
356 task_started_at: Option<chrono::DateTime<chrono::Utc>>,
358}
359
360#[derive(Debug, Clone)]
380pub struct SwarmHealthMonitor {
381 heartbeats: HashMap<String, WorkerHeartbeat>,
383 heartbeat_interval_secs: u64,
385 max_missed_beats: u32,
387 replacement_timeout_secs: u64,
389 started_at: chrono::DateTime<chrono::Utc>,
391 total_messages_sent: u64,
393 total_messages_received: u64,
395 total_duration_ms: f64,
397 total_tasks_completed: u64,
399}
400
401impl Default for SwarmHealthMonitor {
402 fn default() -> Self {
403 Self {
404 heartbeats: HashMap::new(),
405 heartbeat_interval_secs: 5,
406 max_missed_beats: 3,
407 replacement_timeout_secs: 30,
408 started_at: chrono::Utc::now(),
409 total_messages_sent: 0,
410 total_messages_received: 0,
411 total_duration_ms: 0.0,
412 total_tasks_completed: 0,
413 }
414 }
415}
416
417#[allow(dead_code)]
418impl SwarmHealthMonitor {
419 pub fn new(
421 heartbeat_interval_secs: u64,
422 max_missed_beats: u32,
423 replacement_timeout_secs: u64,
424 ) -> Self {
425 Self {
426 heartbeats: HashMap::new(),
427 heartbeat_interval_secs,
428 max_missed_beats,
429 replacement_timeout_secs,
430 started_at: chrono::Utc::now(),
431 total_messages_sent: 0,
432 total_messages_received: 0,
433 total_duration_ms: 0.0,
434 total_tasks_completed: 0,
435 }
436 }
437
438 pub fn register_worker(&mut self, role: &str) {
440 let now = chrono::Utc::now();
441 self.heartbeats
442 .entry(role.to_string())
443 .or_insert(WorkerHeartbeat {
444 role: role.to_string(),
445 spawned_at: now,
446 last_heartbeat: now,
447 missed_beats: 0,
448 status: WorkerHealthStatus::Healthy,
449 tasks_completed: 0,
450 tasks_failed: 0,
451 error_count: 0,
452 total_duration_ms: 0.0,
453 duration_samples: 0,
454 messages_sent: 0,
455 messages_received: 0,
456 iteration: 0,
457 is_busy: false,
458 task_started_at: None,
459 });
460 }
461
462 pub fn heartbeat(&mut self, role: &str) {
464 if let Some(hb) = self.heartbeats.get_mut(role) {
465 hb.last_heartbeat = chrono::Utc::now();
466 hb.missed_beats = 0;
467 hb.status = WorkerHealthStatus::Healthy;
468 }
469 }
470
471 pub fn task_started(&mut self, role: &str) {
473 if let Some(hb) = self.heartbeats.get_mut(role) {
474 hb.is_busy = true;
475 hb.task_started_at = Some(chrono::Utc::now());
476 hb.iteration += 1;
477 }
478 }
479
480 pub fn task_completed(&mut self, role: &str) {
482 if let Some(hb) = self.heartbeats.get_mut(role) {
483 hb.tasks_completed += 1;
484 hb.is_busy = false;
485
486 if let Some(started) = hb.task_started_at {
487 let duration = (chrono::Utc::now() - started).num_milliseconds() as f64;
488 hb.total_duration_ms += duration;
489 hb.duration_samples += 1;
490 self.total_duration_ms += duration;
491 }
492 self.total_tasks_completed += 1;
493 hb.task_started_at = None;
494 }
495 }
496
497 pub fn task_failed(&mut self, role: &str) {
499 if let Some(hb) = self.heartbeats.get_mut(role) {
500 hb.tasks_failed += 1;
501 hb.error_count += 1;
502 hb.is_busy = false;
503 hb.task_started_at = None;
504 }
505 }
506
507 pub fn record_error(&mut self, role: &str) {
509 if let Some(hb) = self.heartbeats.get_mut(role) {
510 hb.error_count += 1;
511 }
512 }
513
514 pub fn message_sent(&mut self, role: &str) {
516 if let Some(hb) = self.heartbeats.get_mut(role) {
517 hb.messages_sent += 1;
518 }
519 self.total_messages_sent += 1;
520 }
521
522 pub fn message_received(&mut self, role: &str) {
524 if let Some(hb) = self.heartbeats.get_mut(role) {
525 hb.messages_received += 1;
526 }
527 self.total_messages_received += 1;
528 }
529
530 pub fn check_health(&mut self) -> Vec<String> {
533 let now = chrono::Utc::now();
534 let mut dead_workers = Vec::new();
535
536 for hb in self.heartbeats.values_mut() {
537 let elapsed = (now - hb.last_heartbeat).num_seconds();
538
539 if elapsed > (self.heartbeat_interval_secs * self.max_missed_beats as u64 * 2) as i64 {
540 if hb.status != WorkerHealthStatus::Dead {
541 hb.status = WorkerHealthStatus::Dead;
542 dead_workers.push(hb.role.clone());
543 }
544 } else if elapsed > (self.heartbeat_interval_secs * self.max_missed_beats as u64) as i64
545 {
546 hb.status = WorkerHealthStatus::Unhealthy;
547 } else if elapsed > (self.heartbeat_interval_secs * 2) as i64 {
548 hb.status = WorkerHealthStatus::Degraded;
549 }
550 }
551
552 dead_workers
553 }
554
555 pub fn worker_telemetry(&self, role: &str) -> Option<WorkerTelemetry> {
557 self.heartbeats.get(role).map(|hb| {
558 let avg_dur = if hb.duration_samples > 0 {
559 hb.total_duration_ms / hb.duration_samples as f64
560 } else {
561 0.0
562 };
563
564 WorkerTelemetry {
565 role: hb.role.clone(),
566 status: hb.status.clone(),
567 tasks_completed: hb.tasks_completed,
568 tasks_failed: hb.tasks_failed,
569 error_count: hb.error_count,
570 avg_duration_ms: avg_dur,
571 last_heartbeat: hb.last_heartbeat.to_rfc3339(),
572 spawned_at: hb.spawned_at.to_rfc3339(),
573 messages_sent: hb.messages_sent,
574 messages_received: hb.messages_received,
575 iteration: hb.iteration,
576 }
577 })
578 }
579
580 pub fn all_worker_telemetry(&self) -> Vec<WorkerTelemetry> {
582 let mut roles: Vec<String> = self.heartbeats.keys().cloned().collect();
583 roles.sort();
584 roles
585 .iter()
586 .filter_map(|r| self.worker_telemetry(r))
587 .collect()
588 }
589
590 pub fn metrics(&self) -> SwarmMetrics {
592 let mut healthy = 0;
593 let mut degraded = 0;
594 let mut unhealthy = 0;
595 let mut dead = 0;
596 let mut total_completed = 0u64;
597 let mut total_failed = 0u64;
598 let mut total_errors = 0u64;
599 let mut busy_workers = 0u64;
600 let total_workers = self.heartbeats.len();
601
602 for hb in self.heartbeats.values() {
603 match hb.status {
604 WorkerHealthStatus::Healthy => healthy += 1,
605 WorkerHealthStatus::Degraded => degraded += 1,
606 WorkerHealthStatus::Unhealthy => unhealthy += 1,
607 WorkerHealthStatus::Dead => dead += 1,
608 }
609 total_completed += hb.tasks_completed;
610 total_failed += hb.tasks_failed;
611 total_errors += hb.error_count;
612 if hb.is_busy {
613 busy_workers += 1;
614 }
615 }
616
617 let elapsed_secs = (chrono::Utc::now() - self.started_at).num_seconds().max(1) as f64;
618 let task_throughput = self.total_tasks_completed as f64 / elapsed_secs;
619
620 let worker_utilization = if total_workers > 0 {
621 busy_workers as f64 / total_workers as f64
622 } else {
623 0.0
624 };
625
626 let error_rate = if total_completed + total_failed > 0 {
627 total_errors as f64 / (total_completed + total_failed) as f64
628 } else {
629 0.0
630 };
631
632 let overall_avg_duration = if self.total_tasks_completed > 0 {
633 self.total_duration_ms / self.total_tasks_completed as f64
634 } else {
635 0.0
636 };
637
638 let comm_latency = if self.total_messages_sent > 0 {
639 let ratio = self.total_messages_received as f64 / self.total_messages_sent as f64;
642 ratio * (self.heartbeat_interval_secs as f64 * 1000.0) / 2.0
643 } else {
644 0.0
645 };
646
647 SwarmMetrics {
648 total_workers,
649 healthy_workers: healthy,
650 degraded_workers: degraded,
651 unhealthy_workers: unhealthy,
652 dead_workers: dead,
653 total_tasks_completed: total_completed,
654 total_tasks_failed: total_failed,
655 total_errors,
656 overall_avg_duration_ms: overall_avg_duration,
657 task_throughput,
658 communication_latency_ms: comm_latency,
659 worker_utilization,
660 error_rate,
661 timestamp: chrono::Utc::now().to_rfc3339(),
662 }
663 }
664
665 pub fn dead_workers_for_replacement(&self) -> Vec<String> {
667 let now = chrono::Utc::now();
668 self.heartbeats
669 .iter()
670 .filter(|(_, hb)| hb.status == WorkerHealthStatus::Dead)
671 .filter(|(_, hb)| {
672 let elapsed = (now - hb.last_heartbeat).num_seconds();
673 elapsed >= self.replacement_timeout_secs as i64
674 })
675 .map(|(role, _)| role.clone())
676 .collect()
677 }
678
679 pub fn remove_worker(&mut self, role: &str) {
681 self.heartbeats.remove(role);
682 }
683
684 pub fn worker_count(&self) -> usize {
686 self.heartbeats.len()
687 }
688
689 pub fn format_status(&self) -> String {
691 let m = self.metrics();
692 format!(
693 "Swarm Health: {}/{} healthy, {} degraded, {} unhealthy, {} dead | \
694 {} tasks ({:.1}/s) | {:.1}% utilization | {:.2}% error rate",
695 m.healthy_workers,
696 m.total_workers,
697 m.degraded_workers,
698 m.unhealthy_workers,
699 m.dead_workers,
700 m.total_tasks_completed,
701 m.task_throughput,
702 m.worker_utilization * 100.0,
703 m.error_rate * 100.0,
704 )
705 }
706}
707
708#[derive(Debug, Clone, Serialize, Deserialize)]
718pub struct WorkerProfile {
719 pub name: String,
721
722 #[serde(default)]
724 pub description: String,
725
726 pub persona: String,
728
729 #[serde(default)]
731 pub allowed_tools: Vec<String>,
732
733 #[serde(default)]
735 pub provider: Option<String>,
736
737 #[serde(default)]
739 pub model: Option<String>,
740
741 #[serde(default = "default_worker_max_iterations")]
743 pub max_iterations: usize,
744
745 #[serde(default = "default_worker_memory")]
747 pub max_memory_messages: usize,
748
749 #[serde(default = "default_true")]
751 pub can_delegate: bool,
752
753 #[serde(default)]
755 pub resource_limits: ResourceLimits,
756}
757
758fn default_worker_max_iterations() -> usize {
759 10
760}
761
762fn default_worker_memory() -> usize {
763 20
764}
765
766fn default_true() -> bool {
767 true
768}
769
770#[derive(Debug, Clone, Serialize, Deserialize)]
772pub struct ResourceLimits {
773 #[serde(default = "default_max_tool_calls")]
775 pub max_tool_calls: usize,
776
777 #[serde(default = "default_max_exec_secs")]
779 pub max_exec_secs: u64,
780}
781
782fn default_max_tool_calls() -> usize {
783 50
784}
785
786fn default_max_exec_secs() -> u64 {
787 300
788}
789
790impl Default for ResourceLimits {
791 fn default() -> Self {
792 Self {
793 max_tool_calls: default_max_tool_calls(),
794 max_exec_secs: default_max_exec_secs(),
795 }
796 }
797}
798
799impl Default for WorkerProfile {
800 fn default() -> Self {
801 Self {
802 name: "default".to_string(),
803 description: String::new(),
804 persona: "You are a helpful assistant.".to_string(),
805 allowed_tools: Vec::new(),
806 provider: None,
807 model: None,
808 max_iterations: default_worker_max_iterations(),
809 max_memory_messages: default_worker_memory(),
810 can_delegate: default_true(),
811 resource_limits: ResourceLimits::default(),
812 }
813 }
814}
815
816impl WorkerProfile {
821 pub fn researcher() -> Self {
823 Self {
824 name: "researcher".to_string(),
825 description: "Analytical researcher focused on data gathering and analysis".to_string(),
826 persona: "You are an analytical researcher. Focus on gathering data, \
827 verifying facts, and providing well-structured analysis. \
828 Be thorough and cite your sources."
829 .to_string(),
830 allowed_tools: vec![
831 "web_fetch".to_string(),
832 "web_search".to_string(),
833 "read_file".to_string(),
834 ],
835 max_iterations: 15,
836 can_delegate: false,
837 ..Default::default()
838 }
839 }
840
841 pub fn creative() -> Self {
843 Self {
844 name: "creative".to_string(),
845 description: "Creative problem-solver focused on innovation".to_string(),
846 persona: "You are a creative problem-solver. Focus on generating \
847 innovative solutions, exploring alternatives, and thinking \
848 outside the box. Consider multiple perspectives."
849 .to_string(),
850 allowed_tools: vec!["write_file".to_string(), "web_search".to_string()],
851 max_iterations: 10,
852 can_delegate: false,
853 ..Default::default()
854 }
855 }
856
857 pub fn executor() -> Self {
859 Self {
860 name: "executor".to_string(),
861 description: "Pragmatic executor focused on efficient task completion".to_string(),
862 persona: "You are a pragmatic executor. Focus on completing tasks \
863 efficiently and correctly. Prioritize simplicity and \
864 practicality over perfection."
865 .to_string(),
866 allowed_tools: vec![
867 "shell_exec".to_string(),
868 "read_file".to_string(),
869 "write_file".to_string(),
870 "web_fetch".to_string(),
871 ],
872 max_iterations: 8,
873 can_delegate: false,
874 ..Default::default()
875 }
876 }
877
878 pub fn reviewer() -> Self {
880 Self {
881 name: "reviewer".to_string(),
882 description: "Quality assurance reviewer focused on verification".to_string(),
883 persona: "You are a meticulous reviewer. Focus on verifying correctness, \
884 identifying issues, and ensuring quality. Be critical and \
885 constructive. Check for errors, edge cases, and improvements."
886 .to_string(),
887 allowed_tools: vec!["read_file".to_string(), "web_fetch".to_string()],
888 max_iterations: 10,
889 can_delegate: false,
890 ..Default::default()
891 }
892 }
893
894 pub fn supervisor() -> Self {
896 Self {
897 name: "supervisor".to_string(),
898 description: "Supervisor that decomposes tasks and coordinates sub-agents".to_string(),
899 persona: "You are a supervisor agent. Your role is to decompose complex \
900 tasks into subtasks and coordinate sub-agents to complete them. \
901 Analyze the task, break it down, assign work, and aggregate results. \
902 \n\nFor each subtask, respond with:\n\
903 SUBTASK: <description>\n\
904 ROLE: <researcher|creative|executor|reviewer|supervisor>\n\
905 \nWhen all subtasks are complete, respond with:\n\
906 FINAL: <aggregated result>"
907 .to_string(),
908 allowed_tools: Vec::new(),
909 max_iterations: 20,
910 can_delegate: true,
911 ..Default::default()
912 }
913 }
914}
915
916#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
922#[serde(rename_all = "lowercase")]
923pub enum SwarmTopology {
924 #[serde(alias = "flat")]
926 Star,
927 Mesh,
929 Hierarchical,
931 Hybrid,
933}
934
935#[allow(clippy::derivable_impls)]
936impl Default for SwarmTopology {
937 fn default() -> Self {
938 Self::Star
939 }
940}
941
942#[derive(Debug, Clone, Serialize, Deserialize)]
944pub struct SwarmConfig {
945 #[serde(default)]
947 pub topology: SwarmTopology,
948
949 #[serde(default = "default_max_depth")]
951 pub max_depth: usize,
952
953 #[serde(default = "default_max_workers", alias = "agent_count")]
955 pub max_workers: usize,
956
957 #[serde(default, deserialize_with = "deserialize_profiles")]
977 pub profiles: Vec<WorkerProfile>,
978
979 #[serde(default = "default_true")]
981 pub dynamic_role_assignment: bool,
982
983 #[serde(default)]
985 pub enable_agent_communication: bool,
986
987 #[serde(default)]
989 pub enable_health_monitoring: bool,
990}
991
992fn default_max_depth() -> usize {
993 3
994}
995
996fn default_max_workers() -> usize {
997 100
998}
999
1000fn deserialize_profiles<'de, D>(
1009 deserializer: D,
1010) -> std::result::Result<Vec<WorkerProfile>, D::Error>
1011where
1012 D: serde::Deserializer<'de>,
1013{
1014 #[derive(Deserialize)]
1017 #[serde(untagged)]
1018 enum ProfilesOrMap {
1019 Array(Vec<WorkerProfile>),
1020 Map(std::collections::HashMap<String, String>),
1021 }
1022
1023 match ProfilesOrMap::deserialize(deserializer) {
1024 Ok(ProfilesOrMap::Array(profiles)) => Ok(profiles),
1025 Ok(ProfilesOrMap::Map(map)) => {
1026 let profiles: Vec<WorkerProfile> = map
1027 .into_iter()
1028 .map(|(name, persona)| WorkerProfile {
1029 name,
1030 persona,
1031 ..WorkerProfile::default()
1032 })
1033 .collect();
1034 Ok(profiles)
1035 }
1036 Err(e) => Err(e),
1037 }
1038}
1039
1040impl Default for SwarmConfig {
1041 fn default() -> Self {
1042 Self {
1043 topology: SwarmTopology::default(),
1044 max_depth: default_max_depth(),
1045 max_workers: default_max_workers(),
1046 profiles: vec![
1047 WorkerProfile::researcher(),
1048 WorkerProfile::creative(),
1049 WorkerProfile::executor(),
1050 WorkerProfile::reviewer(),
1051 WorkerProfile::supervisor(),
1052 ],
1053 dynamic_role_assignment: true,
1054 enable_agent_communication: false,
1055 enable_health_monitoring: false,
1056 }
1057 }
1058}
1059
1060pub struct SwarmOrchestrator {
1067 config: SwarmConfig,
1069 current_depth: usize,
1071 worker_count: usize,
1073 llm: Option<Arc<dyn LLMProviderTrait>>,
1075 multi_llm: Option<MultiModelManager>,
1077 ravenfabric: Option<RavenFabricClient>,
1079 #[allow(dead_code)]
1081 policy_engine: PolicyEngine,
1082 sandbox: Sandbox,
1084 audit_log: AuditLog,
1086 #[allow(dead_code)]
1088 registry: ToolRegistry,
1089 message_bus: Option<Arc<RwLock<AgentMessageBus>>>,
1091 health_monitor: Option<Arc<RwLock<SwarmHealthMonitor>>>,
1094}
1095
1096impl SwarmOrchestrator {
1097 pub fn new(
1099 config: SwarmConfig,
1100 llm: Option<Arc<dyn LLMProviderTrait>>,
1101 multi_llm: Option<MultiModelManager>,
1102 ravenfabric: Option<RavenFabricClient>,
1103 ) -> Self {
1104 let policy_engine = PolicyEngine::default_secure();
1105 let sandbox = Sandbox::default();
1106 let audit_log = AuditLog::new(format!("swarm-{}", std::process::id()));
1107 let registry = ToolRegistry::with_default_tools();
1108
1109 let message_bus = if config.enable_agent_communication {
1111 Some(Arc::new(RwLock::new(AgentMessageBus::new(1000))))
1112 } else {
1113 None
1114 };
1115
1116 let health_monitor = if config.enable_health_monitoring {
1118 Some(Arc::new(RwLock::new(SwarmHealthMonitor::default())))
1119 } else {
1120 None
1121 };
1122
1123 Self {
1124 config,
1125 current_depth: 0,
1126 worker_count: 0,
1127 llm,
1128 multi_llm,
1129 ravenfabric,
1130 policy_engine,
1131 sandbox,
1132 audit_log,
1133 registry,
1134 message_bus,
1135 health_monitor,
1136 }
1137 }
1138
1139 #[allow(dead_code)]
1144 pub fn new_with_bus(
1145 config: SwarmConfig,
1146 llm: Option<Arc<dyn LLMProviderTrait>>,
1147 multi_llm: Option<MultiModelManager>,
1148 ravenfabric: Option<RavenFabricClient>,
1149 message_bus: Option<Arc<RwLock<AgentMessageBus>>>,
1150 ) -> Self {
1151 let policy_engine = PolicyEngine::default_secure();
1152 let sandbox = Sandbox::default();
1153 let audit_log = AuditLog::new(format!("swarm-{}", std::process::id()));
1154 let registry = ToolRegistry::with_default_tools();
1155
1156 let health_monitor = if config.enable_health_monitoring {
1158 Some(Arc::new(RwLock::new(SwarmHealthMonitor::default())))
1159 } else {
1160 None
1161 };
1162
1163 Self {
1164 config,
1165 current_depth: 0,
1166 worker_count: 0,
1167 llm,
1168 multi_llm,
1169 ravenfabric,
1170 policy_engine,
1171 sandbox,
1172 audit_log,
1173 registry,
1174 message_bus,
1175 health_monitor,
1176 }
1177 }
1178
1179 pub async fn init(&mut self) -> Result<()> {
1181 self.sandbox.init().await.map_err(|e| {
1182 RavenClawsError::CommandExecution(format!("Swarm sandbox init failed: {}", e))
1183 })?;
1184 Ok(())
1185 }
1186
1187 #[allow(dead_code)]
1189 pub fn worker_count(&self) -> usize {
1190 self.worker_count
1191 }
1192
1193 #[allow(dead_code)]
1195 pub fn current_depth(&self) -> usize {
1196 self.current_depth
1197 }
1198
1199 #[allow(dead_code)]
1201 pub fn health_metrics(&self) -> Option<SwarmMetrics> {
1202 self.health_monitor
1203 .as_ref()
1204 .and_then(|hm| hm.try_read().ok())
1205 .map(|hm| hm.metrics())
1206 }
1207
1208 #[allow(dead_code)]
1210 pub fn worker_telemetry(&self) -> Option<Vec<WorkerTelemetry>> {
1211 self.health_monitor
1212 .as_ref()
1213 .and_then(|hm| hm.try_read().ok())
1214 .map(|hm| hm.all_worker_telemetry())
1215 }
1216
1217 #[instrument(skip(self, task), fields(depth = self.current_depth, workers = self.worker_count))]
1222 pub async fn orchestrate(&mut self, task: &str) -> Result<String> {
1223 self.orchestrate_impl(task).await
1225 }
1226
1227 async fn orchestrate_impl(&mut self, task: &str) -> Result<String> {
1229 info!(
1230 depth = self.current_depth,
1231 max_depth = self.config.max_depth,
1232 "Orchestrating task"
1233 );
1234
1235 if let Some(ref hm) = self.health_monitor {
1237 if let Ok(hm_guard) = hm.try_read() {
1238 info!(health = %hm_guard.format_status(), "Swarm health at start");
1239 }
1240 }
1241
1242 if self.current_depth >= self.config.max_depth {
1244 warn!(
1245 depth = self.current_depth,
1246 max_depth = self.config.max_depth,
1247 "Max recursion depth reached, executing directly"
1248 );
1249 return self.execute_direct(task).await;
1250 }
1251
1252 if self.worker_count >= self.config.max_workers {
1254 warn!(
1255 workers = self.worker_count,
1256 max_workers = self.config.max_workers,
1257 "Max workers reached, executing directly"
1258 );
1259 return self.execute_direct(task).await;
1260 }
1261
1262 let roles = if self.config.dynamic_role_assignment {
1264 self.analyze_task_roles(task).await?
1265 } else {
1266 vec!["supervisor".to_string()]
1268 };
1269
1270 info!(roles = ?roles, "Assigned roles for task");
1271
1272 if roles.len() == 1 && roles[0] != "supervisor" {
1274 return self.execute_with_profile(task, &roles[0]).await;
1275 }
1276
1277 if roles.contains(&"supervisor".to_string()) || roles.len() > 1 {
1279 return self.recursive_supervise_impl(task, &roles).await;
1280 }
1281
1282 self.execute_direct(task).await
1284 }
1285
1286 async fn analyze_task_roles(&self, task: &str) -> Result<Vec<String>> {
1288 if let Some(ref llm) = self.llm {
1290 let analysis_prompt = format!(
1291 "Analyze this task and determine which roles are needed to complete it. \
1292 Available roles: researcher, creative, executor, reviewer, supervisor. \
1293 \n\nTask: {}\n\n\
1294 Respond with a comma-separated list of roles needed, nothing else. \
1295 Example: researcher, executor, reviewer",
1296 task
1297 );
1298
1299 let messages = vec![
1300 ChatMessage {
1301 role: "system".to_string(),
1302 content: "You are a task analysis expert. Respond only with a comma-separated list of roles."
1303 .to_string(),
1304 },
1305 ChatMessage {
1306 role: "user".to_string(),
1307 content: analysis_prompt,
1308 },
1309 ];
1310
1311 match llm.chat(messages).await {
1312 Ok(response) => {
1313 let content = response
1314 .choices
1315 .first()
1316 .map(|c| c.message.content.clone())
1317 .unwrap_or_default();
1318
1319 let roles: Vec<String> = content
1320 .split(',')
1321 .map(|r| r.trim().to_lowercase())
1322 .filter(|r| {
1323 matches!(
1324 r.as_str(),
1325 "researcher" | "creative" | "executor" | "reviewer" | "supervisor"
1326 )
1327 })
1328 .collect();
1329
1330 if roles.is_empty() {
1331 Ok(vec!["executor".to_string()])
1332 } else {
1333 Ok(roles)
1334 }
1335 }
1336 Err(e) => {
1337 warn!(error = %e, "Task analysis failed, using default roles");
1338 Ok(vec!["executor".to_string()])
1339 }
1340 }
1341 } else {
1342 Ok(vec!["executor".to_string()])
1344 }
1345 }
1346
1347 async fn execute_with_profile(&self, task: &str, role: &str) -> Result<String> {
1349 let profile = self
1350 .config
1351 .profiles
1352 .iter()
1353 .find(|p| p.name == role)
1354 .cloned()
1355 .unwrap_or_else(|| {
1356 if role == "supervisor" {
1357 WorkerProfile::supervisor()
1358 } else {
1359 WorkerProfile::executor()
1360 }
1361 });
1362
1363 info!(role = %role, profile = %profile.name, "Executing task with profile");
1364
1365 if let Some(ref hm) = self.health_monitor {
1367 if let Ok(mut hm_guard) = hm.try_write() {
1368 hm_guard.register_worker(role);
1369 hm_guard.task_started(role);
1370 }
1371 }
1372
1373 let llm = self.llm.as_ref().ok_or_else(|| {
1374 RavenClawsError::CommandExecution("No LLM provider available for worker".to_string())
1375 })?;
1376
1377 let mut memory = ConversationMemory::new(&profile.persona, profile.max_memory_messages);
1378
1379 let enriched_task = if let Some(ref bus) = self.message_bus {
1381 if let Ok(bus_guard) = bus.try_read() {
1382 let msg_context = bus_guard.format_for_prompt(role, 20);
1383 format!("{}{}", task, msg_context)
1384 } else {
1385 task.to_string()
1386 }
1387 } else {
1388 task.to_string()
1389 };
1390
1391 memory.add_user_message(&enriched_task);
1392
1393 let messages = memory.history().to_vec();
1394 let response = llm.chat(messages).await.map_err(|e| {
1395 if let Some(ref hm) = self.health_monitor {
1397 if let Ok(mut hm_guard) = hm.try_write() {
1398 hm_guard.task_failed(role);
1399 }
1400 }
1401 RavenClawsError::CommandExecution(format!("Worker {} failed: {}", role, e))
1402 })?;
1403
1404 let content = response
1405 .choices
1406 .first()
1407 .map(|c| c.message.content.clone())
1408 .unwrap_or_default();
1409
1410 if let Some(ref hm) = self.health_monitor {
1412 if let Ok(mut hm_guard) = hm.try_write() {
1413 hm_guard.task_completed(role);
1414 hm_guard.heartbeat(role);
1415 }
1416 }
1417
1418 if let Some(ref bus) = self.message_bus {
1420 if let Ok(mut bus_guard) = bus.try_write() {
1421 bus_guard.send(
1422 role,
1423 "*",
1424 MessageType::Result,
1425 &format!(
1426 "Completed task. Result ({} chars): {}",
1427 content.len(),
1428 &content[..content.len().min(500)]
1429 ),
1430 HashMap::new(),
1431 );
1432 }
1433 if let Some(ref hm) = self.health_monitor {
1435 if let Ok(mut hm_guard) = hm.try_write() {
1436 hm_guard.message_sent(role);
1437 }
1438 }
1439 }
1440
1441 let _ = self.audit_log.append(
1442 AuditEventType::AgentFinish,
1443 &format!("worker-{}", role),
1444 &format!("Worker {} completed task", role),
1445 Some(serde_json::json!({
1446 "role": role,
1447 "task_length": task.len(),
1448 "response_length": content.len(),
1449 })),
1450 );
1451
1452 Ok(content)
1453 }
1454
1455 async fn execute_direct(&self, task: &str) -> Result<String> {
1457 self.execute_with_profile(task, "executor").await
1458 }
1459
1460 #[allow(dead_code)]
1465 async fn recursive_supervise(&self, task: &str, roles: &[String]) -> Result<String> {
1466 let task = task.to_string();
1467 let roles = roles.to_vec();
1468 let this: &SwarmOrchestrator = self;
1469 Box::pin(async move { this.recursive_supervise_impl(&task, &roles).await }).await
1470 }
1471
1472 async fn recursive_supervise_impl(&self, task: &str, roles: &[String]) -> Result<String> {
1474 let llm = self.llm.as_ref().ok_or_else(|| {
1475 RavenClawsError::CommandExecution(
1476 "No LLM provider available for supervisor".to_string(),
1477 )
1478 })?;
1479
1480 if let Some(ref hm) = self.health_monitor {
1482 if let Ok(mut hm_guard) = hm.try_write() {
1483 hm_guard.register_worker("supervisor");
1484 hm_guard.task_started("supervisor");
1485 }
1486 }
1487
1488 let supervisor_profile = WorkerProfile::supervisor();
1489 let mut memory = ConversationMemory::new(
1490 &supervisor_profile.persona,
1491 supervisor_profile.max_memory_messages,
1492 );
1493
1494 let role_list = roles.join(", ");
1495
1496 let msg_context = if let Some(ref bus) = self.message_bus {
1498 if let Ok(bus_guard) = bus.try_read() {
1499 bus_guard.format_for_prompt("supervisor", 20)
1500 } else {
1501 String::new()
1502 }
1503 } else {
1504 String::new()
1505 };
1506
1507 let supervise_prompt = format!(
1508 "Decompose this task into subtasks and assign each to the most appropriate role.\n\
1509 Available roles: {}\n\n\
1510 Task: {}\n\n\
1511 For each subtask, respond with:\n\
1512 SUBTASK: <description>\n\
1513 ROLE: <role>\n\n\
1514 When all subtasks are complete, respond with:\n\
1515 FINAL: <aggregated result>\n\
1516 {}",
1517 role_list, task, msg_context
1518 );
1519
1520 memory.add_user_message(&supervise_prompt);
1521
1522 let mut subtask_results: Vec<String> = Vec::new();
1523 let mut iteration = 0;
1524 let max_iterations = supervisor_profile.max_iterations;
1525
1526 loop {
1527 iteration += 1;
1528 if iteration > max_iterations {
1529 warn!("Supervisor reached max iterations");
1530 break;
1531 }
1532
1533 let messages = memory.history().to_vec();
1534 let response = match llm.chat(messages).await {
1535 Ok(r) => r,
1536 Err(e) => {
1537 warn!(error = %e, "Supervisor LLM request failed");
1538 continue;
1539 }
1540 };
1541
1542 let content = response
1543 .choices
1544 .first()
1545 .map(|c| c.message.content.clone())
1546 .unwrap_or_default();
1547
1548 if iteration % 3 == 0 {
1550 if let Some(ref hm) = self.health_monitor {
1551 if let Ok(hm_guard) = hm.try_read() {
1552 let status = hm_guard.format_status();
1553 info!(health = %status, "Swarm health check");
1554 let dead = hm_guard.dead_workers_for_replacement();
1556 if !dead.is_empty() {
1557 warn!(dead_workers = ?dead, "Dead workers detected");
1558 }
1559 }
1560 }
1561 }
1562
1563 if content.contains("FINAL:") {
1565 let final_response = content
1566 .split("FINAL:")
1567 .nth(1)
1568 .unwrap_or("")
1569 .trim()
1570 .to_string();
1571 info!(
1572 iteration = iteration,
1573 subtasks = subtask_results.len(),
1574 "Supervisor completed"
1575 );
1576
1577 if let Some(ref hm) = self.health_monitor {
1579 if let Ok(mut hm_guard) = hm.try_write() {
1580 hm_guard.task_completed("supervisor");
1581 hm_guard.heartbeat("supervisor");
1582 }
1583 }
1584
1585 let _ = self.audit_log.append(
1586 AuditEventType::AgentFinish,
1587 "supervisor",
1588 "Supervisor completed recursive decomposition",
1589 Some(serde_json::json!({
1590 "iterations": iteration,
1591 "subtasks_completed": subtask_results.len(),
1592 "depth": self.current_depth,
1593 })),
1594 );
1595
1596 if !subtask_results.is_empty() {
1597 let aggregated = subtask_results.join("\n\n");
1598 return Ok(format!(
1599 "{}\n\n## Aggregated Results\n\n{}",
1600 final_response, aggregated
1601 ));
1602 }
1603 return Ok(final_response);
1604 }
1605
1606 if content.contains("SUBTASK:") {
1608 let subtask_block = content.split("SUBTASK:").nth(1).unwrap_or("");
1609 let subtask_lines: Vec<&str> = subtask_block.lines().take(4).collect();
1610
1611 let subtask_desc = subtask_lines.first().unwrap_or(&"").trim();
1612 let role = subtask_lines
1613 .iter()
1614 .find(|l| l.starts_with("ROLE:"))
1615 .and_then(|l| l.split(':').nth(1))
1616 .unwrap_or("executor")
1617 .trim()
1618 .to_lowercase();
1619
1620 if !subtask_desc.is_empty() {
1621 info!(role = %role, subtask = %subtask_desc, "Delegating subtask");
1622
1623 if let Some(ref bus) = self.message_bus {
1625 if let Ok(mut bus_guard) = bus.try_write() {
1626 bus_guard.send(
1627 "supervisor",
1628 &role,
1629 MessageType::Coordination,
1630 &format!("Delegating subtask: {}", subtask_desc),
1631 HashMap::new(),
1632 );
1633 }
1634 }
1635
1636 let result =
1637 if role == "supervisor" && self.current_depth < self.config.max_depth {
1638 let config = self.config.clone();
1640 let current_depth = self.current_depth + 1;
1641 let worker_count = self.worker_count + 1;
1642 let llm = self.llm.clone();
1643 let multi_llm = self.multi_llm.clone();
1644 let ravenfabric = self.ravenfabric.clone();
1645 let subtask = subtask_desc.to_string();
1646 let message_bus = self.message_bus.clone();
1647 let health_monitor = self.health_monitor.clone();
1648
1649 Box::pin(async move {
1650 let mut sub_orchestrator = SwarmOrchestrator {
1651 config,
1652 current_depth,
1653 worker_count,
1654 llm,
1655 multi_llm,
1656 ravenfabric,
1657 policy_engine: PolicyEngine::default_secure(),
1658 sandbox: Sandbox::default(),
1659 audit_log: AuditLog::new(format!(
1660 "sub-swarm-{}-{}",
1661 current_depth,
1662 std::process::id()
1663 )),
1664 registry: ToolRegistry::with_default_tools(),
1665 message_bus,
1666 health_monitor,
1667 };
1668
1669 let _ = sub_orchestrator.init().await;
1671 sub_orchestrator.orchestrate(&subtask).await
1672 })
1673 .await
1674 } else {
1675 self.execute_with_profile(subtask_desc, &role).await
1677 };
1678
1679 match result {
1680 Ok(result) => {
1681 info!(
1682 role = %role,
1683 chars = result.len(),
1684 "Subtask completed"
1685 );
1686 subtask_results.push(format!("[{}] {}", role, result));
1687
1688 memory.add_assistant_message(&format!(
1689 "Delegated subtask to {}: {}",
1690 role, subtask_desc
1691 ));
1692 memory.add_user_message(&format!("Result from {}: {}", role, result));
1693 }
1694 Err(e) => {
1695 warn!(role = %role, error = %e, "Subtask failed");
1696 if let Some(ref hm) = self.health_monitor {
1698 if let Ok(mut hm_guard) = hm.try_write() {
1699 hm_guard.task_failed(&role);
1700 hm_guard.record_error(&role);
1701 }
1702 }
1703 memory.add_assistant_message(&format!(
1704 "Subtask for {} failed: {}",
1705 role, e
1706 ));
1707 }
1708 }
1709 }
1710 } else {
1711 memory.add_assistant_message(&content);
1712 }
1713 }
1714
1715 if !subtask_results.is_empty() {
1717 let aggregated = subtask_results.join("\n\n");
1718 info!(
1719 "Supervisor aggregated {} subtask results",
1720 subtask_results.len()
1721 );
1722 return Ok(aggregated);
1723 }
1724
1725 Err(RavenClawsError::CommandExecution(
1726 "Supervisor completed without results".to_string(),
1727 ))
1728 }
1729}
1730
1731#[cfg(test)]
1736mod tests {
1737 use super::*;
1738
1739 #[test]
1740 fn test_worker_profile_default() {
1741 let profile = WorkerProfile::default();
1742 assert_eq!(profile.name, "default");
1743 assert!(profile.can_delegate);
1744 assert_eq!(profile.max_iterations, 10);
1745 assert_eq!(profile.max_memory_messages, 20);
1746 }
1747
1748 #[test]
1749 fn test_worker_profile_researcher() {
1750 let profile = WorkerProfile::researcher();
1751 assert_eq!(profile.name, "researcher");
1752 assert!(!profile.can_delegate);
1753 assert!(profile.allowed_tools.contains(&"web_fetch".to_string()));
1754 assert!(profile.allowed_tools.contains(&"web_search".to_string()));
1755 }
1756
1757 #[test]
1758 fn test_worker_profile_creative() {
1759 let profile = WorkerProfile::creative();
1760 assert_eq!(profile.name, "creative");
1761 assert!(!profile.can_delegate);
1762 }
1763
1764 #[test]
1765 fn test_worker_profile_executor() {
1766 let profile = WorkerProfile::executor();
1767 assert_eq!(profile.name, "executor");
1768 assert!(!profile.can_delegate);
1769 assert!(profile.allowed_tools.contains(&"shell_exec".to_string()));
1770 }
1771
1772 #[test]
1773 fn test_worker_profile_reviewer() {
1774 let profile = WorkerProfile::reviewer();
1775 assert_eq!(profile.name, "reviewer");
1776 assert!(!profile.can_delegate);
1777 }
1778
1779 #[test]
1780 fn test_worker_profile_supervisor() {
1781 let profile = WorkerProfile::supervisor();
1782 assert_eq!(profile.name, "supervisor");
1783 assert!(profile.can_delegate);
1784 assert!(profile.persona.contains("SUBTASK:"));
1785 assert!(profile.persona.contains("FINAL:"));
1786 }
1787
1788 #[test]
1789 fn test_swarm_config_default() {
1790 let config = SwarmConfig::default();
1791 assert_eq!(config.topology, SwarmTopology::Star);
1792 assert_eq!(config.max_depth, 3);
1793 assert_eq!(config.max_workers, 100);
1794 assert!(config.dynamic_role_assignment);
1795 assert_eq!(config.profiles.len(), 5);
1796 }
1797
1798 #[test]
1799 fn test_swarm_topology_serde() {
1800 let topologies = vec![
1801 SwarmTopology::Star,
1802 SwarmTopology::Mesh,
1803 SwarmTopology::Hierarchical,
1804 SwarmTopology::Hybrid,
1805 ];
1806
1807 for t in &topologies {
1808 let json = serde_json::to_string(t).unwrap();
1809 let deserialized: SwarmTopology = serde_json::from_str(&json).unwrap();
1810 assert_eq!(*t, deserialized);
1811 }
1812 }
1813
1814 #[test]
1815 fn test_swarm_config_serde() {
1816 let config = SwarmConfig::default();
1817 let json = serde_json::to_string_pretty(&config).unwrap();
1818 let deserialized: SwarmConfig = serde_json::from_str(&json).unwrap();
1819 assert_eq!(config.topology, deserialized.topology);
1820 assert_eq!(config.max_depth, deserialized.max_depth);
1821 assert_eq!(config.max_workers, deserialized.max_workers);
1822 assert_eq!(config.profiles.len(), deserialized.profiles.len());
1823 }
1824
1825 #[test]
1826 fn test_resource_limits_default() {
1827 let limits = ResourceLimits::default();
1828 assert_eq!(limits.max_tool_calls, 50);
1829 assert_eq!(limits.max_exec_secs, 300);
1830 }
1831
1832 #[test]
1833 fn test_swarm_orchestrator_new() {
1834 let config = SwarmConfig::default();
1835 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
1836 assert_eq!(orchestrator.current_depth(), 0);
1837 assert_eq!(orchestrator.worker_count(), 0);
1838 }
1839
1840 #[test]
1841 fn test_swarm_orchestrator_depth_limit() {
1842 let config = SwarmConfig {
1843 max_depth: 0, ..SwarmConfig::default()
1845 };
1846 let mut orchestrator = SwarmOrchestrator::new(config, None, None, None);
1847 orchestrator.current_depth = 0;
1848
1849 assert!(orchestrator.current_depth >= orchestrator.config.max_depth);
1851 }
1852
1853 #[tokio::test]
1854 async fn test_analyze_task_roles_fallback() {
1855 let config = SwarmConfig::default();
1856 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
1857
1858 let result = orchestrator.analyze_task_roles("test task").await;
1860 assert!(result.is_ok());
1861 }
1862
1863 #[test]
1864 fn test_worker_profile_custom() {
1865 let profile = WorkerProfile {
1866 name: "custom".to_string(),
1867 description: "Custom worker".to_string(),
1868 persona: "You are a custom worker.".to_string(),
1869 allowed_tools: vec!["read_file".to_string()],
1870 provider: Some("openai".to_string()),
1871 model: Some("gpt-4".to_string()),
1872 max_iterations: 5,
1873 max_memory_messages: 10,
1874 can_delegate: false,
1875 resource_limits: ResourceLimits {
1876 max_tool_calls: 10,
1877 max_exec_secs: 60,
1878 },
1879 };
1880
1881 assert_eq!(profile.name, "custom");
1882 assert_eq!(profile.provider, Some("openai".to_string()));
1883 assert_eq!(profile.model, Some("gpt-4".to_string()));
1884 assert_eq!(profile.max_iterations, 5);
1885 assert_eq!(profile.resource_limits.max_tool_calls, 10);
1886 }
1887
1888 #[test]
1889 fn test_swarm_config_custom_profiles() {
1890 let config = SwarmConfig {
1891 profiles: vec![WorkerProfile::researcher(), WorkerProfile::executor()],
1892 topology: SwarmTopology::Hierarchical,
1893 max_depth: 5,
1894 max_workers: 50,
1895 ..SwarmConfig::default()
1896 };
1897
1898 assert_eq!(config.profiles.len(), 2);
1899 assert_eq!(config.topology, SwarmTopology::Hierarchical);
1900 assert_eq!(config.max_depth, 5);
1901 assert_eq!(config.max_workers, 50);
1902 }
1903
1904 #[test]
1907 fn test_message_bus_new() {
1908 let bus = AgentMessageBus::new(100);
1909 assert!(bus.is_empty());
1910 assert_eq!(bus.len(), 0);
1911 }
1912
1913 #[test]
1914 fn test_message_bus_send_and_receive() {
1915 let mut bus = AgentMessageBus::new(100);
1916
1917 let id = bus.send(
1918 "researcher",
1919 "executor",
1920 MessageType::Information,
1921 "Found relevant data",
1922 HashMap::new(),
1923 );
1924
1925 assert!(!id.is_empty());
1926 assert_eq!(bus.len(), 1);
1927
1928 let msgs = bus.messages_for("executor");
1929 assert_eq!(msgs.len(), 1);
1930 assert_eq!(msgs[0].content, "Found relevant data");
1931 assert_eq!(msgs[0].sender, "researcher");
1932 }
1933
1934 #[test]
1935 fn test_message_bus_broadcast() {
1936 let mut bus = AgentMessageBus::new(100);
1937
1938 bus.send(
1939 "supervisor",
1940 "*",
1941 MessageType::Coordination,
1942 "All workers proceed",
1943 HashMap::new(),
1944 );
1945
1946 assert_eq!(bus.messages_for("researcher").len(), 1);
1948 assert_eq!(bus.messages_for("executor").len(), 1);
1949 assert_eq!(bus.messages_for("reviewer").len(), 1);
1950 }
1951
1952 #[test]
1953 fn test_message_bus_filter_by_type() {
1954 let mut bus = AgentMessageBus::new(100);
1955
1956 bus.send(
1957 "researcher",
1958 "*",
1959 MessageType::Information,
1960 "Data found",
1961 HashMap::new(),
1962 );
1963 bus.send(
1964 "executor",
1965 "supervisor",
1966 MessageType::Result,
1967 "Task done",
1968 HashMap::new(),
1969 );
1970 bus.send(
1971 "executor",
1972 "supervisor",
1973 MessageType::Error,
1974 "Failed",
1975 HashMap::new(),
1976 );
1977
1978 let errors = bus.messages_of_type(&MessageType::Error);
1979 assert_eq!(errors.len(), 1);
1980 assert_eq!(errors[0].content, "Failed");
1981
1982 let results = bus.messages_of_type(&MessageType::Result);
1983 assert_eq!(results.len(), 1);
1984 }
1985
1986 #[test]
1987 fn test_message_bus_max_messages() {
1988 let mut bus = AgentMessageBus::new(5); for i in 0..10 {
1991 bus.send(
1992 "worker",
1993 "*",
1994 MessageType::Generic,
1995 &format!("Message {}", i),
1996 HashMap::new(),
1997 );
1998 }
1999
2000 assert_eq!(bus.len(), 5);
2002 let all = bus.all_messages();
2003 assert_eq!(all[0].content, "Message 5");
2004 assert_eq!(all[4].content, "Message 9");
2005 }
2006
2007 #[test]
2008 fn test_message_bus_format_for_prompt() {
2009 let mut bus = AgentMessageBus::new(100);
2010
2011 bus.send(
2012 "researcher",
2013 "*",
2014 MessageType::Information,
2015 "Found key insight",
2016 HashMap::new(),
2017 );
2018 bus.send(
2019 "executor",
2020 "supervisor",
2021 MessageType::Result,
2022 "Implementation complete",
2023 HashMap::new(),
2024 );
2025
2026 let prompt = bus.format_for_prompt("supervisor", 10);
2027 assert!(prompt.contains("Inter-Agent Messages"));
2028 assert!(prompt.contains("researcher"));
2029 assert!(prompt.contains("executor"));
2030 assert!(prompt.contains("Found key insight"));
2031 }
2032
2033 #[test]
2034 fn test_message_bus_empty_format() {
2035 let bus = AgentMessageBus::new(100);
2036 let prompt = bus.format_for_prompt("supervisor", 10);
2037 assert!(prompt.is_empty());
2038 }
2039
2040 #[test]
2041 fn test_message_type_display() {
2042 assert_eq!(format!("{}", MessageType::Information), "information");
2043 assert_eq!(format!("{}", MessageType::Question), "question");
2044 assert_eq!(format!("{}", MessageType::Result), "result");
2045 assert_eq!(format!("{}", MessageType::Error), "error");
2046 assert_eq!(format!("{}", MessageType::Coordination), "coordination");
2047 assert_eq!(format!("{}", MessageType::Generic), "generic");
2048 }
2049
2050 #[test]
2051 fn test_message_bus_messages_from() {
2052 let mut bus = AgentMessageBus::new(100);
2053
2054 bus.send(
2055 "researcher",
2056 "*",
2057 MessageType::Information,
2058 "Data A",
2059 HashMap::new(),
2060 );
2061 bus.send(
2062 "researcher",
2063 "executor",
2064 MessageType::Information,
2065 "Data B",
2066 HashMap::new(),
2067 );
2068 bus.send(
2069 "executor",
2070 "supervisor",
2071 MessageType::Result,
2072 "Done",
2073 HashMap::new(),
2074 );
2075
2076 let from_researcher = bus.messages_from("researcher");
2077 assert_eq!(from_researcher.len(), 2);
2078
2079 let from_executor = bus.messages_from("executor");
2080 assert_eq!(from_executor.len(), 1);
2081 }
2082
2083 #[test]
2084 fn test_orchestrator_new_with_communication() {
2085 let config = SwarmConfig {
2086 enable_agent_communication: true,
2087 ..SwarmConfig::default()
2088 };
2089 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2090 assert!(orchestrator.message_bus.is_some());
2091 }
2092
2093 #[test]
2094 fn test_orchestrator_new_without_communication() {
2095 let config = SwarmConfig {
2096 enable_agent_communication: false,
2097 ..SwarmConfig::default()
2098 };
2099 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2100 assert!(orchestrator.message_bus.is_none());
2101 }
2102
2103 #[test]
2104 fn test_swarm_config_communication_default() {
2105 let config = SwarmConfig::default();
2106 assert!(!config.enable_agent_communication); assert!(!config.enable_health_monitoring); }
2109
2110 #[test]
2113 fn test_health_status_display() {
2114 assert_eq!(format!("{}", WorkerHealthStatus::Healthy), "healthy");
2115 assert_eq!(format!("{}", WorkerHealthStatus::Degraded), "degraded");
2116 assert_eq!(format!("{}", WorkerHealthStatus::Unhealthy), "unhealthy");
2117 assert_eq!(format!("{}", WorkerHealthStatus::Dead), "dead");
2118 }
2119
2120 #[test]
2121 fn test_health_monitor_default() {
2122 let hm = SwarmHealthMonitor::default();
2123 assert_eq!(hm.heartbeat_interval_secs, 5);
2124 assert_eq!(hm.max_missed_beats, 3);
2125 assert_eq!(hm.replacement_timeout_secs, 30);
2126 assert_eq!(hm.worker_count(), 0);
2127 }
2128
2129 #[test]
2130 fn test_health_monitor_register_worker() {
2131 let mut hm = SwarmHealthMonitor::default();
2132 hm.register_worker("researcher");
2133 assert_eq!(hm.worker_count(), 1);
2134
2135 let telemetry = hm.worker_telemetry("researcher");
2136 assert!(telemetry.is_some());
2137 assert_eq!(telemetry.unwrap().role, "researcher");
2138 }
2139
2140 #[test]
2141 fn test_health_monitor_heartbeat() {
2142 let mut hm = SwarmHealthMonitor::default();
2143 hm.register_worker("executor");
2144 hm.heartbeat("executor");
2145
2146 let telemetry = hm.worker_telemetry("executor").unwrap();
2147 assert_eq!(telemetry.status, WorkerHealthStatus::Healthy);
2148 }
2149
2150 #[test]
2151 fn test_health_monitor_task_lifecycle() {
2152 let mut hm = SwarmHealthMonitor::default();
2153 hm.register_worker("executor");
2154 hm.task_started("executor");
2155 hm.task_completed("executor");
2156
2157 let telemetry = hm.worker_telemetry("executor").unwrap();
2158 assert_eq!(telemetry.tasks_completed, 1);
2159 assert_eq!(telemetry.tasks_failed, 0);
2160 }
2161
2162 #[test]
2163 fn test_health_monitor_task_failure() {
2164 let mut hm = SwarmHealthMonitor::default();
2165 hm.register_worker("executor");
2166 hm.task_started("executor");
2167 hm.task_failed("executor");
2168
2169 let telemetry = hm.worker_telemetry("executor").unwrap();
2170 assert_eq!(telemetry.tasks_completed, 0);
2171 assert_eq!(telemetry.tasks_failed, 1);
2172 assert_eq!(telemetry.error_count, 1);
2173 }
2174
2175 #[test]
2176 fn test_health_monitor_metrics_empty() {
2177 let hm = SwarmHealthMonitor::default();
2178 let metrics = hm.metrics();
2179 assert_eq!(metrics.total_workers, 0);
2180 assert_eq!(metrics.healthy_workers, 0);
2181 assert_eq!(metrics.total_tasks_completed, 0);
2182 assert_eq!(metrics.task_throughput, 0.0);
2183 }
2184
2185 #[test]
2186 fn test_health_monitor_metrics_with_workers() {
2187 let mut hm = SwarmHealthMonitor::default();
2188 hm.register_worker("researcher");
2189 hm.register_worker("executor");
2190 hm.task_started("executor");
2191 hm.task_completed("executor");
2192 hm.task_started("researcher");
2193 hm.task_completed("researcher");
2194
2195 let metrics = hm.metrics();
2196 assert_eq!(metrics.total_workers, 2);
2197 assert_eq!(metrics.healthy_workers, 2);
2198 assert_eq!(metrics.total_tasks_completed, 2);
2199 }
2200
2201 #[test]
2202 fn test_health_monitor_dead_worker_detection() {
2203 let mut hm = SwarmHealthMonitor {
2204 heartbeat_interval_secs: 1,
2205 max_missed_beats: 1,
2206 replacement_timeout_secs: 0, ..SwarmHealthMonitor::default()
2208 };
2209
2210 hm.register_worker("executor");
2211 if let Some(hb) = hm.heartbeats.get_mut("executor") {
2213 hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(10);
2214 }
2215
2216 let dead = hm.check_health();
2217 assert!(!dead.is_empty());
2218 assert_eq!(dead[0], "executor");
2219 }
2220
2221 #[test]
2222 fn test_health_monitor_degraded_detection() {
2223 let mut hm = SwarmHealthMonitor {
2224 heartbeat_interval_secs: 1,
2225 max_missed_beats: 3,
2226 ..SwarmHealthMonitor::default()
2227 };
2228
2229 hm.register_worker("executor");
2230 if let Some(hb) = hm.heartbeats.get_mut("executor") {
2232 hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(3);
2233 }
2234
2235 let _dead = hm.check_health();
2236 let telemetry = hm.worker_telemetry("executor").unwrap();
2237 assert_eq!(telemetry.status, WorkerHealthStatus::Degraded);
2238 }
2239
2240 #[test]
2241 fn test_health_monitor_message_tracking() {
2242 let mut hm = SwarmHealthMonitor::default();
2243 hm.register_worker("researcher");
2244 hm.message_sent("researcher");
2245 hm.message_sent("researcher");
2246 hm.message_received("researcher");
2247
2248 let telemetry = hm.worker_telemetry("researcher").unwrap();
2249 assert_eq!(telemetry.messages_sent, 2);
2250 assert_eq!(telemetry.messages_received, 1);
2251 }
2252
2253 #[test]
2254 fn test_health_monitor_error_tracking() {
2255 let mut hm = SwarmHealthMonitor::default();
2256 hm.register_worker("executor");
2257 hm.record_error("executor");
2258 hm.record_error("executor");
2259 hm.record_error("executor");
2260
2261 let telemetry = hm.worker_telemetry("executor").unwrap();
2262 assert_eq!(telemetry.error_count, 3);
2263 }
2264
2265 #[test]
2266 fn test_health_monitor_format_status() {
2267 let hm = SwarmHealthMonitor::default();
2268 let status = hm.format_status();
2269 assert!(status.contains("Swarm Health:"));
2270 assert!(status.contains("healthy"));
2271 }
2272
2273 #[test]
2274 fn test_health_monitor_all_worker_telemetry() {
2275 let mut hm = SwarmHealthMonitor::default();
2276 hm.register_worker("researcher");
2277 hm.register_worker("executor");
2278 hm.register_worker("reviewer");
2279
2280 let all = hm.all_worker_telemetry();
2281 assert_eq!(all.len(), 3);
2282 }
2283
2284 #[test]
2285 fn test_health_monitor_remove_worker() {
2286 let mut hm = SwarmHealthMonitor::default();
2287 hm.register_worker("executor");
2288 assert_eq!(hm.worker_count(), 1);
2289 hm.remove_worker("executor");
2290 assert_eq!(hm.worker_count(), 0);
2291 }
2292
2293 #[test]
2294 fn test_orchestrator_new_with_health_monitoring() {
2295 let config = SwarmConfig {
2296 enable_health_monitoring: true,
2297 ..SwarmConfig::default()
2298 };
2299 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2300 assert!(orchestrator.health_monitor.is_some());
2301 }
2302
2303 #[test]
2304 fn test_orchestrator_new_without_health_monitoring() {
2305 let config = SwarmConfig {
2306 enable_health_monitoring: false,
2307 ..SwarmConfig::default()
2308 };
2309 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2310 assert!(orchestrator.health_monitor.is_none());
2311 }
2312
2313 #[test]
2314 fn test_health_metrics_accessor() {
2315 let config = SwarmConfig {
2316 enable_health_monitoring: true,
2317 ..SwarmConfig::default()
2318 };
2319 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2320 let metrics = orchestrator.health_metrics();
2321 assert!(metrics.is_some());
2322 assert_eq!(metrics.unwrap().total_workers, 0);
2323 }
2324
2325 #[test]
2326 fn test_worker_telemetry_accessor() {
2327 let config = SwarmConfig {
2328 enable_health_monitoring: true,
2329 ..SwarmConfig::default()
2330 };
2331 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2332 let telemetry = orchestrator.worker_telemetry();
2333 assert!(telemetry.is_some());
2334 assert!(telemetry.unwrap().is_empty());
2335 }
2336
2337 #[test]
2338 fn test_health_monitor_new_custom() {
2339 let hm = SwarmHealthMonitor::new(10, 5, 60);
2340 assert_eq!(hm.heartbeat_interval_secs, 10);
2341 assert_eq!(hm.max_missed_beats, 5);
2342 assert_eq!(hm.replacement_timeout_secs, 60);
2343 }
2344
2345 #[test]
2346 fn test_health_monitor_dead_workers_for_replacement() {
2347 let mut hm = SwarmHealthMonitor {
2348 heartbeat_interval_secs: 1,
2349 max_missed_beats: 1,
2350 replacement_timeout_secs: 0,
2351 ..SwarmHealthMonitor::default()
2352 };
2353
2354 hm.register_worker("executor");
2355 if let Some(hb) = hm.heartbeats.get_mut("executor") {
2357 hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(30);
2358 hb.status = WorkerHealthStatus::Dead;
2359 }
2360
2361 let candidates = hm.dead_workers_for_replacement();
2362 assert_eq!(candidates.len(), 1);
2363 assert_eq!(candidates[0], "executor");
2364 }
2365
2366 #[test]
2367 fn test_health_monitor_metrics_error_rate() {
2368 let mut hm = SwarmHealthMonitor::default();
2369 hm.register_worker("executor");
2370 hm.task_started("executor");
2371 hm.task_completed("executor");
2372 hm.task_started("executor");
2373 hm.task_failed("executor");
2374
2375 let metrics = hm.metrics();
2376 assert_eq!(metrics.total_tasks_completed, 1);
2377 assert_eq!(metrics.total_tasks_failed, 1);
2378 assert!(metrics.error_rate > 0.0);
2379 }
2380
2381 #[test]
2382 fn test_health_monitor_metrics_utilization() {
2383 let mut hm = SwarmHealthMonitor::default();
2384 hm.register_worker("busy_worker");
2385 hm.register_worker("idle_worker");
2386
2387 if let Some(hb) = hm.heartbeats.get_mut("busy_worker") {
2389 hb.is_busy = true;
2390 }
2391
2392 let metrics = hm.metrics();
2393 assert_eq!(metrics.total_workers, 2);
2394 assert!((metrics.worker_utilization - 0.5).abs() < f64::EPSILON);
2395 }
2396
2397 #[test]
2398 fn test_health_monitor_iteration_tracking() {
2399 let mut hm = SwarmHealthMonitor::default();
2400 hm.register_worker("executor");
2401 hm.task_started("executor");
2402 hm.task_completed("executor");
2403 hm.task_started("executor");
2404 hm.task_completed("executor");
2405 hm.task_started("executor");
2406
2407 let telemetry = hm.worker_telemetry("executor").unwrap();
2408 assert_eq!(telemetry.iteration, 3);
2409 assert_eq!(telemetry.tasks_completed, 2);
2410 }
2411}