1use crate::agent::ConversationMemory;
39use crate::audit::{AuditEventType, AuditLog};
40use crate::error::{RavenClawsError, Result};
41use crate::llm::{ChatMessage, LLMProviderTrait, MultiModelManager};
42use crate::policy::PolicyEngine;
43use crate::ravenfabric::RavenFabricClient;
44use crate::sandbox::Sandbox;
45use crate::tools::ToolRegistry;
46use serde::{Deserialize, Serialize};
47use std::collections::HashMap;
48use std::sync::Arc;
49use tokio::sync::RwLock;
50use tracing::{info, instrument, warn};
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct AgentMessage {
62 pub id: String,
64
65 pub sender: String,
67
68 pub recipient: String,
70
71 pub msg_type: MessageType,
73
74 pub content: String,
76
77 pub timestamp: String,
79
80 #[serde(default)]
82 pub metadata: HashMap<String, String>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
87pub enum MessageType {
88 Information,
90 Question,
92 Result,
94 Error,
96 Coordination,
98 Generic,
100}
101
102impl std::fmt::Display for MessageType {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 match self {
105 MessageType::Information => write!(f, "information"),
106 MessageType::Question => write!(f, "question"),
107 MessageType::Result => write!(f, "result"),
108 MessageType::Error => write!(f, "error"),
109 MessageType::Coordination => write!(f, "coordination"),
110 MessageType::Generic => write!(f, "generic"),
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
121pub struct AgentMessageBus {
122 messages: Vec<AgentMessage>,
124 max_messages: usize,
126}
127
128#[allow(dead_code)]
129impl AgentMessageBus {
130 pub fn new(max_messages: usize) -> Self {
132 Self {
133 messages: Vec::new(),
134 max_messages,
135 }
136 }
137
138 pub fn send(
140 &mut self,
141 sender: &str,
142 recipient: &str,
143 msg_type: MessageType,
144 content: &str,
145 metadata: HashMap<String, String>,
146 ) -> String {
147 let id = uuid::Uuid::new_v4().to_string();
148 let timestamp = chrono::Utc::now().to_rfc3339();
149
150 let msg = AgentMessage {
151 id: id.clone(),
152 sender: sender.to_string(),
153 recipient: recipient.to_string(),
154 msg_type,
155 content: content.to_string(),
156 timestamp,
157 metadata,
158 };
159
160 self.messages.push(msg);
161
162 if self.max_messages > 0 && self.messages.len() > self.max_messages {
164 self.messages.remove(0);
165 }
166
167 id
168 }
169
170 pub fn messages_for(&self, role: &str) -> Vec<&AgentMessage> {
172 self.messages
173 .iter()
174 .filter(|m| m.recipient == role || m.recipient == "*")
175 .collect()
176 }
177
178 pub fn messages_from(&self, sender: &str) -> Vec<&AgentMessage> {
180 self.messages
181 .iter()
182 .filter(|m| m.sender == sender)
183 .collect()
184 }
185
186 pub fn messages_of_type(&self, msg_type: &MessageType) -> Vec<&AgentMessage> {
188 self.messages
189 .iter()
190 .filter(|m| m.msg_type == *msg_type)
191 .collect()
192 }
193
194 pub fn all_messages(&self) -> &[AgentMessage] {
196 &self.messages
197 }
198
199 pub fn len(&self) -> usize {
201 self.messages.len()
202 }
203
204 pub fn is_empty(&self) -> bool {
206 self.messages.is_empty()
207 }
208
209 pub fn format_for_prompt(&self, role: &str, max_messages: usize) -> String {
213 let relevant: Vec<&AgentMessage> = self
214 .messages
215 .iter()
216 .filter(|m| m.recipient == role || m.recipient == "*" || m.sender == role)
217 .rev()
218 .take(max_messages)
219 .collect();
220
221 if relevant.is_empty() {
222 return String::new();
223 }
224
225 let mut output = String::from("\n\n--- Inter-Agent Messages ---\n");
226 for msg in relevant.iter().rev() {
227 output.push_str(&format!(
228 "[{}] {} → {} ({}): {}\n",
229 msg.msg_type, msg.sender, msg.recipient, msg.timestamp, msg.content
230 ));
231 }
232 output.push_str("--- End Inter-Agent Messages ---\n");
233 output
234 }
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
243pub enum WorkerHealthStatus {
244 Healthy,
246 Degraded,
248 Unhealthy,
250 Dead,
252}
253
254impl std::fmt::Display for WorkerHealthStatus {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 match self {
257 WorkerHealthStatus::Healthy => write!(f, "healthy"),
258 WorkerHealthStatus::Degraded => write!(f, "degraded"),
259 WorkerHealthStatus::Unhealthy => write!(f, "unhealthy"),
260 WorkerHealthStatus::Dead => write!(f, "dead"),
261 }
262 }
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct WorkerTelemetry {
268 pub role: String,
270 pub status: WorkerHealthStatus,
272 pub tasks_completed: u64,
274 pub tasks_failed: u64,
276 pub error_count: u64,
278 pub avg_duration_ms: f64,
280 pub last_heartbeat: String,
282 pub spawned_at: String,
284 pub messages_sent: u64,
286 pub messages_received: u64,
288 pub iteration: u64,
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize)]
294pub struct SwarmMetrics {
295 pub total_workers: usize,
297 pub healthy_workers: usize,
299 pub degraded_workers: usize,
301 pub unhealthy_workers: usize,
303 pub dead_workers: usize,
305 pub total_tasks_completed: u64,
307 pub total_tasks_failed: u64,
309 pub total_errors: u64,
311 pub overall_avg_duration_ms: f64,
313 pub task_throughput: f64,
315 pub communication_latency_ms: f64,
317 pub worker_utilization: f64,
319 pub error_rate: f64,
321 pub timestamp: String,
323}
324
325#[derive(Debug, Clone)]
327struct WorkerHeartbeat {
328 role: String,
330 spawned_at: chrono::DateTime<chrono::Utc>,
332 last_heartbeat: chrono::DateTime<chrono::Utc>,
334 missed_beats: u32,
336 status: WorkerHealthStatus,
338 tasks_completed: u64,
340 tasks_failed: u64,
342 error_count: u64,
344 total_duration_ms: f64,
346 duration_samples: u64,
348 messages_sent: u64,
350 messages_received: u64,
352 iteration: u64,
354 is_busy: bool,
356 task_started_at: Option<chrono::DateTime<chrono::Utc>>,
358}
359
360#[derive(Debug, Clone)]
380pub struct SwarmHealthMonitor {
381 heartbeats: HashMap<String, WorkerHeartbeat>,
383 heartbeat_interval_secs: u64,
385 max_missed_beats: u32,
387 replacement_timeout_secs: u64,
389 started_at: chrono::DateTime<chrono::Utc>,
391 total_messages_sent: u64,
393 total_messages_received: u64,
395 total_duration_ms: f64,
397 total_tasks_completed: u64,
399}
400
401impl Default for SwarmHealthMonitor {
402 fn default() -> Self {
403 Self {
404 heartbeats: HashMap::new(),
405 heartbeat_interval_secs: 5,
406 max_missed_beats: 3,
407 replacement_timeout_secs: 30,
408 started_at: chrono::Utc::now(),
409 total_messages_sent: 0,
410 total_messages_received: 0,
411 total_duration_ms: 0.0,
412 total_tasks_completed: 0,
413 }
414 }
415}
416
417#[allow(dead_code)]
418impl SwarmHealthMonitor {
419 pub fn new(
421 heartbeat_interval_secs: u64,
422 max_missed_beats: u32,
423 replacement_timeout_secs: u64,
424 ) -> Self {
425 Self {
426 heartbeats: HashMap::new(),
427 heartbeat_interval_secs,
428 max_missed_beats,
429 replacement_timeout_secs,
430 started_at: chrono::Utc::now(),
431 total_messages_sent: 0,
432 total_messages_received: 0,
433 total_duration_ms: 0.0,
434 total_tasks_completed: 0,
435 }
436 }
437
438 pub fn register_worker(&mut self, role: &str) {
440 let now = chrono::Utc::now();
441 self.heartbeats
442 .entry(role.to_string())
443 .or_insert(WorkerHeartbeat {
444 role: role.to_string(),
445 spawned_at: now,
446 last_heartbeat: now,
447 missed_beats: 0,
448 status: WorkerHealthStatus::Healthy,
449 tasks_completed: 0,
450 tasks_failed: 0,
451 error_count: 0,
452 total_duration_ms: 0.0,
453 duration_samples: 0,
454 messages_sent: 0,
455 messages_received: 0,
456 iteration: 0,
457 is_busy: false,
458 task_started_at: None,
459 });
460 }
461
462 pub fn heartbeat(&mut self, role: &str) {
464 if let Some(hb) = self.heartbeats.get_mut(role) {
465 hb.last_heartbeat = chrono::Utc::now();
466 hb.missed_beats = 0;
467 hb.status = WorkerHealthStatus::Healthy;
468 }
469 }
470
471 pub fn task_started(&mut self, role: &str) {
473 if let Some(hb) = self.heartbeats.get_mut(role) {
474 hb.is_busy = true;
475 hb.task_started_at = Some(chrono::Utc::now());
476 hb.iteration += 1;
477 }
478 }
479
480 pub fn task_completed(&mut self, role: &str) {
482 if let Some(hb) = self.heartbeats.get_mut(role) {
483 hb.tasks_completed += 1;
484 hb.is_busy = false;
485
486 if let Some(started) = hb.task_started_at {
487 let duration = (chrono::Utc::now() - started).num_milliseconds() as f64;
488 hb.total_duration_ms += duration;
489 hb.duration_samples += 1;
490 self.total_duration_ms += duration;
491 }
492 self.total_tasks_completed += 1;
493 hb.task_started_at = None;
494 }
495 }
496
497 pub fn task_failed(&mut self, role: &str) {
499 if let Some(hb) = self.heartbeats.get_mut(role) {
500 hb.tasks_failed += 1;
501 hb.error_count += 1;
502 hb.is_busy = false;
503 hb.task_started_at = None;
504 }
505 }
506
507 pub fn record_error(&mut self, role: &str) {
509 if let Some(hb) = self.heartbeats.get_mut(role) {
510 hb.error_count += 1;
511 }
512 }
513
514 pub fn message_sent(&mut self, role: &str) {
516 if let Some(hb) = self.heartbeats.get_mut(role) {
517 hb.messages_sent += 1;
518 }
519 self.total_messages_sent += 1;
520 }
521
522 pub fn message_received(&mut self, role: &str) {
524 if let Some(hb) = self.heartbeats.get_mut(role) {
525 hb.messages_received += 1;
526 }
527 self.total_messages_received += 1;
528 }
529
530 pub fn check_health(&mut self) -> Vec<String> {
533 let now = chrono::Utc::now();
534 let mut dead_workers = Vec::new();
535
536 for hb in self.heartbeats.values_mut() {
537 let elapsed = (now - hb.last_heartbeat).num_seconds();
538
539 if elapsed > (self.heartbeat_interval_secs * self.max_missed_beats as u64 * 2) as i64 {
540 if hb.status != WorkerHealthStatus::Dead {
541 hb.status = WorkerHealthStatus::Dead;
542 dead_workers.push(hb.role.clone());
543 }
544 } else if elapsed > (self.heartbeat_interval_secs * self.max_missed_beats as u64) as i64
545 {
546 hb.status = WorkerHealthStatus::Unhealthy;
547 } else if elapsed > (self.heartbeat_interval_secs * 2) as i64 {
548 hb.status = WorkerHealthStatus::Degraded;
549 }
550 }
551
552 dead_workers
553 }
554
555 pub fn worker_telemetry(&self, role: &str) -> Option<WorkerTelemetry> {
557 self.heartbeats.get(role).map(|hb| {
558 let avg_dur = if hb.duration_samples > 0 {
559 hb.total_duration_ms / hb.duration_samples as f64
560 } else {
561 0.0
562 };
563
564 WorkerTelemetry {
565 role: hb.role.clone(),
566 status: hb.status.clone(),
567 tasks_completed: hb.tasks_completed,
568 tasks_failed: hb.tasks_failed,
569 error_count: hb.error_count,
570 avg_duration_ms: avg_dur,
571 last_heartbeat: hb.last_heartbeat.to_rfc3339(),
572 spawned_at: hb.spawned_at.to_rfc3339(),
573 messages_sent: hb.messages_sent,
574 messages_received: hb.messages_received,
575 iteration: hb.iteration,
576 }
577 })
578 }
579
580 pub fn all_worker_telemetry(&self) -> Vec<WorkerTelemetry> {
582 let mut roles: Vec<String> = self.heartbeats.keys().cloned().collect();
583 roles.sort();
584 roles
585 .iter()
586 .filter_map(|r| self.worker_telemetry(r))
587 .collect()
588 }
589
590 pub fn metrics(&self) -> SwarmMetrics {
592 let mut healthy = 0;
593 let mut degraded = 0;
594 let mut unhealthy = 0;
595 let mut dead = 0;
596 let mut total_completed = 0u64;
597 let mut total_failed = 0u64;
598 let mut total_errors = 0u64;
599 let mut busy_workers = 0u64;
600 let total_workers = self.heartbeats.len();
601
602 for hb in self.heartbeats.values() {
603 match hb.status {
604 WorkerHealthStatus::Healthy => healthy += 1,
605 WorkerHealthStatus::Degraded => degraded += 1,
606 WorkerHealthStatus::Unhealthy => unhealthy += 1,
607 WorkerHealthStatus::Dead => dead += 1,
608 }
609 total_completed += hb.tasks_completed;
610 total_failed += hb.tasks_failed;
611 total_errors += hb.error_count;
612 if hb.is_busy {
613 busy_workers += 1;
614 }
615 }
616
617 let elapsed_secs = (chrono::Utc::now() - self.started_at).num_seconds().max(1) as f64;
618 let task_throughput = self.total_tasks_completed as f64 / elapsed_secs;
619
620 let worker_utilization = if total_workers > 0 {
621 busy_workers as f64 / total_workers as f64
622 } else {
623 0.0
624 };
625
626 let error_rate = if total_completed + total_failed > 0 {
627 total_errors as f64 / (total_completed + total_failed) as f64
628 } else {
629 0.0
630 };
631
632 let overall_avg_duration = if self.total_tasks_completed > 0 {
633 self.total_duration_ms / self.total_tasks_completed as f64
634 } else {
635 0.0
636 };
637
638 let comm_latency = if self.total_messages_sent > 0 {
639 let ratio = self.total_messages_received as f64 / self.total_messages_sent as f64;
642 ratio * (self.heartbeat_interval_secs as f64 * 1000.0) / 2.0
643 } else {
644 0.0
645 };
646
647 SwarmMetrics {
648 total_workers,
649 healthy_workers: healthy,
650 degraded_workers: degraded,
651 unhealthy_workers: unhealthy,
652 dead_workers: dead,
653 total_tasks_completed: total_completed,
654 total_tasks_failed: total_failed,
655 total_errors,
656 overall_avg_duration_ms: overall_avg_duration,
657 task_throughput,
658 communication_latency_ms: comm_latency,
659 worker_utilization,
660 error_rate,
661 timestamp: chrono::Utc::now().to_rfc3339(),
662 }
663 }
664
665 pub fn dead_workers_for_replacement(&self) -> Vec<String> {
667 let now = chrono::Utc::now();
668 self.heartbeats
669 .iter()
670 .filter(|(_, hb)| hb.status == WorkerHealthStatus::Dead)
671 .filter(|(_, hb)| {
672 let elapsed = (now - hb.last_heartbeat).num_seconds();
673 elapsed >= self.replacement_timeout_secs as i64
674 })
675 .map(|(role, _)| role.clone())
676 .collect()
677 }
678
679 pub fn remove_worker(&mut self, role: &str) {
681 self.heartbeats.remove(role);
682 }
683
684 pub fn worker_count(&self) -> usize {
686 self.heartbeats.len()
687 }
688
689 pub fn format_status(&self) -> String {
691 let m = self.metrics();
692 format!(
693 "Swarm Health: {}/{} healthy, {} degraded, {} unhealthy, {} dead | \
694 {} tasks ({:.1}/s) | {:.1}% utilization | {:.2}% error rate",
695 m.healthy_workers,
696 m.total_workers,
697 m.degraded_workers,
698 m.unhealthy_workers,
699 m.dead_workers,
700 m.total_tasks_completed,
701 m.task_throughput,
702 m.worker_utilization * 100.0,
703 m.error_rate * 100.0,
704 )
705 }
706}
707
708#[derive(Debug, Clone, Serialize, Deserialize)]
718pub struct WorkerProfile {
719 pub name: String,
721
722 #[serde(default)]
724 pub description: String,
725
726 pub persona: String,
728
729 #[serde(default)]
731 pub allowed_tools: Vec<String>,
732
733 #[serde(default)]
735 pub provider: Option<String>,
736
737 #[serde(default)]
739 pub model: Option<String>,
740
741 #[serde(default = "default_worker_max_iterations")]
743 pub max_iterations: usize,
744
745 #[serde(default = "default_worker_memory")]
747 pub max_memory_messages: usize,
748
749 #[serde(default = "default_true")]
751 pub can_delegate: bool,
752
753 #[serde(default)]
755 pub resource_limits: ResourceLimits,
756}
757
758fn default_worker_max_iterations() -> usize {
759 10
760}
761
762fn default_worker_memory() -> usize {
763 20
764}
765
766fn default_true() -> bool {
767 true
768}
769
770#[derive(Debug, Clone, Serialize, Deserialize)]
772pub struct ResourceLimits {
773 #[serde(default = "default_max_tool_calls")]
775 pub max_tool_calls: usize,
776
777 #[serde(default = "default_max_exec_secs")]
779 pub max_exec_secs: u64,
780}
781
782fn default_max_tool_calls() -> usize {
783 50
784}
785
786fn default_max_exec_secs() -> u64 {
787 300
788}
789
790impl Default for ResourceLimits {
791 fn default() -> Self {
792 Self {
793 max_tool_calls: default_max_tool_calls(),
794 max_exec_secs: default_max_exec_secs(),
795 }
796 }
797}
798
799impl Default for WorkerProfile {
800 fn default() -> Self {
801 Self {
802 name: "default".to_string(),
803 description: String::new(),
804 persona: "You are a helpful assistant.".to_string(),
805 allowed_tools: Vec::new(),
806 provider: None,
807 model: None,
808 max_iterations: default_worker_max_iterations(),
809 max_memory_messages: default_worker_memory(),
810 can_delegate: default_true(),
811 resource_limits: ResourceLimits::default(),
812 }
813 }
814}
815
816impl WorkerProfile {
821 pub fn researcher() -> Self {
823 Self {
824 name: "researcher".to_string(),
825 description: "Analytical researcher focused on data gathering and analysis".to_string(),
826 persona: "You are an analytical researcher. Focus on gathering data, \
827 verifying facts, and providing well-structured analysis. \
828 Be thorough and cite your sources."
829 .to_string(),
830 allowed_tools: vec![
831 "web_fetch".to_string(),
832 "web_search".to_string(),
833 "read_file".to_string(),
834 ],
835 max_iterations: 15,
836 can_delegate: false,
837 ..Default::default()
838 }
839 }
840
841 pub fn creative() -> Self {
843 Self {
844 name: "creative".to_string(),
845 description: "Creative problem-solver focused on innovation".to_string(),
846 persona: "You are a creative problem-solver. Focus on generating \
847 innovative solutions, exploring alternatives, and thinking \
848 outside the box. Consider multiple perspectives."
849 .to_string(),
850 allowed_tools: vec!["write_file".to_string(), "web_search".to_string()],
851 max_iterations: 10,
852 can_delegate: false,
853 ..Default::default()
854 }
855 }
856
857 pub fn executor() -> Self {
859 Self {
860 name: "executor".to_string(),
861 description: "Pragmatic executor focused on efficient task completion".to_string(),
862 persona: "You are a pragmatic executor. Focus on completing tasks \
863 efficiently and correctly. Prioritize simplicity and \
864 practicality over perfection."
865 .to_string(),
866 allowed_tools: vec![
867 "shell_exec".to_string(),
868 "read_file".to_string(),
869 "write_file".to_string(),
870 "web_fetch".to_string(),
871 ],
872 max_iterations: 8,
873 can_delegate: false,
874 ..Default::default()
875 }
876 }
877
878 pub fn reviewer() -> Self {
880 Self {
881 name: "reviewer".to_string(),
882 description: "Quality assurance reviewer focused on verification".to_string(),
883 persona: "You are a meticulous reviewer. Focus on verifying correctness, \
884 identifying issues, and ensuring quality. Be critical and \
885 constructive. Check for errors, edge cases, and improvements."
886 .to_string(),
887 allowed_tools: vec!["read_file".to_string(), "web_fetch".to_string()],
888 max_iterations: 10,
889 can_delegate: false,
890 ..Default::default()
891 }
892 }
893
894 pub fn supervisor() -> Self {
896 Self {
897 name: "supervisor".to_string(),
898 description: "Supervisor that decomposes tasks and coordinates sub-agents".to_string(),
899 persona: "You are a supervisor agent. Your role is to decompose complex \
900 tasks into subtasks and coordinate sub-agents to complete them. \
901 Analyze the task, break it down, assign work, and aggregate results. \
902 \n\nFor each subtask, respond with:\n\
903 SUBTASK: <description>\n\
904 ROLE: <researcher|creative|executor|reviewer|supervisor>\n\
905 \nWhen all subtasks are complete, respond with:\n\
906 FINAL: <aggregated result>"
907 .to_string(),
908 allowed_tools: Vec::new(),
909 max_iterations: 20,
910 can_delegate: true,
911 ..Default::default()
912 }
913 }
914}
915
916#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
922#[serde(rename_all = "lowercase")]
923pub enum SwarmTopology {
924 #[serde(alias = "flat")]
926 Star,
927 Mesh,
929 Hierarchical,
931 Hybrid,
933}
934
935#[allow(clippy::derivable_impls)]
936impl Default for SwarmTopology {
937 fn default() -> Self {
938 Self::Star
939 }
940}
941
942#[derive(Debug, Clone, Serialize, Deserialize)]
944pub struct SwarmConfig {
945 #[serde(default)]
947 pub topology: SwarmTopology,
948
949 #[serde(default = "default_max_depth")]
951 pub max_depth: usize,
952
953 #[serde(default = "default_max_workers", alias = "agent_count")]
955 pub max_workers: usize,
956
957 #[serde(default, deserialize_with = "deserialize_profiles")]
977 pub profiles: Vec<WorkerProfile>,
978
979 #[serde(default = "default_true")]
981 pub dynamic_role_assignment: bool,
982
983 #[serde(default)]
985 pub enable_agent_communication: bool,
986
987 #[serde(default)]
989 pub enable_health_monitoring: bool,
990}
991
992fn default_max_depth() -> usize {
993 3
994}
995
996fn default_max_workers() -> usize {
997 100
998}
999
1000fn deserialize_profiles<'de, D>(
1009 deserializer: D,
1010) -> std::result::Result<Vec<WorkerProfile>, D::Error>
1011where
1012 D: serde::Deserializer<'de>,
1013{
1014 #[derive(Deserialize)]
1017 #[serde(untagged)]
1018 enum ProfilesOrMap {
1019 Array(Vec<WorkerProfile>),
1020 Map(std::collections::HashMap<String, String>),
1021 }
1022
1023 match ProfilesOrMap::deserialize(deserializer) {
1024 Ok(ProfilesOrMap::Array(profiles)) => Ok(profiles),
1025 Ok(ProfilesOrMap::Map(map)) => {
1026 let profiles: Vec<WorkerProfile> = map
1027 .into_iter()
1028 .map(|(name, persona)| WorkerProfile {
1029 name,
1030 persona,
1031 ..WorkerProfile::default()
1032 })
1033 .collect();
1034 Ok(profiles)
1035 }
1036 Err(e) => Err(e),
1037 }
1038}
1039
1040impl Default for SwarmConfig {
1041 fn default() -> Self {
1042 Self {
1043 topology: SwarmTopology::default(),
1044 max_depth: default_max_depth(),
1045 max_workers: default_max_workers(),
1046 profiles: vec![
1047 WorkerProfile::researcher(),
1048 WorkerProfile::creative(),
1049 WorkerProfile::executor(),
1050 WorkerProfile::reviewer(),
1051 WorkerProfile::supervisor(),
1052 ],
1053 dynamic_role_assignment: true,
1054 enable_agent_communication: false,
1055 enable_health_monitoring: false,
1056 }
1057 }
1058}
1059
1060pub struct SwarmOrchestrator {
1067 config: SwarmConfig,
1069 current_depth: usize,
1071 worker_count: usize,
1073 llm: Option<Arc<dyn LLMProviderTrait>>,
1075 multi_llm: Option<MultiModelManager>,
1077 ravenfabric: Option<RavenFabricClient>,
1079 #[allow(dead_code)]
1081 policy_engine: PolicyEngine,
1082 sandbox: Sandbox,
1084 audit_log: AuditLog,
1086 #[allow(dead_code)]
1088 registry: ToolRegistry,
1089 message_bus: Option<Arc<RwLock<AgentMessageBus>>>,
1091 health_monitor: Option<Arc<RwLock<SwarmHealthMonitor>>>,
1094}
1095
1096impl SwarmOrchestrator {
1097 pub fn new(
1099 config: SwarmConfig,
1100 llm: Option<Arc<dyn LLMProviderTrait>>,
1101 multi_llm: Option<MultiModelManager>,
1102 ravenfabric: Option<RavenFabricClient>,
1103 ) -> Self {
1104 let policy_engine = PolicyEngine::default_secure();
1105 let sandbox = Sandbox::default();
1106 let audit_log = AuditLog::new(format!("swarm-{}", std::process::id()));
1107 let registry = ToolRegistry::with_default_tools();
1108
1109 let message_bus = if config.enable_agent_communication {
1111 Some(Arc::new(RwLock::new(AgentMessageBus::new(1000))))
1112 } else {
1113 None
1114 };
1115
1116 let health_monitor = if config.enable_health_monitoring {
1118 Some(Arc::new(RwLock::new(SwarmHealthMonitor::default())))
1119 } else {
1120 None
1121 };
1122
1123 Self {
1124 config,
1125 current_depth: 0,
1126 worker_count: 0,
1127 llm,
1128 multi_llm,
1129 ravenfabric,
1130 policy_engine,
1131 sandbox,
1132 audit_log,
1133 registry,
1134 message_bus,
1135 health_monitor,
1136 }
1137 }
1138
1139 #[allow(dead_code)]
1144 pub fn new_with_bus(
1145 config: SwarmConfig,
1146 llm: Option<Arc<dyn LLMProviderTrait>>,
1147 multi_llm: Option<MultiModelManager>,
1148 ravenfabric: Option<RavenFabricClient>,
1149 message_bus: Option<Arc<RwLock<AgentMessageBus>>>,
1150 ) -> Self {
1151 let policy_engine = PolicyEngine::default_secure();
1152 let sandbox = Sandbox::default();
1153 let audit_log = AuditLog::new(format!("swarm-{}", std::process::id()));
1154 let registry = ToolRegistry::with_default_tools();
1155
1156 let health_monitor = if config.enable_health_monitoring {
1158 Some(Arc::new(RwLock::new(SwarmHealthMonitor::default())))
1159 } else {
1160 None
1161 };
1162
1163 Self {
1164 config,
1165 current_depth: 0,
1166 worker_count: 0,
1167 llm,
1168 multi_llm,
1169 ravenfabric,
1170 policy_engine,
1171 sandbox,
1172 audit_log,
1173 registry,
1174 message_bus,
1175 health_monitor,
1176 }
1177 }
1178
1179 pub async fn init(&mut self) -> Result<()> {
1181 self.sandbox.init().await.map_err(|e| {
1182 RavenClawsError::CommandExecution(format!("Swarm sandbox init failed: {}", e))
1183 })?;
1184 Ok(())
1185 }
1186
1187 #[allow(dead_code)]
1189 pub fn worker_count(&self) -> usize {
1190 self.worker_count
1191 }
1192
1193 #[allow(dead_code)]
1195 pub fn current_depth(&self) -> usize {
1196 self.current_depth
1197 }
1198
1199 #[allow(dead_code)]
1201 pub fn health_metrics(&self) -> Option<SwarmMetrics> {
1202 self.health_monitor
1203 .as_ref()
1204 .and_then(|hm| hm.try_read().ok())
1205 .map(|hm| hm.metrics())
1206 }
1207
1208 #[allow(dead_code)]
1210 pub fn worker_telemetry(&self) -> Option<Vec<WorkerTelemetry>> {
1211 self.health_monitor
1212 .as_ref()
1213 .and_then(|hm| hm.try_read().ok())
1214 .map(|hm| hm.all_worker_telemetry())
1215 }
1216
1217 #[instrument(skip(self, task), fields(depth = self.current_depth, workers = self.worker_count))]
1222 pub async fn orchestrate(&mut self, task: &str) -> Result<String> {
1223 self.orchestrate_impl(task).await
1225 }
1226
1227 async fn orchestrate_impl(&mut self, task: &str) -> Result<String> {
1229 info!(
1230 depth = self.current_depth,
1231 max_depth = self.config.max_depth,
1232 "Orchestrating task"
1233 );
1234
1235 if let Some(ref hm) = self.health_monitor {
1237 if let Ok(hm_guard) = hm.try_read() {
1238 info!(health = %hm_guard.format_status(), "Swarm health at start");
1239 }
1240 }
1241
1242 if self.current_depth >= self.config.max_depth {
1244 warn!(
1245 depth = self.current_depth,
1246 max_depth = self.config.max_depth,
1247 "Max recursion depth reached, executing directly"
1248 );
1249 return self.execute_direct(task).await;
1250 }
1251
1252 if self.worker_count >= self.config.max_workers {
1254 warn!(
1255 workers = self.worker_count,
1256 max_workers = self.config.max_workers,
1257 "Max workers reached, executing directly"
1258 );
1259 return self.execute_direct(task).await;
1260 }
1261
1262 let roles = if self.config.dynamic_role_assignment {
1264 self.analyze_task_roles(task).await?
1265 } else {
1266 vec!["supervisor".to_string()]
1268 };
1269
1270 info!(roles = ?roles, "Assigned roles for task");
1271
1272 if roles.len() == 1 && roles[0] != "supervisor" {
1274 return self.execute_with_profile(task, &roles[0]).await;
1275 }
1276
1277 if roles.contains(&"supervisor".to_string()) || roles.len() > 1 {
1279 return self.recursive_supervise_impl(task, &roles).await;
1280 }
1281
1282 self.execute_direct(task).await
1284 }
1285
1286 async fn analyze_task_roles(&self, task: &str) -> Result<Vec<String>> {
1288 if let Some(ref llm) = self.llm {
1290 let analysis_prompt = format!(
1291 "Analyze this task and determine which roles are needed to complete it. \
1292 Available roles: researcher, creative, executor, reviewer, supervisor. \
1293 \n\nTask: {}\n\n\
1294 Respond with a comma-separated list of roles needed, nothing else. \
1295 Example: researcher, executor, reviewer",
1296 task
1297 );
1298
1299 let messages = vec![
1300 ChatMessage::new(
1301 "system",
1302 "You are a task analysis expert. Respond only with a comma-separated list of roles.",
1303 ),
1304 ChatMessage::new("user", analysis_prompt),
1305 ];
1306
1307 match llm.chat(messages).await {
1308 Ok(response) => {
1309 let content = response
1310 .choices
1311 .first()
1312 .map(|c| c.message.content.clone())
1313 .unwrap_or_default();
1314
1315 let roles: Vec<String> = content
1316 .split(',')
1317 .map(|r| r.trim().to_lowercase())
1318 .filter(|r| {
1319 matches!(
1320 r.as_str(),
1321 "researcher" | "creative" | "executor" | "reviewer" | "supervisor"
1322 )
1323 })
1324 .collect();
1325
1326 if roles.is_empty() {
1327 Ok(vec!["executor".to_string()])
1328 } else {
1329 Ok(roles)
1330 }
1331 }
1332 Err(e) => {
1333 warn!(error = %e, "Task analysis failed, using default roles");
1334 Ok(vec!["executor".to_string()])
1335 }
1336 }
1337 } else {
1338 Ok(vec!["executor".to_string()])
1340 }
1341 }
1342
1343 async fn execute_with_profile(&self, task: &str, role: &str) -> Result<String> {
1345 let profile = self
1346 .config
1347 .profiles
1348 .iter()
1349 .find(|p| p.name == role)
1350 .cloned()
1351 .unwrap_or_else(|| {
1352 if role == "supervisor" {
1353 WorkerProfile::supervisor()
1354 } else {
1355 WorkerProfile::executor()
1356 }
1357 });
1358
1359 info!(role = %role, profile = %profile.name, "Executing task with profile");
1360
1361 if let Some(ref hm) = self.health_monitor {
1363 if let Ok(mut hm_guard) = hm.try_write() {
1364 hm_guard.register_worker(role);
1365 hm_guard.task_started(role);
1366 }
1367 }
1368
1369 let llm = self.llm.as_ref().ok_or_else(|| {
1370 RavenClawsError::CommandExecution("No LLM provider available for worker".to_string())
1371 })?;
1372
1373 let mut memory = ConversationMemory::new(&profile.persona, profile.max_memory_messages);
1374
1375 let enriched_task = if let Some(ref bus) = self.message_bus {
1377 if let Ok(bus_guard) = bus.try_read() {
1378 let msg_context = bus_guard.format_for_prompt(role, 20);
1379 format!("{}{}", task, msg_context)
1380 } else {
1381 task.to_string()
1382 }
1383 } else {
1384 task.to_string()
1385 };
1386
1387 memory.add_user_message(&enriched_task);
1388
1389 let messages = memory.history().to_vec();
1390 let response = llm.chat(messages).await.map_err(|e| {
1391 if let Some(ref hm) = self.health_monitor {
1393 if let Ok(mut hm_guard) = hm.try_write() {
1394 hm_guard.task_failed(role);
1395 }
1396 }
1397 RavenClawsError::CommandExecution(format!("Worker {} failed: {}", role, e))
1398 })?;
1399
1400 let content = response
1401 .choices
1402 .first()
1403 .map(|c| c.message.content.clone())
1404 .unwrap_or_default();
1405
1406 if let Some(ref hm) = self.health_monitor {
1408 if let Ok(mut hm_guard) = hm.try_write() {
1409 hm_guard.task_completed(role);
1410 hm_guard.heartbeat(role);
1411 }
1412 }
1413
1414 if let Some(ref bus) = self.message_bus {
1416 if let Ok(mut bus_guard) = bus.try_write() {
1417 bus_guard.send(
1418 role,
1419 "*",
1420 MessageType::Result,
1421 &format!(
1422 "Completed task. Result ({} chars): {}",
1423 content.len(),
1424 &content[..content.len().min(500)]
1425 ),
1426 HashMap::new(),
1427 );
1428 }
1429 if let Some(ref hm) = self.health_monitor {
1431 if let Ok(mut hm_guard) = hm.try_write() {
1432 hm_guard.message_sent(role);
1433 }
1434 }
1435 }
1436
1437 let _ = self.audit_log.append(
1438 AuditEventType::AgentFinish,
1439 &format!("worker-{}", role),
1440 &format!("Worker {} completed task", role),
1441 Some(serde_json::json!({
1442 "role": role,
1443 "task_length": task.len(),
1444 "response_length": content.len(),
1445 })),
1446 );
1447
1448 Ok(content)
1449 }
1450
1451 async fn execute_direct(&self, task: &str) -> Result<String> {
1453 self.execute_with_profile(task, "executor").await
1454 }
1455
1456 #[allow(dead_code)]
1461 async fn recursive_supervise(&self, task: &str, roles: &[String]) -> Result<String> {
1462 let task = task.to_string();
1463 let roles = roles.to_vec();
1464 let this: &SwarmOrchestrator = self;
1465 Box::pin(async move { this.recursive_supervise_impl(&task, &roles).await }).await
1466 }
1467
1468 async fn recursive_supervise_impl(&self, task: &str, roles: &[String]) -> Result<String> {
1470 let llm = self.llm.as_ref().ok_or_else(|| {
1471 RavenClawsError::CommandExecution(
1472 "No LLM provider available for supervisor".to_string(),
1473 )
1474 })?;
1475
1476 if let Some(ref hm) = self.health_monitor {
1478 if let Ok(mut hm_guard) = hm.try_write() {
1479 hm_guard.register_worker("supervisor");
1480 hm_guard.task_started("supervisor");
1481 }
1482 }
1483
1484 let supervisor_profile = WorkerProfile::supervisor();
1485 let mut memory = ConversationMemory::new(
1486 &supervisor_profile.persona,
1487 supervisor_profile.max_memory_messages,
1488 );
1489
1490 let role_list = roles.join(", ");
1491
1492 let msg_context = if let Some(ref bus) = self.message_bus {
1494 if let Ok(bus_guard) = bus.try_read() {
1495 bus_guard.format_for_prompt("supervisor", 20)
1496 } else {
1497 String::new()
1498 }
1499 } else {
1500 String::new()
1501 };
1502
1503 let supervise_prompt = format!(
1504 "Decompose this task into subtasks and assign each to the most appropriate role.\n\
1505 Available roles: {}\n\n\
1506 Task: {}\n\n\
1507 For each subtask, respond with:\n\
1508 SUBTASK: <description>\n\
1509 ROLE: <role>\n\n\
1510 When all subtasks are complete, respond with:\n\
1511 FINAL: <aggregated result>\n\
1512 {}",
1513 role_list, task, msg_context
1514 );
1515
1516 memory.add_user_message(&supervise_prompt);
1517
1518 let mut subtask_results: Vec<String> = Vec::new();
1519 let mut iteration = 0;
1520 let max_iterations = supervisor_profile.max_iterations;
1521
1522 loop {
1523 iteration += 1;
1524 if iteration > max_iterations {
1525 warn!("Supervisor reached max iterations");
1526 break;
1527 }
1528
1529 let messages = memory.history().to_vec();
1530 let response = match llm.chat(messages).await {
1531 Ok(r) => r,
1532 Err(e) => {
1533 warn!(error = %e, "Supervisor LLM request failed");
1534 continue;
1535 }
1536 };
1537
1538 let content = response
1539 .choices
1540 .first()
1541 .map(|c| c.message.content.clone())
1542 .unwrap_or_default();
1543
1544 if iteration % 3 == 0 {
1546 if let Some(ref hm) = self.health_monitor {
1547 if let Ok(hm_guard) = hm.try_read() {
1548 let status = hm_guard.format_status();
1549 info!(health = %status, "Swarm health check");
1550 let dead = hm_guard.dead_workers_for_replacement();
1552 if !dead.is_empty() {
1553 warn!(dead_workers = ?dead, "Dead workers detected");
1554 }
1555 }
1556 }
1557 }
1558
1559 if content.contains("FINAL:") {
1561 let final_response = content
1562 .split("FINAL:")
1563 .nth(1)
1564 .unwrap_or("")
1565 .trim()
1566 .to_string();
1567 info!(
1568 iteration = iteration,
1569 subtasks = subtask_results.len(),
1570 "Supervisor completed"
1571 );
1572
1573 if let Some(ref hm) = self.health_monitor {
1575 if let Ok(mut hm_guard) = hm.try_write() {
1576 hm_guard.task_completed("supervisor");
1577 hm_guard.heartbeat("supervisor");
1578 }
1579 }
1580
1581 let _ = self.audit_log.append(
1582 AuditEventType::AgentFinish,
1583 "supervisor",
1584 "Supervisor completed recursive decomposition",
1585 Some(serde_json::json!({
1586 "iterations": iteration,
1587 "subtasks_completed": subtask_results.len(),
1588 "depth": self.current_depth,
1589 })),
1590 );
1591
1592 if !subtask_results.is_empty() {
1593 let aggregated = subtask_results.join("\n\n");
1594 return Ok(format!(
1595 "{}\n\n## Aggregated Results\n\n{}",
1596 final_response, aggregated
1597 ));
1598 }
1599 return Ok(final_response);
1600 }
1601
1602 if content.contains("SUBTASK:") {
1604 let subtask_block = content.split("SUBTASK:").nth(1).unwrap_or("");
1605 let subtask_lines: Vec<&str> = subtask_block.lines().take(4).collect();
1606
1607 let subtask_desc = subtask_lines.first().unwrap_or(&"").trim();
1608 let role = subtask_lines
1609 .iter()
1610 .find(|l| l.starts_with("ROLE:"))
1611 .and_then(|l| l.split(':').nth(1))
1612 .unwrap_or("executor")
1613 .trim()
1614 .to_lowercase();
1615
1616 if !subtask_desc.is_empty() {
1617 info!(role = %role, subtask = %subtask_desc, "Delegating subtask");
1618
1619 if let Some(ref bus) = self.message_bus {
1621 if let Ok(mut bus_guard) = bus.try_write() {
1622 bus_guard.send(
1623 "supervisor",
1624 &role,
1625 MessageType::Coordination,
1626 &format!("Delegating subtask: {}", subtask_desc),
1627 HashMap::new(),
1628 );
1629 }
1630 }
1631
1632 let result =
1633 if role == "supervisor" && self.current_depth < self.config.max_depth {
1634 let config = self.config.clone();
1636 let current_depth = self.current_depth + 1;
1637 let worker_count = self.worker_count + 1;
1638 let llm = self.llm.clone();
1639 let multi_llm = self.multi_llm.clone();
1640 let ravenfabric = self.ravenfabric.clone();
1641 let subtask = subtask_desc.to_string();
1642 let message_bus = self.message_bus.clone();
1643 let health_monitor = self.health_monitor.clone();
1644
1645 Box::pin(async move {
1646 let mut sub_orchestrator = SwarmOrchestrator {
1647 config,
1648 current_depth,
1649 worker_count,
1650 llm,
1651 multi_llm,
1652 ravenfabric,
1653 policy_engine: PolicyEngine::default_secure(),
1654 sandbox: Sandbox::default(),
1655 audit_log: AuditLog::new(format!(
1656 "sub-swarm-{}-{}",
1657 current_depth,
1658 std::process::id()
1659 )),
1660 registry: ToolRegistry::with_default_tools(),
1661 message_bus,
1662 health_monitor,
1663 };
1664
1665 let _ = sub_orchestrator.init().await;
1667 sub_orchestrator.orchestrate(&subtask).await
1668 })
1669 .await
1670 } else {
1671 self.execute_with_profile(subtask_desc, &role).await
1673 };
1674
1675 match result {
1676 Ok(result) => {
1677 info!(
1678 role = %role,
1679 chars = result.len(),
1680 "Subtask completed"
1681 );
1682 subtask_results.push(format!("[{}] {}", role, result));
1683
1684 memory.add_assistant_message(&format!(
1685 "Delegated subtask to {}: {}",
1686 role, subtask_desc
1687 ));
1688 memory.add_user_message(&format!("Result from {}: {}", role, result));
1689 }
1690 Err(e) => {
1691 warn!(role = %role, error = %e, "Subtask failed");
1692 if let Some(ref hm) = self.health_monitor {
1694 if let Ok(mut hm_guard) = hm.try_write() {
1695 hm_guard.task_failed(&role);
1696 hm_guard.record_error(&role);
1697 }
1698 }
1699 memory.add_assistant_message(&format!(
1700 "Subtask for {} failed: {}",
1701 role, e
1702 ));
1703 }
1704 }
1705 }
1706 } else {
1707 memory.add_assistant_message(&content);
1708 }
1709 }
1710
1711 if !subtask_results.is_empty() {
1713 let aggregated = subtask_results.join("\n\n");
1714 info!(
1715 "Supervisor aggregated {} subtask results",
1716 subtask_results.len()
1717 );
1718 return Ok(aggregated);
1719 }
1720
1721 Err(RavenClawsError::CommandExecution(
1722 "Supervisor completed without results".to_string(),
1723 ))
1724 }
1725}
1726
1727#[cfg(test)]
1732mod tests {
1733 use super::*;
1734
1735 #[test]
1736 fn test_worker_profile_default() {
1737 let profile = WorkerProfile::default();
1738 assert_eq!(profile.name, "default");
1739 assert!(profile.can_delegate);
1740 assert_eq!(profile.max_iterations, 10);
1741 assert_eq!(profile.max_memory_messages, 20);
1742 }
1743
1744 #[test]
1745 fn test_worker_profile_researcher() {
1746 let profile = WorkerProfile::researcher();
1747 assert_eq!(profile.name, "researcher");
1748 assert!(!profile.can_delegate);
1749 assert!(profile.allowed_tools.contains(&"web_fetch".to_string()));
1750 assert!(profile.allowed_tools.contains(&"web_search".to_string()));
1751 }
1752
1753 #[test]
1754 fn test_worker_profile_creative() {
1755 let profile = WorkerProfile::creative();
1756 assert_eq!(profile.name, "creative");
1757 assert!(!profile.can_delegate);
1758 }
1759
1760 #[test]
1761 fn test_worker_profile_executor() {
1762 let profile = WorkerProfile::executor();
1763 assert_eq!(profile.name, "executor");
1764 assert!(!profile.can_delegate);
1765 assert!(profile.allowed_tools.contains(&"shell_exec".to_string()));
1766 }
1767
1768 #[test]
1769 fn test_worker_profile_reviewer() {
1770 let profile = WorkerProfile::reviewer();
1771 assert_eq!(profile.name, "reviewer");
1772 assert!(!profile.can_delegate);
1773 }
1774
1775 #[test]
1776 fn test_worker_profile_supervisor() {
1777 let profile = WorkerProfile::supervisor();
1778 assert_eq!(profile.name, "supervisor");
1779 assert!(profile.can_delegate);
1780 assert!(profile.persona.contains("SUBTASK:"));
1781 assert!(profile.persona.contains("FINAL:"));
1782 }
1783
1784 #[test]
1785 fn test_swarm_config_default() {
1786 let config = SwarmConfig::default();
1787 assert_eq!(config.topology, SwarmTopology::Star);
1788 assert_eq!(config.max_depth, 3);
1789 assert_eq!(config.max_workers, 100);
1790 assert!(config.dynamic_role_assignment);
1791 assert_eq!(config.profiles.len(), 5);
1792 }
1793
1794 #[test]
1795 fn test_swarm_topology_serde() {
1796 let topologies = vec![
1797 SwarmTopology::Star,
1798 SwarmTopology::Mesh,
1799 SwarmTopology::Hierarchical,
1800 SwarmTopology::Hybrid,
1801 ];
1802
1803 for t in &topologies {
1804 let json = serde_json::to_string(t).unwrap();
1805 let deserialized: SwarmTopology = serde_json::from_str(&json).unwrap();
1806 assert_eq!(*t, deserialized);
1807 }
1808 }
1809
1810 #[test]
1811 fn test_swarm_config_serde() {
1812 let config = SwarmConfig::default();
1813 let json = serde_json::to_string_pretty(&config).unwrap();
1814 let deserialized: SwarmConfig = serde_json::from_str(&json).unwrap();
1815 assert_eq!(config.topology, deserialized.topology);
1816 assert_eq!(config.max_depth, deserialized.max_depth);
1817 assert_eq!(config.max_workers, deserialized.max_workers);
1818 assert_eq!(config.profiles.len(), deserialized.profiles.len());
1819 }
1820
1821 #[test]
1822 fn test_resource_limits_default() {
1823 let limits = ResourceLimits::default();
1824 assert_eq!(limits.max_tool_calls, 50);
1825 assert_eq!(limits.max_exec_secs, 300);
1826 }
1827
1828 #[test]
1829 fn test_swarm_orchestrator_new() {
1830 let config = SwarmConfig::default();
1831 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
1832 assert_eq!(orchestrator.current_depth(), 0);
1833 assert_eq!(orchestrator.worker_count(), 0);
1834 }
1835
1836 #[test]
1837 fn test_swarm_orchestrator_depth_limit() {
1838 let config = SwarmConfig {
1839 max_depth: 0, ..SwarmConfig::default()
1841 };
1842 let mut orchestrator = SwarmOrchestrator::new(config, None, None, None);
1843 orchestrator.current_depth = 0;
1844
1845 assert!(orchestrator.current_depth >= orchestrator.config.max_depth);
1847 }
1848
1849 #[tokio::test]
1850 async fn test_analyze_task_roles_fallback() {
1851 let config = SwarmConfig::default();
1852 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
1853
1854 let result = orchestrator.analyze_task_roles("test task").await;
1856 assert!(result.is_ok());
1857 }
1858
1859 #[test]
1860 fn test_worker_profile_custom() {
1861 let profile = WorkerProfile {
1862 name: "custom".to_string(),
1863 description: "Custom worker".to_string(),
1864 persona: "You are a custom worker.".to_string(),
1865 allowed_tools: vec!["read_file".to_string()],
1866 provider: Some("openai".to_string()),
1867 model: Some("gpt-4".to_string()),
1868 max_iterations: 5,
1869 max_memory_messages: 10,
1870 can_delegate: false,
1871 resource_limits: ResourceLimits {
1872 max_tool_calls: 10,
1873 max_exec_secs: 60,
1874 },
1875 };
1876
1877 assert_eq!(profile.name, "custom");
1878 assert_eq!(profile.provider, Some("openai".to_string()));
1879 assert_eq!(profile.model, Some("gpt-4".to_string()));
1880 assert_eq!(profile.max_iterations, 5);
1881 assert_eq!(profile.resource_limits.max_tool_calls, 10);
1882 }
1883
1884 #[test]
1885 fn test_swarm_config_custom_profiles() {
1886 let config = SwarmConfig {
1887 profiles: vec![WorkerProfile::researcher(), WorkerProfile::executor()],
1888 topology: SwarmTopology::Hierarchical,
1889 max_depth: 5,
1890 max_workers: 50,
1891 ..SwarmConfig::default()
1892 };
1893
1894 assert_eq!(config.profiles.len(), 2);
1895 assert_eq!(config.topology, SwarmTopology::Hierarchical);
1896 assert_eq!(config.max_depth, 5);
1897 assert_eq!(config.max_workers, 50);
1898 }
1899
1900 #[test]
1903 fn test_message_bus_new() {
1904 let bus = AgentMessageBus::new(100);
1905 assert!(bus.is_empty());
1906 assert_eq!(bus.len(), 0);
1907 }
1908
1909 #[test]
1910 fn test_message_bus_send_and_receive() {
1911 let mut bus = AgentMessageBus::new(100);
1912
1913 let id = bus.send(
1914 "researcher",
1915 "executor",
1916 MessageType::Information,
1917 "Found relevant data",
1918 HashMap::new(),
1919 );
1920
1921 assert!(!id.is_empty());
1922 assert_eq!(bus.len(), 1);
1923
1924 let msgs = bus.messages_for("executor");
1925 assert_eq!(msgs.len(), 1);
1926 assert_eq!(msgs[0].content, "Found relevant data");
1927 assert_eq!(msgs[0].sender, "researcher");
1928 }
1929
1930 #[test]
1931 fn test_message_bus_broadcast() {
1932 let mut bus = AgentMessageBus::new(100);
1933
1934 bus.send(
1935 "supervisor",
1936 "*",
1937 MessageType::Coordination,
1938 "All workers proceed",
1939 HashMap::new(),
1940 );
1941
1942 assert_eq!(bus.messages_for("researcher").len(), 1);
1944 assert_eq!(bus.messages_for("executor").len(), 1);
1945 assert_eq!(bus.messages_for("reviewer").len(), 1);
1946 }
1947
1948 #[test]
1949 fn test_message_bus_filter_by_type() {
1950 let mut bus = AgentMessageBus::new(100);
1951
1952 bus.send(
1953 "researcher",
1954 "*",
1955 MessageType::Information,
1956 "Data found",
1957 HashMap::new(),
1958 );
1959 bus.send(
1960 "executor",
1961 "supervisor",
1962 MessageType::Result,
1963 "Task done",
1964 HashMap::new(),
1965 );
1966 bus.send(
1967 "executor",
1968 "supervisor",
1969 MessageType::Error,
1970 "Failed",
1971 HashMap::new(),
1972 );
1973
1974 let errors = bus.messages_of_type(&MessageType::Error);
1975 assert_eq!(errors.len(), 1);
1976 assert_eq!(errors[0].content, "Failed");
1977
1978 let results = bus.messages_of_type(&MessageType::Result);
1979 assert_eq!(results.len(), 1);
1980 }
1981
1982 #[test]
1983 fn test_message_bus_max_messages() {
1984 let mut bus = AgentMessageBus::new(5); for i in 0..10 {
1987 bus.send(
1988 "worker",
1989 "*",
1990 MessageType::Generic,
1991 &format!("Message {}", i),
1992 HashMap::new(),
1993 );
1994 }
1995
1996 assert_eq!(bus.len(), 5);
1998 let all = bus.all_messages();
1999 assert_eq!(all[0].content, "Message 5");
2000 assert_eq!(all[4].content, "Message 9");
2001 }
2002
2003 #[test]
2004 fn test_message_bus_format_for_prompt() {
2005 let mut bus = AgentMessageBus::new(100);
2006
2007 bus.send(
2008 "researcher",
2009 "*",
2010 MessageType::Information,
2011 "Found key insight",
2012 HashMap::new(),
2013 );
2014 bus.send(
2015 "executor",
2016 "supervisor",
2017 MessageType::Result,
2018 "Implementation complete",
2019 HashMap::new(),
2020 );
2021
2022 let prompt = bus.format_for_prompt("supervisor", 10);
2023 assert!(prompt.contains("Inter-Agent Messages"));
2024 assert!(prompt.contains("researcher"));
2025 assert!(prompt.contains("executor"));
2026 assert!(prompt.contains("Found key insight"));
2027 }
2028
2029 #[test]
2030 fn test_message_bus_empty_format() {
2031 let bus = AgentMessageBus::new(100);
2032 let prompt = bus.format_for_prompt("supervisor", 10);
2033 assert!(prompt.is_empty());
2034 }
2035
2036 #[test]
2037 fn test_message_type_display() {
2038 assert_eq!(format!("{}", MessageType::Information), "information");
2039 assert_eq!(format!("{}", MessageType::Question), "question");
2040 assert_eq!(format!("{}", MessageType::Result), "result");
2041 assert_eq!(format!("{}", MessageType::Error), "error");
2042 assert_eq!(format!("{}", MessageType::Coordination), "coordination");
2043 assert_eq!(format!("{}", MessageType::Generic), "generic");
2044 }
2045
2046 #[test]
2047 fn test_message_bus_messages_from() {
2048 let mut bus = AgentMessageBus::new(100);
2049
2050 bus.send(
2051 "researcher",
2052 "*",
2053 MessageType::Information,
2054 "Data A",
2055 HashMap::new(),
2056 );
2057 bus.send(
2058 "researcher",
2059 "executor",
2060 MessageType::Information,
2061 "Data B",
2062 HashMap::new(),
2063 );
2064 bus.send(
2065 "executor",
2066 "supervisor",
2067 MessageType::Result,
2068 "Done",
2069 HashMap::new(),
2070 );
2071
2072 let from_researcher = bus.messages_from("researcher");
2073 assert_eq!(from_researcher.len(), 2);
2074
2075 let from_executor = bus.messages_from("executor");
2076 assert_eq!(from_executor.len(), 1);
2077 }
2078
2079 #[test]
2080 fn test_orchestrator_new_with_communication() {
2081 let config = SwarmConfig {
2082 enable_agent_communication: true,
2083 ..SwarmConfig::default()
2084 };
2085 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2086 assert!(orchestrator.message_bus.is_some());
2087 }
2088
2089 #[test]
2090 fn test_orchestrator_new_without_communication() {
2091 let config = SwarmConfig {
2092 enable_agent_communication: false,
2093 ..SwarmConfig::default()
2094 };
2095 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2096 assert!(orchestrator.message_bus.is_none());
2097 }
2098
2099 #[test]
2100 fn test_swarm_config_communication_default() {
2101 let config = SwarmConfig::default();
2102 assert!(!config.enable_agent_communication); assert!(!config.enable_health_monitoring); }
2105
2106 #[test]
2109 fn test_health_status_display() {
2110 assert_eq!(format!("{}", WorkerHealthStatus::Healthy), "healthy");
2111 assert_eq!(format!("{}", WorkerHealthStatus::Degraded), "degraded");
2112 assert_eq!(format!("{}", WorkerHealthStatus::Unhealthy), "unhealthy");
2113 assert_eq!(format!("{}", WorkerHealthStatus::Dead), "dead");
2114 }
2115
2116 #[test]
2117 fn test_health_monitor_default() {
2118 let hm = SwarmHealthMonitor::default();
2119 assert_eq!(hm.heartbeat_interval_secs, 5);
2120 assert_eq!(hm.max_missed_beats, 3);
2121 assert_eq!(hm.replacement_timeout_secs, 30);
2122 assert_eq!(hm.worker_count(), 0);
2123 }
2124
2125 #[test]
2126 fn test_health_monitor_register_worker() {
2127 let mut hm = SwarmHealthMonitor::default();
2128 hm.register_worker("researcher");
2129 assert_eq!(hm.worker_count(), 1);
2130
2131 let telemetry = hm.worker_telemetry("researcher");
2132 assert!(telemetry.is_some());
2133 assert_eq!(telemetry.unwrap().role, "researcher");
2134 }
2135
2136 #[test]
2137 fn test_health_monitor_heartbeat() {
2138 let mut hm = SwarmHealthMonitor::default();
2139 hm.register_worker("executor");
2140 hm.heartbeat("executor");
2141
2142 let telemetry = hm.worker_telemetry("executor").unwrap();
2143 assert_eq!(telemetry.status, WorkerHealthStatus::Healthy);
2144 }
2145
2146 #[test]
2147 fn test_health_monitor_task_lifecycle() {
2148 let mut hm = SwarmHealthMonitor::default();
2149 hm.register_worker("executor");
2150 hm.task_started("executor");
2151 hm.task_completed("executor");
2152
2153 let telemetry = hm.worker_telemetry("executor").unwrap();
2154 assert_eq!(telemetry.tasks_completed, 1);
2155 assert_eq!(telemetry.tasks_failed, 0);
2156 }
2157
2158 #[test]
2159 fn test_health_monitor_task_failure() {
2160 let mut hm = SwarmHealthMonitor::default();
2161 hm.register_worker("executor");
2162 hm.task_started("executor");
2163 hm.task_failed("executor");
2164
2165 let telemetry = hm.worker_telemetry("executor").unwrap();
2166 assert_eq!(telemetry.tasks_completed, 0);
2167 assert_eq!(telemetry.tasks_failed, 1);
2168 assert_eq!(telemetry.error_count, 1);
2169 }
2170
2171 #[test]
2172 fn test_health_monitor_metrics_empty() {
2173 let hm = SwarmHealthMonitor::default();
2174 let metrics = hm.metrics();
2175 assert_eq!(metrics.total_workers, 0);
2176 assert_eq!(metrics.healthy_workers, 0);
2177 assert_eq!(metrics.total_tasks_completed, 0);
2178 assert_eq!(metrics.task_throughput, 0.0);
2179 }
2180
2181 #[test]
2182 fn test_health_monitor_metrics_with_workers() {
2183 let mut hm = SwarmHealthMonitor::default();
2184 hm.register_worker("researcher");
2185 hm.register_worker("executor");
2186 hm.task_started("executor");
2187 hm.task_completed("executor");
2188 hm.task_started("researcher");
2189 hm.task_completed("researcher");
2190
2191 let metrics = hm.metrics();
2192 assert_eq!(metrics.total_workers, 2);
2193 assert_eq!(metrics.healthy_workers, 2);
2194 assert_eq!(metrics.total_tasks_completed, 2);
2195 }
2196
2197 #[test]
2198 fn test_health_monitor_dead_worker_detection() {
2199 let mut hm = SwarmHealthMonitor {
2200 heartbeat_interval_secs: 1,
2201 max_missed_beats: 1,
2202 replacement_timeout_secs: 0, ..SwarmHealthMonitor::default()
2204 };
2205
2206 hm.register_worker("executor");
2207 if let Some(hb) = hm.heartbeats.get_mut("executor") {
2209 hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(10);
2210 }
2211
2212 let dead = hm.check_health();
2213 assert!(!dead.is_empty());
2214 assert_eq!(dead[0], "executor");
2215 }
2216
2217 #[test]
2218 fn test_health_monitor_degraded_detection() {
2219 let mut hm = SwarmHealthMonitor {
2220 heartbeat_interval_secs: 1,
2221 max_missed_beats: 3,
2222 ..SwarmHealthMonitor::default()
2223 };
2224
2225 hm.register_worker("executor");
2226 if let Some(hb) = hm.heartbeats.get_mut("executor") {
2228 hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(3);
2229 }
2230
2231 let _dead = hm.check_health();
2232 let telemetry = hm.worker_telemetry("executor").unwrap();
2233 assert_eq!(telemetry.status, WorkerHealthStatus::Degraded);
2234 }
2235
2236 #[test]
2237 fn test_health_monitor_message_tracking() {
2238 let mut hm = SwarmHealthMonitor::default();
2239 hm.register_worker("researcher");
2240 hm.message_sent("researcher");
2241 hm.message_sent("researcher");
2242 hm.message_received("researcher");
2243
2244 let telemetry = hm.worker_telemetry("researcher").unwrap();
2245 assert_eq!(telemetry.messages_sent, 2);
2246 assert_eq!(telemetry.messages_received, 1);
2247 }
2248
2249 #[test]
2250 fn test_health_monitor_error_tracking() {
2251 let mut hm = SwarmHealthMonitor::default();
2252 hm.register_worker("executor");
2253 hm.record_error("executor");
2254 hm.record_error("executor");
2255 hm.record_error("executor");
2256
2257 let telemetry = hm.worker_telemetry("executor").unwrap();
2258 assert_eq!(telemetry.error_count, 3);
2259 }
2260
2261 #[test]
2262 fn test_health_monitor_format_status() {
2263 let hm = SwarmHealthMonitor::default();
2264 let status = hm.format_status();
2265 assert!(status.contains("Swarm Health:"));
2266 assert!(status.contains("healthy"));
2267 }
2268
2269 #[test]
2270 fn test_health_monitor_all_worker_telemetry() {
2271 let mut hm = SwarmHealthMonitor::default();
2272 hm.register_worker("researcher");
2273 hm.register_worker("executor");
2274 hm.register_worker("reviewer");
2275
2276 let all = hm.all_worker_telemetry();
2277 assert_eq!(all.len(), 3);
2278 }
2279
2280 #[test]
2281 fn test_health_monitor_remove_worker() {
2282 let mut hm = SwarmHealthMonitor::default();
2283 hm.register_worker("executor");
2284 assert_eq!(hm.worker_count(), 1);
2285 hm.remove_worker("executor");
2286 assert_eq!(hm.worker_count(), 0);
2287 }
2288
2289 #[test]
2290 fn test_orchestrator_new_with_health_monitoring() {
2291 let config = SwarmConfig {
2292 enable_health_monitoring: true,
2293 ..SwarmConfig::default()
2294 };
2295 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2296 assert!(orchestrator.health_monitor.is_some());
2297 }
2298
2299 #[test]
2300 fn test_orchestrator_new_without_health_monitoring() {
2301 let config = SwarmConfig {
2302 enable_health_monitoring: false,
2303 ..SwarmConfig::default()
2304 };
2305 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2306 assert!(orchestrator.health_monitor.is_none());
2307 }
2308
2309 #[test]
2310 fn test_health_metrics_accessor() {
2311 let config = SwarmConfig {
2312 enable_health_monitoring: true,
2313 ..SwarmConfig::default()
2314 };
2315 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2316 let metrics = orchestrator.health_metrics();
2317 assert!(metrics.is_some());
2318 assert_eq!(metrics.unwrap().total_workers, 0);
2319 }
2320
2321 #[test]
2322 fn test_worker_telemetry_accessor() {
2323 let config = SwarmConfig {
2324 enable_health_monitoring: true,
2325 ..SwarmConfig::default()
2326 };
2327 let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2328 let telemetry = orchestrator.worker_telemetry();
2329 assert!(telemetry.is_some());
2330 assert!(telemetry.unwrap().is_empty());
2331 }
2332
2333 #[test]
2334 fn test_health_monitor_new_custom() {
2335 let hm = SwarmHealthMonitor::new(10, 5, 60);
2336 assert_eq!(hm.heartbeat_interval_secs, 10);
2337 assert_eq!(hm.max_missed_beats, 5);
2338 assert_eq!(hm.replacement_timeout_secs, 60);
2339 }
2340
2341 #[test]
2342 fn test_health_monitor_dead_workers_for_replacement() {
2343 let mut hm = SwarmHealthMonitor {
2344 heartbeat_interval_secs: 1,
2345 max_missed_beats: 1,
2346 replacement_timeout_secs: 0,
2347 ..SwarmHealthMonitor::default()
2348 };
2349
2350 hm.register_worker("executor");
2351 if let Some(hb) = hm.heartbeats.get_mut("executor") {
2353 hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(30);
2354 hb.status = WorkerHealthStatus::Dead;
2355 }
2356
2357 let candidates = hm.dead_workers_for_replacement();
2358 assert_eq!(candidates.len(), 1);
2359 assert_eq!(candidates[0], "executor");
2360 }
2361
2362 #[test]
2363 fn test_health_monitor_metrics_error_rate() {
2364 let mut hm = SwarmHealthMonitor::default();
2365 hm.register_worker("executor");
2366 hm.task_started("executor");
2367 hm.task_completed("executor");
2368 hm.task_started("executor");
2369 hm.task_failed("executor");
2370
2371 let metrics = hm.metrics();
2372 assert_eq!(metrics.total_tasks_completed, 1);
2373 assert_eq!(metrics.total_tasks_failed, 1);
2374 assert!(metrics.error_rate > 0.0);
2375 }
2376
2377 #[test]
2378 fn test_health_monitor_metrics_utilization() {
2379 let mut hm = SwarmHealthMonitor::default();
2380 hm.register_worker("busy_worker");
2381 hm.register_worker("idle_worker");
2382
2383 if let Some(hb) = hm.heartbeats.get_mut("busy_worker") {
2385 hb.is_busy = true;
2386 }
2387
2388 let metrics = hm.metrics();
2389 assert_eq!(metrics.total_workers, 2);
2390 assert!((metrics.worker_utilization - 0.5).abs() < f64::EPSILON);
2391 }
2392
2393 #[test]
2394 fn test_health_monitor_iteration_tracking() {
2395 let mut hm = SwarmHealthMonitor::default();
2396 hm.register_worker("executor");
2397 hm.task_started("executor");
2398 hm.task_completed("executor");
2399 hm.task_started("executor");
2400 hm.task_completed("executor");
2401 hm.task_started("executor");
2402
2403 let telemetry = hm.worker_telemetry("executor").unwrap();
2404 assert_eq!(telemetry.iteration, 3);
2405 assert_eq!(telemetry.tasks_completed, 2);
2406 }
2407}