Skip to main content

codetether_agent/swarm/
mod.rs

1//! Swarm orchestration for parallel sub-agent execution
2//!
3//! Implements the SubAgent/SubTask paradigm for parallel task execution,
4//! similar to Kimi K2.5's Agent Swarm but generalized for the CodeTether ecosystem.
5//!
6//! Key concepts:
7//! - **Orchestrator**: Decomposes complex tasks into parallelizable subtasks
8//! - **SubAgent**: A dynamically instantiated agent for executing a subtask
9//! - **SubTask**: A unit of work that can be executed in parallel
10//! - **Critical Path**: Latency-oriented metric for parallel execution
11
12pub mod cache;
13pub mod collapse_controller;
14pub mod executor;
15pub mod kubernetes_executor;
16pub mod orchestrator;
17pub mod rate_limiter;
18pub mod remote_subtask;
19pub mod result_store;
20pub mod subtask;
21
22pub use cache::{CacheConfig, CacheStats, SwarmCache};
23pub use collapse_controller::{
24    BranchEvaluation, BranchObservation, BranchRuntimeState, CoherenceScore, CollapseController,
25    CollapsePolicy, CollapseTick, KillDecision,
26};
27pub use executor::{SwarmExecutor, run_agent_loop};
28pub use orchestrator::Orchestrator;
29pub use rate_limiter::{AdaptiveRateLimiter, RateLimitInfo, RateLimitStats};
30pub use result_store::{ResultStore, ResultStoreContext, SharedResult, SubTaskStoreHandle};
31pub use subtask::{SubAgent, SubTask, SubTaskContext, SubTaskResult, SubTaskStatus};
32
33use anyhow::Result;
34use async_trait::async_trait;
35
36/// Actor trait for swarm participants
37///
38/// An Actor is an entity that can participate in the swarm by receiving
39/// and processing messages. This is the base trait for all swarm participants.
40#[async_trait]
41pub trait Actor: Send + Sync {
42    /// Get the unique identifier for this actor
43    fn actor_id(&self) -> &str;
44
45    /// Get the actor's current status
46    fn actor_status(&self) -> ActorStatus;
47
48    /// Initialize the actor for swarm participation
49    async fn initialize(&mut self) -> Result<()>;
50
51    /// Shutdown the actor gracefully
52    async fn shutdown(&mut self) -> Result<()>;
53}
54
55/// Handler trait for processing messages in the swarm
56///
57/// A Handler can receive and process messages of a specific type.
58/// This enables actors to respond to different message types.
59#[async_trait]
60pub trait Handler<M>: Actor {
61    /// The response type for this handler
62    type Response: Send + Sync;
63
64    /// Handle a message and return a response
65    async fn handle(&mut self, message: M) -> Result<Self::Response>;
66}
67
68/// Status of an actor in the swarm
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum ActorStatus {
71    /// Actor is initializing
72    Initializing,
73    /// Actor is ready to process messages
74    Ready,
75    /// Actor is currently processing
76    Busy,
77    /// Actor is paused
78    Paused,
79    /// Actor is shutting down
80    ShuttingDown,
81    /// Actor has stopped
82    Stopped,
83}
84
85impl std::fmt::Display for ActorStatus {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        match self {
88            ActorStatus::Initializing => write!(f, "initializing"),
89            ActorStatus::Ready => write!(f, "ready"),
90            ActorStatus::Busy => write!(f, "busy"),
91            ActorStatus::Paused => write!(f, "paused"),
92            ActorStatus::ShuttingDown => write!(f, "shutting_down"),
93            ActorStatus::Stopped => write!(f, "stopped"),
94        }
95    }
96}
97
98/// Message types for swarm coordination
99#[derive(Debug, Clone)]
100pub enum SwarmMessage {
101    /// Execute a task
102    ExecuteTask {
103        task_id: String,
104        instruction: String,
105    },
106    /// Report progress
107    Progress {
108        task_id: String,
109        progress: f32,
110        message: String,
111    },
112    /// Task completed
113    TaskCompleted { task_id: String, result: String },
114    /// Task failed
115    TaskFailed { task_id: String, error: String },
116    /// Request tool execution
117    ToolRequest {
118        tool_id: String,
119        arguments: serde_json::Value,
120    },
121    /// Tool response
122    ToolResponse {
123        tool_id: String,
124        result: crate::tool::ToolResult,
125    },
126}
127
128use serde::{Deserialize, Serialize};
129
130/// Maximum number of concurrent sub-agents
131pub const MAX_SUBAGENTS: usize = 100;
132
133/// Maximum total tool calls across all sub-agents
134pub const MAX_TOOL_CALLS: usize = 1500;
135
136/// Swarm execution configuration
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct SwarmConfig {
139    /// Maximum number of concurrent sub-agents
140    pub max_subagents: usize,
141
142    /// Maximum tool calls per sub-agent
143    pub max_steps_per_subagent: usize,
144
145    /// Maximum total tool calls across all sub-agents
146    pub max_total_steps: usize,
147
148    /// Timeout for individual sub-agent execution (seconds)
149    pub subagent_timeout_secs: u64,
150
151    /// Whether to enable parallel execution
152    pub parallel_enabled: bool,
153
154    /// Critical path optimization threshold
155    pub critical_path_threshold: usize,
156
157    /// Model to use for sub-agents (provider/model format)
158    pub model: Option<String>,
159
160    /// Max concurrent API requests (rate limiting)
161    pub max_concurrent_requests: usize,
162
163    /// Delay between API calls in ms (rate limiting)
164    pub request_delay_ms: u64,
165
166    /// Enable worktree isolation for sub-agents
167    pub worktree_enabled: bool,
168
169    /// Automatically merge worktree changes on success
170    pub worktree_auto_merge: bool,
171
172    /// Working directory for worktree creation
173    pub working_dir: Option<String>,
174
175    /// Execution mode for sub-agent runtime
176    #[serde(default)]
177    pub execution_mode: ExecutionMode,
178
179    /// Maximum number of Kubernetes sub-agent pods active at once.
180    #[serde(default = "default_k8s_pod_budget")]
181    pub k8s_pod_budget: usize,
182
183    /// Optional container image override for Kubernetes sub-agent pods.
184    #[serde(default)]
185    pub k8s_subagent_image: Option<String>,
186
187    /// Maximum number of retry attempts for transient failures (0 = no retries)
188    #[serde(default = "default_max_retries")]
189    pub max_retries: u32,
190
191    /// Base delay in milliseconds for the first retry (exponential backoff)
192    #[serde(default = "default_base_delay_ms")]
193    pub base_delay_ms: u64,
194
195    /// Maximum delay in milliseconds between retries (caps exponential growth)
196    #[serde(default = "default_max_delay_ms")]
197    pub max_delay_ms: u64,
198}
199
200/// Sub-agent execution mode.
201#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
202#[serde(rename_all = "snake_case")]
203#[derive(Default)]
204pub enum ExecutionMode {
205    /// Run sub-agents as local async tasks in the current process.
206    #[default]
207    LocalThread,
208    /// Run sub-agents as isolated Kubernetes pods.
209    KubernetesPod,
210}
211
212impl ExecutionMode {
213    pub fn from_cli_value(value: &str) -> Self {
214        match value {
215            "k8s" | "kubernetes" | "kubernetes-pod" | "pod" => Self::KubernetesPod,
216            _ => Self::LocalThread,
217        }
218    }
219}
220
221fn default_k8s_pod_budget() -> usize {
222    8
223}
224
225fn default_max_retries() -> u32 {
226    3
227}
228
229fn default_base_delay_ms() -> u64 {
230    500
231}
232
233fn default_max_delay_ms() -> u64 {
234    30_000
235}
236
237impl Default for SwarmConfig {
238    fn default() -> Self {
239        Self {
240            max_subagents: MAX_SUBAGENTS,
241            max_steps_per_subagent: 100,
242            max_total_steps: MAX_TOOL_CALLS,
243            subagent_timeout_secs: 600, // 10 minutes for complex tasks
244            parallel_enabled: true,
245            critical_path_threshold: 10,
246            model: None,
247            max_concurrent_requests: 3, // V1 tier allows 3 concurrent
248            request_delay_ms: 1000,     // V1 tier: 60 RPM, 3 concurrent = fast
249            worktree_enabled: true,     // Enable worktree isolation by default
250            worktree_auto_merge: true,  // Auto-merge on success
251            working_dir: None,
252            execution_mode: ExecutionMode::LocalThread,
253            k8s_pod_budget: 8,
254            k8s_subagent_image: None,
255            max_retries: 3,
256            base_delay_ms: 500,
257            max_delay_ms: 30_000,
258        }
259    }
260}
261
262/// Swarm execution statistics
263#[derive(Debug, Clone, Default, Serialize, Deserialize)]
264pub struct SwarmStats {
265    /// Total number of sub-agents spawned
266    pub subagents_spawned: usize,
267
268    /// Number of sub-agents that completed successfully
269    pub subagents_completed: usize,
270
271    /// Number of sub-agents that failed
272    pub subagents_failed: usize,
273
274    /// Total tool calls across all sub-agents
275    pub total_tool_calls: usize,
276
277    /// Critical path length (longest chain of dependent steps)
278    pub critical_path_length: usize,
279
280    /// Wall-clock execution time (milliseconds)
281    pub execution_time_ms: u64,
282
283    /// Estimated sequential time (milliseconds)
284    pub sequential_time_estimate_ms: u64,
285
286    /// Parallelization speedup factor
287    pub speedup_factor: f64,
288
289    /// Per-stage statistics
290    pub stages: Vec<StageStats>,
291
292    /// Rate limiting statistics
293    pub rate_limit_stats: RateLimitStats,
294
295    /// Number of cache hits (subtasks served from cache)
296    pub cache_hits: u64,
297
298    /// Number of cache misses (subtasks that required execution)
299    pub cache_misses: u64,
300}
301
302/// Statistics for a single execution stage
303#[derive(Debug, Clone, Default, Serialize, Deserialize)]
304pub struct StageStats {
305    /// Stage index
306    pub stage: usize,
307
308    /// Number of sub-agents in this stage
309    pub subagent_count: usize,
310
311    /// Maximum steps in this stage (critical path contribution)
312    pub max_steps: usize,
313
314    /// Total steps across all sub-agents in this stage
315    pub total_steps: usize,
316
317    /// Execution time for this stage (milliseconds)
318    pub execution_time_ms: u64,
319}
320
321impl SwarmStats {
322    /// Calculate the speedup factor
323    pub fn calculate_speedup(&mut self) {
324        if self.execution_time_ms > 0 {
325            self.speedup_factor =
326                self.sequential_time_estimate_ms as f64 / self.execution_time_ms as f64;
327        }
328    }
329
330    /// Calculate critical path from stages
331    pub fn calculate_critical_path(&mut self) {
332        self.critical_path_length = self.stages.iter().map(|s| s.max_steps).sum();
333    }
334
335    /// Merge cache statistics from a [`CacheStats`] snapshot into these stats
336    pub fn merge_cache_stats(&mut self, cache_stats: &cache::CacheStats) {
337        self.cache_hits = cache_stats.hits;
338        self.cache_misses = cache_stats.misses;
339    }
340
341    /// Cache hit rate (0.0 to 1.0); returns 0.0 when no cache lookups occurred
342    pub fn cache_hit_rate(&self) -> f64 {
343        let total = self.cache_hits + self.cache_misses;
344        if total == 0 {
345            0.0
346        } else {
347            self.cache_hits as f64 / total as f64
348        }
349    }
350}
351
352/// Decomposition strategy for breaking down tasks
353#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
354#[serde(rename_all = "snake_case")]
355#[derive(Default)]
356pub enum DecompositionStrategy {
357    /// Let the AI decide how to decompose
358    #[default]
359    Automatic,
360
361    /// Decompose by domain/specialty
362    ByDomain,
363
364    /// Decompose by data partition
365    ByData,
366
367    /// Decompose by workflow stage
368    ByStage,
369
370    /// Single agent (no decomposition)
371    None,
372}
373
374/// Result of swarm execution
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct SwarmResult {
377    /// Overall success status
378    pub success: bool,
379
380    /// Aggregated result from all sub-agents
381    pub result: String,
382
383    /// Individual sub-task results
384    pub subtask_results: Vec<SubTaskResult>,
385
386    /// Execution statistics
387    pub stats: SwarmStats,
388
389    /// Any artifacts produced
390    pub artifacts: Vec<SwarmArtifact>,
391
392    /// Error message if failed
393    pub error: Option<String>,
394}
395
396/// An artifact produced by the swarm
397#[derive(Debug, Clone, Serialize, Deserialize)]
398pub struct SwarmArtifact {
399    /// Artifact type
400    pub artifact_type: String,
401
402    /// Name/identifier
403    pub name: String,
404
405    /// Content or path
406    pub content: String,
407
408    /// Which sub-agent produced it
409    pub source_subagent: Option<String>,
410
411    /// MIME type
412    pub mime_type: Option<String>,
413}