Skip to main content

ravenclaws/
swarm.rs

1//! Swarm orchestration — self-provisioning sub-agents with recursive supervision
2//!
3//! RavenClaws's swarm orchestrator enables truly autonomous multi-agent coordination:
4//! supervisors can spawn sub-supervisors, creating recursive task decomposition
5//! trees of arbitrary depth. Each worker has a declarative profile specifying its
6//! persona, tools, provider, model, and resource limits.
7//!
8//! # Architecture
9//!
10//! ```text
11//! SwarmOrchestrator
12//!   ├── topology: Star | Mesh | Hierarchical | Hybrid
13//!   ├── max_depth: recursion limit (default: 3)
14//!   ├── max_workers: total worker cap (default: 100)
15//!   ├── profiles: Vec<WorkerProfile> — available worker types
16//!   ├── message_bus: AgentMessageBus — inter-agent communication
17//!   └── orchestrate(task):
18//!       1. Analyze task → determine required roles
19//!       2. Assign roles → select profiles
20//!       3. Spawn workers → local or remote via RavenFabric
21//!       4. Coordinate → recursive supervisor if task is complex
22//!       5. Aggregate → collect and merge results
23//! ```
24//!
25//! # Inter-Agent Communication
26//!
27//! When `enable_agent_communication` is set, swarm workers can exchange messages
28//! through a shared `AgentMessageBus`. Messages are routed by role and include
29//! typed payloads (information, question, result, error, coordination).
30//! All messages are audited and timestamped.
31//!
32//! # Integration
33//!
34//! - CLI: `--swarm-topology <star|mesh|hierarchical|hybrid> --max-workers <N>`
35//! - Config: `[swarm]` section in `ravenclaws.toml`
36//! - Supervisor mode: automatically uses orchestrator when `max_depth > 0`
37
38use crate::agent::ConversationMemory;
39use crate::audit::{AuditEventType, AuditLog};
40use crate::error::{RavenClawsError, Result};
41use crate::llm::{ChatMessage, LLMProviderTrait, MultiModelManager};
42use crate::policy::PolicyEngine;
43use crate::ravenfabric::RavenFabricClient;
44use crate::sandbox::Sandbox;
45use crate::tools::ToolRegistry;
46use serde::{Deserialize, Serialize};
47use std::collections::HashMap;
48use std::sync::Arc;
49use tokio::sync::RwLock;
50use tracing::{info, instrument, warn};
51
52// ---------------------------------------------------------------------------
53// Inter-agent communication
54// ---------------------------------------------------------------------------
55
56/// A message exchanged between swarm workers.
57///
58/// Messages are typed so workers can filter and respond appropriately.
59/// All messages are timestamped and include the sender's role for auditability.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct AgentMessage {
62    /// Unique message ID
63    pub id: String,
64
65    /// Sender's worker role (e.g., "researcher", "executor")
66    pub sender: String,
67
68    /// Recipient's worker role ("*" = broadcast to all)
69    pub recipient: String,
70
71    /// Message type
72    pub msg_type: MessageType,
73
74    /// Message content
75    pub content: String,
76
77    /// ISO 8601 timestamp
78    pub timestamp: String,
79
80    /// Optional metadata (e.g., task ID, iteration number)
81    #[serde(default)]
82    pub metadata: HashMap<String, String>,
83}
84
85/// The type of an inter-agent message.
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
87pub enum MessageType {
88    /// Sharing information or findings
89    Information,
90    /// Asking a question or requesting input
91    Question,
92    /// Reporting a result or completion
93    Result,
94    /// Reporting an error or issue
95    Error,
96    /// Coordination message (e.g., task re-assignment)
97    Coordination,
98    /// General purpose message
99    Generic,
100}
101
102impl std::fmt::Display for MessageType {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        match self {
105            MessageType::Information => write!(f, "information"),
106            MessageType::Question => write!(f, "question"),
107            MessageType::Result => write!(f, "result"),
108            MessageType::Error => write!(f, "error"),
109            MessageType::Coordination => write!(f, "coordination"),
110            MessageType::Generic => write!(f, "generic"),
111        }
112    }
113}
114
115/// A shared message bus for inter-agent communication within a swarm.
116///
117/// The message bus is shared across all workers in a swarm via `Arc<RwLock<>>`.
118/// Workers can send messages to specific roles or broadcast to all.
119/// Messages are retained for the lifetime of the swarm and can be queried.
120#[derive(Debug, Clone)]
121pub struct AgentMessageBus {
122    /// All messages sent in this swarm session
123    messages: Vec<AgentMessage>,
124    /// Maximum messages to retain (0 = unlimited)
125    max_messages: usize,
126}
127
128#[allow(dead_code)]
129impl AgentMessageBus {
130    /// Create a new message bus with the given capacity.
131    pub fn new(max_messages: usize) -> Self {
132        Self {
133            messages: Vec::new(),
134            max_messages,
135        }
136    }
137
138    /// Send a message to a specific recipient (or "*" for broadcast).
139    pub fn send(
140        &mut self,
141        sender: &str,
142        recipient: &str,
143        msg_type: MessageType,
144        content: &str,
145        metadata: HashMap<String, String>,
146    ) -> String {
147        let id = uuid::Uuid::new_v4().to_string();
148        let timestamp = chrono::Utc::now().to_rfc3339();
149
150        let msg = AgentMessage {
151            id: id.clone(),
152            sender: sender.to_string(),
153            recipient: recipient.to_string(),
154            msg_type,
155            content: content.to_string(),
156            timestamp,
157            metadata,
158        };
159
160        self.messages.push(msg);
161
162        // Trim oldest messages if over capacity
163        if self.max_messages > 0 && self.messages.len() > self.max_messages {
164            self.messages.remove(0);
165        }
166
167        id
168    }
169
170    /// Get all messages addressed to a specific role (or "*" for broadcast).
171    pub fn messages_for(&self, role: &str) -> Vec<&AgentMessage> {
172        self.messages
173            .iter()
174            .filter(|m| m.recipient == role || m.recipient == "*")
175            .collect()
176    }
177
178    /// Get all messages from a specific sender.
179    pub fn messages_from(&self, sender: &str) -> Vec<&AgentMessage> {
180        self.messages
181            .iter()
182            .filter(|m| m.sender == sender)
183            .collect()
184    }
185
186    /// Get all messages of a specific type.
187    pub fn messages_of_type(&self, msg_type: &MessageType) -> Vec<&AgentMessage> {
188        self.messages
189            .iter()
190            .filter(|m| m.msg_type == *msg_type)
191            .collect()
192    }
193
194    /// Get all messages in the bus.
195    pub fn all_messages(&self) -> &[AgentMessage] {
196        &self.messages
197    }
198
199    /// Get the number of messages in the bus.
200    pub fn len(&self) -> usize {
201        self.messages.len()
202    }
203
204    /// Check if the bus is empty.
205    pub fn is_empty(&self) -> bool {
206        self.messages.is_empty()
207    }
208
209    /// Format recent messages for inclusion in an LLM prompt.
210    ///
211    /// Returns a string with the last N messages formatted for the agent's context.
212    pub fn format_for_prompt(&self, role: &str, max_messages: usize) -> String {
213        let relevant: Vec<&AgentMessage> = self
214            .messages
215            .iter()
216            .filter(|m| m.recipient == role || m.recipient == "*" || m.sender == role)
217            .rev()
218            .take(max_messages)
219            .collect();
220
221        if relevant.is_empty() {
222            return String::new();
223        }
224
225        let mut output = String::from("\n\n--- Inter-Agent Messages ---\n");
226        for msg in relevant.iter().rev() {
227            output.push_str(&format!(
228                "[{}] {} → {} ({}): {}\n",
229                msg.msg_type, msg.sender, msg.recipient, msg.timestamp, msg.content
230            ));
231        }
232        output.push_str("--- End Inter-Agent Messages ---\n");
233        output
234    }
235}
236
237// ---------------------------------------------------------------------------
238// Swarm health monitoring
239// ---------------------------------------------------------------------------
240
241/// Health status of a single swarm worker.
242#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
243pub enum WorkerHealthStatus {
244    /// Worker is active and responding to heartbeats
245    Healthy,
246    /// Worker is running but has not responded to recent heartbeats
247    Degraded,
248    /// Worker has not responded within the timeout window
249    Unhealthy,
250    /// Worker has been terminated and needs replacement
251    Dead,
252}
253
254impl std::fmt::Display for WorkerHealthStatus {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        match self {
257            WorkerHealthStatus::Healthy => write!(f, "healthy"),
258            WorkerHealthStatus::Degraded => write!(f, "degraded"),
259            WorkerHealthStatus::Unhealthy => write!(f, "unhealthy"),
260            WorkerHealthStatus::Dead => write!(f, "dead"),
261        }
262    }
263}
264
265/// Telemetry snapshot for a single worker.
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct WorkerTelemetry {
268    /// Worker role name
269    pub role: String,
270    /// Current health status
271    pub status: WorkerHealthStatus,
272    /// Tasks completed by this worker
273    pub tasks_completed: u64,
274    /// Tasks that failed
275    pub tasks_failed: u64,
276    /// Total errors encountered
277    pub error_count: u64,
278    /// Average task duration in milliseconds
279    pub avg_duration_ms: f64,
280    /// Last heartbeat timestamp (ISO 8601)
281    pub last_heartbeat: String,
282    /// When the worker was spawned (ISO 8601)
283    pub spawned_at: String,
284    /// Number of messages sent via the bus
285    pub messages_sent: u64,
286    /// Number of messages received via the bus
287    pub messages_received: u64,
288    /// Current iteration count
289    pub iteration: u64,
290}
291
292/// Aggregate swarm health metrics.
293#[derive(Debug, Clone, Serialize, Deserialize)]
294pub struct SwarmMetrics {
295    /// Total workers in the swarm
296    pub total_workers: usize,
297    /// Healthy workers
298    pub healthy_workers: usize,
299    /// Degraded workers
300    pub degraded_workers: usize,
301    /// Unhealthy workers
302    pub unhealthy_workers: usize,
303    /// Dead workers
304    pub dead_workers: usize,
305    /// Total tasks completed across all workers
306    pub total_tasks_completed: u64,
307    /// Total tasks failed across all workers
308    pub total_tasks_failed: u64,
309    /// Total errors across all workers
310    pub total_errors: u64,
311    /// Overall average task duration in milliseconds
312    pub overall_avg_duration_ms: f64,
313    /// Task throughput (tasks per second over the last window)
314    pub task_throughput: f64,
315    /// Communication latency (average ms between send and receive)
316    pub communication_latency_ms: f64,
317    /// Worker utilization (0.0–1.0, ratio of busy time to total time)
318    pub worker_utilization: f64,
319    /// Error rate (errors per task)
320    pub error_rate: f64,
321    /// Timestamp of this metrics snapshot
322    pub timestamp: String,
323}
324
325/// Per-worker heartbeat tracker.
326#[derive(Debug, Clone)]
327struct WorkerHeartbeat {
328    /// Worker role
329    role: String,
330    /// When the worker was spawned
331    spawned_at: chrono::DateTime<chrono::Utc>,
332    /// Last heartbeat received
333    last_heartbeat: chrono::DateTime<chrono::Utc>,
334    /// Number of missed heartbeats
335    missed_beats: u32,
336    /// Current health status
337    status: WorkerHealthStatus,
338    /// Tasks completed
339    tasks_completed: u64,
340    /// Tasks failed
341    tasks_failed: u64,
342    /// Error count
343    error_count: u64,
344    /// Total duration of all completed tasks in milliseconds
345    total_duration_ms: f64,
346    /// Number of tasks with recorded duration
347    duration_samples: u64,
348    /// Messages sent
349    messages_sent: u64,
350    /// Messages received
351    messages_received: u64,
352    /// Current iteration
353    iteration: u64,
354    /// Whether this worker is currently busy
355    is_busy: bool,
356    /// When the current task started
357    task_started_at: Option<chrono::DateTime<chrono::Utc>>,
358}
359
360/// Swarm health monitor — tracks worker health, detects failures, and
361/// provides aggregate telemetry.
362///
363/// # Heartbeat Protocol
364///
365/// Workers send heartbeats at regular intervals. If a worker misses
366/// `max_missed_beats` consecutive heartbeats, it is marked as `Unhealthy`.
367/// After `max_missed_beats * 2` missed beats, it is marked as `Dead`.
368///
369/// # Dead-Agent Detection
370///
371/// The health monitor periodically scans all tracked workers. Workers that
372/// have been `Dead` for longer than `replacement_timeout_secs` are candidates
373/// for automatic replacement.
374///
375/// # Metrics
376///
377/// The monitor tracks task throughput, worker utilization, error rates, and
378/// communication latency. These are exposed via `metrics()`.
379#[derive(Debug, Clone)]
380pub struct SwarmHealthMonitor {
381    /// Per-worker heartbeat trackers
382    heartbeats: HashMap<String, WorkerHeartbeat>,
383    /// Heartbeat interval in seconds (default: 5)
384    heartbeat_interval_secs: u64,
385    /// Max missed heartbeats before marking unhealthy (default: 3)
386    max_missed_beats: u32,
387    /// Time in seconds before a dead worker is replaced (default: 30)
388    replacement_timeout_secs: u64,
389    /// When monitoring started
390    started_at: chrono::DateTime<chrono::Utc>,
391    /// Total messages sent across all workers (for latency calculation)
392    total_messages_sent: u64,
393    /// Total messages received across all workers
394    total_messages_received: u64,
395    /// Sum of all task durations for throughput calculation
396    total_duration_ms: f64,
397    /// Total tasks completed across all workers
398    total_tasks_completed: u64,
399}
400
401impl Default for SwarmHealthMonitor {
402    fn default() -> Self {
403        Self {
404            heartbeats: HashMap::new(),
405            heartbeat_interval_secs: 5,
406            max_missed_beats: 3,
407            replacement_timeout_secs: 30,
408            started_at: chrono::Utc::now(),
409            total_messages_sent: 0,
410            total_messages_received: 0,
411            total_duration_ms: 0.0,
412            total_tasks_completed: 0,
413        }
414    }
415}
416
417#[allow(dead_code)]
418impl SwarmHealthMonitor {
419    /// Create a new health monitor with custom parameters.
420    pub fn new(
421        heartbeat_interval_secs: u64,
422        max_missed_beats: u32,
423        replacement_timeout_secs: u64,
424    ) -> Self {
425        Self {
426            heartbeats: HashMap::new(),
427            heartbeat_interval_secs,
428            max_missed_beats,
429            replacement_timeout_secs,
430            started_at: chrono::Utc::now(),
431            total_messages_sent: 0,
432            total_messages_received: 0,
433            total_duration_ms: 0.0,
434            total_tasks_completed: 0,
435        }
436    }
437
438    /// Register a new worker for health tracking.
439    pub fn register_worker(&mut self, role: &str) {
440        let now = chrono::Utc::now();
441        self.heartbeats
442            .entry(role.to_string())
443            .or_insert(WorkerHeartbeat {
444                role: role.to_string(),
445                spawned_at: now,
446                last_heartbeat: now,
447                missed_beats: 0,
448                status: WorkerHealthStatus::Healthy,
449                tasks_completed: 0,
450                tasks_failed: 0,
451                error_count: 0,
452                total_duration_ms: 0.0,
453                duration_samples: 0,
454                messages_sent: 0,
455                messages_received: 0,
456                iteration: 0,
457                is_busy: false,
458                task_started_at: None,
459            });
460    }
461
462    /// Record a heartbeat from a worker.
463    pub fn heartbeat(&mut self, role: &str) {
464        if let Some(hb) = self.heartbeats.get_mut(role) {
465            hb.last_heartbeat = chrono::Utc::now();
466            hb.missed_beats = 0;
467            hb.status = WorkerHealthStatus::Healthy;
468        }
469    }
470
471    /// Record that a worker started a task.
472    pub fn task_started(&mut self, role: &str) {
473        if let Some(hb) = self.heartbeats.get_mut(role) {
474            hb.is_busy = true;
475            hb.task_started_at = Some(chrono::Utc::now());
476            hb.iteration += 1;
477        }
478    }
479
480    /// Record that a worker completed a task successfully.
481    pub fn task_completed(&mut self, role: &str) {
482        if let Some(hb) = self.heartbeats.get_mut(role) {
483            hb.tasks_completed += 1;
484            hb.is_busy = false;
485
486            if let Some(started) = hb.task_started_at {
487                let duration = (chrono::Utc::now() - started).num_milliseconds() as f64;
488                hb.total_duration_ms += duration;
489                hb.duration_samples += 1;
490                self.total_duration_ms += duration;
491            }
492            self.total_tasks_completed += 1;
493            hb.task_started_at = None;
494        }
495    }
496
497    /// Record that a worker's task failed.
498    pub fn task_failed(&mut self, role: &str) {
499        if let Some(hb) = self.heartbeats.get_mut(role) {
500            hb.tasks_failed += 1;
501            hb.error_count += 1;
502            hb.is_busy = false;
503            hb.task_started_at = None;
504        }
505    }
506
507    /// Record an error from a worker.
508    pub fn record_error(&mut self, role: &str) {
509        if let Some(hb) = self.heartbeats.get_mut(role) {
510            hb.error_count += 1;
511        }
512    }
513
514    /// Record a message sent by a worker.
515    pub fn message_sent(&mut self, role: &str) {
516        if let Some(hb) = self.heartbeats.get_mut(role) {
517            hb.messages_sent += 1;
518        }
519        self.total_messages_sent += 1;
520    }
521
522    /// Record a message received by a worker.
523    pub fn message_received(&mut self, role: &str) {
524        if let Some(hb) = self.heartbeats.get_mut(role) {
525            hb.messages_received += 1;
526        }
527        self.total_messages_received += 1;
528    }
529
530    /// Scan all workers and update their health status based on heartbeat timing.
531    /// Returns a list of workers that have been detected as dead.
532    pub fn check_health(&mut self) -> Vec<String> {
533        let now = chrono::Utc::now();
534        let mut dead_workers = Vec::new();
535
536        for hb in self.heartbeats.values_mut() {
537            let elapsed = (now - hb.last_heartbeat).num_seconds();
538
539            if elapsed > (self.heartbeat_interval_secs * self.max_missed_beats as u64 * 2) as i64 {
540                if hb.status != WorkerHealthStatus::Dead {
541                    hb.status = WorkerHealthStatus::Dead;
542                    dead_workers.push(hb.role.clone());
543                }
544            } else if elapsed > (self.heartbeat_interval_secs * self.max_missed_beats as u64) as i64
545            {
546                hb.status = WorkerHealthStatus::Unhealthy;
547            } else if elapsed > (self.heartbeat_interval_secs * 2) as i64 {
548                hb.status = WorkerHealthStatus::Degraded;
549            }
550        }
551
552        dead_workers
553    }
554
555    /// Get telemetry for a specific worker.
556    pub fn worker_telemetry(&self, role: &str) -> Option<WorkerTelemetry> {
557        self.heartbeats.get(role).map(|hb| {
558            let avg_dur = if hb.duration_samples > 0 {
559                hb.total_duration_ms / hb.duration_samples as f64
560            } else {
561                0.0
562            };
563
564            WorkerTelemetry {
565                role: hb.role.clone(),
566                status: hb.status.clone(),
567                tasks_completed: hb.tasks_completed,
568                tasks_failed: hb.tasks_failed,
569                error_count: hb.error_count,
570                avg_duration_ms: avg_dur,
571                last_heartbeat: hb.last_heartbeat.to_rfc3339(),
572                spawned_at: hb.spawned_at.to_rfc3339(),
573                messages_sent: hb.messages_sent,
574                messages_received: hb.messages_received,
575                iteration: hb.iteration,
576            }
577        })
578    }
579
580    /// Get telemetry for all workers.
581    pub fn all_worker_telemetry(&self) -> Vec<WorkerTelemetry> {
582        let mut roles: Vec<String> = self.heartbeats.keys().cloned().collect();
583        roles.sort();
584        roles
585            .iter()
586            .filter_map(|r| self.worker_telemetry(r))
587            .collect()
588    }
589
590    /// Get aggregate swarm metrics.
591    pub fn metrics(&self) -> SwarmMetrics {
592        let mut healthy = 0;
593        let mut degraded = 0;
594        let mut unhealthy = 0;
595        let mut dead = 0;
596        let mut total_completed = 0u64;
597        let mut total_failed = 0u64;
598        let mut total_errors = 0u64;
599        let mut busy_workers = 0u64;
600        let total_workers = self.heartbeats.len();
601
602        for hb in self.heartbeats.values() {
603            match hb.status {
604                WorkerHealthStatus::Healthy => healthy += 1,
605                WorkerHealthStatus::Degraded => degraded += 1,
606                WorkerHealthStatus::Unhealthy => unhealthy += 1,
607                WorkerHealthStatus::Dead => dead += 1,
608            }
609            total_completed += hb.tasks_completed;
610            total_failed += hb.tasks_failed;
611            total_errors += hb.error_count;
612            if hb.is_busy {
613                busy_workers += 1;
614            }
615        }
616
617        let elapsed_secs = (chrono::Utc::now() - self.started_at).num_seconds().max(1) as f64;
618        let task_throughput = self.total_tasks_completed as f64 / elapsed_secs;
619
620        let worker_utilization = if total_workers > 0 {
621            busy_workers as f64 / total_workers as f64
622        } else {
623            0.0
624        };
625
626        let error_rate = if total_completed + total_failed > 0 {
627            total_errors as f64 / (total_completed + total_failed) as f64
628        } else {
629            0.0
630        };
631
632        let overall_avg_duration = if self.total_tasks_completed > 0 {
633            self.total_duration_ms / self.total_tasks_completed as f64
634        } else {
635            0.0
636        };
637
638        let comm_latency = if self.total_messages_sent > 0 {
639            // Approximate: average time between send and receive is estimated
640            // from the ratio of received to sent messages × average interval
641            let ratio = self.total_messages_received as f64 / self.total_messages_sent as f64;
642            ratio * (self.heartbeat_interval_secs as f64 * 1000.0) / 2.0
643        } else {
644            0.0
645        };
646
647        SwarmMetrics {
648            total_workers,
649            healthy_workers: healthy,
650            degraded_workers: degraded,
651            unhealthy_workers: unhealthy,
652            dead_workers: dead,
653            total_tasks_completed: total_completed,
654            total_tasks_failed: total_failed,
655            total_errors,
656            overall_avg_duration_ms: overall_avg_duration,
657            task_throughput,
658            communication_latency_ms: comm_latency,
659            worker_utilization,
660            error_rate,
661            timestamp: chrono::Utc::now().to_rfc3339(),
662        }
663    }
664
665    /// Get workers that are candidates for replacement (dead for > replacement_timeout).
666    pub fn dead_workers_for_replacement(&self) -> Vec<String> {
667        let now = chrono::Utc::now();
668        self.heartbeats
669            .iter()
670            .filter(|(_, hb)| hb.status == WorkerHealthStatus::Dead)
671            .filter(|(_, hb)| {
672                let elapsed = (now - hb.last_heartbeat).num_seconds();
673                elapsed >= self.replacement_timeout_secs as i64
674            })
675            .map(|(role, _)| role.clone())
676            .collect()
677    }
678
679    /// Remove a worker from tracking (after replacement).
680    pub fn remove_worker(&mut self, role: &str) {
681        self.heartbeats.remove(role);
682    }
683
684    /// Get the number of tracked workers.
685    pub fn worker_count(&self) -> usize {
686        self.heartbeats.len()
687    }
688
689    /// Format health status for logging.
690    pub fn format_status(&self) -> String {
691        let m = self.metrics();
692        format!(
693            "Swarm Health: {}/{} healthy, {} degraded, {} unhealthy, {} dead | \
694             {} tasks ({:.1}/s) | {:.1}% utilization | {:.2}% error rate",
695            m.healthy_workers,
696            m.total_workers,
697            m.degraded_workers,
698            m.unhealthy_workers,
699            m.dead_workers,
700            m.total_tasks_completed,
701            m.task_throughput,
702            m.worker_utilization * 100.0,
703            m.error_rate * 100.0,
704        )
705    }
706}
707
708// ---------------------------------------------------------------------------
709// Worker profile
710// ---------------------------------------------------------------------------
711
712/// Declarative profile for a swarm worker agent.
713///
714/// Each worker has a unique combination of persona, capabilities, and resource
715/// limits. Profiles are composable (can inherit from a base profile) and
716/// inheritable (sub-profiles can override specific fields).
717#[derive(Debug, Clone, Serialize, Deserialize)]
718pub struct WorkerProfile {
719    /// Unique name for this profile (e.g., "researcher", "coder", "reviewer")
720    pub name: String,
721
722    /// Human-readable description of this worker's role
723    #[serde(default)]
724    pub description: String,
725
726    /// System prompt / persona for this worker
727    pub persona: String,
728
729    /// Which tools this worker can use (empty = all available)
730    #[serde(default)]
731    pub allowed_tools: Vec<String>,
732
733    /// Provider override (empty = use default from config)
734    #[serde(default)]
735    pub provider: Option<String>,
736
737    /// Model override (empty = use default from config)
738    #[serde(default)]
739    pub model: Option<String>,
740
741    /// Maximum iterations for this worker's agent loop
742    #[serde(default = "default_worker_max_iterations")]
743    pub max_iterations: usize,
744
745    /// Maximum memory messages for this worker
746    #[serde(default = "default_worker_memory")]
747    pub max_memory_messages: usize,
748
749    /// Whether this worker can spawn sub-workers (recursive supervision)
750    #[serde(default = "default_true")]
751    pub can_delegate: bool,
752
753    /// Resource limits
754    #[serde(default)]
755    pub resource_limits: ResourceLimits,
756}
757
758fn default_worker_max_iterations() -> usize {
759    10
760}
761
762fn default_worker_memory() -> usize {
763    20
764}
765
766fn default_true() -> bool {
767    true
768}
769
770/// Resource limits for a worker agent
771#[derive(Debug, Clone, Serialize, Deserialize)]
772pub struct ResourceLimits {
773    /// Maximum tool calls per tick
774    #[serde(default = "default_max_tool_calls")]
775    pub max_tool_calls: usize,
776
777    /// Maximum total execution time in seconds
778    #[serde(default = "default_max_exec_secs")]
779    pub max_exec_secs: u64,
780}
781
782fn default_max_tool_calls() -> usize {
783    50
784}
785
786fn default_max_exec_secs() -> u64 {
787    300
788}
789
790impl Default for ResourceLimits {
791    fn default() -> Self {
792        Self {
793            max_tool_calls: default_max_tool_calls(),
794            max_exec_secs: default_max_exec_secs(),
795        }
796    }
797}
798
799impl Default for WorkerProfile {
800    fn default() -> Self {
801        Self {
802            name: "default".to_string(),
803            description: String::new(),
804            persona: "You are a helpful assistant.".to_string(),
805            allowed_tools: Vec::new(),
806            provider: None,
807            model: None,
808            max_iterations: default_worker_max_iterations(),
809            max_memory_messages: default_worker_memory(),
810            can_delegate: default_true(),
811            resource_limits: ResourceLimits::default(),
812        }
813    }
814}
815
816// ---------------------------------------------------------------------------
817// Built-in profiles
818// ---------------------------------------------------------------------------
819
820impl WorkerProfile {
821    /// Analytical researcher profile — focused on logic, structure, and precision
822    pub fn researcher() -> Self {
823        Self {
824            name: "researcher".to_string(),
825            description: "Analytical researcher focused on data gathering and analysis".to_string(),
826            persona: "You are an analytical researcher. Focus on gathering data, \
827                      verifying facts, and providing well-structured analysis. \
828                      Be thorough and cite your sources."
829                .to_string(),
830            allowed_tools: vec![
831                "web_fetch".to_string(),
832                "web_search".to_string(),
833                "read_file".to_string(),
834            ],
835            max_iterations: 15,
836            can_delegate: false,
837            ..Default::default()
838        }
839    }
840
841    /// Creative problem-solver profile — focused on innovation and alternatives
842    pub fn creative() -> Self {
843        Self {
844            name: "creative".to_string(),
845            description: "Creative problem-solver focused on innovation".to_string(),
846            persona: "You are a creative problem-solver. Focus on generating \
847                      innovative solutions, exploring alternatives, and thinking \
848                      outside the box. Consider multiple perspectives."
849                .to_string(),
850            allowed_tools: vec!["write_file".to_string(), "web_search".to_string()],
851            max_iterations: 10,
852            can_delegate: false,
853            ..Default::default()
854        }
855    }
856
857    /// Pragmatic executor profile — focused on getting things done efficiently
858    pub fn executor() -> Self {
859        Self {
860            name: "executor".to_string(),
861            description: "Pragmatic executor focused on efficient task completion".to_string(),
862            persona: "You are a pragmatic executor. Focus on completing tasks \
863                      efficiently and correctly. Prioritize simplicity and \
864                      practicality over perfection."
865                .to_string(),
866            allowed_tools: vec![
867                "shell_exec".to_string(),
868                "read_file".to_string(),
869                "write_file".to_string(),
870                "web_fetch".to_string(),
871            ],
872            max_iterations: 8,
873            can_delegate: false,
874            ..Default::default()
875        }
876    }
877
878    /// Reviewer / quality-assurance profile — focused on verification and validation
879    pub fn reviewer() -> Self {
880        Self {
881            name: "reviewer".to_string(),
882            description: "Quality assurance reviewer focused on verification".to_string(),
883            persona: "You are a meticulous reviewer. Focus on verifying correctness, \
884                      identifying issues, and ensuring quality. Be critical and \
885                      constructive. Check for errors, edge cases, and improvements."
886                .to_string(),
887            allowed_tools: vec!["read_file".to_string(), "web_fetch".to_string()],
888            max_iterations: 10,
889            can_delegate: false,
890            ..Default::default()
891        }
892    }
893
894    /// Supervisor profile — can delegate to sub-workers
895    pub fn supervisor() -> Self {
896        Self {
897            name: "supervisor".to_string(),
898            description: "Supervisor that decomposes tasks and coordinates sub-agents".to_string(),
899            persona: "You are a supervisor agent. Your role is to decompose complex \
900                      tasks into subtasks and coordinate sub-agents to complete them. \
901                      Analyze the task, break it down, assign work, and aggregate results. \
902                      \n\nFor each subtask, respond with:\n\
903                      SUBTASK: <description>\n\
904                      ROLE: <researcher|creative|executor|reviewer|supervisor>\n\
905                      \nWhen all subtasks are complete, respond with:\n\
906                      FINAL: <aggregated result>"
907                .to_string(),
908            allowed_tools: Vec::new(),
909            max_iterations: 20,
910            can_delegate: true,
911            ..Default::default()
912        }
913    }
914}
915
916// ---------------------------------------------------------------------------
917// Swarm configuration
918// ---------------------------------------------------------------------------
919
920/// Swarm topology — how workers are organized
921#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
922#[serde(rename_all = "lowercase")]
923pub enum SwarmTopology {
924    /// Single coordinator delegates to workers (default)
925    #[serde(alias = "flat")]
926    Star,
927    /// Peer-to-peer — workers communicate directly
928    Mesh,
929    /// Tree of supervisors — recursive decomposition
930    Hierarchical,
931    /// Combination of topologies based on task
932    Hybrid,
933}
934
935#[allow(clippy::derivable_impls)]
936impl Default for SwarmTopology {
937    fn default() -> Self {
938        Self::Star
939    }
940}
941
942/// Swarm orchestration configuration
943#[derive(Debug, Clone, Serialize, Deserialize)]
944pub struct SwarmConfig {
945    /// Swarm topology
946    #[serde(default)]
947    pub topology: SwarmTopology,
948
949    /// Maximum recursion depth for hierarchical decomposition (default: 3)
950    #[serde(default = "default_max_depth")]
951    pub max_depth: usize,
952
953    /// Maximum number of workers in the swarm (default: 100)
954    #[serde(default = "default_max_workers", alias = "agent_count")]
955    pub max_workers: usize,
956
957    /// Worker profiles available for role assignment
958    ///
959    /// Accepts both `[[swarm.profiles]]` array-of-tables and `[swarm.profiles.name]`
960    /// map syntax in TOML. The map syntax uses the key as the profile name and the
961    /// value as the persona string (shorthand for quick configuration).
962    ///
963    /// # Examples (TOML)
964    ///
965    /// ```toml
966    /// # Array-of-tables (full syntax)
967    /// [[swarm.profiles]]
968    /// name = "researcher"
969    /// persona = "You are a research specialist..."
970    ///
971    /// # Map shorthand (name → persona)
972    /// [swarm.profiles]
973    /// coder = "You are a Rust expert..."
974    /// reviewer = "You are a code reviewer..."
975    /// ```
976    #[serde(default, deserialize_with = "deserialize_profiles")]
977    pub profiles: Vec<WorkerProfile>,
978
979    /// Enable dynamic role assignment based on task analysis
980    #[serde(default = "default_true")]
981    pub dynamic_role_assignment: bool,
982
983    /// Enable inter-agent communication
984    #[serde(default)]
985    pub enable_agent_communication: bool,
986
987    /// Enable swarm health monitoring
988    #[serde(default)]
989    pub enable_health_monitoring: bool,
990}
991
992fn default_max_depth() -> usize {
993    3
994}
995
996fn default_max_workers() -> usize {
997    100
998}
999
1000/// Custom deserializer for `profiles` that accepts both array-of-tables and map syntax.
1001///
1002/// In TOML:
1003/// - `[[swarm.profiles]]` → array of tables (standard)
1004/// - `[swarm.profiles.name]` → map with key as profile name, value as persona string
1005///
1006/// The map shorthand creates a `WorkerProfile` with the key as `name` and the
1007/// string value as `persona`. All other fields use defaults.
1008fn deserialize_profiles<'de, D>(
1009    deserializer: D,
1010) -> std::result::Result<Vec<WorkerProfile>, D::Error>
1011where
1012    D: serde::Deserializer<'de>,
1013{
1014    // Try to deserialize as a map first (shorthand syntax)
1015    // If that fails, fall back to array-of-tables
1016    #[derive(Deserialize)]
1017    #[serde(untagged)]
1018    enum ProfilesOrMap {
1019        Array(Vec<WorkerProfile>),
1020        Map(std::collections::HashMap<String, String>),
1021    }
1022
1023    match ProfilesOrMap::deserialize(deserializer) {
1024        Ok(ProfilesOrMap::Array(profiles)) => Ok(profiles),
1025        Ok(ProfilesOrMap::Map(map)) => {
1026            let profiles: Vec<WorkerProfile> = map
1027                .into_iter()
1028                .map(|(name, persona)| WorkerProfile {
1029                    name,
1030                    persona,
1031                    ..WorkerProfile::default()
1032                })
1033                .collect();
1034            Ok(profiles)
1035        }
1036        Err(e) => Err(e),
1037    }
1038}
1039
1040impl Default for SwarmConfig {
1041    fn default() -> Self {
1042        Self {
1043            topology: SwarmTopology::default(),
1044            max_depth: default_max_depth(),
1045            max_workers: default_max_workers(),
1046            profiles: vec![
1047                WorkerProfile::researcher(),
1048                WorkerProfile::creative(),
1049                WorkerProfile::executor(),
1050                WorkerProfile::reviewer(),
1051                WorkerProfile::supervisor(),
1052            ],
1053            dynamic_role_assignment: true,
1054            enable_agent_communication: false,
1055            enable_health_monitoring: false,
1056        }
1057    }
1058}
1059
1060// ---------------------------------------------------------------------------
1061// Swarm orchestrator
1062// ---------------------------------------------------------------------------
1063
1064/// The swarm orchestrator manages recursive task decomposition and worker
1065/// coordination. It is the core of the self-provisioning sub-agents feature.
1066pub struct SwarmOrchestrator {
1067    /// Swarm configuration
1068    config: SwarmConfig,
1069    /// Current recursion depth (for enforcing max_depth)
1070    current_depth: usize,
1071    /// Total workers spawned (for enforcing max_workers)
1072    worker_count: usize,
1073    /// The LLM provider for this orchestrator instance
1074    llm: Option<Arc<dyn LLMProviderTrait>>,
1075    /// Multi-model manager (if using multiple providers)
1076    multi_llm: Option<MultiModelManager>,
1077    /// RavenFabric client for remote execution
1078    ravenfabric: Option<RavenFabricClient>,
1079    /// Policy engine for security (reserved for future tool execution)
1080    #[allow(dead_code)]
1081    policy_engine: PolicyEngine,
1082    /// Sandbox for execution
1083    sandbox: Sandbox,
1084    /// Audit log
1085    audit_log: AuditLog,
1086    /// Tool registry (reserved for future worker tool execution)
1087    #[allow(dead_code)]
1088    registry: ToolRegistry,
1089    /// Inter-agent message bus (shared across sub-orchestrators)
1090    message_bus: Option<Arc<RwLock<AgentMessageBus>>>,
1091    /// Swarm health monitor (active when enable_health_monitoring is true)
1092    /// Uses Arc<RwLock> for interior mutability since execute_with_profile takes &self
1093    health_monitor: Option<Arc<RwLock<SwarmHealthMonitor>>>,
1094}
1095
1096impl SwarmOrchestrator {
1097    /// Create a new swarm orchestrator with the given configuration.
1098    pub fn new(
1099        config: SwarmConfig,
1100        llm: Option<Arc<dyn LLMProviderTrait>>,
1101        multi_llm: Option<MultiModelManager>,
1102        ravenfabric: Option<RavenFabricClient>,
1103    ) -> Self {
1104        let policy_engine = PolicyEngine::default_secure();
1105        let sandbox = Sandbox::default();
1106        let audit_log = AuditLog::new(format!("swarm-{}", std::process::id()));
1107        let registry = ToolRegistry::with_default_tools();
1108
1109        // Create message bus if inter-agent communication is enabled
1110        let message_bus = if config.enable_agent_communication {
1111            Some(Arc::new(RwLock::new(AgentMessageBus::new(1000))))
1112        } else {
1113            None
1114        };
1115
1116        // Create health monitor if health monitoring is enabled
1117        let health_monitor = if config.enable_health_monitoring {
1118            Some(Arc::new(RwLock::new(SwarmHealthMonitor::default())))
1119        } else {
1120            None
1121        };
1122
1123        Self {
1124            config,
1125            current_depth: 0,
1126            worker_count: 0,
1127            llm,
1128            multi_llm,
1129            ravenfabric,
1130            policy_engine,
1131            sandbox,
1132            audit_log,
1133            registry,
1134            message_bus,
1135            health_monitor,
1136        }
1137    }
1138
1139    /// Create a new swarm orchestrator with a shared message bus.
1140    ///
1141    /// This is used when spawning sub-orchestrators that should share
1142    /// the same communication channel as their parent.
1143    #[allow(dead_code)]
1144    pub fn new_with_bus(
1145        config: SwarmConfig,
1146        llm: Option<Arc<dyn LLMProviderTrait>>,
1147        multi_llm: Option<MultiModelManager>,
1148        ravenfabric: Option<RavenFabricClient>,
1149        message_bus: Option<Arc<RwLock<AgentMessageBus>>>,
1150    ) -> Self {
1151        let policy_engine = PolicyEngine::default_secure();
1152        let sandbox = Sandbox::default();
1153        let audit_log = AuditLog::new(format!("swarm-{}", std::process::id()));
1154        let registry = ToolRegistry::with_default_tools();
1155
1156        // Create health monitor if health monitoring is enabled
1157        let health_monitor = if config.enable_health_monitoring {
1158            Some(Arc::new(RwLock::new(SwarmHealthMonitor::default())))
1159        } else {
1160            None
1161        };
1162
1163        Self {
1164            config,
1165            current_depth: 0,
1166            worker_count: 0,
1167            llm,
1168            multi_llm,
1169            ravenfabric,
1170            policy_engine,
1171            sandbox,
1172            audit_log,
1173            registry,
1174            message_bus,
1175            health_monitor,
1176        }
1177    }
1178
1179    /// Initialize the sandbox for this orchestrator.
1180    pub async fn init(&mut self) -> Result<()> {
1181        self.sandbox.init().await.map_err(|e| {
1182            RavenClawsError::CommandExecution(format!("Swarm sandbox init failed: {}", e))
1183        })?;
1184        Ok(())
1185    }
1186
1187    /// Get the current worker count.
1188    #[allow(dead_code)]
1189    pub fn worker_count(&self) -> usize {
1190        self.worker_count
1191    }
1192
1193    /// Get the current recursion depth.
1194    #[allow(dead_code)]
1195    pub fn current_depth(&self) -> usize {
1196        self.current_depth
1197    }
1198
1199    /// Get swarm health metrics, if health monitoring is enabled.
1200    #[allow(dead_code)]
1201    pub fn health_metrics(&self) -> Option<SwarmMetrics> {
1202        self.health_monitor
1203            .as_ref()
1204            .and_then(|hm| hm.try_read().ok())
1205            .map(|hm| hm.metrics())
1206    }
1207
1208    /// Get telemetry for all workers, if health monitoring is enabled.
1209    #[allow(dead_code)]
1210    pub fn worker_telemetry(&self) -> Option<Vec<WorkerTelemetry>> {
1211        self.health_monitor
1212            .as_ref()
1213            .and_then(|hm| hm.try_read().ok())
1214            .map(|hm| hm.all_worker_telemetry())
1215    }
1216
1217    /// Orchestrate a task — decompose, assign, execute, and aggregate.
1218    ///
1219    /// This is the main entry point for swarm execution. It analyzes the task,
1220    /// determines required roles, spawns workers, and aggregates results.
1221    #[instrument(skip(self, task), fields(depth = self.current_depth, workers = self.worker_count))]
1222    pub async fn orchestrate(&mut self, task: &str) -> Result<String> {
1223        // Delegate to the boxed implementation to handle recursion safely.
1224        self.orchestrate_impl(task).await
1225    }
1226
1227    /// Non-recursive entry point — calls the boxed recursive implementation.
1228    async fn orchestrate_impl(&mut self, task: &str) -> Result<String> {
1229        info!(
1230            depth = self.current_depth,
1231            max_depth = self.config.max_depth,
1232            "Orchestrating task"
1233        );
1234
1235        // Log health status at start if monitoring is enabled
1236        if let Some(ref hm) = self.health_monitor {
1237            if let Ok(hm_guard) = hm.try_read() {
1238                info!(health = %hm_guard.format_status(), "Swarm health at start");
1239            }
1240        }
1241
1242        // Check recursion depth
1243        if self.current_depth >= self.config.max_depth {
1244            warn!(
1245                depth = self.current_depth,
1246                max_depth = self.config.max_depth,
1247                "Max recursion depth reached, executing directly"
1248            );
1249            return self.execute_direct(task).await;
1250        }
1251
1252        // Check worker limit
1253        if self.worker_count >= self.config.max_workers {
1254            warn!(
1255                workers = self.worker_count,
1256                max_workers = self.config.max_workers,
1257                "Max workers reached, executing directly"
1258            );
1259            return self.execute_direct(task).await;
1260        }
1261
1262        // Analyze task and determine roles
1263        let roles = if self.config.dynamic_role_assignment {
1264            self.analyze_task_roles(task).await?
1265        } else {
1266            // Default: use supervisor + executor
1267            vec!["supervisor".to_string()]
1268        };
1269
1270        info!(roles = ?roles, "Assigned roles for task");
1271
1272        // If only one role and it's not supervisor, execute directly
1273        if roles.len() == 1 && roles[0] != "supervisor" {
1274            return self.execute_with_profile(task, &roles[0]).await;
1275        }
1276
1277        // If supervisor role is present, do recursive decomposition
1278        if roles.contains(&"supervisor".to_string()) || roles.len() > 1 {
1279            return self.recursive_supervise_impl(task, &roles).await;
1280        }
1281
1282        // Default: execute directly
1283        self.execute_direct(task).await
1284    }
1285
1286    /// Analyze a task and determine which worker roles are needed.
1287    async fn analyze_task_roles(&self, task: &str) -> Result<Vec<String>> {
1288        // Use the LLM to analyze the task if available
1289        if let Some(ref llm) = self.llm {
1290            let analysis_prompt = format!(
1291                "Analyze this task and determine which roles are needed to complete it. \
1292                 Available roles: researcher, creative, executor, reviewer, supervisor. \
1293                 \n\nTask: {}\n\n\
1294                 Respond with a comma-separated list of roles needed, nothing else. \
1295                 Example: researcher, executor, reviewer",
1296                task
1297            );
1298
1299            let messages = vec![
1300                ChatMessage::new(
1301                    "system",
1302                    "You are a task analysis expert. Respond only with a comma-separated list of roles.",
1303                ),
1304                ChatMessage::new("user", analysis_prompt),
1305            ];
1306
1307            match llm.chat(messages).await {
1308                Ok(response) => {
1309                    let content = response
1310                        .choices
1311                        .first()
1312                        .map(|c| c.message.content.clone())
1313                        .unwrap_or_default();
1314
1315                    let roles: Vec<String> = content
1316                        .split(',')
1317                        .map(|r| r.trim().to_lowercase())
1318                        .filter(|r| {
1319                            matches!(
1320                                r.as_str(),
1321                                "researcher" | "creative" | "executor" | "reviewer" | "supervisor"
1322                            )
1323                        })
1324                        .collect();
1325
1326                    if roles.is_empty() {
1327                        Ok(vec!["executor".to_string()])
1328                    } else {
1329                        Ok(roles)
1330                    }
1331                }
1332                Err(e) => {
1333                    warn!(error = %e, "Task analysis failed, using default roles");
1334                    Ok(vec!["executor".to_string()])
1335                }
1336            }
1337        } else {
1338            // No LLM available, use default roles
1339            Ok(vec!["executor".to_string()])
1340        }
1341    }
1342
1343    /// Execute a task directly with a specific worker profile.
1344    async fn execute_with_profile(&self, task: &str, role: &str) -> Result<String> {
1345        let profile = self
1346            .config
1347            .profiles
1348            .iter()
1349            .find(|p| p.name == role)
1350            .cloned()
1351            .unwrap_or_else(|| {
1352                if role == "supervisor" {
1353                    WorkerProfile::supervisor()
1354                } else {
1355                    WorkerProfile::executor()
1356                }
1357            });
1358
1359        info!(role = %role, profile = %profile.name, "Executing task with profile");
1360
1361        // Register worker with health monitor and record task start
1362        if let Some(ref hm) = self.health_monitor {
1363            if let Ok(mut hm_guard) = hm.try_write() {
1364                hm_guard.register_worker(role);
1365                hm_guard.task_started(role);
1366            }
1367        }
1368
1369        let llm = self.llm.as_ref().ok_or_else(|| {
1370            RavenClawsError::CommandExecution("No LLM provider available for worker".to_string())
1371        })?;
1372
1373        let mut memory = ConversationMemory::new(&profile.persona, profile.max_memory_messages);
1374
1375        // Include inter-agent messages in the prompt if communication is enabled
1376        let enriched_task = if let Some(ref bus) = self.message_bus {
1377            if let Ok(bus_guard) = bus.try_read() {
1378                let msg_context = bus_guard.format_for_prompt(role, 20);
1379                format!("{}{}", task, msg_context)
1380            } else {
1381                task.to_string()
1382            }
1383        } else {
1384            task.to_string()
1385        };
1386
1387        memory.add_user_message(&enriched_task);
1388
1389        let messages = memory.history().to_vec();
1390        let response = llm.chat(messages).await.map_err(|e| {
1391            // Record failure in health monitor
1392            if let Some(ref hm) = self.health_monitor {
1393                if let Ok(mut hm_guard) = hm.try_write() {
1394                    hm_guard.task_failed(role);
1395                }
1396            }
1397            RavenClawsError::CommandExecution(format!("Worker {} failed: {}", role, e))
1398        })?;
1399
1400        let content = response
1401            .choices
1402            .first()
1403            .map(|c| c.message.content.clone())
1404            .unwrap_or_default();
1405
1406        // Record task completion in health monitor
1407        if let Some(ref hm) = self.health_monitor {
1408            if let Ok(mut hm_guard) = hm.try_write() {
1409                hm_guard.task_completed(role);
1410                hm_guard.heartbeat(role);
1411            }
1412        }
1413
1414        // Broadcast result to other workers via message bus
1415        if let Some(ref bus) = self.message_bus {
1416            if let Ok(mut bus_guard) = bus.try_write() {
1417                bus_guard.send(
1418                    role,
1419                    "*",
1420                    MessageType::Result,
1421                    &format!(
1422                        "Completed task. Result ({} chars): {}",
1423                        content.len(),
1424                        &content[..content.len().min(500)]
1425                    ),
1426                    HashMap::new(),
1427                );
1428            }
1429            // Track message in health monitor
1430            if let Some(ref hm) = self.health_monitor {
1431                if let Ok(mut hm_guard) = hm.try_write() {
1432                    hm_guard.message_sent(role);
1433                }
1434            }
1435        }
1436
1437        let _ = self.audit_log.append(
1438            AuditEventType::AgentFinish,
1439            &format!("worker-{}", role),
1440            &format!("Worker {} completed task", role),
1441            Some(serde_json::json!({
1442                "role": role,
1443                "task_length": task.len(),
1444                "response_length": content.len(),
1445            })),
1446        );
1447
1448        Ok(content)
1449    }
1450
1451    /// Execute a task directly without decomposition (leaf node).
1452    async fn execute_direct(&self, task: &str) -> Result<String> {
1453        self.execute_with_profile(task, "executor").await
1454    }
1455
1456    /// Recursive supervision — decompose task, spawn sub-supervisors or workers.
1457    ///
1458    /// This is a thin wrapper that delegates to the boxed implementation
1459    /// to avoid Rust's recursive async fn limitation.
1460    #[allow(dead_code)]
1461    async fn recursive_supervise(&self, task: &str, roles: &[String]) -> Result<String> {
1462        let task = task.to_string();
1463        let roles = roles.to_vec();
1464        let this: &SwarmOrchestrator = self;
1465        Box::pin(async move { this.recursive_supervise_impl(&task, &roles).await }).await
1466    }
1467
1468    /// Recursive supervision implementation (boxed to avoid infinite future size).
1469    async fn recursive_supervise_impl(&self, task: &str, roles: &[String]) -> Result<String> {
1470        let llm = self.llm.as_ref().ok_or_else(|| {
1471            RavenClawsError::CommandExecution(
1472                "No LLM provider available for supervisor".to_string(),
1473            )
1474        })?;
1475
1476        // Register supervisor with health monitor
1477        if let Some(ref hm) = self.health_monitor {
1478            if let Ok(mut hm_guard) = hm.try_write() {
1479                hm_guard.register_worker("supervisor");
1480                hm_guard.task_started("supervisor");
1481            }
1482        }
1483
1484        let supervisor_profile = WorkerProfile::supervisor();
1485        let mut memory = ConversationMemory::new(
1486            &supervisor_profile.persona,
1487            supervisor_profile.max_memory_messages,
1488        );
1489
1490        let role_list = roles.join(", ");
1491
1492        // Include inter-agent messages in the supervisor prompt if communication is enabled
1493        let msg_context = if let Some(ref bus) = self.message_bus {
1494            if let Ok(bus_guard) = bus.try_read() {
1495                bus_guard.format_for_prompt("supervisor", 20)
1496            } else {
1497                String::new()
1498            }
1499        } else {
1500            String::new()
1501        };
1502
1503        let supervise_prompt = format!(
1504            "Decompose this task into subtasks and assign each to the most appropriate role.\n\
1505             Available roles: {}\n\n\
1506             Task: {}\n\n\
1507             For each subtask, respond with:\n\
1508             SUBTASK: <description>\n\
1509             ROLE: <role>\n\n\
1510             When all subtasks are complete, respond with:\n\
1511             FINAL: <aggregated result>\n\
1512             {}",
1513            role_list, task, msg_context
1514        );
1515
1516        memory.add_user_message(&supervise_prompt);
1517
1518        let mut subtask_results: Vec<String> = Vec::new();
1519        let mut iteration = 0;
1520        let max_iterations = supervisor_profile.max_iterations;
1521
1522        loop {
1523            iteration += 1;
1524            if iteration > max_iterations {
1525                warn!("Supervisor reached max iterations");
1526                break;
1527            }
1528
1529            let messages = memory.history().to_vec();
1530            let response = match llm.chat(messages).await {
1531                Ok(r) => r,
1532                Err(e) => {
1533                    warn!(error = %e, "Supervisor LLM request failed");
1534                    continue;
1535                }
1536            };
1537
1538            let content = response
1539                .choices
1540                .first()
1541                .map(|c| c.message.content.clone())
1542                .unwrap_or_default();
1543
1544            // Periodically check health status
1545            if iteration % 3 == 0 {
1546                if let Some(ref hm) = self.health_monitor {
1547                    if let Ok(hm_guard) = hm.try_read() {
1548                        let status = hm_guard.format_status();
1549                        info!(health = %status, "Swarm health check");
1550                        // Check for dead workers
1551                        let dead = hm_guard.dead_workers_for_replacement();
1552                        if !dead.is_empty() {
1553                            warn!(dead_workers = ?dead, "Dead workers detected");
1554                        }
1555                    }
1556                }
1557            }
1558
1559            // Check for FINAL: completion
1560            if content.contains("FINAL:") {
1561                let final_response = content
1562                    .split("FINAL:")
1563                    .nth(1)
1564                    .unwrap_or("")
1565                    .trim()
1566                    .to_string();
1567                info!(
1568                    iteration = iteration,
1569                    subtasks = subtask_results.len(),
1570                    "Supervisor completed"
1571                );
1572
1573                // Record supervisor completion in health monitor
1574                if let Some(ref hm) = self.health_monitor {
1575                    if let Ok(mut hm_guard) = hm.try_write() {
1576                        hm_guard.task_completed("supervisor");
1577                        hm_guard.heartbeat("supervisor");
1578                    }
1579                }
1580
1581                let _ = self.audit_log.append(
1582                    AuditEventType::AgentFinish,
1583                    "supervisor",
1584                    "Supervisor completed recursive decomposition",
1585                    Some(serde_json::json!({
1586                        "iterations": iteration,
1587                        "subtasks_completed": subtask_results.len(),
1588                        "depth": self.current_depth,
1589                    })),
1590                );
1591
1592                if !subtask_results.is_empty() {
1593                    let aggregated = subtask_results.join("\n\n");
1594                    return Ok(format!(
1595                        "{}\n\n## Aggregated Results\n\n{}",
1596                        final_response, aggregated
1597                    ));
1598                }
1599                return Ok(final_response);
1600            }
1601
1602            // Check for SUBTASK: decomposition
1603            if content.contains("SUBTASK:") {
1604                let subtask_block = content.split("SUBTASK:").nth(1).unwrap_or("");
1605                let subtask_lines: Vec<&str> = subtask_block.lines().take(4).collect();
1606
1607                let subtask_desc = subtask_lines.first().unwrap_or(&"").trim();
1608                let role = subtask_lines
1609                    .iter()
1610                    .find(|l| l.starts_with("ROLE:"))
1611                    .and_then(|l| l.split(':').nth(1))
1612                    .unwrap_or("executor")
1613                    .trim()
1614                    .to_lowercase();
1615
1616                if !subtask_desc.is_empty() {
1617                    info!(role = %role, subtask = %subtask_desc, "Delegating subtask");
1618
1619                    // Broadcast coordination message before delegating
1620                    if let Some(ref bus) = self.message_bus {
1621                        if let Ok(mut bus_guard) = bus.try_write() {
1622                            bus_guard.send(
1623                                "supervisor",
1624                                &role,
1625                                MessageType::Coordination,
1626                                &format!("Delegating subtask: {}", subtask_desc),
1627                                HashMap::new(),
1628                            );
1629                        }
1630                    }
1631
1632                    let result =
1633                        if role == "supervisor" && self.current_depth < self.config.max_depth {
1634                            // Recursive: spawn a sub-supervisor (boxed to avoid recursive async fn)
1635                            let config = self.config.clone();
1636                            let current_depth = self.current_depth + 1;
1637                            let worker_count = self.worker_count + 1;
1638                            let llm = self.llm.clone();
1639                            let multi_llm = self.multi_llm.clone();
1640                            let ravenfabric = self.ravenfabric.clone();
1641                            let subtask = subtask_desc.to_string();
1642                            let message_bus = self.message_bus.clone();
1643                            let health_monitor = self.health_monitor.clone();
1644
1645                            Box::pin(async move {
1646                                let mut sub_orchestrator = SwarmOrchestrator {
1647                                    config,
1648                                    current_depth,
1649                                    worker_count,
1650                                    llm,
1651                                    multi_llm,
1652                                    ravenfabric,
1653                                    policy_engine: PolicyEngine::default_secure(),
1654                                    sandbox: Sandbox::default(),
1655                                    audit_log: AuditLog::new(format!(
1656                                        "sub-swarm-{}-{}",
1657                                        current_depth,
1658                                        std::process::id()
1659                                    )),
1660                                    registry: ToolRegistry::with_default_tools(),
1661                                    message_bus,
1662                                    health_monitor,
1663                                };
1664
1665                                // Initialize sub-orchestrator sandbox
1666                                let _ = sub_orchestrator.init().await;
1667                                sub_orchestrator.orchestrate(&subtask).await
1668                            })
1669                            .await
1670                        } else {
1671                            // Execute with the assigned profile
1672                            self.execute_with_profile(subtask_desc, &role).await
1673                        };
1674
1675                    match result {
1676                        Ok(result) => {
1677                            info!(
1678                                role = %role,
1679                                chars = result.len(),
1680                                "Subtask completed"
1681                            );
1682                            subtask_results.push(format!("[{}] {}", role, result));
1683
1684                            memory.add_assistant_message(&format!(
1685                                "Delegated subtask to {}: {}",
1686                                role, subtask_desc
1687                            ));
1688                            memory.add_user_message(&format!("Result from {}: {}", role, result));
1689                        }
1690                        Err(e) => {
1691                            warn!(role = %role, error = %e, "Subtask failed");
1692                            // Record failure in health monitor
1693                            if let Some(ref hm) = self.health_monitor {
1694                                if let Ok(mut hm_guard) = hm.try_write() {
1695                                    hm_guard.task_failed(&role);
1696                                    hm_guard.record_error(&role);
1697                                }
1698                            }
1699                            memory.add_assistant_message(&format!(
1700                                "Subtask for {} failed: {}",
1701                                role, e
1702                            ));
1703                        }
1704                    }
1705                }
1706            } else {
1707                memory.add_assistant_message(&content);
1708            }
1709        }
1710
1711        // Fallback: return aggregated results
1712        if !subtask_results.is_empty() {
1713            let aggregated = subtask_results.join("\n\n");
1714            info!(
1715                "Supervisor aggregated {} subtask results",
1716                subtask_results.len()
1717            );
1718            return Ok(aggregated);
1719        }
1720
1721        Err(RavenClawsError::CommandExecution(
1722            "Supervisor completed without results".to_string(),
1723        ))
1724    }
1725}
1726
1727// ---------------------------------------------------------------------------
1728// Tests
1729// ---------------------------------------------------------------------------
1730
1731#[cfg(test)]
1732mod tests {
1733    use super::*;
1734
1735    #[test]
1736    fn test_worker_profile_default() {
1737        let profile = WorkerProfile::default();
1738        assert_eq!(profile.name, "default");
1739        assert!(profile.can_delegate);
1740        assert_eq!(profile.max_iterations, 10);
1741        assert_eq!(profile.max_memory_messages, 20);
1742    }
1743
1744    #[test]
1745    fn test_worker_profile_researcher() {
1746        let profile = WorkerProfile::researcher();
1747        assert_eq!(profile.name, "researcher");
1748        assert!(!profile.can_delegate);
1749        assert!(profile.allowed_tools.contains(&"web_fetch".to_string()));
1750        assert!(profile.allowed_tools.contains(&"web_search".to_string()));
1751    }
1752
1753    #[test]
1754    fn test_worker_profile_creative() {
1755        let profile = WorkerProfile::creative();
1756        assert_eq!(profile.name, "creative");
1757        assert!(!profile.can_delegate);
1758    }
1759
1760    #[test]
1761    fn test_worker_profile_executor() {
1762        let profile = WorkerProfile::executor();
1763        assert_eq!(profile.name, "executor");
1764        assert!(!profile.can_delegate);
1765        assert!(profile.allowed_tools.contains(&"shell_exec".to_string()));
1766    }
1767
1768    #[test]
1769    fn test_worker_profile_reviewer() {
1770        let profile = WorkerProfile::reviewer();
1771        assert_eq!(profile.name, "reviewer");
1772        assert!(!profile.can_delegate);
1773    }
1774
1775    #[test]
1776    fn test_worker_profile_supervisor() {
1777        let profile = WorkerProfile::supervisor();
1778        assert_eq!(profile.name, "supervisor");
1779        assert!(profile.can_delegate);
1780        assert!(profile.persona.contains("SUBTASK:"));
1781        assert!(profile.persona.contains("FINAL:"));
1782    }
1783
1784    #[test]
1785    fn test_swarm_config_default() {
1786        let config = SwarmConfig::default();
1787        assert_eq!(config.topology, SwarmTopology::Star);
1788        assert_eq!(config.max_depth, 3);
1789        assert_eq!(config.max_workers, 100);
1790        assert!(config.dynamic_role_assignment);
1791        assert_eq!(config.profiles.len(), 5);
1792    }
1793
1794    #[test]
1795    fn test_swarm_topology_serde() {
1796        let topologies = vec![
1797            SwarmTopology::Star,
1798            SwarmTopology::Mesh,
1799            SwarmTopology::Hierarchical,
1800            SwarmTopology::Hybrid,
1801        ];
1802
1803        for t in &topologies {
1804            let json = serde_json::to_string(t).unwrap();
1805            let deserialized: SwarmTopology = serde_json::from_str(&json).unwrap();
1806            assert_eq!(*t, deserialized);
1807        }
1808    }
1809
1810    #[test]
1811    fn test_swarm_config_serde() {
1812        let config = SwarmConfig::default();
1813        let json = serde_json::to_string_pretty(&config).unwrap();
1814        let deserialized: SwarmConfig = serde_json::from_str(&json).unwrap();
1815        assert_eq!(config.topology, deserialized.topology);
1816        assert_eq!(config.max_depth, deserialized.max_depth);
1817        assert_eq!(config.max_workers, deserialized.max_workers);
1818        assert_eq!(config.profiles.len(), deserialized.profiles.len());
1819    }
1820
1821    #[test]
1822    fn test_resource_limits_default() {
1823        let limits = ResourceLimits::default();
1824        assert_eq!(limits.max_tool_calls, 50);
1825        assert_eq!(limits.max_exec_secs, 300);
1826    }
1827
1828    #[test]
1829    fn test_swarm_orchestrator_new() {
1830        let config = SwarmConfig::default();
1831        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
1832        assert_eq!(orchestrator.current_depth(), 0);
1833        assert_eq!(orchestrator.worker_count(), 0);
1834    }
1835
1836    #[test]
1837    fn test_swarm_orchestrator_depth_limit() {
1838        let config = SwarmConfig {
1839            max_depth: 0, // No recursion allowed
1840            ..SwarmConfig::default()
1841        };
1842        let mut orchestrator = SwarmOrchestrator::new(config, None, None, None);
1843        orchestrator.current_depth = 0;
1844
1845        // At depth 0 with max_depth 0, should hit the limit
1846        assert!(orchestrator.current_depth >= orchestrator.config.max_depth);
1847    }
1848
1849    #[tokio::test]
1850    async fn test_analyze_task_roles_fallback() {
1851        let config = SwarmConfig::default();
1852        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
1853
1854        // Without an LLM, should return default roles
1855        let result = orchestrator.analyze_task_roles("test task").await;
1856        assert!(result.is_ok());
1857    }
1858
1859    #[test]
1860    fn test_worker_profile_custom() {
1861        let profile = WorkerProfile {
1862            name: "custom".to_string(),
1863            description: "Custom worker".to_string(),
1864            persona: "You are a custom worker.".to_string(),
1865            allowed_tools: vec!["read_file".to_string()],
1866            provider: Some("openai".to_string()),
1867            model: Some("gpt-4".to_string()),
1868            max_iterations: 5,
1869            max_memory_messages: 10,
1870            can_delegate: false,
1871            resource_limits: ResourceLimits {
1872                max_tool_calls: 10,
1873                max_exec_secs: 60,
1874            },
1875        };
1876
1877        assert_eq!(profile.name, "custom");
1878        assert_eq!(profile.provider, Some("openai".to_string()));
1879        assert_eq!(profile.model, Some("gpt-4".to_string()));
1880        assert_eq!(profile.max_iterations, 5);
1881        assert_eq!(profile.resource_limits.max_tool_calls, 10);
1882    }
1883
1884    #[test]
1885    fn test_swarm_config_custom_profiles() {
1886        let config = SwarmConfig {
1887            profiles: vec![WorkerProfile::researcher(), WorkerProfile::executor()],
1888            topology: SwarmTopology::Hierarchical,
1889            max_depth: 5,
1890            max_workers: 50,
1891            ..SwarmConfig::default()
1892        };
1893
1894        assert_eq!(config.profiles.len(), 2);
1895        assert_eq!(config.topology, SwarmTopology::Hierarchical);
1896        assert_eq!(config.max_depth, 5);
1897        assert_eq!(config.max_workers, 50);
1898    }
1899
1900    // ── Inter-agent communication tests ─────────────────────────────────
1901
1902    #[test]
1903    fn test_message_bus_new() {
1904        let bus = AgentMessageBus::new(100);
1905        assert!(bus.is_empty());
1906        assert_eq!(bus.len(), 0);
1907    }
1908
1909    #[test]
1910    fn test_message_bus_send_and_receive() {
1911        let mut bus = AgentMessageBus::new(100);
1912
1913        let id = bus.send(
1914            "researcher",
1915            "executor",
1916            MessageType::Information,
1917            "Found relevant data",
1918            HashMap::new(),
1919        );
1920
1921        assert!(!id.is_empty());
1922        assert_eq!(bus.len(), 1);
1923
1924        let msgs = bus.messages_for("executor");
1925        assert_eq!(msgs.len(), 1);
1926        assert_eq!(msgs[0].content, "Found relevant data");
1927        assert_eq!(msgs[0].sender, "researcher");
1928    }
1929
1930    #[test]
1931    fn test_message_bus_broadcast() {
1932        let mut bus = AgentMessageBus::new(100);
1933
1934        bus.send(
1935            "supervisor",
1936            "*",
1937            MessageType::Coordination,
1938            "All workers proceed",
1939            HashMap::new(),
1940        );
1941
1942        // All roles should receive broadcast messages
1943        assert_eq!(bus.messages_for("researcher").len(), 1);
1944        assert_eq!(bus.messages_for("executor").len(), 1);
1945        assert_eq!(bus.messages_for("reviewer").len(), 1);
1946    }
1947
1948    #[test]
1949    fn test_message_bus_filter_by_type() {
1950        let mut bus = AgentMessageBus::new(100);
1951
1952        bus.send(
1953            "researcher",
1954            "*",
1955            MessageType::Information,
1956            "Data found",
1957            HashMap::new(),
1958        );
1959        bus.send(
1960            "executor",
1961            "supervisor",
1962            MessageType::Result,
1963            "Task done",
1964            HashMap::new(),
1965        );
1966        bus.send(
1967            "executor",
1968            "supervisor",
1969            MessageType::Error,
1970            "Failed",
1971            HashMap::new(),
1972        );
1973
1974        let errors = bus.messages_of_type(&MessageType::Error);
1975        assert_eq!(errors.len(), 1);
1976        assert_eq!(errors[0].content, "Failed");
1977
1978        let results = bus.messages_of_type(&MessageType::Result);
1979        assert_eq!(results.len(), 1);
1980    }
1981
1982    #[test]
1983    fn test_message_bus_max_messages() {
1984        let mut bus = AgentMessageBus::new(5); // Only keep 5 messages
1985
1986        for i in 0..10 {
1987            bus.send(
1988                "worker",
1989                "*",
1990                MessageType::Generic,
1991                &format!("Message {}", i),
1992                HashMap::new(),
1993            );
1994        }
1995
1996        // Should only have the last 5 messages
1997        assert_eq!(bus.len(), 5);
1998        let all = bus.all_messages();
1999        assert_eq!(all[0].content, "Message 5");
2000        assert_eq!(all[4].content, "Message 9");
2001    }
2002
2003    #[test]
2004    fn test_message_bus_format_for_prompt() {
2005        let mut bus = AgentMessageBus::new(100);
2006
2007        bus.send(
2008            "researcher",
2009            "*",
2010            MessageType::Information,
2011            "Found key insight",
2012            HashMap::new(),
2013        );
2014        bus.send(
2015            "executor",
2016            "supervisor",
2017            MessageType::Result,
2018            "Implementation complete",
2019            HashMap::new(),
2020        );
2021
2022        let prompt = bus.format_for_prompt("supervisor", 10);
2023        assert!(prompt.contains("Inter-Agent Messages"));
2024        assert!(prompt.contains("researcher"));
2025        assert!(prompt.contains("executor"));
2026        assert!(prompt.contains("Found key insight"));
2027    }
2028
2029    #[test]
2030    fn test_message_bus_empty_format() {
2031        let bus = AgentMessageBus::new(100);
2032        let prompt = bus.format_for_prompt("supervisor", 10);
2033        assert!(prompt.is_empty());
2034    }
2035
2036    #[test]
2037    fn test_message_type_display() {
2038        assert_eq!(format!("{}", MessageType::Information), "information");
2039        assert_eq!(format!("{}", MessageType::Question), "question");
2040        assert_eq!(format!("{}", MessageType::Result), "result");
2041        assert_eq!(format!("{}", MessageType::Error), "error");
2042        assert_eq!(format!("{}", MessageType::Coordination), "coordination");
2043        assert_eq!(format!("{}", MessageType::Generic), "generic");
2044    }
2045
2046    #[test]
2047    fn test_message_bus_messages_from() {
2048        let mut bus = AgentMessageBus::new(100);
2049
2050        bus.send(
2051            "researcher",
2052            "*",
2053            MessageType::Information,
2054            "Data A",
2055            HashMap::new(),
2056        );
2057        bus.send(
2058            "researcher",
2059            "executor",
2060            MessageType::Information,
2061            "Data B",
2062            HashMap::new(),
2063        );
2064        bus.send(
2065            "executor",
2066            "supervisor",
2067            MessageType::Result,
2068            "Done",
2069            HashMap::new(),
2070        );
2071
2072        let from_researcher = bus.messages_from("researcher");
2073        assert_eq!(from_researcher.len(), 2);
2074
2075        let from_executor = bus.messages_from("executor");
2076        assert_eq!(from_executor.len(), 1);
2077    }
2078
2079    #[test]
2080    fn test_orchestrator_new_with_communication() {
2081        let config = SwarmConfig {
2082            enable_agent_communication: true,
2083            ..SwarmConfig::default()
2084        };
2085        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2086        assert!(orchestrator.message_bus.is_some());
2087    }
2088
2089    #[test]
2090    fn test_orchestrator_new_without_communication() {
2091        let config = SwarmConfig {
2092            enable_agent_communication: false,
2093            ..SwarmConfig::default()
2094        };
2095        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2096        assert!(orchestrator.message_bus.is_none());
2097    }
2098
2099    #[test]
2100    fn test_swarm_config_communication_default() {
2101        let config = SwarmConfig::default();
2102        assert!(!config.enable_agent_communication); // Default: disabled
2103        assert!(!config.enable_health_monitoring); // Default: disabled
2104    }
2105
2106    // ── Health monitoring tests ─────────────────────────────────────────
2107
2108    #[test]
2109    fn test_health_status_display() {
2110        assert_eq!(format!("{}", WorkerHealthStatus::Healthy), "healthy");
2111        assert_eq!(format!("{}", WorkerHealthStatus::Degraded), "degraded");
2112        assert_eq!(format!("{}", WorkerHealthStatus::Unhealthy), "unhealthy");
2113        assert_eq!(format!("{}", WorkerHealthStatus::Dead), "dead");
2114    }
2115
2116    #[test]
2117    fn test_health_monitor_default() {
2118        let hm = SwarmHealthMonitor::default();
2119        assert_eq!(hm.heartbeat_interval_secs, 5);
2120        assert_eq!(hm.max_missed_beats, 3);
2121        assert_eq!(hm.replacement_timeout_secs, 30);
2122        assert_eq!(hm.worker_count(), 0);
2123    }
2124
2125    #[test]
2126    fn test_health_monitor_register_worker() {
2127        let mut hm = SwarmHealthMonitor::default();
2128        hm.register_worker("researcher");
2129        assert_eq!(hm.worker_count(), 1);
2130
2131        let telemetry = hm.worker_telemetry("researcher");
2132        assert!(telemetry.is_some());
2133        assert_eq!(telemetry.unwrap().role, "researcher");
2134    }
2135
2136    #[test]
2137    fn test_health_monitor_heartbeat() {
2138        let mut hm = SwarmHealthMonitor::default();
2139        hm.register_worker("executor");
2140        hm.heartbeat("executor");
2141
2142        let telemetry = hm.worker_telemetry("executor").unwrap();
2143        assert_eq!(telemetry.status, WorkerHealthStatus::Healthy);
2144    }
2145
2146    #[test]
2147    fn test_health_monitor_task_lifecycle() {
2148        let mut hm = SwarmHealthMonitor::default();
2149        hm.register_worker("executor");
2150        hm.task_started("executor");
2151        hm.task_completed("executor");
2152
2153        let telemetry = hm.worker_telemetry("executor").unwrap();
2154        assert_eq!(telemetry.tasks_completed, 1);
2155        assert_eq!(telemetry.tasks_failed, 0);
2156    }
2157
2158    #[test]
2159    fn test_health_monitor_task_failure() {
2160        let mut hm = SwarmHealthMonitor::default();
2161        hm.register_worker("executor");
2162        hm.task_started("executor");
2163        hm.task_failed("executor");
2164
2165        let telemetry = hm.worker_telemetry("executor").unwrap();
2166        assert_eq!(telemetry.tasks_completed, 0);
2167        assert_eq!(telemetry.tasks_failed, 1);
2168        assert_eq!(telemetry.error_count, 1);
2169    }
2170
2171    #[test]
2172    fn test_health_monitor_metrics_empty() {
2173        let hm = SwarmHealthMonitor::default();
2174        let metrics = hm.metrics();
2175        assert_eq!(metrics.total_workers, 0);
2176        assert_eq!(metrics.healthy_workers, 0);
2177        assert_eq!(metrics.total_tasks_completed, 0);
2178        assert_eq!(metrics.task_throughput, 0.0);
2179    }
2180
2181    #[test]
2182    fn test_health_monitor_metrics_with_workers() {
2183        let mut hm = SwarmHealthMonitor::default();
2184        hm.register_worker("researcher");
2185        hm.register_worker("executor");
2186        hm.task_started("executor");
2187        hm.task_completed("executor");
2188        hm.task_started("researcher");
2189        hm.task_completed("researcher");
2190
2191        let metrics = hm.metrics();
2192        assert_eq!(metrics.total_workers, 2);
2193        assert_eq!(metrics.healthy_workers, 2);
2194        assert_eq!(metrics.total_tasks_completed, 2);
2195    }
2196
2197    #[test]
2198    fn test_health_monitor_dead_worker_detection() {
2199        let mut hm = SwarmHealthMonitor {
2200            heartbeat_interval_secs: 1,
2201            max_missed_beats: 1,
2202            replacement_timeout_secs: 0, // Immediate replacement
2203            ..SwarmHealthMonitor::default()
2204        };
2205
2206        hm.register_worker("executor");
2207        // Set last heartbeat far in the past by manipulating directly
2208        if let Some(hb) = hm.heartbeats.get_mut("executor") {
2209            hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(10);
2210        }
2211
2212        let dead = hm.check_health();
2213        assert!(!dead.is_empty());
2214        assert_eq!(dead[0], "executor");
2215    }
2216
2217    #[test]
2218    fn test_health_monitor_degraded_detection() {
2219        let mut hm = SwarmHealthMonitor {
2220            heartbeat_interval_secs: 1,
2221            max_missed_beats: 3,
2222            ..SwarmHealthMonitor::default()
2223        };
2224
2225        hm.register_worker("executor");
2226        // Set last heartbeat to 3 seconds ago (between 2x interval and max_missed*interval)
2227        if let Some(hb) = hm.heartbeats.get_mut("executor") {
2228            hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(3);
2229        }
2230
2231        let _dead = hm.check_health();
2232        let telemetry = hm.worker_telemetry("executor").unwrap();
2233        assert_eq!(telemetry.status, WorkerHealthStatus::Degraded);
2234    }
2235
2236    #[test]
2237    fn test_health_monitor_message_tracking() {
2238        let mut hm = SwarmHealthMonitor::default();
2239        hm.register_worker("researcher");
2240        hm.message_sent("researcher");
2241        hm.message_sent("researcher");
2242        hm.message_received("researcher");
2243
2244        let telemetry = hm.worker_telemetry("researcher").unwrap();
2245        assert_eq!(telemetry.messages_sent, 2);
2246        assert_eq!(telemetry.messages_received, 1);
2247    }
2248
2249    #[test]
2250    fn test_health_monitor_error_tracking() {
2251        let mut hm = SwarmHealthMonitor::default();
2252        hm.register_worker("executor");
2253        hm.record_error("executor");
2254        hm.record_error("executor");
2255        hm.record_error("executor");
2256
2257        let telemetry = hm.worker_telemetry("executor").unwrap();
2258        assert_eq!(telemetry.error_count, 3);
2259    }
2260
2261    #[test]
2262    fn test_health_monitor_format_status() {
2263        let hm = SwarmHealthMonitor::default();
2264        let status = hm.format_status();
2265        assert!(status.contains("Swarm Health:"));
2266        assert!(status.contains("healthy"));
2267    }
2268
2269    #[test]
2270    fn test_health_monitor_all_worker_telemetry() {
2271        let mut hm = SwarmHealthMonitor::default();
2272        hm.register_worker("researcher");
2273        hm.register_worker("executor");
2274        hm.register_worker("reviewer");
2275
2276        let all = hm.all_worker_telemetry();
2277        assert_eq!(all.len(), 3);
2278    }
2279
2280    #[test]
2281    fn test_health_monitor_remove_worker() {
2282        let mut hm = SwarmHealthMonitor::default();
2283        hm.register_worker("executor");
2284        assert_eq!(hm.worker_count(), 1);
2285        hm.remove_worker("executor");
2286        assert_eq!(hm.worker_count(), 0);
2287    }
2288
2289    #[test]
2290    fn test_orchestrator_new_with_health_monitoring() {
2291        let config = SwarmConfig {
2292            enable_health_monitoring: true,
2293            ..SwarmConfig::default()
2294        };
2295        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2296        assert!(orchestrator.health_monitor.is_some());
2297    }
2298
2299    #[test]
2300    fn test_orchestrator_new_without_health_monitoring() {
2301        let config = SwarmConfig {
2302            enable_health_monitoring: false,
2303            ..SwarmConfig::default()
2304        };
2305        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2306        assert!(orchestrator.health_monitor.is_none());
2307    }
2308
2309    #[test]
2310    fn test_health_metrics_accessor() {
2311        let config = SwarmConfig {
2312            enable_health_monitoring: true,
2313            ..SwarmConfig::default()
2314        };
2315        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2316        let metrics = orchestrator.health_metrics();
2317        assert!(metrics.is_some());
2318        assert_eq!(metrics.unwrap().total_workers, 0);
2319    }
2320
2321    #[test]
2322    fn test_worker_telemetry_accessor() {
2323        let config = SwarmConfig {
2324            enable_health_monitoring: true,
2325            ..SwarmConfig::default()
2326        };
2327        let orchestrator = SwarmOrchestrator::new(config, None, None, None);
2328        let telemetry = orchestrator.worker_telemetry();
2329        assert!(telemetry.is_some());
2330        assert!(telemetry.unwrap().is_empty());
2331    }
2332
2333    #[test]
2334    fn test_health_monitor_new_custom() {
2335        let hm = SwarmHealthMonitor::new(10, 5, 60);
2336        assert_eq!(hm.heartbeat_interval_secs, 10);
2337        assert_eq!(hm.max_missed_beats, 5);
2338        assert_eq!(hm.replacement_timeout_secs, 60);
2339    }
2340
2341    #[test]
2342    fn test_health_monitor_dead_workers_for_replacement() {
2343        let mut hm = SwarmHealthMonitor {
2344            heartbeat_interval_secs: 1,
2345            max_missed_beats: 1,
2346            replacement_timeout_secs: 0,
2347            ..SwarmHealthMonitor::default()
2348        };
2349
2350        hm.register_worker("executor");
2351        // Set last heartbeat far in the past
2352        if let Some(hb) = hm.heartbeats.get_mut("executor") {
2353            hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(30);
2354            hb.status = WorkerHealthStatus::Dead;
2355        }
2356
2357        let candidates = hm.dead_workers_for_replacement();
2358        assert_eq!(candidates.len(), 1);
2359        assert_eq!(candidates[0], "executor");
2360    }
2361
2362    #[test]
2363    fn test_health_monitor_metrics_error_rate() {
2364        let mut hm = SwarmHealthMonitor::default();
2365        hm.register_worker("executor");
2366        hm.task_started("executor");
2367        hm.task_completed("executor");
2368        hm.task_started("executor");
2369        hm.task_failed("executor");
2370
2371        let metrics = hm.metrics();
2372        assert_eq!(metrics.total_tasks_completed, 1);
2373        assert_eq!(metrics.total_tasks_failed, 1);
2374        assert!(metrics.error_rate > 0.0);
2375    }
2376
2377    #[test]
2378    fn test_health_monitor_metrics_utilization() {
2379        let mut hm = SwarmHealthMonitor::default();
2380        hm.register_worker("busy_worker");
2381        hm.register_worker("idle_worker");
2382
2383        // Mark one worker as busy
2384        if let Some(hb) = hm.heartbeats.get_mut("busy_worker") {
2385            hb.is_busy = true;
2386        }
2387
2388        let metrics = hm.metrics();
2389        assert_eq!(metrics.total_workers, 2);
2390        assert!((metrics.worker_utilization - 0.5).abs() < f64::EPSILON);
2391    }
2392
2393    #[test]
2394    fn test_health_monitor_iteration_tracking() {
2395        let mut hm = SwarmHealthMonitor::default();
2396        hm.register_worker("executor");
2397        hm.task_started("executor");
2398        hm.task_completed("executor");
2399        hm.task_started("executor");
2400        hm.task_completed("executor");
2401        hm.task_started("executor");
2402
2403        let telemetry = hm.worker_telemetry("executor").unwrap();
2404        assert_eq!(telemetry.iteration, 3);
2405        assert_eq!(telemetry.tasks_completed, 2);
2406    }
2407}