Skip to main content

codetether_agent/swarm/
mod.rs

1//! Swarm orchestration for parallel sub-agent execution
2//!
3//! Implements the SubAgent/SubTask paradigm for parallel task execution,
4//! similar to Kimi K2.5's Agent Swarm but generalized for the CodeTether ecosystem.
5//!
6//! Key concepts:
7//! - **Orchestrator**: Decomposes complex tasks into parallelizable subtasks
8//! - **SubAgent**: A dynamically instantiated agent for executing a subtask
9//! - **SubTask**: A unit of work that can be executed in parallel
10//! - **Critical Path**: Latency-oriented metric for parallel execution
11
12pub mod cache;
13pub mod collapse_controller;
14pub mod delegation;
15pub mod delegation_outcome;
16pub mod executor;
17pub mod k8s_result;
18pub mod kubernetes_executor;
19mod live_bus;
20pub mod orchestrator;
21pub mod rate_limiter;
22pub mod remote_subtask;
23pub mod result_store;
24pub mod speculative;
25pub mod subtask;
26pub mod token_exhaustion;
27pub mod token_truncate;
28mod tool_policy;
29pub mod validation;
30
31pub use cache::{CacheConfig, CacheStats, SwarmCache};
32pub use collapse_controller::{
33    BranchEvaluation, BranchObservation, BranchRuntimeState, CoherenceScore, CollapseController,
34    CollapsePolicy, CollapseTick, KillDecision,
35};
36pub use executor::{SwarmExecutor, run_agent_loop};
37pub use orchestrator::Orchestrator;
38pub use rate_limiter::{AdaptiveRateLimiter, RateLimitInfo, RateLimitStats};
39pub use result_store::{ResultStore, ResultStoreContext, SharedResult, SubTaskStoreHandle};
40pub use subtask::{SubAgent, SubTask, SubTaskContext, SubTaskResult, SubTaskStatus};
41
42use anyhow::Result;
43use async_trait::async_trait;
44
45/// Actor trait for swarm participants
46///
47/// An Actor is an entity that can participate in the swarm by receiving
48/// and processing messages. This is the base trait for all swarm participants.
49#[async_trait]
50pub trait Actor: Send + Sync {
51    /// Get the unique identifier for this actor
52    fn actor_id(&self) -> &str;
53
54    /// Get the actor's current status
55    fn actor_status(&self) -> ActorStatus;
56
57    /// Initialize the actor for swarm participation
58    async fn initialize(&mut self) -> Result<()>;
59
60    /// Shutdown the actor gracefully
61    async fn shutdown(&mut self) -> Result<()>;
62}
63
64/// Handler trait for processing messages in the swarm
65///
66/// A Handler can receive and process messages of a specific type.
67/// This enables actors to respond to different message types.
68#[async_trait]
69pub trait Handler<M>: Actor {
70    /// The response type for this handler
71    type Response: Send + Sync;
72
73    /// Handle a message and return a response
74    async fn handle(&mut self, message: M) -> Result<Self::Response>;
75}
76
77/// Status of an actor in the swarm
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum ActorStatus {
80    /// Actor is initializing
81    Initializing,
82    /// Actor is ready to process messages
83    Ready,
84    /// Actor is currently processing
85    Busy,
86    /// Actor is paused
87    Paused,
88    /// Actor is shutting down
89    ShuttingDown,
90    /// Actor has stopped
91    Stopped,
92}
93
94impl std::fmt::Display for ActorStatus {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            ActorStatus::Initializing => write!(f, "initializing"),
98            ActorStatus::Ready => write!(f, "ready"),
99            ActorStatus::Busy => write!(f, "busy"),
100            ActorStatus::Paused => write!(f, "paused"),
101            ActorStatus::ShuttingDown => write!(f, "shutting_down"),
102            ActorStatus::Stopped => write!(f, "stopped"),
103        }
104    }
105}
106
107/// Message types for swarm coordination
108#[derive(Debug, Clone)]
109pub enum SwarmMessage {
110    /// Execute a task
111    ExecuteTask {
112        task_id: String,
113        instruction: String,
114    },
115    /// Report progress
116    Progress {
117        task_id: String,
118        progress: f32,
119        message: String,
120    },
121    /// Task completed
122    TaskCompleted { task_id: String, result: String },
123    /// Task failed
124    TaskFailed { task_id: String, error: String },
125    /// Request tool execution
126    ToolRequest {
127        tool_id: String,
128        arguments: serde_json::Value,
129    },
130    /// Tool response
131    ToolResponse {
132        tool_id: String,
133        result: crate::tool::ToolResult,
134    },
135}
136
137use serde::{Deserialize, Serialize};
138
139/// Maximum number of concurrent sub-agents
140pub const MAX_SUBAGENTS: usize = 100;
141
142/// Maximum total tool calls across all sub-agents
143pub const MAX_TOOL_CALLS: usize = 1500;
144
145/// Swarm execution configuration
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct SwarmConfig {
148    /// Maximum number of concurrent sub-agents
149    pub max_subagents: usize,
150
151    /// Maximum tool calls per sub-agent
152    pub max_steps_per_subagent: usize,
153
154    /// Maximum total tool calls across all sub-agents
155    pub max_total_steps: usize,
156
157    /// Timeout for individual sub-agent execution (seconds)
158    pub subagent_timeout_secs: u64,
159
160    /// Whether to enable parallel execution
161    pub parallel_enabled: bool,
162
163    /// Critical path optimization threshold
164    pub critical_path_threshold: usize,
165
166    /// Model to use for sub-agents (provider/model format)
167    pub model: Option<String>,
168
169    /// Max concurrent API requests (rate limiting)
170    pub max_concurrent_requests: usize,
171
172    /// Delay between API calls in ms (rate limiting)
173    pub request_delay_ms: u64,
174
175    /// Enable worktree isolation for sub-agents
176    pub worktree_enabled: bool,
177
178    /// Automatically merge worktree changes on success
179    pub worktree_auto_merge: bool,
180
181    /// Working directory for worktree creation
182    pub working_dir: Option<String>,
183
184    /// Execution mode for sub-agent runtime
185    #[serde(default)]
186    pub execution_mode: ExecutionMode,
187
188    /// Maximum number of Kubernetes sub-agent pods active at once.
189    #[serde(default = "default_k8s_pod_budget")]
190    pub k8s_pod_budget: usize,
191
192    /// Optional container image override for Kubernetes sub-agent pods.
193    #[serde(default)]
194    pub k8s_subagent_image: Option<String>,
195
196    /// Maximum number of retry attempts for transient failures (0 = no retries)
197    #[serde(default = "default_max_retries")]
198    pub max_retries: u32,
199
200    /// Base delay in milliseconds for the first retry (exponential backoff)
201    #[serde(default = "default_base_delay_ms")]
202    pub base_delay_ms: u64,
203
204    /// Maximum delay in milliseconds between retries (caps exponential growth)
205    #[serde(default = "default_max_delay_ms")]
206    pub max_delay_ms: u64,
207}
208
209/// Sub-agent execution mode.
210#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
211#[serde(rename_all = "snake_case")]
212#[derive(Default)]
213pub enum ExecutionMode {
214    /// Run sub-agents as local async tasks in the current process.
215    #[default]
216    LocalThread,
217    /// Run sub-agents as isolated Kubernetes pods.
218    KubernetesPod,
219}
220
221impl ExecutionMode {
222    pub fn from_cli_value(value: &str) -> Self {
223        match value {
224            "k8s" | "kubernetes" | "kubernetes-pod" | "pod" => Self::KubernetesPod,
225            _ => Self::LocalThread,
226        }
227    }
228}
229
230fn default_k8s_pod_budget() -> usize {
231    8
232}
233
234fn default_max_retries() -> u32 {
235    3
236}
237
238fn default_base_delay_ms() -> u64 {
239    500
240}
241
242fn default_max_delay_ms() -> u64 {
243    30_000
244}
245
246impl Default for SwarmConfig {
247    fn default() -> Self {
248        Self {
249            max_subagents: MAX_SUBAGENTS,
250            max_steps_per_subagent: 100,
251            max_total_steps: MAX_TOOL_CALLS,
252            subagent_timeout_secs: 600, // 10 minutes for complex tasks
253            parallel_enabled: true,
254            critical_path_threshold: 10,
255            model: None,
256            // Concurrency tuned for modern paid tiers and multi-agent swarms.
257            // Override via SwarmConfigBuilder for conservative free-tier
258            // accounts (e.g. 3 concurrent, 1000ms delay).
259            max_concurrent_requests: 8,
260            request_delay_ms: 250,
261            worktree_enabled: true,    // Enable worktree isolation by default
262            worktree_auto_merge: true, // Auto-merge on success
263            working_dir: None,
264            execution_mode: ExecutionMode::LocalThread,
265            k8s_pod_budget: 8,
266            k8s_subagent_image: None,
267            max_retries: 3,
268            base_delay_ms: 500,
269            max_delay_ms: 30_000,
270        }
271    }
272}
273
274/// Swarm execution statistics
275#[derive(Debug, Clone, Default, Serialize, Deserialize)]
276pub struct SwarmStats {
277    /// Total number of sub-agents spawned
278    pub subagents_spawned: usize,
279
280    /// Number of sub-agents that completed successfully
281    pub subagents_completed: usize,
282
283    /// Number of sub-agents that failed
284    pub subagents_failed: usize,
285
286    /// Total tool calls across all sub-agents
287    pub total_tool_calls: usize,
288
289    /// Critical path length (longest chain of dependent steps)
290    pub critical_path_length: usize,
291
292    /// Wall-clock execution time (milliseconds)
293    pub execution_time_ms: u64,
294
295    /// Estimated sequential time (milliseconds)
296    pub sequential_time_estimate_ms: u64,
297
298    /// Parallelization speedup factor
299    pub speedup_factor: f64,
300
301    /// Per-stage statistics
302    pub stages: Vec<StageStats>,
303
304    /// Rate limiting statistics
305    pub rate_limit_stats: RateLimitStats,
306
307    /// Number of cache hits (subtasks served from cache)
308    pub cache_hits: u64,
309
310    /// Number of cache misses (subtasks that required execution)
311    pub cache_misses: u64,
312}
313
314/// Statistics for a single execution stage
315#[derive(Debug, Clone, Default, Serialize, Deserialize)]
316pub struct StageStats {
317    /// Stage index
318    pub stage: usize,
319
320    /// Number of sub-agents in this stage
321    pub subagent_count: usize,
322
323    /// Maximum steps in this stage (critical path contribution)
324    pub max_steps: usize,
325
326    /// Total steps across all sub-agents in this stage
327    pub total_steps: usize,
328
329    /// Execution time for this stage (milliseconds)
330    pub execution_time_ms: u64,
331}
332
333impl SwarmStats {
334    /// Calculate the speedup factor
335    pub fn calculate_speedup(&mut self) {
336        if self.execution_time_ms > 0 {
337            self.speedup_factor =
338                self.sequential_time_estimate_ms as f64 / self.execution_time_ms as f64;
339        }
340    }
341
342    /// Calculate critical path from stages
343    pub fn calculate_critical_path(&mut self) {
344        self.critical_path_length = self.stages.iter().map(|s| s.max_steps).sum();
345    }
346
347    /// Merge cache statistics from a [`CacheStats`] snapshot into these stats
348    pub fn merge_cache_stats(&mut self, cache_stats: &cache::CacheStats) {
349        self.cache_hits = cache_stats.hits;
350        self.cache_misses = cache_stats.misses;
351    }
352
353    /// Cache hit rate (0.0 to 1.0); returns 0.0 when no cache lookups occurred
354    pub fn cache_hit_rate(&self) -> f64 {
355        let total = self.cache_hits + self.cache_misses;
356        if total == 0 {
357            0.0
358        } else {
359            self.cache_hits as f64 / total as f64
360        }
361    }
362}
363
364/// Decomposition strategy for breaking down tasks
365#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
366#[serde(rename_all = "snake_case")]
367#[derive(Default)]
368pub enum DecompositionStrategy {
369    /// Let the AI decide how to decompose
370    #[default]
371    Automatic,
372
373    /// Decompose by domain/specialty
374    ByDomain,
375
376    /// Decompose by data partition
377    ByData,
378
379    /// Decompose by workflow stage
380    ByStage,
381
382    /// Single agent (no decomposition)
383    None,
384}
385
386/// Result of swarm execution
387#[derive(Debug, Clone, Serialize, Deserialize)]
388pub struct SwarmResult {
389    /// Overall success status
390    pub success: bool,
391
392    /// Aggregated result from all sub-agents
393    pub result: String,
394
395    /// Individual sub-task results
396    pub subtask_results: Vec<SubTaskResult>,
397
398    /// Execution statistics
399    pub stats: SwarmStats,
400
401    /// Any artifacts produced
402    pub artifacts: Vec<SwarmArtifact>,
403
404    /// Error message if failed
405    pub error: Option<String>,
406}
407
408/// An artifact produced by the swarm
409#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct SwarmArtifact {
411    /// Artifact type
412    pub artifact_type: String,
413
414    /// Name/identifier
415    pub name: String,
416
417    /// Content or path
418    pub content: String,
419
420    /// Which sub-agent produced it
421    pub source_subagent: Option<String>,
422
423    /// MIME type
424    pub mime_type: Option<String>,
425}