Skip to main content

ravenclaws/
swarm.rs

1//! Swarm orchestration — self-provisioning sub-agents with recursive supervision
2//!
3//! RavenClaws's swarm orchestrator enables truly autonomous multi-agent coordination:
4//! supervisors can spawn sub-supervisors, creating recursive task decomposition
5//! trees of arbitrary depth. Each worker has a declarative profile specifying its
6//! persona, tools, provider, model, and resource limits.
7//!
8//! # Architecture
9//!
10//! ```text
11//! SwarmOrchestrator
12//!   ├── topology: Star | Mesh | Hierarchical | Hybrid
13//!   ├── max_depth: recursion limit (default: 3)
14//!   ├── max_workers: total worker cap (default: 100)
15//!   ├── profiles: Vec<WorkerProfile> — available worker types
16//!   ├── message_bus: AgentMessageBus — inter-agent communication
17//!   └── orchestrate(task):
18//!       1. Analyze task → determine required roles
19//!       2. Assign roles → select profiles
20//!       3. Spawn workers → local or remote via RavenFabric
21//!       4. Coordinate → recursive supervisor if task is complex
22//!       5. Aggregate → collect and merge results
23//! ```
24//!
25//! # Inter-Agent Communication
26//!
27//! When `enable_agent_communication` is set, swarm workers can exchange messages
28//! through a shared `AgentMessageBus`. Messages are routed by role and include
29//! typed payloads (information, question, result, error, coordination).
30//! All messages are audited and timestamped.
31//!
32//! # Integration
33//!
34//! - CLI: `--swarm-topology <star|mesh|hierarchical|hybrid> --max-workers <N>`
35//! - Config: `[swarm]` section in `ravenclaws.toml`
36//! - Supervisor mode: automatically uses orchestrator when `max_depth > 0`
37
38use crate::agent::ConversationMemory;
39use crate::audit::{AuditEventType, AuditLog};
40use crate::error::{RavenClawsError, Result};
41use crate::llm::{ChatMessage, LLMProviderTrait, MultiModelManager};
42use crate::policy::PolicyEngine;
43use crate::ravenfabric::RavenFabricClient;
44use crate::sandbox::Sandbox;
45use crate::tools::ToolRegistry;
46use serde::{Deserialize, Serialize};
47use std::collections::HashMap;
48use std::sync::Arc;
49use tokio::sync::RwLock;
50use tracing::{info, instrument, warn};
51
52// ---------------------------------------------------------------------------
53// Inter-agent communication
54// ---------------------------------------------------------------------------
55
56/// A message exchanged between swarm workers.
57///
58/// Messages are typed so workers can filter and respond appropriately.
59/// All messages are timestamped and include the sender's role for auditability.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct AgentMessage {
62    /// Unique message ID
63    pub id: String,
64
65    /// Sender's worker role (e.g., "researcher", "executor")
66    pub sender: String,
67
68    /// Recipient's worker role ("*" = broadcast to all)
69    pub recipient: String,
70
71    /// Message type
72    pub msg_type: MessageType,
73
74    /// Message content
75    pub content: String,
76
77    /// ISO 8601 timestamp
78    pub timestamp: String,
79
80    /// Optional metadata (e.g., task ID, iteration number)
81    #[serde(default)]
82    pub metadata: HashMap<String, String>,
83}
84
85/// The type of an inter-agent message.
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
87pub enum MessageType {
88    /// Sharing information or findings
89    Information,
90    /// Asking a question or requesting input
91    Question,
92    /// Reporting a result or completion
93    Result,
94    /// Reporting an error or issue
95    Error,
96    /// Coordination message (e.g., task re-assignment)
97    Coordination,
98    /// General purpose message
99    Generic,
100}
101
102impl std::fmt::Display for MessageType {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        match self {
105            MessageType::Information => write!(f, "information"),
106            MessageType::Question => write!(f, "question"),
107            MessageType::Result => write!(f, "result"),
108            MessageType::Error => write!(f, "error"),
109            MessageType::Coordination => write!(f, "coordination"),
110            MessageType::Generic => write!(f, "generic"),
111        }
112    }
113}
114
115/// A shared message bus for inter-agent communication within a swarm.
116///
117/// The message bus is shared across all workers in a swarm via `Arc<RwLock<>>`.
118/// Workers can send messages to specific roles or broadcast to all.
119/// Messages are retained for the lifetime of the swarm and can be queried.
120#[derive(Debug, Clone)]
121pub struct AgentMessageBus {
122    /// All messages sent in this swarm session
123    messages: Vec<AgentMessage>,
124    /// Maximum messages to retain (0 = unlimited)
125    max_messages: usize,
126}
127
128#[allow(dead_code)]
129impl AgentMessageBus {
130    /// Create a new message bus with the given capacity.
131    pub fn new(max_messages: usize) -> Self {
132        Self {
133            messages: Vec::new(),
134            max_messages,
135        }
136    }
137
138    /// Send a message to a specific recipient (or "*" for broadcast).
139    pub fn send(
140        &mut self,
141        sender: &str,
142        recipient: &str,
143        msg_type: MessageType,
144        content: &str,
145        metadata: HashMap<String, String>,
146    ) -> String {
147        let id = uuid::Uuid::new_v4().to_string();
148        let timestamp = chrono::Utc::now().to_rfc3339();
149
150        let msg = AgentMessage {
151            id: id.clone(),
152            sender: sender.to_string(),
153            recipient: recipient.to_string(),
154            msg_type,
155            content: content.to_string(),
156            timestamp,
157            metadata,
158        };
159
160        self.messages.push(msg);
161
162        // Trim oldest messages if over capacity
163        if self.max_messages > 0 && self.messages.len() > self.max_messages {
164            self.messages.remove(0);
165        }
166
167        id
168    }
169
170    /// Get all messages addressed to a specific role (or "*" for broadcast).
171    pub fn messages_for(&self, role: &str) -> Vec<&AgentMessage> {
172        self.messages
173            .iter()
174            .filter(|m| m.recipient == role || m.recipient == "*")
175            .collect()
176    }
177
178    /// Get all messages from a specific sender.
179    pub fn messages_from(&self, sender: &str) -> Vec<&AgentMessage> {
180        self.messages
181            .iter()
182            .filter(|m| m.sender == sender)
183            .collect()
184    }
185
186    /// Get all messages of a specific type.
187    pub fn messages_of_type(&self, msg_type: &MessageType) -> Vec<&AgentMessage> {
188        self.messages
189            .iter()
190            .filter(|m| m.msg_type == *msg_type)
191            .collect()
192    }
193
194    /// Get all messages in the bus.
195    pub fn all_messages(&self) -> &[AgentMessage] {
196        &self.messages
197    }
198
199    /// Get the number of messages in the bus.
200    pub fn len(&self) -> usize {
201        self.messages.len()
202    }
203
204    /// Check if the bus is empty.
205    pub fn is_empty(&self) -> bool {
206        self.messages.is_empty()
207    }
208
209    /// Format recent messages for inclusion in an LLM prompt.
210    ///
211    /// Returns a string with the last N messages formatted for the agent's context.
212    pub fn format_for_prompt(&self, role: &str, max_messages: usize) -> String {
213        let relevant: Vec<&AgentMessage> = self
214            .messages
215            .iter()
216            .filter(|m| m.recipient == role || m.recipient == "*" || m.sender == role)
217            .rev()
218            .take(max_messages)
219            .collect();
220
221        if relevant.is_empty() {
222            return String::new();
223        }
224
225        let mut output = String::from("\n\n--- Inter-Agent Messages ---\n");
226        for msg in relevant.iter().rev() {
227            output.push_str(&format!(
228                "[{}] {} → {} ({}): {}\n",
229                msg.msg_type, msg.sender, msg.recipient, msg.timestamp, msg.content
230            ));
231        }
232        output.push_str("--- End Inter-Agent Messages ---\n");
233        output
234    }
235}
236
237// ---------------------------------------------------------------------------
238// Swarm health monitoring
239// ---------------------------------------------------------------------------
240
241/// Health status of a single swarm worker.
242#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
243pub enum WorkerHealthStatus {
244    /// Worker is active and responding to heartbeats
245    Healthy,
246    /// Worker is running but has not responded to recent heartbeats
247    Degraded,
248    /// Worker has not responded within the timeout window
249    Unhealthy,
250    /// Worker has been terminated and needs replacement
251    Dead,
252}
253
254impl std::fmt::Display for WorkerHealthStatus {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        match self {
257            WorkerHealthStatus::Healthy => write!(f, "healthy"),
258            WorkerHealthStatus::Degraded => write!(f, "degraded"),
259            WorkerHealthStatus::Unhealthy => write!(f, "unhealthy"),
260            WorkerHealthStatus::Dead => write!(f, "dead"),
261        }
262    }
263}
264
265/// Telemetry snapshot for a single worker.
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct WorkerTelemetry {
268    /// Worker role name
269    pub role: String,
270    /// Current health status
271    pub status: WorkerHealthStatus,
272    /// Tasks completed by this worker
273    pub tasks_completed: u64,
274    /// Tasks that failed
275    pub tasks_failed: u64,
276    /// Total errors encountered
277    pub error_count: u64,
278    /// Average task duration in milliseconds
279    pub avg_duration_ms: f64,
280    /// Last heartbeat timestamp (ISO 8601)
281    pub last_heartbeat: String,
282    /// When the worker was spawned (ISO 8601)
283    pub spawned_at: String,
284    /// Number of messages sent via the bus
285    pub messages_sent: u64,
286    /// Number of messages received via the bus
287    pub messages_received: u64,
288    /// Current iteration count
289    pub iteration: u64,
290}
291
292/// Aggregate swarm health metrics.
293#[derive(Debug, Clone, Serialize, Deserialize)]
294pub struct SwarmMetrics {
295    /// Total workers in the swarm
296    pub total_workers: usize,
297    /// Healthy workers
298    pub healthy_workers: usize,
299    /// Degraded workers
300    pub degraded_workers: usize,
301    /// Unhealthy workers
302    pub unhealthy_workers: usize,
303    /// Dead workers
304    pub dead_workers: usize,
305    /// Total tasks completed across all workers
306    pub total_tasks_completed: u64,
307    /// Total tasks failed across all workers
308    pub total_tasks_failed: u64,
309    /// Total errors across all workers
310    pub total_errors: u64,
311    /// Overall average task duration in milliseconds
312    pub overall_avg_duration_ms: f64,
313    /// Task throughput (tasks per second over the last window)
314    pub task_throughput: f64,
315    /// Communication latency (average ms between send and receive)
316    pub communication_latency_ms: f64,
317    /// Worker utilization (0.0–1.0, ratio of busy time to total time)
318    pub worker_utilization: f64,
319    /// Error rate (errors per task)
320    pub error_rate: f64,
321    /// Timestamp of this metrics snapshot
322    pub timestamp: String,
323}
324
325/// Per-worker heartbeat tracker.
326#[derive(Debug, Clone)]
327struct WorkerHeartbeat {
328    /// Worker role
329    role: String,
330    /// When the worker was spawned
331    spawned_at: chrono::DateTime<chrono::Utc>,
332    /// Last heartbeat received
333    last_heartbeat: chrono::DateTime<chrono::Utc>,
334    /// Number of missed heartbeats
335    missed_beats: u32,
336    /// Current health status
337    status: WorkerHealthStatus,
338    /// Tasks completed
339    tasks_completed: u64,
340    /// Tasks failed
341    tasks_failed: u64,
342    /// Error count
343    error_count: u64,
344    /// Total duration of all completed tasks in milliseconds
345    total_duration_ms: f64,
346    /// Number of tasks with recorded duration
347    duration_samples: u64,
348    /// Messages sent
349    messages_sent: u64,
350    /// Messages received
351    messages_received: u64,
352    /// Current iteration
353    iteration: u64,
354    /// Whether this worker is currently busy
355    is_busy: bool,
356    /// When the current task started
357    task_started_at: Option<chrono::DateTime<chrono::Utc>>,
358}
359
360/// Swarm health monitor — tracks worker health, detects failures, and
361/// provides aggregate telemetry.
362///
363/// # Heartbeat Protocol
364///
365/// Workers send heartbeats at regular intervals. If a worker misses
366/// `max_missed_beats` consecutive heartbeats, it is marked as `Unhealthy`.
367/// After `max_missed_beats * 2` missed beats, it is marked as `Dead`.
368///
369/// # Dead-Agent Detection
370///
371/// The health monitor periodically scans all tracked workers. Workers that
372/// have been `Dead` for longer than `replacement_timeout_secs` are candidates
373/// for automatic replacement.
374///
375/// # Metrics
376///
377/// The monitor tracks task throughput, worker utilization, error rates, and
378/// communication latency. These are exposed via `metrics()`.
379#[derive(Debug, Clone)]
380pub struct SwarmHealthMonitor {
381    /// Per-worker heartbeat trackers
382    heartbeats: HashMap<String, WorkerHeartbeat>,
383    /// Heartbeat interval in seconds (default: 5)
384    heartbeat_interval_secs: u64,
385    /// Max missed heartbeats before marking unhealthy (default: 3)
386    max_missed_beats: u32,
387    /// Time in seconds before a dead worker is replaced (default: 30)
388    replacement_timeout_secs: u64,
389    /// When monitoring started
390    started_at: chrono::DateTime<chrono::Utc>,
391    /// Total messages sent across all workers (for latency calculation)
392    total_messages_sent: u64,
393    /// Total messages received across all workers
394    total_messages_received: u64,
395    /// Sum of all task durations for throughput calculation
396    total_duration_ms: f64,
397    /// Total tasks completed across all workers
398    total_tasks_completed: u64,
399}
400
401impl Default for SwarmHealthMonitor {
402    fn default() -> Self {
403        Self {
404            heartbeats: HashMap::new(),
405            heartbeat_interval_secs: 5,
406            max_missed_beats: 3,
407            replacement_timeout_secs: 30,
408            started_at: chrono::Utc::now(),
409            total_messages_sent: 0,
410            total_messages_received: 0,
411            total_duration_ms: 0.0,
412            total_tasks_completed: 0,
413        }
414    }
415}
416
417#[allow(dead_code)]
418impl SwarmHealthMonitor {
419    /// Create a new health monitor with custom parameters.
420    pub fn new(
421        heartbeat_interval_secs: u64,
422        max_missed_beats: u32,
423        replacement_timeout_secs: u64,
424    ) -> Self {
425        Self {
426            heartbeats: HashMap::new(),
427            heartbeat_interval_secs,
428            max_missed_beats,
429            replacement_timeout_secs,
430            started_at: chrono::Utc::now(),
431            total_messages_sent: 0,
432            total_messages_received: 0,
433            total_duration_ms: 0.0,
434            total_tasks_completed: 0,
435        }
436    }
437
438    /// Register a new worker for health tracking.
439    pub fn register_worker(&mut self, role: &str) {
440        let now = chrono::Utc::now();
441        self.heartbeats
442            .entry(role.to_string())
443            .or_insert(WorkerHeartbeat {
444                role: role.to_string(),
445                spawned_at: now,
446                last_heartbeat: now,
447                missed_beats: 0,
448                status: WorkerHealthStatus::Healthy,
449                tasks_completed: 0,
450                tasks_failed: 0,
451                error_count: 0,
452                total_duration_ms: 0.0,
453                duration_samples: 0,
454                messages_sent: 0,
455                messages_received: 0,
456                iteration: 0,
457                is_busy: false,
458                task_started_at: None,
459            });
460    }
461
462    /// Record a heartbeat from a worker.
463    pub fn heartbeat(&mut self, role: &str) {
464        if let Some(hb) = self.heartbeats.get_mut(role) {
465            hb.last_heartbeat = chrono::Utc::now();
466            hb.missed_beats = 0;
467            hb.status = WorkerHealthStatus::Healthy;
468        }
469    }
470
471    /// Record that a worker started a task.
472    pub fn task_started(&mut self, role: &str) {
473        if let Some(hb) = self.heartbeats.get_mut(role) {
474            hb.is_busy = true;
475            hb.task_started_at = Some(chrono::Utc::now());
476            hb.iteration += 1;
477        }
478    }
479
480    /// Record that a worker completed a task successfully.
481    pub fn task_completed(&mut self, role: &str) {
482        if let Some(hb) = self.heartbeats.get_mut(role) {
483            hb.tasks_completed += 1;
484            hb.is_busy = false;
485
486            if let Some(started) = hb.task_started_at {
487                let duration = (chrono::Utc::now() - started).num_milliseconds() as f64;
488                hb.total_duration_ms += duration;
489                hb.duration_samples += 1;
490                self.total_duration_ms += duration;
491            }
492            self.total_tasks_completed += 1;
493            hb.task_started_at = None;
494        }
495    }
496
497    /// Record that a worker's task failed.
498    pub fn task_failed(&mut self, role: &str) {
499        if let Some(hb) = self.heartbeats.get_mut(role) {
500            hb.tasks_failed += 1;
501            hb.error_count += 1;
502            hb.is_busy = false;
503            hb.task_started_at = None;
504        }
505    }
506
507    /// Record an error from a worker.
508    pub fn record_error(&mut self, role: &str) {
509        if let Some(hb) = self.heartbeats.get_mut(role) {
510            hb.error_count += 1;
511        }
512    }
513
514    /// Record a message sent by a worker.
515    pub fn message_sent(&mut self, role: &str) {
516        if let Some(hb) = self.heartbeats.get_mut(role) {
517            hb.messages_sent += 1;
518        }
519        self.total_messages_sent += 1;
520    }
521
522    /// Record a message received by a worker.
523    pub fn message_received(&mut self, role: &str) {
524        if let Some(hb) = self.heartbeats.get_mut(role) {
525            hb.messages_received += 1;
526        }
527        self.total_messages_received += 1;
528    }
529
530    /// Scan all workers and update their health status based on heartbeat timing.
531    /// Returns a list of workers that have been detected as dead.
532    pub fn check_health(&mut self) -> Vec<String> {
533        let now = chrono::Utc::now();
534        let mut dead_workers = Vec::new();
535
536        for hb in self.heartbeats.values_mut() {
537            let elapsed = (now - hb.last_heartbeat).num_seconds();
538
539            if elapsed > (self.heartbeat_interval_secs * self.max_missed_beats as u64 * 2) as i64 {
540                if hb.status != WorkerHealthStatus::Dead {
541                    hb.status = WorkerHealthStatus::Dead;
542                    dead_workers.push(hb.role.clone());
543                }
544            } else if elapsed > (self.heartbeat_interval_secs * self.max_missed_beats as u64) as i64
545            {
546                hb.status = WorkerHealthStatus::Unhealthy;
547            } else if elapsed > (self.heartbeat_interval_secs * 2) as i64 {
548                hb.status = WorkerHealthStatus::Degraded;
549            }
550        }
551
552        dead_workers
553    }
554
555    /// Get telemetry for a specific worker.
556    pub fn worker_telemetry(&self, role: &str) -> Option<WorkerTelemetry> {
557        self.heartbeats.get(role).map(|hb| {
558            let avg_dur = if hb.duration_samples > 0 {
559                hb.total_duration_ms / hb.duration_samples as f64
560            } else {
561                0.0
562            };
563
564            WorkerTelemetry {
565                role: hb.role.clone(),
566                status: hb.status.clone(),
567                tasks_completed: hb.tasks_completed,
568                tasks_failed: hb.tasks_failed,
569                error_count: hb.error_count,
570                avg_duration_ms: avg_dur,
571                last_heartbeat: hb.last_heartbeat.to_rfc3339(),
572                spawned_at: hb.spawned_at.to_rfc3339(),
573                messages_sent: hb.messages_sent,
574                messages_received: hb.messages_received,
575                iteration: hb.iteration,
576            }
577        })
578    }
579
580    /// Get telemetry for all workers.
581    pub fn all_worker_telemetry(&self) -> Vec<WorkerTelemetry> {
582        let mut roles: Vec<String> = self.heartbeats.keys().cloned().collect();
583        roles.sort();
584        roles
585            .iter()
586            .filter_map(|r| self.worker_telemetry(r))
587            .collect()
588    }
589
590    /// Get aggregate swarm metrics.
591    pub fn metrics(&self) -> SwarmMetrics {
592        let mut healthy = 0;
593        let mut degraded = 0;
594        let mut unhealthy = 0;
595        let mut dead = 0;
596        let mut total_completed = 0u64;
597        let mut total_failed = 0u64;
598        let mut total_errors = 0u64;
599        let mut busy_workers = 0u64;
600        let total_workers = self.heartbeats.len();
601
602        for hb in self.heartbeats.values() {
603            match hb.status {
604                WorkerHealthStatus::Healthy => healthy += 1,
605                WorkerHealthStatus::Degraded => degraded += 1,
606                WorkerHealthStatus::Unhealthy => unhealthy += 1,
607                WorkerHealthStatus::Dead => dead += 1,
608            }
609            total_completed += hb.tasks_completed;
610            total_failed += hb.tasks_failed;
611            total_errors += hb.error_count;
612            if hb.is_busy {
613                busy_workers += 1;
614            }
615        }
616
617        let elapsed_secs = (chrono::Utc::now() - self.started_at).num_seconds().max(1) as f64;
618        let task_throughput = self.total_tasks_completed as f64 / elapsed_secs;
619
620        let worker_utilization = if total_workers > 0 {
621            busy_workers as f64 / total_workers as f64
622        } else {
623            0.0
624        };
625
626        let error_rate = if total_completed + total_failed > 0 {
627            total_errors as f64 / (total_completed + total_failed) as f64
628        } else {
629            0.0
630        };
631
632        let overall_avg_duration = if self.total_tasks_completed > 0 {
633            self.total_duration_ms / self.total_tasks_completed as f64
634        } else {
635            0.0
636        };
637
638        let comm_latency = if self.total_messages_sent > 0 {
639            // Approximate: average time between send and receive is estimated
640            // from the ratio of received to sent messages × average interval
641            let ratio = self.total_messages_received as f64 / self.total_messages_sent as f64;
642            ratio * (self.heartbeat_interval_secs as f64 * 1000.0) / 2.0
643        } else {
644            0.0
645        };
646
647        SwarmMetrics {
648            total_workers,
649            healthy_workers: healthy,
650            degraded_workers: degraded,
651            unhealthy_workers: unhealthy,
652            dead_workers: dead,
653            total_tasks_completed: total_completed,
654            total_tasks_failed: total_failed,
655            total_errors,
656            overall_avg_duration_ms: overall_avg_duration,
657            task_throughput,
658            communication_latency_ms: comm_latency,
659            worker_utilization,
660            error_rate,
661            timestamp: chrono::Utc::now().to_rfc3339(),
662        }
663    }
664
665    /// Get workers that are candidates for replacement (dead for > replacement_timeout).
666    pub fn dead_workers_for_replacement(&self) -> Vec<String> {
667        let now = chrono::Utc::now();
668        self.heartbeats
669            .iter()
670            .filter(|(_, hb)| hb.status == WorkerHealthStatus::Dead)
671            .filter(|(_, hb)| {
672                let elapsed = (now - hb.last_heartbeat).num_seconds();
673                elapsed >= self.replacement_timeout_secs as i64
674            })
675            .map(|(role, _)| role.clone())
676            .collect()
677    }
678
679    /// Remove a worker from tracking (after replacement).
680    pub fn remove_worker(&mut self, role: &str) {
681        self.heartbeats.remove(role);
682    }
683
684    /// Get the number of tracked workers.
685    pub fn worker_count(&self) -> usize {
686        self.heartbeats.len()
687    }
688
689    /// Format health status for logging.
690    pub fn format_status(&self) -> String {
691        let m = self.metrics();
692        format!(
693            "Swarm Health: {}/{} healthy, {} degraded, {} unhealthy, {} dead | \
694             {} tasks ({:.1}/s) | {:.1}% utilization | {:.2}% error rate",
695            m.healthy_workers,
696            m.total_workers,
697            m.degraded_workers,
698            m.unhealthy_workers,
699            m.dead_workers,
700            m.total_tasks_completed,
701            m.task_throughput,
702            m.worker_utilization * 100.0,
703            m.error_rate * 100.0,
704        )
705    }
706}
707
708// ---------------------------------------------------------------------------
709// Worker profile
710// ---------------------------------------------------------------------------
711
712/// Declarative profile for a swarm worker agent.
713///
714/// Each worker has a unique combination of persona, capabilities, and resource
715/// limits. Profiles are composable (can inherit from a base profile) and
716/// inheritable (sub-profiles can override specific fields).
717#[derive(Debug, Clone, Serialize, Deserialize)]
718pub struct WorkerProfile {
719    /// Unique name for this profile (e.g., "researcher", "coder", "reviewer")
720    pub name: String,
721
722    /// Human-readable description of this worker's role
723    #[serde(default)]
724    pub description: String,
725
726    /// System prompt / persona for this worker
727    pub persona: String,
728
729    /// Which tools this worker can use (empty = all available)
730    #[serde(default)]
731    pub allowed_tools: Vec<String>,
732
733    /// Provider override (empty = use default from config)
734    #[serde(default)]
735    pub provider: Option<String>,
736
737    /// Model override (empty = use default from config)
738    #[serde(default)]
739    pub model: Option<String>,
740
741    /// Maximum iterations for this worker's agent loop
742    #[serde(default = "default_worker_max_iterations")]
743    pub max_iterations: usize,
744
745    /// Maximum memory messages for this worker
746    #[serde(default = "default_worker_memory")]
747    pub max_memory_messages: usize,
748
749    /// Whether this worker can spawn sub-workers (recursive supervision)
750    #[serde(default = "default_true")]
751    pub can_delegate: bool,
752
753    /// Resource limits
754    #[serde(default)]
755    pub resource_limits: ResourceLimits,
756}
757
758fn default_worker_max_iterations() -> usize {
759    10
760}
761
762fn default_worker_memory() -> usize {
763    20
764}
765
766fn default_true() -> bool {
767    true
768}
769
770/// Resource limits for a worker agent
771#[derive(Debug, Clone, Serialize, Deserialize)]
772pub struct ResourceLimits {
773    /// Maximum tool calls per tick
774    #[serde(default = "default_max_tool_calls")]
775    pub max_tool_calls: usize,
776
777    /// Maximum total execution time in seconds
778    #[serde(default = "default_max_exec_secs")]
779    pub max_exec_secs: u64,
780}
781
782fn default_max_tool_calls() -> usize {
783    50
784}
785
786fn default_max_exec_secs() -> u64 {
787    300
788}
789
790impl Default for ResourceLimits {
791    fn default() -> Self {
792        Self {
793            max_tool_calls: default_max_tool_calls(),
794            max_exec_secs: default_max_exec_secs(),
795        }
796    }
797}
798
799impl Default for WorkerProfile {
800    fn default() -> Self {
801        Self {
802            name: "default".to_string(),
803            description: String::new(),
804            persona: "You are a helpful assistant.".to_string(),
805            allowed_tools: Vec::new(),
806            provider: None,
807            model: None,
808            max_iterations: default_worker_max_iterations(),
809            max_memory_messages: default_worker_memory(),
810            can_delegate: default_true(),
811            resource_limits: ResourceLimits::default(),
812        }
813    }
814}
815
816// ---------------------------------------------------------------------------
817// Built-in profiles
818// ---------------------------------------------------------------------------
819
820impl WorkerProfile {
821    /// Analytical researcher profile — focused on logic, structure, and precision
822    pub fn researcher() -> Self {
823        Self {
824            name: "researcher".to_string(),
825            description: "Analytical researcher focused on data gathering and analysis".to_string(),
826            persona: "You are an analytical researcher. Focus on gathering data, \
827                      verifying facts, and providing well-structured analysis. \
828                      Be thorough and cite your sources."
829                .to_string(),
830            allowed_tools: vec![
831                "web_fetch".to_string(),
832                "web_search".to_string(),
833                "read_file".to_string(),
834            ],
835            max_iterations: 15,
836            can_delegate: false,
837            ..Default::default()
838        }
839    }
840
841    /// Creative problem-solver profile — focused on innovation and alternatives
842    pub fn creative() -> Self {
843        Self {
844            name: "creative".to_string(),
845            description: "Creative problem-solver focused on innovation".to_string(),
846            persona: "You are a creative problem-solver. Focus on generating \
847                      innovative solutions, exploring alternatives, and thinking \
848                      outside the box. Consider multiple perspectives."
849                .to_string(),
850            allowed_tools: vec!["write_file".to_string(), "web_search".to_string()],
851            max_iterations: 10,
852            can_delegate: false,
853            ..Default::default()
854        }
855    }
856
857    /// Pragmatic executor profile — focused on getting things done efficiently
858    pub fn executor() -> Self {
859        Self {
860            name: "executor".to_string(),
861            description: "Pragmatic executor focused on efficient task completion".to_string(),
862            persona: "You are a pragmatic executor. Focus on completing tasks \
863                      efficiently and correctly. Prioritize simplicity and \
864                      practicality over perfection."
865                .to_string(),
866            allowed_tools: vec![
867                "shell_exec".to_string(),
868                "read_file".to_string(),
869                "write_file".to_string(),
870                "web_fetch".to_string(),
871            ],
872            max_iterations: 8,
873            can_delegate: false,
874            ..Default::default()
875        }
876    }
877
878    /// Reviewer / quality-assurance profile — focused on verification and validation
879    pub fn reviewer() -> Self {
880        Self {
881            name: "reviewer".to_string(),
882            description: "Quality assurance reviewer focused on verification".to_string(),
883            persona: "You are a meticulous reviewer. Focus on verifying correctness, \
884                      identifying issues, and ensuring quality. Be critical and \
885                      constructive. Check for errors, edge cases, and improvements."
886                .to_string(),
887            allowed_tools: vec!["read_file".to_string(), "web_fetch".to_string()],
888            max_iterations: 10,
889            can_delegate: false,
890            ..Default::default()
891        }
892    }
893
894    /// Supervisor profile — can delegate to sub-workers
895    pub fn supervisor() -> Self {
896        Self {
897            name: "supervisor".to_string(),
898            description: "Supervisor that decomposes tasks and coordinates sub-agents".to_string(),
899            persona: "You are a supervisor agent. Your role is to decompose complex \
900                      tasks into subtasks and coordinate sub-agents to complete them. \
901                      Analyze the task, break it down, assign work, and aggregate results. \
902                      \n\nFor each subtask, respond with:\n\
903                      SUBTASK: <description>\n\
904                      ROLE: <researcher|creative|executor|reviewer|supervisor>\n\
905                      \nWhen all subtasks are complete, respond with:\n\
906                      FINAL: <aggregated result>"
907                .to_string(),
908            allowed_tools: Vec::new(),
909            max_iterations: 20,
910            can_delegate: true,
911            ..Default::default()
912        }
913    }
914}
915
916// ---------------------------------------------------------------------------
917// Swarm configuration
918// ---------------------------------------------------------------------------
919
920/// Swarm topology — how workers are organized
921#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
922#[serde(rename_all = "lowercase")]
923pub enum SwarmTopology {
924    /// Single coordinator delegates to workers (default)
925    #[serde(alias = "flat")]
926    Star,
927    /// Peer-to-peer — workers communicate directly
928    Mesh,
929    /// Tree of supervisors — recursive decomposition
930    Hierarchical,
931    /// Combination of topologies based on task
932    Hybrid,
933}
934
935#[allow(clippy::derivable_impls)]
936impl Default for SwarmTopology {
937    fn default() -> Self {
938        Self::Star
939    }
940}
941
942/// Swarm orchestration configuration
943#[derive(Debug, Clone, Serialize, Deserialize)]
944pub struct SwarmConfig {
945    /// Swarm topology
946    #[serde(default)]
947    pub topology: SwarmTopology,
948
949    /// Maximum recursion depth for hierarchical decomposition (default: 3)
950    #[serde(default = "default_max_depth")]
951    pub max_depth: usize,
952
953    /// Maximum number of workers in the swarm (default: 100)
954    #[serde(default = "default_max_workers", alias = "agent_count")]
955    pub max_workers: usize,
956
957    /// Worker profiles available for role assignment
958    ///
959    /// Accepts both `[[swarm.profiles]]` array-of-tables and `[swarm.profiles.name]`
960    /// map syntax in TOML. The map syntax uses the key as the profile name and the
961    /// value as the persona string (shorthand for quick configuration).
962    ///
963    /// # Examples (TOML)
964    ///
965    /// ```toml
966    /// # Array-of-tables (full syntax)
967    /// [[swarm.profiles]]
968    /// name = "researcher"
969    /// persona = "You are a research specialist..."
970    ///
971    /// # Map shorthand (name → persona)
972    /// [swarm.profiles]
973    /// coder = "You are a Rust expert..."
974    /// reviewer = "You are a code reviewer..."
975    /// ```
976    #[serde(default, deserialize_with = "deserialize_profiles")]
977    pub profiles: Vec<WorkerProfile>,
978
979    /// Enable dynamic role assignment based on task analysis
980    #[serde(default = "default_true")]
981    pub dynamic_role_assignment: bool,
982
983    /// Enable inter-agent communication
984    #[serde(default)]
985    pub enable_agent_communication: bool,
986
987    /// Enable swarm health monitoring
988    #[serde(default)]
989    pub enable_health_monitoring: bool,
990}
991
992fn default_max_depth() -> usize {
993    3
994}
995
996fn default_max_workers() -> usize {
997    100
998}
999
1000/// Custom deserializer for `profiles` that accepts both array-of-tables and map syntax.
1001///
1002/// In TOML:
1003/// - `[[swarm.profiles]]` → array of tables (standard)
1004/// - `[swarm.profiles.name]` → map with key as profile name, value as persona string
1005///
1006/// The map shorthand creates a `WorkerProfile` with the key as `name` and the
1007/// string value as `persona`. All other fields use defaults.
1008fn deserialize_profiles<'de, D>(
1009    deserializer: D,
1010) -> std::result::Result<Vec<WorkerProfile>, D::Error>
1011where
1012    D: serde::Deserializer<'de>,
1013{
1014    // Try to deserialize as a map first (shorthand syntax)
1015    // If that fails, fall back to array-of-tables
1016    #[derive(Deserialize)]
1017    #[serde(untagged)]
1018    enum ProfilesOrMap {
1019        Array(Vec<WorkerProfile>),
1020        Map(std::collections::HashMap<String, String>),
1021    }
1022
1023    match ProfilesOrMap::deserialize(deserializer) {
1024        Ok(ProfilesOrMap::Array(profiles)) => Ok(profiles),
1025        Ok(ProfilesOrMap::Map(map)) => {
1026            let profiles: Vec<WorkerProfile> = map
1027                .into_iter()
1028                .map(|(name, persona)| WorkerProfile {
1029                    name,
1030                    persona,
1031                    ..WorkerProfile::default()
1032                })
1033                .collect();
1034            Ok(profiles)
1035        }
1036        Err(e) => Err(e),
1037    }
1038}
1039
1040impl Default for SwarmConfig {
1041    fn default() -> Self {
1042        Self {
1043            topology: SwarmTopology::default(),
1044            max_depth: default_max_depth(),
1045            max_workers: default_max_workers(),
1046            profiles: vec![
1047                WorkerProfile::researcher(),
1048                WorkerProfile::creative(),
1049                WorkerProfile::executor(),
1050                WorkerProfile::reviewer(),
1051                WorkerProfile::supervisor(),
1052            ],
1053            dynamic_role_assignment: true,
1054            enable_agent_communication: false,
1055            enable_health_monitoring: false,
1056        }
1057    }
1058}
1059
1060// ---------------------------------------------------------------------------
1061// Swarm orchestrator
1062// ---------------------------------------------------------------------------
1063
1064/// The swarm orchestrator manages recursive task decomposition and worker
1065/// coordination. It is the core of the self-provisioning sub-agents feature.
1066pub struct SwarmOrchestrator {
1067    /// Swarm configuration
1068    config: SwarmConfig,
1069    /// Current recursion depth (for enforcing max_depth)
1070    current_depth: usize,
1071    /// Total workers spawned (for enforcing max_workers)
1072    worker_count: usize,
1073    /// The LLM provider for this orchestrator instance
1074    llm: Option<Arc<dyn LLMProviderTrait>>,
1075    /// Multi-model manager (if using multiple providers)
1076    multi_llm: Option<MultiModelManager>,
1077    /// RavenFabric client for remote execution
1078    ravenfabric: Option<RavenFabricClient>,
1079    /// Policy engine for security (reserved for future tool execution)
1080    #[allow(dead_code)]
1081    policy_engine: PolicyEngine,
1082    /// Sandbox for execution
1083    sandbox: Sandbox,
1084    /// Audit log
1085    audit_log: AuditLog,
1086    /// Tool registry (reserved for future worker tool execution)
1087    #[allow(dead_code)]
1088    registry: ToolRegistry,
1089    /// Inter-agent message bus (shared across sub-orchestrators)
1090    message_bus: Option<Arc<RwLock<AgentMessageBus>>>,
1091    /// Swarm health monitor (active when enable_health_monitoring is true)
1092    /// Uses Arc<RwLock> for interior mutability since execute_with_profile takes &self
1093    health_monitor: Option<Arc<RwLock<SwarmHealthMonitor>>>,
1094}
1095
1096impl SwarmOrchestrator {
1097    /// Create a new swarm orchestrator with the given configuration.
1098    pub fn new(
1099        config: SwarmConfig,
1100        llm: Option<Arc<dyn LLMProviderTrait>>,
1101        multi_llm: Option<MultiModelManager>,
1102        ravenfabric: Option<RavenFabricClient>,
1103    ) -> Self {
1104        let policy_engine = PolicyEngine::default_secure();
1105        let sandbox = Sandbox::default();
1106        let audit_log = AuditLog::new(format!("swarm-{}", std::process::id()));
1107        let registry = ToolRegistry::with_default_tools();
1108
1109        // Create message bus if inter-agent communication is enabled
1110        let message_bus = if config.enable_agent_communication {
1111            Some(Arc::new(RwLock::new(AgentMessageBus::new(1000))))
1112        } else {
1113            None
1114        };
1115
1116        // Create health monitor if health monitoring is enabled
1117        let health_monitor = if config.enable_health_monitoring {
1118            Some(Arc::new(RwLock::new(SwarmHealthMonitor::default())))
1119        } else {
1120            None
1121        };
1122
1123        Self {
1124            config,
1125            current_depth: 0,
1126            worker_count: 0,
1127            llm,
1128            multi_llm,
1129            ravenfabric,
1130            policy_engine,
1131            sandbox,
1132            audit_log,
1133            registry,
1134            message_bus,
1135            health_monitor,
1136        }
1137    }
1138
1139    /// Create a new swarm orchestrator with a shared message bus.
1140    ///
1141    /// This is used when spawning sub-orchestrators that should share
1142    /// the same communication channel as their parent.
1143    #[allow(dead_code)]
1144    pub fn new_with_bus(
1145        config: SwarmConfig,
1146        llm: Option<Arc<dyn LLMProviderTrait>>,
1147        multi_llm: Option<MultiModelManager>,
1148        ravenfabric: Option<RavenFabricClient>,
1149        message_bus: Option<Arc<RwLock<AgentMessageBus>>>,
1150    ) -> Self {
1151        let policy_engine = PolicyEngine::default_secure();
1152        let sandbox = Sandbox::default();
1153        let audit_log = AuditLog::new(format!("swarm-{}", std::process::id()));
1154        let registry = ToolRegistry::with_default_tools();
1155
1156        // Create health monitor if health monitoring is enabled
1157        let health_monitor = if config.enable_health_monitoring {
1158            Some(Arc::new(RwLock::new(SwarmHealthMonitor::default())))
1159        } else {
1160            None
1161        };
1162
1163        Self {
1164            config,
1165            current_depth: 0,
1166            worker_count: 0,
1167            llm,
1168            multi_llm,
1169            ravenfabric,
1170            policy_engine,
1171            sandbox,
1172            audit_log,
1173            registry,
1174            message_bus,
1175            health_monitor,
1176        }
1177    }
1178
1179    /// Initialize the sandbox for this orchestrator.
1180    pub async fn init(&mut self) -> Result<()> {
1181        self.sandbox.init().await.map_err(|e| {
1182            RavenClawsError::CommandExecution(format!("Swarm sandbox init failed: {}", e))
1183        })?;
1184        Ok(())
1185    }
1186
1187    /// Get the current worker count.
1188    #[allow(dead_code)]
1189    pub fn worker_count(&self) -> usize {
1190        self.worker_count
1191    }
1192
1193    /// Get the current recursion depth.
1194    #[allow(dead_code)]
1195    pub fn current_depth(&self) -> usize {
1196        self.current_depth
1197    }
1198
1199    /// Get swarm health metrics, if health monitoring is enabled.
1200    #[allow(dead_code)]
1201    pub fn health_metrics(&self) -> Option<SwarmMetrics> {
1202        self.health_monitor
1203            .as_ref()
1204            .and_then(|hm| hm.try_read().ok())
1205            .map(|hm| hm.metrics())
1206    }
1207
1208    /// Get telemetry for all workers, if health monitoring is enabled.
1209    #[allow(dead_code)]
1210    pub fn worker_telemetry(&self) -> Option<Vec<WorkerTelemetry>> {
1211        self.health_monitor
1212            .as_ref()
1213            .and_then(|hm| hm.try_read().ok())
1214            .map(|hm| hm.all_worker_telemetry())
1215    }
1216
1217    /// Orchestrate a task — decompose, assign, execute, and aggregate.
1218    ///
1219    /// This is the main entry point for swarm execution. It analyzes the task,
1220    /// determines required roles, spawns workers, and aggregates results.
1221    #[instrument(skip(self, task), fields(depth = self.current_depth, workers = self.worker_count))]
1222    pub async fn orchestrate(&mut self, task: &str) -> Result<String> {
1223        // Delegate to the boxed implementation to handle recursion safely.
1224        self.orchestrate_impl(task).await
1225    }
1226
1227    /// Non-recursive entry point — calls the boxed recursive implementation.
1228    async fn orchestrate_impl(&mut self, task: &str) -> Result<String> {
1229        info!(
1230            depth = self.current_depth,
1231            max_depth = self.config.max_depth,
1232            "Orchestrating task"
1233        );
1234
1235        // Log health status at start if monitoring is enabled
1236        if let Some(ref hm) = self.health_monitor {
1237            if let Ok(hm_guard) = hm.try_read() {
1238                info!(health = %hm_guard.format_status(), "Swarm health at start");
1239            }
1240        }
1241
1242        // Check recursion depth
1243        if self.current_depth >= self.config.max_depth {
1244            warn!(
1245                depth = self.current_depth,
1246                max_depth = self.config.max_depth,
1247                "Max recursion depth reached, executing directly"
1248            );
1249            return self.execute_direct(task).await;
1250        }
1251
1252        // Check worker limit
1253        if self.worker_count >= self.config.max_workers {
1254            warn!(
1255                workers = self.worker_count,
1256                max_workers = self.config.max_workers,
1257                "Max workers reached, executing directly"
1258            );
1259            return self.execute_direct(task).await;
1260        }
1261
1262        // Analyze task and determine roles
1263        let roles = if self.config.dynamic_role_assignment {
1264            self.analyze_task_roles(task).await?
1265        } else {
1266            // Default: use supervisor + executor
1267            vec!["supervisor".to_string()]
1268        };
1269
1270        info!(roles = ?roles, "Assigned roles for task");
1271
1272        // If only one role and it's not supervisor, execute directly
1273        if roles.len() == 1 && roles[0] != "supervisor" {
1274            return self.execute_with_profile(task, &roles[0]).await;
1275        }
1276
1277        // If supervisor role is present, do recursive decomposition
1278        if roles.contains(&"supervisor".to_string()) || roles.len() > 1 {
1279            return self.recursive_supervise_impl(task, &roles).await;
1280        }
1281
1282        // Default: execute directly
1283        self.execute_direct(task).await
1284    }
1285
1286    /// Analyze a task and determine which worker roles are needed.
1287    async fn analyze_task_roles(&self, task: &str) -> Result<Vec<String>> {
1288        // Use the LLM to analyze the task if available
1289        if let Some(ref llm) = self.llm {
1290            let analysis_prompt = format!(
1291                "Analyze this task and determine which roles are needed to complete it. \
1292                 Available roles: researcher, creative, executor, reviewer, supervisor. \
1293                 \n\nTask: {}\n\n\
1294                 Respond with a comma-separated list of roles needed, nothing else. \
1295                 Example: researcher, executor, reviewer",
1296                task
1297            );
1298
1299            let messages = vec![
1300                ChatMessage {
1301                    role: "system".to_string(),
1302                    content: "You are a task analysis expert. Respond only with a comma-separated list of roles."
1303                        .to_string(),
1304                },
1305                ChatMessage {
1306                    role: "user".to_string(),
1307                    content: analysis_prompt,
1308                },
1309            ];
1310
1311            match llm.chat(messages).await {
1312                Ok(response) => {
1313                    let content = response
1314                        .choices
1315                        .first()
1316                        .map(|c| c.message.content.clone())
1317                        .unwrap_or_default();
1318
1319                    let roles: Vec<String> = content
1320                        .split(',')
1321                        .map(|r| r.trim().to_lowercase())
1322                        .filter(|r| {
1323                            matches!(
1324                                r.as_str(),
1325                                "researcher" | "creative" | "executor" | "reviewer" | "supervisor"
1326                            )
1327                        })
1328                        .collect();
1329
1330                    if roles.is_empty() {
1331                        Ok(vec!["executor".to_string()])
1332                    } else {
1333                        Ok(roles)
1334                    }
1335                }
1336                Err(e) => {
1337                    warn!(error = %e, "Task analysis failed, using default roles");
1338                    Ok(vec!["executor".to_string()])
1339                }
1340            }
1341        } else {
1342            // No LLM available, use default roles
1343            Ok(vec!["executor".to_string()])
1344        }
1345    }
1346
1347    /// Execute a task directly with a specific worker profile.
1348    async fn execute_with_profile(&self, task: &str, role: &str) -> Result<String> {
1349        let profile = self
1350            .config
1351            .profiles
1352            .iter()
1353            .find(|p| p.name == role)
1354            .cloned()
1355            .unwrap_or_else(|| {
1356                if role == "supervisor" {
1357                    WorkerProfile::supervisor()
1358                } else {
1359                    WorkerProfile::executor()
1360                }
1361            });
1362
1363        info!(role = %role, profile = %profile.name, "Executing task with profile");
1364
1365        // Register worker with health monitor and record task start
1366        if let Some(ref hm) = self.health_monitor {
1367            if let Ok(mut hm_guard) = hm.try_write() {
1368                hm_guard.register_worker(role);
1369                hm_guard.task_started(role);
1370            }
1371        }
1372
1373        let llm = self.llm.as_ref().ok_or_else(|| {
1374            RavenClawsError::CommandExecution("No LLM provider available for worker".to_string())
1375        })?;
1376
1377        let mut memory = ConversationMemory::new(&profile.persona, profile.max_memory_messages);
1378
1379        // Include inter-agent messages in the prompt if communication is enabled
1380        let enriched_task = if let Some(ref bus) = self.message_bus {
1381            if let Ok(bus_guard) = bus.try_read() {
1382                let msg_context = bus_guard.format_for_prompt(role, 20);
1383                format!("{}{}", task, msg_context)
1384            } else {
1385                task.to_string()
1386            }
1387        } else {
1388            task.to_string()
1389        };
1390
1391        memory.add_user_message(&enriched_task);
1392
1393        let messages = memory.history().to_vec();
1394        let response = llm.chat(messages).await.map_err(|e| {
1395            // Record failure in health monitor
1396            if let Some(ref hm) = self.health_monitor {
1397                if let Ok(mut hm_guard) = hm.try_write() {
1398                    hm_guard.task_failed(role);
1399                }
1400            }
1401            RavenClawsError::CommandExecution(format!("Worker {} failed: {}", role, e))
1402        })?;
1403
1404        let content = response
1405            .choices
1406            .first()
1407            .map(|c| c.message.content.clone())
1408            .unwrap_or_default();
1409
1410        // Record task completion in health monitor
1411        if let Some(ref hm) = self.health_monitor {
1412            if let Ok(mut hm_guard) = hm.try_write() {
1413                hm_guard.task_completed(role);
1414                hm_guard.heartbeat(role);
1415            }
1416        }
1417
1418        // Broadcast result to other workers via message bus
1419        if let Some(ref bus) = self.message_bus {
1420            if let Ok(mut bus_guard) = bus.try_write() {
1421                bus_guard.send(
1422                    role,
1423                    "*",
1424                    MessageType::Result,
1425                    &format!(
1426                        "Completed task. Result ({} chars): {}",
1427                        content.len(),
1428                        &content[..content.len().min(500)]
1429                    ),
1430                    HashMap::new(),
1431                );
1432            }
1433            // Track message in health monitor
1434            if let Some(ref hm) = self.health_monitor {
1435                if let Ok(mut hm_guard) = hm.try_write() {
1436                    hm_guard.message_sent(role);
1437                }
1438            }
1439        }
1440
1441        let _ = self.audit_log.append(
1442            AuditEventType::AgentFinish,
1443            &format!("worker-{}", role),
1444            &format!("Worker {} completed task", role),
1445            Some(serde_json::json!({
1446                "role": role,
1447                "task_length": task.len(),
1448                "response_length": content.len(),
1449            })),
1450        );
1451
1452        Ok(content)
1453    }
1454
1455    /// Execute a task directly without decomposition (leaf node).
1456    async fn execute_direct(&self, task: &str) -> Result<String> {
1457        self.execute_with_profile(task, "executor").await
1458    }
1459
1460    /// Recursive supervision — decompose task, spawn sub-supervisors or workers.
1461    ///
1462    /// This is a thin wrapper that delegates to the boxed implementation
1463    /// to avoid Rust's recursive async fn limitation.
1464    #[allow(dead_code)]
1465    async fn recursive_supervise(&self, task: &str, roles: &[String]) -> Result<String> {
1466        let task = task.to_string();
1467        let roles = roles.to_vec();
1468        let this: &SwarmOrchestrator = self;
1469        Box::pin(async move { this.recursive_supervise_impl(&task, &roles).await }).await
1470    }
1471
1472    /// Recursive supervision implementation (boxed to avoid infinite future size).
1473    async fn recursive_supervise_impl(&self, task: &str, roles: &[String]) -> Result<String> {
1474        let llm = self.llm.as_ref().ok_or_else(|| {
1475            RavenClawsError::CommandExecution(
1476                "No LLM provider available for supervisor".to_string(),
1477            )
1478        })?;
1479
1480        // Register supervisor with health monitor
1481        if let Some(ref hm) = self.health_monitor {
1482            if let Ok(mut hm_guard) = hm.try_write() {
1483                hm_guard.register_worker("supervisor");
1484                hm_guard.task_started("supervisor");
1485            }
1486        }
1487
1488        let supervisor_profile = WorkerProfile::supervisor();
1489        let mut memory = ConversationMemory::new(
1490            &supervisor_profile.persona,
1491            supervisor_profile.max_memory_messages,
1492        );
1493
1494        let role_list = roles.join(", ");
1495
1496        // Include inter-agent messages in the supervisor prompt if communication is enabled
1497        let msg_context = if let Some(ref bus) = self.message_bus {
1498            if let Ok(bus_guard) = bus.try_read() {
1499                bus_guard.format_for_prompt("supervisor", 20)
1500            } else {
1501                String::new()
1502            }
1503        } else {
1504            String::new()
1505        };
1506
1507        let supervise_prompt = format!(
1508            "Decompose this task into subtasks and assign each to the most appropriate role.\n\
1509             Available roles: {}\n\n\
1510             Task: {}\n\n\
1511             For each subtask, respond with:\n\
1512             SUBTASK: <description>\n\
1513             ROLE: <role>\n\n\
1514             When all subtasks are complete, respond with:\n\
1515             FINAL: <aggregated result>\n\
1516             {}",
1517            role_list, task, msg_context
1518        );
1519
1520        memory.add_user_message(&supervise_prompt);
1521
1522        let mut subtask_results: Vec<String> = Vec::new();
1523        let mut iteration = 0;
1524        let max_iterations = supervisor_profile.max_iterations;
1525
1526        loop {
1527            iteration += 1;
1528            if iteration > max_iterations {
1529                warn!("Supervisor reached max iterations");
1530                break;
1531            }
1532
1533            let messages = memory.history().to_vec();
1534            let response = match llm.chat(messages).await {
1535                Ok(r) => r,
1536                Err(e) => {
1537                    warn!(error = %e, "Supervisor LLM request failed");
1538                    continue;
1539                }
1540            };
1541
1542            let content = response
1543                .choices
1544                .first()
1545                .map(|c| c.message.content.clone())
1546                .unwrap_or_default();
1547
1548            // Periodically check health status
1549            if iteration % 3 == 0 {
1550                if let Some(ref hm) = self.health_monitor {
1551                    if let Ok(hm_guard) = hm.try_read() {
1552                        let status = hm_guard.format_status();
1553                        info!(health = %status, "Swarm health check");
1554                        // Check for dead workers
1555                        let dead = hm_guard.dead_workers_for_replacement();
1556                        if !dead.is_empty() {
1557                            warn!(dead_workers = ?dead, "Dead workers detected");
1558                        }
1559                    }
1560                }
1561            }
1562
1563            // Check for FINAL: completion
1564            if content.contains("FINAL:") {
1565                let final_response = content
1566                    .split("FINAL:")
1567                    .nth(1)
1568                    .unwrap_or("")
1569                    .trim()
1570                    .to_string();
1571                info!(
1572                    iteration = iteration,
1573                    subtasks = subtask_results.len(),
1574                    "Supervisor completed"
1575                );
1576
1577                // Record supervisor completion in health monitor
1578                if let Some(ref hm) = self.health_monitor {
1579                    if let Ok(mut hm_guard) = hm.try_write() {
1580                        hm_guard.task_completed("supervisor");
1581                        hm_guard.heartbeat("supervisor");
1582                    }
1583                }
1584
1585                let _ = self.audit_log.append(
1586                    AuditEventType::AgentFinish,
1587                    "supervisor",
1588                    "Supervisor completed recursive decomposition",
1589                    Some(serde_json::json!({
1590                        "iterations": iteration,
1591                        "subtasks_completed": subtask_results.len(),
1592                        "depth": self.current_depth,
1593                    })),
1594                );
1595
1596                if !subtask_results.is_empty() {
1597                    let aggregated = subtask_results.join("\n\n");
1598                    return Ok(format!(
1599                        "{}\n\n## Aggregated Results\n\n{}",
1600                        final_response, aggregated
1601                    ));
1602                }
1603                return Ok(final_response);
1604            }
1605
1606            // Check for SUBTASK: decomposition
1607            if content.contains("SUBTASK:") {
1608                let subtask_block = content.split("SUBTASK:").nth(1).unwrap_or("");
1609                let subtask_lines: Vec<&str> = subtask_block.lines().take(4).collect();
1610
1611                let subtask_desc = subtask_lines.first().unwrap_or(&"").trim();
1612                let role = subtask_lines
1613                    .iter()
1614                    .find(|l| l.starts_with("ROLE:"))
1615                    .and_then(|l| l.split(':').nth(1))
1616                    .unwrap_or("executor")
1617                    .trim()
1618                    .to_lowercase();
1619
1620                if !subtask_desc.is_empty() {
1621                    info!(role = %role, subtask = %subtask_desc, "Delegating subtask");
1622
1623                    // Broadcast coordination message before delegating
1624                    if let Some(ref bus) = self.message_bus {
1625                        if let Ok(mut bus_guard) = bus.try_write() {
1626                            bus_guard.send(
1627                                "supervisor",
1628                                &role,
1629                                MessageType::Coordination,
1630                                &format!("Delegating subtask: {}", subtask_desc),
1631                                HashMap::new(),
1632                            );
1633                        }
1634                    }
1635
1636                    let result =
1637                        if role == "supervisor" && self.current_depth < self.config.max_depth {
1638                            // Recursive: spawn a sub-supervisor (boxed to avoid recursive async fn)
1639                            let config = self.config.clone();
1640                            let current_depth = self.current_depth + 1;
1641                            let worker_count = self.worker_count + 1;
1642                            let llm = self.llm.clone();
1643                            let multi_llm = self.multi_llm.clone();
1644                            let ravenfabric = self.ravenfabric.clone();
1645                            let subtask = subtask_desc.to_string();
1646                            let message_bus = self.message_bus.clone();
1647                            let health_monitor = self.health_monitor.clone();
1648
1649                            Box::pin(async move {
1650                                let mut sub_orchestrator = SwarmOrchestrator {
1651                                    config,
1652                                    current_depth,
1653                                    worker_count,
1654                                    llm,
1655                                    multi_llm,
1656                                    ravenfabric,
1657                                    policy_engine: PolicyEngine::default_secure(),
1658                                    sandbox: Sandbox::default(),
1659                                    audit_log: AuditLog::new(format!(
1660                                        "sub-swarm-{}-{}",
1661                                        current_depth,
1662                                        std::process::id()
1663                                    )),
1664                                    registry: ToolRegistry::with_default_tools(),
1665                                    message_bus,
1666                                    health_monitor,
1667                                };
1668
1669                                // Initialize sub-orchestrator sandbox
1670                                let _ = sub_orchestrator.init().await;
1671                                sub_orchestrator.orchestrate(&subtask).await
1672                            })
1673                            .await
1674                        } else {
1675                            // Execute with the assigned profile
1676                            self.execute_with_profile(subtask_desc, &role).await
1677                        };
1678
1679                    match result {
1680                        Ok(result) => {
1681                            info!(
1682                                role = %role,
1683                                chars = result.len(),
1684                                "Subtask completed"
1685                            );
1686                            subtask_results.push(format!("[{}] {}", role, result));
1687
1688                            memory.add_assistant_message(&format!(
1689                                "Delegated subtask to {}: {}",
1690                                role, subtask_desc
1691                            ));
1692                            memory.add_user_message(&format!("Result from {}: {}", role, result));
1693                        }
1694                        Err(e) => {
1695                            warn!(role = %role, error = %e, "Subtask failed");
1696                            // Record failure in health monitor
1697                            if let Some(ref hm) = self.health_monitor {
1698                                if let Ok(mut hm_guard) = hm.try_write() {
1699                                    hm_guard.task_failed(&role);
1700                                    hm_guard.record_error(&role);
1701                                }
1702                            }
1703                            memory.add_assistant_message(&format!(
1704                                "Subtask for {} failed: {}",
1705                                role, e
1706                            ));
1707                        }
1708                    }
1709                }
1710            } else {
1711                memory.add_assistant_message(&content);
1712            }
1713        }
1714
1715        // Fallback: return aggregated results
1716        if !subtask_results.is_empty() {
1717            let aggregated = subtask_results.join("\n\n");
1718            info!(
1719                "Supervisor aggregated {} subtask results",
1720                subtask_results.len()
1721            );
1722            return Ok(aggregated);
1723        }
1724
1725        Err(RavenClawsError::CommandExecution(
1726            "Supervisor completed without results".to_string(),
1727        ))
1728    }
1729}
1730
1731// ---------------------------------------------------------------------------
1732// Tests
1733// ---------------------------------------------------------------------------
1734
1735#[cfg(test)]
1736mod tests {
1737    use super::*;
1738
1739    #[test]
1740    fn test_worker_profile_default() {
1741        let profile = WorkerProfile::default();
1742        assert_eq!(profile.name, "default");
1743        assert!(profile.can_delegate);
1744        assert_eq!(profile.max_iterations, 10);
1745        assert_eq!(profile.max_memory_messages, 20);
1746    }
1747
1748    #[test]
1749    fn test_worker_profile_researcher() {
1750        let profile = WorkerProfile::researcher();
1751        assert_eq!(profile.name, "researcher");
1752        assert!(!profile.can_delegate);
1753        assert!(profile.allowed_tools.contains(&"web_fetch".to_string()));
1754        assert!(profile.allowed_tools.contains(&"web_search".to_string()));
1755    }
1756
1757    #[test]
1758    fn test_worker_profile_creative() {
1759        let profile = WorkerProfile::creative();
1760        assert_eq!(profile.name, "creative");
1761        assert!(!profile.can_delegate);
1762    }
1763
1764    #[test]
1765    fn test_worker_profile_executor() {
1766        let profile = WorkerProfile::executor();
1767        assert_eq!(profile.name, "executor");
1768        assert!(!profile.can_delegate);
1769        assert!(profile.allowed_tools.contains(&"shell_exec".to_string()));
1770    }
1771
1772    #[test]
1773    fn test_worker_profile_reviewer() {
1774        let profile = WorkerProfile::reviewer();
1775        assert_eq!(profile.name, "reviewer");
1776        assert!(!profile.can_delegate);
1777    }
1778
1779    #[test]
1780    fn test_worker_profile_supervisor() {
1781        let profile = WorkerProfile::supervisor();
1782        assert_eq!(profile.name, "supervisor");
1783        assert!(profile.can_delegate);
1784        assert!(profile.persona.contains("SUBTASK:"));
1785        assert!(profile.persona.contains("FINAL:"));
1786    }
1787
1788    #[test]
1789    fn test_swarm_config_default() {
1790        let config = SwarmConfig::default();
1791        assert_eq!(config.topology, SwarmTopology::Star);
1792        assert_eq!(config.max_depth, 3);
1793        assert_eq!(config.max_workers, 100);
1794        assert!(config.dynamic_role_assignment);
1795        assert_eq!(config.profiles.len(), 5);
1796    }
1797
1798    #[test]
1799    fn test_swarm_topology_serde() {
1800        let topologies = vec![
1801            SwarmTopology::Star,
1802            SwarmTopology::Mesh,
1803            SwarmTopology::Hierarchical,
1804            SwarmTopology::Hybrid,
1805        ];
1806
1807        for t in &topologies {
1808            let json = serde_json::to_string(t).unwrap();
1809            let deserialized: SwarmTopology = serde_json::from_str(&json).unwrap();
1810            assert_eq!(*t, deserialized);
1811        }
1812    }
1813
1814    #[test]
1815    fn test_swarm_config_serde() {
1816        let config = SwarmConfig::default();
1817        let json = serde_json::to_string_pretty(&config).unwrap();
1818        let deserialized: SwarmConfig = serde_json::from_str(&json).unwrap();
1819        assert_eq!(config.topology, deserialized.topology);
1820        assert_eq!(config.max_depth, deserialized.max_depth);
1821        assert_eq!(config.max_workers, deserialized.max_workers);
1822        assert_eq!(config.profiles.len(), deserialized.profiles.len());
1823    }
1824
1825    #[test]
1826    fn test_resource_limits_default() {
1827        let limits = ResourceLimits::default();
1828        assert_eq!(limits.max_tool_calls, 50);
1829        assert_eq!(limits.max_exec_secs, 300);
1830    }
1831
1832    #[test]
1833    fn test_swarm_orchestrator_new() {
1834        let config = SwarmConfig::default();
1835        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
1836        assert_eq!(orchestrator.current_depth(), 0);
1837        assert_eq!(orchestrator.worker_count(), 0);
1838    }
1839
1840    #[test]
1841    fn test_swarm_orchestrator_depth_limit() {
1842        let config = SwarmConfig {
1843            max_depth: 0, // No recursion allowed
1844            ..SwarmConfig::default()
1845        };
1846        let mut orchestrator = SwarmOrchestrator::new(config, None, None, None);
1847        orchestrator.current_depth = 0;
1848
1849        // At depth 0 with max_depth 0, should hit the limit
1850        assert!(orchestrator.current_depth >= orchestrator.config.max_depth);
1851    }
1852
1853    #[tokio::test]
1854    async fn test_analyze_task_roles_fallback() {
1855        let config = SwarmConfig::default();
1856        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
1857
1858        // Without an LLM, should return default roles
1859        let result = orchestrator.analyze_task_roles("test task").await;
1860        assert!(result.is_ok());
1861    }
1862
1863    #[test]
1864    fn test_worker_profile_custom() {
1865        let profile = WorkerProfile {
1866            name: "custom".to_string(),
1867            description: "Custom worker".to_string(),
1868            persona: "You are a custom worker.".to_string(),
1869            allowed_tools: vec!["read_file".to_string()],
1870            provider: Some("openai".to_string()),
1871            model: Some("gpt-4".to_string()),
1872            max_iterations: 5,
1873            max_memory_messages: 10,
1874            can_delegate: false,
1875            resource_limits: ResourceLimits {
1876                max_tool_calls: 10,
1877                max_exec_secs: 60,
1878            },
1879        };
1880
1881        assert_eq!(profile.name, "custom");
1882        assert_eq!(profile.provider, Some("openai".to_string()));
1883        assert_eq!(profile.model, Some("gpt-4".to_string()));
1884        assert_eq!(profile.max_iterations, 5);
1885        assert_eq!(profile.resource_limits.max_tool_calls, 10);
1886    }
1887
1888    #[test]
1889    fn test_swarm_config_custom_profiles() {
1890        let config = SwarmConfig {
1891            profiles: vec![WorkerProfile::researcher(), WorkerProfile::executor()],
1892            topology: SwarmTopology::Hierarchical,
1893            max_depth: 5,
1894            max_workers: 50,
1895            ..SwarmConfig::default()
1896        };
1897
1898        assert_eq!(config.profiles.len(), 2);
1899        assert_eq!(config.topology, SwarmTopology::Hierarchical);
1900        assert_eq!(config.max_depth, 5);
1901        assert_eq!(config.max_workers, 50);
1902    }
1903
1904    // ── Inter-agent communication tests ─────────────────────────────────
1905
1906    #[test]
1907    fn test_message_bus_new() {
1908        let bus = AgentMessageBus::new(100);
1909        assert!(bus.is_empty());
1910        assert_eq!(bus.len(), 0);
1911    }
1912
1913    #[test]
1914    fn test_message_bus_send_and_receive() {
1915        let mut bus = AgentMessageBus::new(100);
1916
1917        let id = bus.send(
1918            "researcher",
1919            "executor",
1920            MessageType::Information,
1921            "Found relevant data",
1922            HashMap::new(),
1923        );
1924
1925        assert!(!id.is_empty());
1926        assert_eq!(bus.len(), 1);
1927
1928        let msgs = bus.messages_for("executor");
1929        assert_eq!(msgs.len(), 1);
1930        assert_eq!(msgs[0].content, "Found relevant data");
1931        assert_eq!(msgs[0].sender, "researcher");
1932    }
1933
1934    #[test]
1935    fn test_message_bus_broadcast() {
1936        let mut bus = AgentMessageBus::new(100);
1937
1938        bus.send(
1939            "supervisor",
1940            "*",
1941            MessageType::Coordination,
1942            "All workers proceed",
1943            HashMap::new(),
1944        );
1945
1946        // All roles should receive broadcast messages
1947        assert_eq!(bus.messages_for("researcher").len(), 1);
1948        assert_eq!(bus.messages_for("executor").len(), 1);
1949        assert_eq!(bus.messages_for("reviewer").len(), 1);
1950    }
1951
1952    #[test]
1953    fn test_message_bus_filter_by_type() {
1954        let mut bus = AgentMessageBus::new(100);
1955
1956        bus.send(
1957            "researcher",
1958            "*",
1959            MessageType::Information,
1960            "Data found",
1961            HashMap::new(),
1962        );
1963        bus.send(
1964            "executor",
1965            "supervisor",
1966            MessageType::Result,
1967            "Task done",
1968            HashMap::new(),
1969        );
1970        bus.send(
1971            "executor",
1972            "supervisor",
1973            MessageType::Error,
1974            "Failed",
1975            HashMap::new(),
1976        );
1977
1978        let errors = bus.messages_of_type(&MessageType::Error);
1979        assert_eq!(errors.len(), 1);
1980        assert_eq!(errors[0].content, "Failed");
1981
1982        let results = bus.messages_of_type(&MessageType::Result);
1983        assert_eq!(results.len(), 1);
1984    }
1985
1986    #[test]
1987    fn test_message_bus_max_messages() {
1988        let mut bus = AgentMessageBus::new(5); // Only keep 5 messages
1989
1990        for i in 0..10 {
1991            bus.send(
1992                "worker",
1993                "*",
1994                MessageType::Generic,
1995                &format!("Message {}", i),
1996                HashMap::new(),
1997            );
1998        }
1999
2000        // Should only have the last 5 messages
2001        assert_eq!(bus.len(), 5);
2002        let all = bus.all_messages();
2003        assert_eq!(all[0].content, "Message 5");
2004        assert_eq!(all[4].content, "Message 9");
2005    }
2006
2007    #[test]
2008    fn test_message_bus_format_for_prompt() {
2009        let mut bus = AgentMessageBus::new(100);
2010
2011        bus.send(
2012            "researcher",
2013            "*",
2014            MessageType::Information,
2015            "Found key insight",
2016            HashMap::new(),
2017        );
2018        bus.send(
2019            "executor",
2020            "supervisor",
2021            MessageType::Result,
2022            "Implementation complete",
2023            HashMap::new(),
2024        );
2025
2026        let prompt = bus.format_for_prompt("supervisor", 10);
2027        assert!(prompt.contains("Inter-Agent Messages"));
2028        assert!(prompt.contains("researcher"));
2029        assert!(prompt.contains("executor"));
2030        assert!(prompt.contains("Found key insight"));
2031    }
2032
2033    #[test]
2034    fn test_message_bus_empty_format() {
2035        let bus = AgentMessageBus::new(100);
2036        let prompt = bus.format_for_prompt("supervisor", 10);
2037        assert!(prompt.is_empty());
2038    }
2039
2040    #[test]
2041    fn test_message_type_display() {
2042        assert_eq!(format!("{}", MessageType::Information), "information");
2043        assert_eq!(format!("{}", MessageType::Question), "question");
2044        assert_eq!(format!("{}", MessageType::Result), "result");
2045        assert_eq!(format!("{}", MessageType::Error), "error");
2046        assert_eq!(format!("{}", MessageType::Coordination), "coordination");
2047        assert_eq!(format!("{}", MessageType::Generic), "generic");
2048    }
2049
2050    #[test]
2051    fn test_message_bus_messages_from() {
2052        let mut bus = AgentMessageBus::new(100);
2053
2054        bus.send(
2055            "researcher",
2056            "*",
2057            MessageType::Information,
2058            "Data A",
2059            HashMap::new(),
2060        );
2061        bus.send(
2062            "researcher",
2063            "executor",
2064            MessageType::Information,
2065            "Data B",
2066            HashMap::new(),
2067        );
2068        bus.send(
2069            "executor",
2070            "supervisor",
2071            MessageType::Result,
2072            "Done",
2073            HashMap::new(),
2074        );
2075
2076        let from_researcher = bus.messages_from("researcher");
2077        assert_eq!(from_researcher.len(), 2);
2078
2079        let from_executor = bus.messages_from("executor");
2080        assert_eq!(from_executor.len(), 1);
2081    }
2082
2083    #[test]
2084    fn test_orchestrator_new_with_communication() {
2085        let config = SwarmConfig {
2086            enable_agent_communication: true,
2087            ..SwarmConfig::default()
2088        };
2089        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2090        assert!(orchestrator.message_bus.is_some());
2091    }
2092
2093    #[test]
2094    fn test_orchestrator_new_without_communication() {
2095        let config = SwarmConfig {
2096            enable_agent_communication: false,
2097            ..SwarmConfig::default()
2098        };
2099        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2100        assert!(orchestrator.message_bus.is_none());
2101    }
2102
2103    #[test]
2104    fn test_swarm_config_communication_default() {
2105        let config = SwarmConfig::default();
2106        assert!(!config.enable_agent_communication); // Default: disabled
2107        assert!(!config.enable_health_monitoring); // Default: disabled
2108    }
2109
2110    // ── Health monitoring tests ─────────────────────────────────────────
2111
2112    #[test]
2113    fn test_health_status_display() {
2114        assert_eq!(format!("{}", WorkerHealthStatus::Healthy), "healthy");
2115        assert_eq!(format!("{}", WorkerHealthStatus::Degraded), "degraded");
2116        assert_eq!(format!("{}", WorkerHealthStatus::Unhealthy), "unhealthy");
2117        assert_eq!(format!("{}", WorkerHealthStatus::Dead), "dead");
2118    }
2119
2120    #[test]
2121    fn test_health_monitor_default() {
2122        let hm = SwarmHealthMonitor::default();
2123        assert_eq!(hm.heartbeat_interval_secs, 5);
2124        assert_eq!(hm.max_missed_beats, 3);
2125        assert_eq!(hm.replacement_timeout_secs, 30);
2126        assert_eq!(hm.worker_count(), 0);
2127    }
2128
2129    #[test]
2130    fn test_health_monitor_register_worker() {
2131        let mut hm = SwarmHealthMonitor::default();
2132        hm.register_worker("researcher");
2133        assert_eq!(hm.worker_count(), 1);
2134
2135        let telemetry = hm.worker_telemetry("researcher");
2136        assert!(telemetry.is_some());
2137        assert_eq!(telemetry.unwrap().role, "researcher");
2138    }
2139
2140    #[test]
2141    fn test_health_monitor_heartbeat() {
2142        let mut hm = SwarmHealthMonitor::default();
2143        hm.register_worker("executor");
2144        hm.heartbeat("executor");
2145
2146        let telemetry = hm.worker_telemetry("executor").unwrap();
2147        assert_eq!(telemetry.status, WorkerHealthStatus::Healthy);
2148    }
2149
2150    #[test]
2151    fn test_health_monitor_task_lifecycle() {
2152        let mut hm = SwarmHealthMonitor::default();
2153        hm.register_worker("executor");
2154        hm.task_started("executor");
2155        hm.task_completed("executor");
2156
2157        let telemetry = hm.worker_telemetry("executor").unwrap();
2158        assert_eq!(telemetry.tasks_completed, 1);
2159        assert_eq!(telemetry.tasks_failed, 0);
2160    }
2161
2162    #[test]
2163    fn test_health_monitor_task_failure() {
2164        let mut hm = SwarmHealthMonitor::default();
2165        hm.register_worker("executor");
2166        hm.task_started("executor");
2167        hm.task_failed("executor");
2168
2169        let telemetry = hm.worker_telemetry("executor").unwrap();
2170        assert_eq!(telemetry.tasks_completed, 0);
2171        assert_eq!(telemetry.tasks_failed, 1);
2172        assert_eq!(telemetry.error_count, 1);
2173    }
2174
2175    #[test]
2176    fn test_health_monitor_metrics_empty() {
2177        let hm = SwarmHealthMonitor::default();
2178        let metrics = hm.metrics();
2179        assert_eq!(metrics.total_workers, 0);
2180        assert_eq!(metrics.healthy_workers, 0);
2181        assert_eq!(metrics.total_tasks_completed, 0);
2182        assert_eq!(metrics.task_throughput, 0.0);
2183    }
2184
2185    #[test]
2186    fn test_health_monitor_metrics_with_workers() {
2187        let mut hm = SwarmHealthMonitor::default();
2188        hm.register_worker("researcher");
2189        hm.register_worker("executor");
2190        hm.task_started("executor");
2191        hm.task_completed("executor");
2192        hm.task_started("researcher");
2193        hm.task_completed("researcher");
2194
2195        let metrics = hm.metrics();
2196        assert_eq!(metrics.total_workers, 2);
2197        assert_eq!(metrics.healthy_workers, 2);
2198        assert_eq!(metrics.total_tasks_completed, 2);
2199    }
2200
2201    #[test]
2202    fn test_health_monitor_dead_worker_detection() {
2203        let mut hm = SwarmHealthMonitor {
2204            heartbeat_interval_secs: 1,
2205            max_missed_beats: 1,
2206            replacement_timeout_secs: 0, // Immediate replacement
2207            ..SwarmHealthMonitor::default()
2208        };
2209
2210        hm.register_worker("executor");
2211        // Set last heartbeat far in the past by manipulating directly
2212        if let Some(hb) = hm.heartbeats.get_mut("executor") {
2213            hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(10);
2214        }
2215
2216        let dead = hm.check_health();
2217        assert!(!dead.is_empty());
2218        assert_eq!(dead[0], "executor");
2219    }
2220
2221    #[test]
2222    fn test_health_monitor_degraded_detection() {
2223        let mut hm = SwarmHealthMonitor {
2224            heartbeat_interval_secs: 1,
2225            max_missed_beats: 3,
2226            ..SwarmHealthMonitor::default()
2227        };
2228
2229        hm.register_worker("executor");
2230        // Set last heartbeat to 3 seconds ago (between 2x interval and max_missed*interval)
2231        if let Some(hb) = hm.heartbeats.get_mut("executor") {
2232            hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(3);
2233        }
2234
2235        let _dead = hm.check_health();
2236        let telemetry = hm.worker_telemetry("executor").unwrap();
2237        assert_eq!(telemetry.status, WorkerHealthStatus::Degraded);
2238    }
2239
2240    #[test]
2241    fn test_health_monitor_message_tracking() {
2242        let mut hm = SwarmHealthMonitor::default();
2243        hm.register_worker("researcher");
2244        hm.message_sent("researcher");
2245        hm.message_sent("researcher");
2246        hm.message_received("researcher");
2247
2248        let telemetry = hm.worker_telemetry("researcher").unwrap();
2249        assert_eq!(telemetry.messages_sent, 2);
2250        assert_eq!(telemetry.messages_received, 1);
2251    }
2252
2253    #[test]
2254    fn test_health_monitor_error_tracking() {
2255        let mut hm = SwarmHealthMonitor::default();
2256        hm.register_worker("executor");
2257        hm.record_error("executor");
2258        hm.record_error("executor");
2259        hm.record_error("executor");
2260
2261        let telemetry = hm.worker_telemetry("executor").unwrap();
2262        assert_eq!(telemetry.error_count, 3);
2263    }
2264
2265    #[test]
2266    fn test_health_monitor_format_status() {
2267        let hm = SwarmHealthMonitor::default();
2268        let status = hm.format_status();
2269        assert!(status.contains("Swarm Health:"));
2270        assert!(status.contains("healthy"));
2271    }
2272
2273    #[test]
2274    fn test_health_monitor_all_worker_telemetry() {
2275        let mut hm = SwarmHealthMonitor::default();
2276        hm.register_worker("researcher");
2277        hm.register_worker("executor");
2278        hm.register_worker("reviewer");
2279
2280        let all = hm.all_worker_telemetry();
2281        assert_eq!(all.len(), 3);
2282    }
2283
2284    #[test]
2285    fn test_health_monitor_remove_worker() {
2286        let mut hm = SwarmHealthMonitor::default();
2287        hm.register_worker("executor");
2288        assert_eq!(hm.worker_count(), 1);
2289        hm.remove_worker("executor");
2290        assert_eq!(hm.worker_count(), 0);
2291    }
2292
2293    #[test]
2294    fn test_orchestrator_new_with_health_monitoring() {
2295        let config = SwarmConfig {
2296            enable_health_monitoring: true,
2297            ..SwarmConfig::default()
2298        };
2299        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2300        assert!(orchestrator.health_monitor.is_some());
2301    }
2302
2303    #[test]
2304    fn test_orchestrator_new_without_health_monitoring() {
2305        let config = SwarmConfig {
2306            enable_health_monitoring: false,
2307            ..SwarmConfig::default()
2308        };
2309        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2310        assert!(orchestrator.health_monitor.is_none());
2311    }
2312
2313    #[test]
2314    fn test_health_metrics_accessor() {
2315        let config = SwarmConfig {
2316            enable_health_monitoring: true,
2317            ..SwarmConfig::default()
2318        };
2319        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2320        let metrics = orchestrator.health_metrics();
2321        assert!(metrics.is_some());
2322        assert_eq!(metrics.unwrap().total_workers, 0);
2323    }
2324
2325    #[test]
2326    fn test_worker_telemetry_accessor() {
2327        let config = SwarmConfig {
2328            enable_health_monitoring: true,
2329            ..SwarmConfig::default()
2330        };
2331        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2332        let telemetry = orchestrator.worker_telemetry();
2333        assert!(telemetry.is_some());
2334        assert!(telemetry.unwrap().is_empty());
2335    }
2336
2337    #[test]
2338    fn test_health_monitor_new_custom() {
2339        let hm = SwarmHealthMonitor::new(10, 5, 60);
2340        assert_eq!(hm.heartbeat_interval_secs, 10);
2341        assert_eq!(hm.max_missed_beats, 5);
2342        assert_eq!(hm.replacement_timeout_secs, 60);
2343    }
2344
2345    #[test]
2346    fn test_health_monitor_dead_workers_for_replacement() {
2347        let mut hm = SwarmHealthMonitor {
2348            heartbeat_interval_secs: 1,
2349            max_missed_beats: 1,
2350            replacement_timeout_secs: 0,
2351            ..SwarmHealthMonitor::default()
2352        };
2353
2354        hm.register_worker("executor");
2355        // Set last heartbeat far in the past
2356        if let Some(hb) = hm.heartbeats.get_mut("executor") {
2357            hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(30);
2358            hb.status = WorkerHealthStatus::Dead;
2359        }
2360
2361        let candidates = hm.dead_workers_for_replacement();
2362        assert_eq!(candidates.len(), 1);
2363        assert_eq!(candidates[0], "executor");
2364    }
2365
2366    #[test]
2367    fn test_health_monitor_metrics_error_rate() {
2368        let mut hm = SwarmHealthMonitor::default();
2369        hm.register_worker("executor");
2370        hm.task_started("executor");
2371        hm.task_completed("executor");
2372        hm.task_started("executor");
2373        hm.task_failed("executor");
2374
2375        let metrics = hm.metrics();
2376        assert_eq!(metrics.total_tasks_completed, 1);
2377        assert_eq!(metrics.total_tasks_failed, 1);
2378        assert!(metrics.error_rate > 0.0);
2379    }
2380
2381    #[test]
2382    fn test_health_monitor_metrics_utilization() {
2383        let mut hm = SwarmHealthMonitor::default();
2384        hm.register_worker("busy_worker");
2385        hm.register_worker("idle_worker");
2386
2387        // Mark one worker as busy
2388        if let Some(hb) = hm.heartbeats.get_mut("busy_worker") {
2389            hb.is_busy = true;
2390        }
2391
2392        let metrics = hm.metrics();
2393        assert_eq!(metrics.total_workers, 2);
2394        assert!((metrics.worker_utilization - 0.5).abs() < f64::EPSILON);
2395    }
2396
2397    #[test]
2398    fn test_health_monitor_iteration_tracking() {
2399        let mut hm = SwarmHealthMonitor::default();
2400        hm.register_worker("executor");
2401        hm.task_started("executor");
2402        hm.task_completed("executor");
2403        hm.task_started("executor");
2404        hm.task_completed("executor");
2405        hm.task_started("executor");
2406
2407        let telemetry = hm.worker_telemetry("executor").unwrap();
2408        assert_eq!(telemetry.iteration, 3);
2409        assert_eq!(telemetry.tasks_completed, 2);
2410    }
2411}