use crate::agent::ConversationMemory;
use crate::audit::{AuditEventType, AuditLog};
use crate::error::{RavenClawsError, Result};
use crate::llm::{ChatMessage, LLMProviderTrait, MultiModelManager};
use crate::policy::PolicyEngine;
use crate::ravenfabric::RavenFabricClient;
use crate::sandbox::Sandbox;
use crate::tools::ToolRegistry;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{info, instrument, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentMessage {
pub id: String,
pub sender: String,
pub recipient: String,
pub msg_type: MessageType,
pub content: String,
pub timestamp: String,
#[serde(default)]
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum MessageType {
Information,
Question,
Result,
Error,
Coordination,
Generic,
}
impl std::fmt::Display for MessageType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MessageType::Information => write!(f, "information"),
MessageType::Question => write!(f, "question"),
MessageType::Result => write!(f, "result"),
MessageType::Error => write!(f, "error"),
MessageType::Coordination => write!(f, "coordination"),
MessageType::Generic => write!(f, "generic"),
}
}
}
#[derive(Debug, Clone)]
pub struct AgentMessageBus {
messages: Vec<AgentMessage>,
max_messages: usize,
}
#[allow(dead_code)]
impl AgentMessageBus {
pub fn new(max_messages: usize) -> Self {
Self {
messages: Vec::new(),
max_messages,
}
}
pub fn send(
&mut self,
sender: &str,
recipient: &str,
msg_type: MessageType,
content: &str,
metadata: HashMap<String, String>,
) -> String {
let id = uuid::Uuid::new_v4().to_string();
let timestamp = chrono::Utc::now().to_rfc3339();
let msg = AgentMessage {
id: id.clone(),
sender: sender.to_string(),
recipient: recipient.to_string(),
msg_type,
content: content.to_string(),
timestamp,
metadata,
};
self.messages.push(msg);
if self.max_messages > 0 && self.messages.len() > self.max_messages {
self.messages.remove(0);
}
id
}
pub fn messages_for(&self, role: &str) -> Vec<&AgentMessage> {
self.messages
.iter()
.filter(|m| m.recipient == role || m.recipient == "*")
.collect()
}
pub fn messages_from(&self, sender: &str) -> Vec<&AgentMessage> {
self.messages
.iter()
.filter(|m| m.sender == sender)
.collect()
}
pub fn messages_of_type(&self, msg_type: &MessageType) -> Vec<&AgentMessage> {
self.messages
.iter()
.filter(|m| m.msg_type == *msg_type)
.collect()
}
pub fn all_messages(&self) -> &[AgentMessage] {
&self.messages
}
pub fn len(&self) -> usize {
self.messages.len()
}
pub fn is_empty(&self) -> bool {
self.messages.is_empty()
}
pub fn format_for_prompt(&self, role: &str, max_messages: usize) -> String {
let relevant: Vec<&AgentMessage> = self
.messages
.iter()
.filter(|m| m.recipient == role || m.recipient == "*" || m.sender == role)
.rev()
.take(max_messages)
.collect();
if relevant.is_empty() {
return String::new();
}
let mut output = String::from("\n\n--- Inter-Agent Messages ---\n");
for msg in relevant.iter().rev() {
output.push_str(&format!(
"[{}] {} → {} ({}): {}\n",
msg.msg_type, msg.sender, msg.recipient, msg.timestamp, msg.content
));
}
output.push_str("--- End Inter-Agent Messages ---\n");
output
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum WorkerHealthStatus {
Healthy,
Degraded,
Unhealthy,
Dead,
}
impl std::fmt::Display for WorkerHealthStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkerHealthStatus::Healthy => write!(f, "healthy"),
WorkerHealthStatus::Degraded => write!(f, "degraded"),
WorkerHealthStatus::Unhealthy => write!(f, "unhealthy"),
WorkerHealthStatus::Dead => write!(f, "dead"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerTelemetry {
pub role: String,
pub status: WorkerHealthStatus,
pub tasks_completed: u64,
pub tasks_failed: u64,
pub error_count: u64,
pub avg_duration_ms: f64,
pub last_heartbeat: String,
pub spawned_at: String,
pub messages_sent: u64,
pub messages_received: u64,
pub iteration: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmMetrics {
pub total_workers: usize,
pub healthy_workers: usize,
pub degraded_workers: usize,
pub unhealthy_workers: usize,
pub dead_workers: usize,
pub total_tasks_completed: u64,
pub total_tasks_failed: u64,
pub total_errors: u64,
pub overall_avg_duration_ms: f64,
pub task_throughput: f64,
pub communication_latency_ms: f64,
pub worker_utilization: f64,
pub error_rate: f64,
pub timestamp: String,
}
#[derive(Debug, Clone)]
struct WorkerHeartbeat {
role: String,
spawned_at: chrono::DateTime<chrono::Utc>,
last_heartbeat: chrono::DateTime<chrono::Utc>,
missed_beats: u32,
status: WorkerHealthStatus,
tasks_completed: u64,
tasks_failed: u64,
error_count: u64,
total_duration_ms: f64,
duration_samples: u64,
messages_sent: u64,
messages_received: u64,
iteration: u64,
is_busy: bool,
task_started_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone)]
pub struct SwarmHealthMonitor {
heartbeats: HashMap<String, WorkerHeartbeat>,
heartbeat_interval_secs: u64,
max_missed_beats: u32,
replacement_timeout_secs: u64,
started_at: chrono::DateTime<chrono::Utc>,
total_messages_sent: u64,
total_messages_received: u64,
total_duration_ms: f64,
total_tasks_completed: u64,
}
impl Default for SwarmHealthMonitor {
fn default() -> Self {
Self {
heartbeats: HashMap::new(),
heartbeat_interval_secs: 5,
max_missed_beats: 3,
replacement_timeout_secs: 30,
started_at: chrono::Utc::now(),
total_messages_sent: 0,
total_messages_received: 0,
total_duration_ms: 0.0,
total_tasks_completed: 0,
}
}
}
#[allow(dead_code)]
impl SwarmHealthMonitor {
pub fn new(
heartbeat_interval_secs: u64,
max_missed_beats: u32,
replacement_timeout_secs: u64,
) -> Self {
Self {
heartbeats: HashMap::new(),
heartbeat_interval_secs,
max_missed_beats,
replacement_timeout_secs,
started_at: chrono::Utc::now(),
total_messages_sent: 0,
total_messages_received: 0,
total_duration_ms: 0.0,
total_tasks_completed: 0,
}
}
pub fn register_worker(&mut self, role: &str) {
let now = chrono::Utc::now();
self.heartbeats
.entry(role.to_string())
.or_insert(WorkerHeartbeat {
role: role.to_string(),
spawned_at: now,
last_heartbeat: now,
missed_beats: 0,
status: WorkerHealthStatus::Healthy,
tasks_completed: 0,
tasks_failed: 0,
error_count: 0,
total_duration_ms: 0.0,
duration_samples: 0,
messages_sent: 0,
messages_received: 0,
iteration: 0,
is_busy: false,
task_started_at: None,
});
}
pub fn heartbeat(&mut self, role: &str) {
if let Some(hb) = self.heartbeats.get_mut(role) {
hb.last_heartbeat = chrono::Utc::now();
hb.missed_beats = 0;
hb.status = WorkerHealthStatus::Healthy;
}
}
pub fn task_started(&mut self, role: &str) {
if let Some(hb) = self.heartbeats.get_mut(role) {
hb.is_busy = true;
hb.task_started_at = Some(chrono::Utc::now());
hb.iteration += 1;
}
}
pub fn task_completed(&mut self, role: &str) {
if let Some(hb) = self.heartbeats.get_mut(role) {
hb.tasks_completed += 1;
hb.is_busy = false;
if let Some(started) = hb.task_started_at {
let duration = (chrono::Utc::now() - started).num_milliseconds() as f64;
hb.total_duration_ms += duration;
hb.duration_samples += 1;
self.total_duration_ms += duration;
}
self.total_tasks_completed += 1;
hb.task_started_at = None;
}
}
pub fn task_failed(&mut self, role: &str) {
if let Some(hb) = self.heartbeats.get_mut(role) {
hb.tasks_failed += 1;
hb.error_count += 1;
hb.is_busy = false;
hb.task_started_at = None;
}
}
pub fn record_error(&mut self, role: &str) {
if let Some(hb) = self.heartbeats.get_mut(role) {
hb.error_count += 1;
}
}
pub fn message_sent(&mut self, role: &str) {
if let Some(hb) = self.heartbeats.get_mut(role) {
hb.messages_sent += 1;
}
self.total_messages_sent += 1;
}
pub fn message_received(&mut self, role: &str) {
if let Some(hb) = self.heartbeats.get_mut(role) {
hb.messages_received += 1;
}
self.total_messages_received += 1;
}
pub fn check_health(&mut self) -> Vec<String> {
let now = chrono::Utc::now();
let mut dead_workers = Vec::new();
for hb in self.heartbeats.values_mut() {
let elapsed = (now - hb.last_heartbeat).num_seconds();
if elapsed > (self.heartbeat_interval_secs * self.max_missed_beats as u64 * 2) as i64 {
if hb.status != WorkerHealthStatus::Dead {
hb.status = WorkerHealthStatus::Dead;
dead_workers.push(hb.role.clone());
}
} else if elapsed > (self.heartbeat_interval_secs * self.max_missed_beats as u64) as i64
{
hb.status = WorkerHealthStatus::Unhealthy;
} else if elapsed > (self.heartbeat_interval_secs * 2) as i64 {
hb.status = WorkerHealthStatus::Degraded;
}
}
dead_workers
}
pub fn worker_telemetry(&self, role: &str) -> Option<WorkerTelemetry> {
self.heartbeats.get(role).map(|hb| {
let avg_dur = if hb.duration_samples > 0 {
hb.total_duration_ms / hb.duration_samples as f64
} else {
0.0
};
WorkerTelemetry {
role: hb.role.clone(),
status: hb.status.clone(),
tasks_completed: hb.tasks_completed,
tasks_failed: hb.tasks_failed,
error_count: hb.error_count,
avg_duration_ms: avg_dur,
last_heartbeat: hb.last_heartbeat.to_rfc3339(),
spawned_at: hb.spawned_at.to_rfc3339(),
messages_sent: hb.messages_sent,
messages_received: hb.messages_received,
iteration: hb.iteration,
}
})
}
pub fn all_worker_telemetry(&self) -> Vec<WorkerTelemetry> {
let mut roles: Vec<String> = self.heartbeats.keys().cloned().collect();
roles.sort();
roles
.iter()
.filter_map(|r| self.worker_telemetry(r))
.collect()
}
pub fn metrics(&self) -> SwarmMetrics {
let mut healthy = 0;
let mut degraded = 0;
let mut unhealthy = 0;
let mut dead = 0;
let mut total_completed = 0u64;
let mut total_failed = 0u64;
let mut total_errors = 0u64;
let mut busy_workers = 0u64;
let total_workers = self.heartbeats.len();
for hb in self.heartbeats.values() {
match hb.status {
WorkerHealthStatus::Healthy => healthy += 1,
WorkerHealthStatus::Degraded => degraded += 1,
WorkerHealthStatus::Unhealthy => unhealthy += 1,
WorkerHealthStatus::Dead => dead += 1,
}
total_completed += hb.tasks_completed;
total_failed += hb.tasks_failed;
total_errors += hb.error_count;
if hb.is_busy {
busy_workers += 1;
}
}
let elapsed_secs = (chrono::Utc::now() - self.started_at).num_seconds().max(1) as f64;
let task_throughput = self.total_tasks_completed as f64 / elapsed_secs;
let worker_utilization = if total_workers > 0 {
busy_workers as f64 / total_workers as f64
} else {
0.0
};
let error_rate = if total_completed + total_failed > 0 {
total_errors as f64 / (total_completed + total_failed) as f64
} else {
0.0
};
let overall_avg_duration = if self.total_tasks_completed > 0 {
self.total_duration_ms / self.total_tasks_completed as f64
} else {
0.0
};
let comm_latency = if self.total_messages_sent > 0 {
let ratio = self.total_messages_received as f64 / self.total_messages_sent as f64;
ratio * (self.heartbeat_interval_secs as f64 * 1000.0) / 2.0
} else {
0.0
};
SwarmMetrics {
total_workers,
healthy_workers: healthy,
degraded_workers: degraded,
unhealthy_workers: unhealthy,
dead_workers: dead,
total_tasks_completed: total_completed,
total_tasks_failed: total_failed,
total_errors,
overall_avg_duration_ms: overall_avg_duration,
task_throughput,
communication_latency_ms: comm_latency,
worker_utilization,
error_rate,
timestamp: chrono::Utc::now().to_rfc3339(),
}
}
pub fn dead_workers_for_replacement(&self) -> Vec<String> {
let now = chrono::Utc::now();
self.heartbeats
.iter()
.filter(|(_, hb)| hb.status == WorkerHealthStatus::Dead)
.filter(|(_, hb)| {
let elapsed = (now - hb.last_heartbeat).num_seconds();
elapsed >= self.replacement_timeout_secs as i64
})
.map(|(role, _)| role.clone())
.collect()
}
pub fn remove_worker(&mut self, role: &str) {
self.heartbeats.remove(role);
}
pub fn worker_count(&self) -> usize {
self.heartbeats.len()
}
pub fn format_status(&self) -> String {
let m = self.metrics();
format!(
"Swarm Health: {}/{} healthy, {} degraded, {} unhealthy, {} dead | \
{} tasks ({:.1}/s) | {:.1}% utilization | {:.2}% error rate",
m.healthy_workers,
m.total_workers,
m.degraded_workers,
m.unhealthy_workers,
m.dead_workers,
m.total_tasks_completed,
m.task_throughput,
m.worker_utilization * 100.0,
m.error_rate * 100.0,
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerProfile {
pub name: String,
#[serde(default)]
pub description: String,
pub persona: String,
#[serde(default)]
pub allowed_tools: Vec<String>,
#[serde(default)]
pub provider: Option<String>,
#[serde(default)]
pub model: Option<String>,
#[serde(default = "default_worker_max_iterations")]
pub max_iterations: usize,
#[serde(default = "default_worker_memory")]
pub max_memory_messages: usize,
#[serde(default = "default_true")]
pub can_delegate: bool,
#[serde(default)]
pub resource_limits: ResourceLimits,
}
fn default_worker_max_iterations() -> usize {
10
}
fn default_worker_memory() -> usize {
20
}
fn default_true() -> bool {
true
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceLimits {
#[serde(default = "default_max_tool_calls")]
pub max_tool_calls: usize,
#[serde(default = "default_max_exec_secs")]
pub max_exec_secs: u64,
}
fn default_max_tool_calls() -> usize {
50
}
fn default_max_exec_secs() -> u64 {
300
}
impl Default for ResourceLimits {
fn default() -> Self {
Self {
max_tool_calls: default_max_tool_calls(),
max_exec_secs: default_max_exec_secs(),
}
}
}
impl Default for WorkerProfile {
fn default() -> Self {
Self {
name: "default".to_string(),
description: String::new(),
persona: "You are a helpful assistant.".to_string(),
allowed_tools: Vec::new(),
provider: None,
model: None,
max_iterations: default_worker_max_iterations(),
max_memory_messages: default_worker_memory(),
can_delegate: default_true(),
resource_limits: ResourceLimits::default(),
}
}
}
impl WorkerProfile {
pub fn researcher() -> Self {
Self {
name: "researcher".to_string(),
description: "Analytical researcher focused on data gathering and analysis".to_string(),
persona: "You are an analytical researcher. Focus on gathering data, \
verifying facts, and providing well-structured analysis. \
Be thorough and cite your sources."
.to_string(),
allowed_tools: vec![
"web_fetch".to_string(),
"web_search".to_string(),
"read_file".to_string(),
],
max_iterations: 15,
can_delegate: false,
..Default::default()
}
}
pub fn creative() -> Self {
Self {
name: "creative".to_string(),
description: "Creative problem-solver focused on innovation".to_string(),
persona: "You are a creative problem-solver. Focus on generating \
innovative solutions, exploring alternatives, and thinking \
outside the box. Consider multiple perspectives."
.to_string(),
allowed_tools: vec!["write_file".to_string(), "web_search".to_string()],
max_iterations: 10,
can_delegate: false,
..Default::default()
}
}
pub fn executor() -> Self {
Self {
name: "executor".to_string(),
description: "Pragmatic executor focused on efficient task completion".to_string(),
persona: "You are a pragmatic executor. Focus on completing tasks \
efficiently and correctly. Prioritize simplicity and \
practicality over perfection."
.to_string(),
allowed_tools: vec![
"shell_exec".to_string(),
"read_file".to_string(),
"write_file".to_string(),
"web_fetch".to_string(),
],
max_iterations: 8,
can_delegate: false,
..Default::default()
}
}
pub fn reviewer() -> Self {
Self {
name: "reviewer".to_string(),
description: "Quality assurance reviewer focused on verification".to_string(),
persona: "You are a meticulous reviewer. Focus on verifying correctness, \
identifying issues, and ensuring quality. Be critical and \
constructive. Check for errors, edge cases, and improvements."
.to_string(),
allowed_tools: vec!["read_file".to_string(), "web_fetch".to_string()],
max_iterations: 10,
can_delegate: false,
..Default::default()
}
}
pub fn supervisor() -> Self {
Self {
name: "supervisor".to_string(),
description: "Supervisor that decomposes tasks and coordinates sub-agents".to_string(),
persona: "You are a supervisor agent. Your role is to decompose complex \
tasks into subtasks and coordinate sub-agents to complete them. \
Analyze the task, break it down, assign work, and aggregate results. \
\n\nFor each subtask, respond with:\n\
SUBTASK: <description>\n\
ROLE: <researcher|creative|executor|reviewer|supervisor>\n\
\nWhen all subtasks are complete, respond with:\n\
FINAL: <aggregated result>"
.to_string(),
allowed_tools: Vec::new(),
max_iterations: 20,
can_delegate: true,
..Default::default()
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum SwarmTopology {
#[serde(alias = "flat")]
Star,
Mesh,
Hierarchical,
Hybrid,
}
#[allow(clippy::derivable_impls)]
impl Default for SwarmTopology {
fn default() -> Self {
Self::Star
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmConfig {
#[serde(default)]
pub topology: SwarmTopology,
#[serde(default = "default_max_depth")]
pub max_depth: usize,
#[serde(default = "default_max_workers", alias = "agent_count")]
pub max_workers: usize,
#[serde(default, deserialize_with = "deserialize_profiles")]
pub profiles: Vec<WorkerProfile>,
#[serde(default = "default_true")]
pub dynamic_role_assignment: bool,
#[serde(default)]
pub enable_agent_communication: bool,
#[serde(default)]
pub enable_health_monitoring: bool,
}
fn default_max_depth() -> usize {
3
}
fn default_max_workers() -> usize {
100
}
fn deserialize_profiles<'de, D>(
deserializer: D,
) -> std::result::Result<Vec<WorkerProfile>, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum ProfilesOrMap {
Array(Vec<WorkerProfile>),
Map(std::collections::HashMap<String, String>),
}
match ProfilesOrMap::deserialize(deserializer) {
Ok(ProfilesOrMap::Array(profiles)) => Ok(profiles),
Ok(ProfilesOrMap::Map(map)) => {
let profiles: Vec<WorkerProfile> = map
.into_iter()
.map(|(name, persona)| WorkerProfile {
name,
persona,
..WorkerProfile::default()
})
.collect();
Ok(profiles)
}
Err(e) => Err(e),
}
}
impl Default for SwarmConfig {
fn default() -> Self {
Self {
topology: SwarmTopology::default(),
max_depth: default_max_depth(),
max_workers: default_max_workers(),
profiles: vec![
WorkerProfile::researcher(),
WorkerProfile::creative(),
WorkerProfile::executor(),
WorkerProfile::reviewer(),
WorkerProfile::supervisor(),
],
dynamic_role_assignment: true,
enable_agent_communication: false,
enable_health_monitoring: false,
}
}
}
pub struct SwarmOrchestrator {
config: SwarmConfig,
current_depth: usize,
worker_count: usize,
llm: Option<Arc<dyn LLMProviderTrait>>,
multi_llm: Option<MultiModelManager>,
ravenfabric: Option<RavenFabricClient>,
#[allow(dead_code)]
policy_engine: PolicyEngine,
sandbox: Sandbox,
audit_log: AuditLog,
#[allow(dead_code)]
registry: ToolRegistry,
message_bus: Option<Arc<RwLock<AgentMessageBus>>>,
health_monitor: Option<Arc<RwLock<SwarmHealthMonitor>>>,
}
impl SwarmOrchestrator {
pub fn new(
config: SwarmConfig,
llm: Option<Arc<dyn LLMProviderTrait>>,
multi_llm: Option<MultiModelManager>,
ravenfabric: Option<RavenFabricClient>,
) -> Self {
let policy_engine = PolicyEngine::default_secure();
let sandbox = Sandbox::default();
let audit_log = AuditLog::new(format!("swarm-{}", std::process::id()));
let registry = ToolRegistry::with_default_tools();
let message_bus = if config.enable_agent_communication {
Some(Arc::new(RwLock::new(AgentMessageBus::new(1000))))
} else {
None
};
let health_monitor = if config.enable_health_monitoring {
Some(Arc::new(RwLock::new(SwarmHealthMonitor::default())))
} else {
None
};
Self {
config,
current_depth: 0,
worker_count: 0,
llm,
multi_llm,
ravenfabric,
policy_engine,
sandbox,
audit_log,
registry,
message_bus,
health_monitor,
}
}
#[allow(dead_code)]
pub fn new_with_bus(
config: SwarmConfig,
llm: Option<Arc<dyn LLMProviderTrait>>,
multi_llm: Option<MultiModelManager>,
ravenfabric: Option<RavenFabricClient>,
message_bus: Option<Arc<RwLock<AgentMessageBus>>>,
) -> Self {
let policy_engine = PolicyEngine::default_secure();
let sandbox = Sandbox::default();
let audit_log = AuditLog::new(format!("swarm-{}", std::process::id()));
let registry = ToolRegistry::with_default_tools();
let health_monitor = if config.enable_health_monitoring {
Some(Arc::new(RwLock::new(SwarmHealthMonitor::default())))
} else {
None
};
Self {
config,
current_depth: 0,
worker_count: 0,
llm,
multi_llm,
ravenfabric,
policy_engine,
sandbox,
audit_log,
registry,
message_bus,
health_monitor,
}
}
pub async fn init(&mut self) -> Result<()> {
self.sandbox.init().await.map_err(|e| {
RavenClawsError::CommandExecution(format!("Swarm sandbox init failed: {}", e))
})?;
Ok(())
}
#[allow(dead_code)]
pub fn worker_count(&self) -> usize {
self.worker_count
}
#[allow(dead_code)]
pub fn current_depth(&self) -> usize {
self.current_depth
}
#[allow(dead_code)]
pub fn health_metrics(&self) -> Option<SwarmMetrics> {
self.health_monitor
.as_ref()
.and_then(|hm| hm.try_read().ok())
.map(|hm| hm.metrics())
}
#[allow(dead_code)]
pub fn worker_telemetry(&self) -> Option<Vec<WorkerTelemetry>> {
self.health_monitor
.as_ref()
.and_then(|hm| hm.try_read().ok())
.map(|hm| hm.all_worker_telemetry())
}
#[instrument(skip(self, task), fields(depth = self.current_depth, workers = self.worker_count))]
pub async fn orchestrate(&mut self, task: &str) -> Result<String> {
self.orchestrate_impl(task).await
}
async fn orchestrate_impl(&mut self, task: &str) -> Result<String> {
info!(
depth = self.current_depth,
max_depth = self.config.max_depth,
"Orchestrating task"
);
if let Some(ref hm) = self.health_monitor {
if let Ok(hm_guard) = hm.try_read() {
info!(health = %hm_guard.format_status(), "Swarm health at start");
}
}
if self.current_depth >= self.config.max_depth {
warn!(
depth = self.current_depth,
max_depth = self.config.max_depth,
"Max recursion depth reached, executing directly"
);
return self.execute_direct(task).await;
}
if self.worker_count >= self.config.max_workers {
warn!(
workers = self.worker_count,
max_workers = self.config.max_workers,
"Max workers reached, executing directly"
);
return self.execute_direct(task).await;
}
let roles = if self.config.dynamic_role_assignment {
self.analyze_task_roles(task).await?
} else {
vec!["supervisor".to_string()]
};
info!(roles = ?roles, "Assigned roles for task");
if roles.len() == 1 && roles[0] != "supervisor" {
return self.execute_with_profile(task, &roles[0]).await;
}
if roles.contains(&"supervisor".to_string()) || roles.len() > 1 {
return self.recursive_supervise_impl(task, &roles).await;
}
self.execute_direct(task).await
}
async fn analyze_task_roles(&self, task: &str) -> Result<Vec<String>> {
if let Some(ref llm) = self.llm {
let analysis_prompt = format!(
"Analyze this task and determine which roles are needed to complete it. \
Available roles: researcher, creative, executor, reviewer, supervisor. \
\n\nTask: {}\n\n\
Respond with a comma-separated list of roles needed, nothing else. \
Example: researcher, executor, reviewer",
task
);
let messages = vec![
ChatMessage::new(
"system",
"You are a task analysis expert. Respond only with a comma-separated list of roles.",
),
ChatMessage::new("user", analysis_prompt),
];
match llm.chat(messages).await {
Ok(response) => {
let content = response
.choices
.first()
.map(|c| c.message.content.clone())
.unwrap_or_default();
let roles: Vec<String> = content
.split(',')
.map(|r| r.trim().to_lowercase())
.filter(|r| {
matches!(
r.as_str(),
"researcher" | "creative" | "executor" | "reviewer" | "supervisor"
)
})
.collect();
if roles.is_empty() {
Ok(vec!["executor".to_string()])
} else {
Ok(roles)
}
}
Err(e) => {
warn!(error = %e, "Task analysis failed, using default roles");
Ok(vec!["executor".to_string()])
}
}
} else {
Ok(vec!["executor".to_string()])
}
}
async fn execute_with_profile(&self, task: &str, role: &str) -> Result<String> {
let profile = self
.config
.profiles
.iter()
.find(|p| p.name == role)
.cloned()
.unwrap_or_else(|| {
if role == "supervisor" {
WorkerProfile::supervisor()
} else {
WorkerProfile::executor()
}
});
info!(role = %role, profile = %profile.name, "Executing task with profile");
if let Some(ref hm) = self.health_monitor {
if let Ok(mut hm_guard) = hm.try_write() {
hm_guard.register_worker(role);
hm_guard.task_started(role);
}
}
let llm = self.llm.as_ref().ok_or_else(|| {
RavenClawsError::CommandExecution("No LLM provider available for worker".to_string())
})?;
let mut memory = ConversationMemory::new(&profile.persona, profile.max_memory_messages);
let enriched_task = if let Some(ref bus) = self.message_bus {
if let Ok(bus_guard) = bus.try_read() {
let msg_context = bus_guard.format_for_prompt(role, 20);
format!("{}{}", task, msg_context)
} else {
task.to_string()
}
} else {
task.to_string()
};
memory.add_user_message(&enriched_task);
let messages = memory.history().to_vec();
let response = llm.chat(messages).await.map_err(|e| {
if let Some(ref hm) = self.health_monitor {
if let Ok(mut hm_guard) = hm.try_write() {
hm_guard.task_failed(role);
}
}
RavenClawsError::CommandExecution(format!("Worker {} failed: {}", role, e))
})?;
let content = response
.choices
.first()
.map(|c| c.message.content.clone())
.unwrap_or_default();
if let Some(ref hm) = self.health_monitor {
if let Ok(mut hm_guard) = hm.try_write() {
hm_guard.task_completed(role);
hm_guard.heartbeat(role);
}
}
if let Some(ref bus) = self.message_bus {
if let Ok(mut bus_guard) = bus.try_write() {
bus_guard.send(
role,
"*",
MessageType::Result,
&format!(
"Completed task. Result ({} chars): {}",
content.len(),
&content[..content.len().min(500)]
),
HashMap::new(),
);
}
if let Some(ref hm) = self.health_monitor {
if let Ok(mut hm_guard) = hm.try_write() {
hm_guard.message_sent(role);
}
}
}
let _ = self.audit_log.append(
AuditEventType::AgentFinish,
&format!("worker-{}", role),
&format!("Worker {} completed task", role),
Some(serde_json::json!({
"role": role,
"task_length": task.len(),
"response_length": content.len(),
})),
);
Ok(content)
}
async fn execute_direct(&self, task: &str) -> Result<String> {
self.execute_with_profile(task, "executor").await
}
#[allow(dead_code)]
async fn recursive_supervise(&self, task: &str, roles: &[String]) -> Result<String> {
let task = task.to_string();
let roles = roles.to_vec();
let this: &SwarmOrchestrator = self;
Box::pin(async move { this.recursive_supervise_impl(&task, &roles).await }).await
}
async fn recursive_supervise_impl(&self, task: &str, roles: &[String]) -> Result<String> {
let llm = self.llm.as_ref().ok_or_else(|| {
RavenClawsError::CommandExecution(
"No LLM provider available for supervisor".to_string(),
)
})?;
if let Some(ref hm) = self.health_monitor {
if let Ok(mut hm_guard) = hm.try_write() {
hm_guard.register_worker("supervisor");
hm_guard.task_started("supervisor");
}
}
let supervisor_profile = WorkerProfile::supervisor();
let mut memory = ConversationMemory::new(
&supervisor_profile.persona,
supervisor_profile.max_memory_messages,
);
let role_list = roles.join(", ");
let msg_context = if let Some(ref bus) = self.message_bus {
if let Ok(bus_guard) = bus.try_read() {
bus_guard.format_for_prompt("supervisor", 20)
} else {
String::new()
}
} else {
String::new()
};
let supervise_prompt = format!(
"Decompose this task into subtasks and assign each to the most appropriate role.\n\
Available roles: {}\n\n\
Task: {}\n\n\
For each subtask, respond with:\n\
SUBTASK: <description>\n\
ROLE: <role>\n\n\
When all subtasks are complete, respond with:\n\
FINAL: <aggregated result>\n\
{}",
role_list, task, msg_context
);
memory.add_user_message(&supervise_prompt);
let mut subtask_results: Vec<String> = Vec::new();
let mut iteration = 0;
let max_iterations = supervisor_profile.max_iterations;
loop {
iteration += 1;
if iteration > max_iterations {
warn!("Supervisor reached max iterations");
break;
}
let messages = memory.history().to_vec();
let response = match llm.chat(messages).await {
Ok(r) => r,
Err(e) => {
warn!(error = %e, "Supervisor LLM request failed");
continue;
}
};
let content = response
.choices
.first()
.map(|c| c.message.content.clone())
.unwrap_or_default();
if iteration % 3 == 0 {
if let Some(ref hm) = self.health_monitor {
if let Ok(hm_guard) = hm.try_read() {
let status = hm_guard.format_status();
info!(health = %status, "Swarm health check");
let dead = hm_guard.dead_workers_for_replacement();
if !dead.is_empty() {
warn!(dead_workers = ?dead, "Dead workers detected");
}
}
}
}
if content.contains("FINAL:") {
let final_response = content
.split("FINAL:")
.nth(1)
.unwrap_or("")
.trim()
.to_string();
info!(
iteration = iteration,
subtasks = subtask_results.len(),
"Supervisor completed"
);
if let Some(ref hm) = self.health_monitor {
if let Ok(mut hm_guard) = hm.try_write() {
hm_guard.task_completed("supervisor");
hm_guard.heartbeat("supervisor");
}
}
let _ = self.audit_log.append(
AuditEventType::AgentFinish,
"supervisor",
"Supervisor completed recursive decomposition",
Some(serde_json::json!({
"iterations": iteration,
"subtasks_completed": subtask_results.len(),
"depth": self.current_depth,
})),
);
if !subtask_results.is_empty() {
let aggregated = subtask_results.join("\n\n");
return Ok(format!(
"{}\n\n## Aggregated Results\n\n{}",
final_response, aggregated
));
}
return Ok(final_response);
}
if content.contains("SUBTASK:") {
let subtask_block = content.split("SUBTASK:").nth(1).unwrap_or("");
let subtask_lines: Vec<&str> = subtask_block.lines().take(4).collect();
let subtask_desc = subtask_lines.first().unwrap_or(&"").trim();
let role = subtask_lines
.iter()
.find(|l| l.starts_with("ROLE:"))
.and_then(|l| l.split(':').nth(1))
.unwrap_or("executor")
.trim()
.to_lowercase();
if !subtask_desc.is_empty() {
info!(role = %role, subtask = %subtask_desc, "Delegating subtask");
if let Some(ref bus) = self.message_bus {
if let Ok(mut bus_guard) = bus.try_write() {
bus_guard.send(
"supervisor",
&role,
MessageType::Coordination,
&format!("Delegating subtask: {}", subtask_desc),
HashMap::new(),
);
}
}
let result =
if role == "supervisor" && self.current_depth < self.config.max_depth {
let config = self.config.clone();
let current_depth = self.current_depth + 1;
let worker_count = self.worker_count + 1;
let llm = self.llm.clone();
let multi_llm = self.multi_llm.clone();
let ravenfabric = self.ravenfabric.clone();
let subtask = subtask_desc.to_string();
let message_bus = self.message_bus.clone();
let health_monitor = self.health_monitor.clone();
Box::pin(async move {
let mut sub_orchestrator = SwarmOrchestrator {
config,
current_depth,
worker_count,
llm,
multi_llm,
ravenfabric,
policy_engine: PolicyEngine::default_secure(),
sandbox: Sandbox::default(),
audit_log: AuditLog::new(format!(
"sub-swarm-{}-{}",
current_depth,
std::process::id()
)),
registry: ToolRegistry::with_default_tools(),
message_bus,
health_monitor,
};
let _ = sub_orchestrator.init().await;
sub_orchestrator.orchestrate(&subtask).await
})
.await
} else {
self.execute_with_profile(subtask_desc, &role).await
};
match result {
Ok(result) => {
info!(
role = %role,
chars = result.len(),
"Subtask completed"
);
subtask_results.push(format!("[{}] {}", role, result));
memory.add_assistant_message(&format!(
"Delegated subtask to {}: {}",
role, subtask_desc
));
memory.add_user_message(&format!("Result from {}: {}", role, result));
}
Err(e) => {
warn!(role = %role, error = %e, "Subtask failed");
if let Some(ref hm) = self.health_monitor {
if let Ok(mut hm_guard) = hm.try_write() {
hm_guard.task_failed(&role);
hm_guard.record_error(&role);
}
}
memory.add_assistant_message(&format!(
"Subtask for {} failed: {}",
role, e
));
}
}
}
} else {
memory.add_assistant_message(&content);
}
}
if !subtask_results.is_empty() {
let aggregated = subtask_results.join("\n\n");
info!(
"Supervisor aggregated {} subtask results",
subtask_results.len()
);
return Ok(aggregated);
}
Err(RavenClawsError::CommandExecution(
"Supervisor completed without results".to_string(),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_worker_profile_default() {
let profile = WorkerProfile::default();
assert_eq!(profile.name, "default");
assert!(profile.can_delegate);
assert_eq!(profile.max_iterations, 10);
assert_eq!(profile.max_memory_messages, 20);
}
#[test]
fn test_worker_profile_researcher() {
let profile = WorkerProfile::researcher();
assert_eq!(profile.name, "researcher");
assert!(!profile.can_delegate);
assert!(profile.allowed_tools.contains(&"web_fetch".to_string()));
assert!(profile.allowed_tools.contains(&"web_search".to_string()));
}
#[test]
fn test_worker_profile_creative() {
let profile = WorkerProfile::creative();
assert_eq!(profile.name, "creative");
assert!(!profile.can_delegate);
}
#[test]
fn test_worker_profile_executor() {
let profile = WorkerProfile::executor();
assert_eq!(profile.name, "executor");
assert!(!profile.can_delegate);
assert!(profile.allowed_tools.contains(&"shell_exec".to_string()));
}
#[test]
fn test_worker_profile_reviewer() {
let profile = WorkerProfile::reviewer();
assert_eq!(profile.name, "reviewer");
assert!(!profile.can_delegate);
}
#[test]
fn test_worker_profile_supervisor() {
let profile = WorkerProfile::supervisor();
assert_eq!(profile.name, "supervisor");
assert!(profile.can_delegate);
assert!(profile.persona.contains("SUBTASK:"));
assert!(profile.persona.contains("FINAL:"));
}
#[test]
fn test_swarm_config_default() {
let config = SwarmConfig::default();
assert_eq!(config.topology, SwarmTopology::Star);
assert_eq!(config.max_depth, 3);
assert_eq!(config.max_workers, 100);
assert!(config.dynamic_role_assignment);
assert_eq!(config.profiles.len(), 5);
}
#[test]
fn test_swarm_topology_serde() {
let topologies = vec![
SwarmTopology::Star,
SwarmTopology::Mesh,
SwarmTopology::Hierarchical,
SwarmTopology::Hybrid,
];
for t in &topologies {
let json = serde_json::to_string(t).unwrap();
let deserialized: SwarmTopology = serde_json::from_str(&json).unwrap();
assert_eq!(*t, deserialized);
}
}
#[test]
fn test_swarm_config_serde() {
let config = SwarmConfig::default();
let json = serde_json::to_string_pretty(&config).unwrap();
let deserialized: SwarmConfig = serde_json::from_str(&json).unwrap();
assert_eq!(config.topology, deserialized.topology);
assert_eq!(config.max_depth, deserialized.max_depth);
assert_eq!(config.max_workers, deserialized.max_workers);
assert_eq!(config.profiles.len(), deserialized.profiles.len());
}
#[test]
fn test_resource_limits_default() {
let limits = ResourceLimits::default();
assert_eq!(limits.max_tool_calls, 50);
assert_eq!(limits.max_exec_secs, 300);
}
#[test]
fn test_swarm_orchestrator_new() {
let config = SwarmConfig::default();
let orchestrator = SwarmOrchestrator::new(config, None, None, None);
assert_eq!(orchestrator.current_depth(), 0);
assert_eq!(orchestrator.worker_count(), 0);
}
#[test]
fn test_swarm_orchestrator_depth_limit() {
let config = SwarmConfig {
max_depth: 0, ..SwarmConfig::default()
};
let mut orchestrator = SwarmOrchestrator::new(config, None, None, None);
orchestrator.current_depth = 0;
assert!(orchestrator.current_depth >= orchestrator.config.max_depth);
}
#[tokio::test]
async fn test_analyze_task_roles_fallback() {
let config = SwarmConfig::default();
let orchestrator = SwarmOrchestrator::new(config, None, None, None);
let result = orchestrator.analyze_task_roles("test task").await;
assert!(result.is_ok());
}
#[test]
fn test_worker_profile_custom() {
let profile = WorkerProfile {
name: "custom".to_string(),
description: "Custom worker".to_string(),
persona: "You are a custom worker.".to_string(),
allowed_tools: vec!["read_file".to_string()],
provider: Some("openai".to_string()),
model: Some("gpt-4".to_string()),
max_iterations: 5,
max_memory_messages: 10,
can_delegate: false,
resource_limits: ResourceLimits {
max_tool_calls: 10,
max_exec_secs: 60,
},
};
assert_eq!(profile.name, "custom");
assert_eq!(profile.provider, Some("openai".to_string()));
assert_eq!(profile.model, Some("gpt-4".to_string()));
assert_eq!(profile.max_iterations, 5);
assert_eq!(profile.resource_limits.max_tool_calls, 10);
}
#[test]
fn test_swarm_config_custom_profiles() {
let config = SwarmConfig {
profiles: vec![WorkerProfile::researcher(), WorkerProfile::executor()],
topology: SwarmTopology::Hierarchical,
max_depth: 5,
max_workers: 50,
..SwarmConfig::default()
};
assert_eq!(config.profiles.len(), 2);
assert_eq!(config.topology, SwarmTopology::Hierarchical);
assert_eq!(config.max_depth, 5);
assert_eq!(config.max_workers, 50);
}
#[test]
fn test_message_bus_new() {
let bus = AgentMessageBus::new(100);
assert!(bus.is_empty());
assert_eq!(bus.len(), 0);
}
#[test]
fn test_message_bus_send_and_receive() {
let mut bus = AgentMessageBus::new(100);
let id = bus.send(
"researcher",
"executor",
MessageType::Information,
"Found relevant data",
HashMap::new(),
);
assert!(!id.is_empty());
assert_eq!(bus.len(), 1);
let msgs = bus.messages_for("executor");
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].content, "Found relevant data");
assert_eq!(msgs[0].sender, "researcher");
}
#[test]
fn test_message_bus_broadcast() {
let mut bus = AgentMessageBus::new(100);
bus.send(
"supervisor",
"*",
MessageType::Coordination,
"All workers proceed",
HashMap::new(),
);
assert_eq!(bus.messages_for("researcher").len(), 1);
assert_eq!(bus.messages_for("executor").len(), 1);
assert_eq!(bus.messages_for("reviewer").len(), 1);
}
#[test]
fn test_message_bus_filter_by_type() {
let mut bus = AgentMessageBus::new(100);
bus.send(
"researcher",
"*",
MessageType::Information,
"Data found",
HashMap::new(),
);
bus.send(
"executor",
"supervisor",
MessageType::Result,
"Task done",
HashMap::new(),
);
bus.send(
"executor",
"supervisor",
MessageType::Error,
"Failed",
HashMap::new(),
);
let errors = bus.messages_of_type(&MessageType::Error);
assert_eq!(errors.len(), 1);
assert_eq!(errors[0].content, "Failed");
let results = bus.messages_of_type(&MessageType::Result);
assert_eq!(results.len(), 1);
}
#[test]
fn test_message_bus_max_messages() {
let mut bus = AgentMessageBus::new(5);
for i in 0..10 {
bus.send(
"worker",
"*",
MessageType::Generic,
&format!("Message {}", i),
HashMap::new(),
);
}
assert_eq!(bus.len(), 5);
let all = bus.all_messages();
assert_eq!(all[0].content, "Message 5");
assert_eq!(all[4].content, "Message 9");
}
#[test]
fn test_message_bus_format_for_prompt() {
let mut bus = AgentMessageBus::new(100);
bus.send(
"researcher",
"*",
MessageType::Information,
"Found key insight",
HashMap::new(),
);
bus.send(
"executor",
"supervisor",
MessageType::Result,
"Implementation complete",
HashMap::new(),
);
let prompt = bus.format_for_prompt("supervisor", 10);
assert!(prompt.contains("Inter-Agent Messages"));
assert!(prompt.contains("researcher"));
assert!(prompt.contains("executor"));
assert!(prompt.contains("Found key insight"));
}
#[test]
fn test_message_bus_empty_format() {
let bus = AgentMessageBus::new(100);
let prompt = bus.format_for_prompt("supervisor", 10);
assert!(prompt.is_empty());
}
#[test]
fn test_message_type_display() {
assert_eq!(format!("{}", MessageType::Information), "information");
assert_eq!(format!("{}", MessageType::Question), "question");
assert_eq!(format!("{}", MessageType::Result), "result");
assert_eq!(format!("{}", MessageType::Error), "error");
assert_eq!(format!("{}", MessageType::Coordination), "coordination");
assert_eq!(format!("{}", MessageType::Generic), "generic");
}
#[test]
fn test_message_bus_messages_from() {
let mut bus = AgentMessageBus::new(100);
bus.send(
"researcher",
"*",
MessageType::Information,
"Data A",
HashMap::new(),
);
bus.send(
"researcher",
"executor",
MessageType::Information,
"Data B",
HashMap::new(),
);
bus.send(
"executor",
"supervisor",
MessageType::Result,
"Done",
HashMap::new(),
);
let from_researcher = bus.messages_from("researcher");
assert_eq!(from_researcher.len(), 2);
let from_executor = bus.messages_from("executor");
assert_eq!(from_executor.len(), 1);
}
#[test]
fn test_orchestrator_new_with_communication() {
let config = SwarmConfig {
enable_agent_communication: true,
..SwarmConfig::default()
};
let orchestrator = SwarmOrchestrator::new(config, None, None, None);
assert!(orchestrator.message_bus.is_some());
}
#[test]
fn test_orchestrator_new_without_communication() {
let config = SwarmConfig {
enable_agent_communication: false,
..SwarmConfig::default()
};
let orchestrator = SwarmOrchestrator::new(config, None, None, None);
assert!(orchestrator.message_bus.is_none());
}
#[test]
fn test_swarm_config_communication_default() {
let config = SwarmConfig::default();
assert!(!config.enable_agent_communication); assert!(!config.enable_health_monitoring); }
#[test]
fn test_health_status_display() {
assert_eq!(format!("{}", WorkerHealthStatus::Healthy), "healthy");
assert_eq!(format!("{}", WorkerHealthStatus::Degraded), "degraded");
assert_eq!(format!("{}", WorkerHealthStatus::Unhealthy), "unhealthy");
assert_eq!(format!("{}", WorkerHealthStatus::Dead), "dead");
}
#[test]
fn test_health_monitor_default() {
let hm = SwarmHealthMonitor::default();
assert_eq!(hm.heartbeat_interval_secs, 5);
assert_eq!(hm.max_missed_beats, 3);
assert_eq!(hm.replacement_timeout_secs, 30);
assert_eq!(hm.worker_count(), 0);
}
#[test]
fn test_health_monitor_register_worker() {
let mut hm = SwarmHealthMonitor::default();
hm.register_worker("researcher");
assert_eq!(hm.worker_count(), 1);
let telemetry = hm.worker_telemetry("researcher");
assert!(telemetry.is_some());
assert_eq!(telemetry.unwrap().role, "researcher");
}
#[test]
fn test_health_monitor_heartbeat() {
let mut hm = SwarmHealthMonitor::default();
hm.register_worker("executor");
hm.heartbeat("executor");
let telemetry = hm.worker_telemetry("executor").unwrap();
assert_eq!(telemetry.status, WorkerHealthStatus::Healthy);
}
#[test]
fn test_health_monitor_task_lifecycle() {
let mut hm = SwarmHealthMonitor::default();
hm.register_worker("executor");
hm.task_started("executor");
hm.task_completed("executor");
let telemetry = hm.worker_telemetry("executor").unwrap();
assert_eq!(telemetry.tasks_completed, 1);
assert_eq!(telemetry.tasks_failed, 0);
}
#[test]
fn test_health_monitor_task_failure() {
let mut hm = SwarmHealthMonitor::default();
hm.register_worker("executor");
hm.task_started("executor");
hm.task_failed("executor");
let telemetry = hm.worker_telemetry("executor").unwrap();
assert_eq!(telemetry.tasks_completed, 0);
assert_eq!(telemetry.tasks_failed, 1);
assert_eq!(telemetry.error_count, 1);
}
#[test]
fn test_health_monitor_metrics_empty() {
let hm = SwarmHealthMonitor::default();
let metrics = hm.metrics();
assert_eq!(metrics.total_workers, 0);
assert_eq!(metrics.healthy_workers, 0);
assert_eq!(metrics.total_tasks_completed, 0);
assert_eq!(metrics.task_throughput, 0.0);
}
#[test]
fn test_health_monitor_metrics_with_workers() {
let mut hm = SwarmHealthMonitor::default();
hm.register_worker("researcher");
hm.register_worker("executor");
hm.task_started("executor");
hm.task_completed("executor");
hm.task_started("researcher");
hm.task_completed("researcher");
let metrics = hm.metrics();
assert_eq!(metrics.total_workers, 2);
assert_eq!(metrics.healthy_workers, 2);
assert_eq!(metrics.total_tasks_completed, 2);
}
#[test]
fn test_health_monitor_dead_worker_detection() {
let mut hm = SwarmHealthMonitor {
heartbeat_interval_secs: 1,
max_missed_beats: 1,
replacement_timeout_secs: 0, ..SwarmHealthMonitor::default()
};
hm.register_worker("executor");
if let Some(hb) = hm.heartbeats.get_mut("executor") {
hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(10);
}
let dead = hm.check_health();
assert!(!dead.is_empty());
assert_eq!(dead[0], "executor");
}
#[test]
fn test_health_monitor_degraded_detection() {
let mut hm = SwarmHealthMonitor {
heartbeat_interval_secs: 1,
max_missed_beats: 3,
..SwarmHealthMonitor::default()
};
hm.register_worker("executor");
if let Some(hb) = hm.heartbeats.get_mut("executor") {
hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(3);
}
let _dead = hm.check_health();
let telemetry = hm.worker_telemetry("executor").unwrap();
assert_eq!(telemetry.status, WorkerHealthStatus::Degraded);
}
#[test]
fn test_health_monitor_message_tracking() {
let mut hm = SwarmHealthMonitor::default();
hm.register_worker("researcher");
hm.message_sent("researcher");
hm.message_sent("researcher");
hm.message_received("researcher");
let telemetry = hm.worker_telemetry("researcher").unwrap();
assert_eq!(telemetry.messages_sent, 2);
assert_eq!(telemetry.messages_received, 1);
}
#[test]
fn test_health_monitor_error_tracking() {
let mut hm = SwarmHealthMonitor::default();
hm.register_worker("executor");
hm.record_error("executor");
hm.record_error("executor");
hm.record_error("executor");
let telemetry = hm.worker_telemetry("executor").unwrap();
assert_eq!(telemetry.error_count, 3);
}
#[test]
fn test_health_monitor_format_status() {
let hm = SwarmHealthMonitor::default();
let status = hm.format_status();
assert!(status.contains("Swarm Health:"));
assert!(status.contains("healthy"));
}
#[test]
fn test_health_monitor_all_worker_telemetry() {
let mut hm = SwarmHealthMonitor::default();
hm.register_worker("researcher");
hm.register_worker("executor");
hm.register_worker("reviewer");
let all = hm.all_worker_telemetry();
assert_eq!(all.len(), 3);
}
#[test]
fn test_health_monitor_remove_worker() {
let mut hm = SwarmHealthMonitor::default();
hm.register_worker("executor");
assert_eq!(hm.worker_count(), 1);
hm.remove_worker("executor");
assert_eq!(hm.worker_count(), 0);
}
#[test]
fn test_orchestrator_new_with_health_monitoring() {
let config = SwarmConfig {
enable_health_monitoring: true,
..SwarmConfig::default()
};
let orchestrator = SwarmOrchestrator::new(config, None, None, None);
assert!(orchestrator.health_monitor.is_some());
}
#[test]
fn test_orchestrator_new_without_health_monitoring() {
let config = SwarmConfig {
enable_health_monitoring: false,
..SwarmConfig::default()
};
let orchestrator = SwarmOrchestrator::new(config, None, None, None);
assert!(orchestrator.health_monitor.is_none());
}
#[test]
fn test_health_metrics_accessor() {
let config = SwarmConfig {
enable_health_monitoring: true,
..SwarmConfig::default()
};
let orchestrator = SwarmOrchestrator::new(config, None, None, None);
let metrics = orchestrator.health_metrics();
assert!(metrics.is_some());
assert_eq!(metrics.unwrap().total_workers, 0);
}
#[test]
fn test_worker_telemetry_accessor() {
let config = SwarmConfig {
enable_health_monitoring: true,
..SwarmConfig::default()
};
let orchestrator = SwarmOrchestrator::new(config, None, None, None);
let telemetry = orchestrator.worker_telemetry();
assert!(telemetry.is_some());
assert!(telemetry.unwrap().is_empty());
}
#[test]
fn test_health_monitor_new_custom() {
let hm = SwarmHealthMonitor::new(10, 5, 60);
assert_eq!(hm.heartbeat_interval_secs, 10);
assert_eq!(hm.max_missed_beats, 5);
assert_eq!(hm.replacement_timeout_secs, 60);
}
#[test]
fn test_health_monitor_dead_workers_for_replacement() {
let mut hm = SwarmHealthMonitor {
heartbeat_interval_secs: 1,
max_missed_beats: 1,
replacement_timeout_secs: 0,
..SwarmHealthMonitor::default()
};
hm.register_worker("executor");
if let Some(hb) = hm.heartbeats.get_mut("executor") {
hb.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(30);
hb.status = WorkerHealthStatus::Dead;
}
let candidates = hm.dead_workers_for_replacement();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0], "executor");
}
#[test]
fn test_health_monitor_metrics_error_rate() {
let mut hm = SwarmHealthMonitor::default();
hm.register_worker("executor");
hm.task_started("executor");
hm.task_completed("executor");
hm.task_started("executor");
hm.task_failed("executor");
let metrics = hm.metrics();
assert_eq!(metrics.total_tasks_completed, 1);
assert_eq!(metrics.total_tasks_failed, 1);
assert!(metrics.error_rate > 0.0);
}
#[test]
fn test_health_monitor_metrics_utilization() {
let mut hm = SwarmHealthMonitor::default();
hm.register_worker("busy_worker");
hm.register_worker("idle_worker");
if let Some(hb) = hm.heartbeats.get_mut("busy_worker") {
hb.is_busy = true;
}
let metrics = hm.metrics();
assert_eq!(metrics.total_workers, 2);
assert!((metrics.worker_utilization - 0.5).abs() < f64::EPSILON);
}
#[test]
fn test_health_monitor_iteration_tracking() {
let mut hm = SwarmHealthMonitor::default();
hm.register_worker("executor");
hm.task_started("executor");
hm.task_completed("executor");
hm.task_started("executor");
hm.task_completed("executor");
hm.task_started("executor");
let telemetry = hm.worker_telemetry("executor").unwrap();
assert_eq!(telemetry.iteration, 3);
assert_eq!(telemetry.tasks_completed, 2);
}
}