Skip to main content

codetether_agent/swarm/
mod.rs

1//! Swarm orchestration for parallel sub-agent execution
2//!
3//! Implements the SubAgent/SubTask paradigm for parallel task execution,
4//! similar to Kimi K2.5's Agent Swarm but generalized for the CodeTether ecosystem.
5//!
6//! Key concepts:
7//! - **Orchestrator**: Decomposes complex tasks into parallelizable subtasks
8//! - **SubAgent**: A dynamically instantiated agent for executing a subtask
9//! - **SubTask**: A unit of work that can be executed in parallel
10//! - **Critical Path**: Latency-oriented metric for parallel execution
11
12pub mod cache;
13pub mod collapse_controller;
14pub mod delegation;
15pub mod delegation_outcome;
16pub mod executor;
17pub mod k8s_result;
18pub mod kubernetes_executor;
19mod live_bus; pub mod orchestrator;
20pub mod rate_limiter;
21pub mod remote_subtask;
22pub mod result_store;
23pub mod speculative;
24pub mod subtask;
25pub mod token_exhaustion;
26pub mod token_truncate;
27mod tool_policy;
28pub mod validation;
29
30pub use cache::{CacheConfig, CacheStats, SwarmCache};
31pub use collapse_controller::{
32    BranchEvaluation, BranchObservation, BranchRuntimeState, CoherenceScore, CollapseController,
33    CollapsePolicy, CollapseTick, KillDecision,
34};
35pub use executor::{SwarmExecutor, run_agent_loop};
36pub use orchestrator::Orchestrator;
37pub use rate_limiter::{AdaptiveRateLimiter, RateLimitInfo, RateLimitStats};
38pub use result_store::{ResultStore, ResultStoreContext, SharedResult, SubTaskStoreHandle};
39pub use subtask::{SubAgent, SubTask, SubTaskContext, SubTaskResult, SubTaskStatus};
40
41use anyhow::Result;
42use async_trait::async_trait;
43
44/// Actor trait for swarm participants
45///
46/// An Actor is an entity that can participate in the swarm by receiving
47/// and processing messages. This is the base trait for all swarm participants.
48#[async_trait]
49pub trait Actor: Send + Sync {
50    /// Get the unique identifier for this actor
51    fn actor_id(&self) -> &str;
52
53    /// Get the actor's current status
54    fn actor_status(&self) -> ActorStatus;
55
56    /// Initialize the actor for swarm participation
57    async fn initialize(&mut self) -> Result<()>;
58
59    /// Shutdown the actor gracefully
60    async fn shutdown(&mut self) -> Result<()>;
61}
62
63/// Handler trait for processing messages in the swarm
64///
65/// A Handler can receive and process messages of a specific type.
66/// This enables actors to respond to different message types.
67#[async_trait]
68pub trait Handler<M>: Actor {
69    /// The response type for this handler
70    type Response: Send + Sync;
71
72    /// Handle a message and return a response
73    async fn handle(&mut self, message: M) -> Result<Self::Response>;
74}
75
76/// Status of an actor in the swarm
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum ActorStatus {
79    /// Actor is initializing
80    Initializing,
81    /// Actor is ready to process messages
82    Ready,
83    /// Actor is currently processing
84    Busy,
85    /// Actor is paused
86    Paused,
87    /// Actor is shutting down
88    ShuttingDown,
89    /// Actor has stopped
90    Stopped,
91}
92
93impl std::fmt::Display for ActorStatus {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        match self {
96            ActorStatus::Initializing => write!(f, "initializing"),
97            ActorStatus::Ready => write!(f, "ready"),
98            ActorStatus::Busy => write!(f, "busy"),
99            ActorStatus::Paused => write!(f, "paused"),
100            ActorStatus::ShuttingDown => write!(f, "shutting_down"),
101            ActorStatus::Stopped => write!(f, "stopped"),
102        }
103    }
104}
105
106/// Message types for swarm coordination
107#[derive(Debug, Clone)]
108pub enum SwarmMessage {
109    /// Execute a task
110    ExecuteTask {
111        task_id: String,
112        instruction: String,
113    },
114    /// Report progress
115    Progress {
116        task_id: String,
117        progress: f32,
118        message: String,
119    },
120    /// Task completed
121    TaskCompleted { task_id: String, result: String },
122    /// Task failed
123    TaskFailed { task_id: String, error: String },
124    /// Request tool execution
125    ToolRequest {
126        tool_id: String,
127        arguments: serde_json::Value,
128    },
129    /// Tool response
130    ToolResponse {
131        tool_id: String,
132        result: crate::tool::ToolResult,
133    },
134}
135
136use serde::{Deserialize, Serialize};
137
138/// Maximum number of concurrent sub-agents
139pub const MAX_SUBAGENTS: usize = 100;
140
141/// Maximum total tool calls across all sub-agents
142pub const MAX_TOOL_CALLS: usize = 1500;
143
144/// Swarm execution configuration
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct SwarmConfig {
147    /// Maximum number of concurrent sub-agents
148    pub max_subagents: usize,
149
150    /// Maximum tool calls per sub-agent
151    pub max_steps_per_subagent: usize,
152
153    /// Maximum total tool calls across all sub-agents
154    pub max_total_steps: usize,
155
156    /// Timeout for individual sub-agent execution (seconds)
157    pub subagent_timeout_secs: u64,
158
159    /// Whether to enable parallel execution
160    pub parallel_enabled: bool,
161
162    /// Critical path optimization threshold
163    pub critical_path_threshold: usize,
164
165    /// Model to use for sub-agents (provider/model format)
166    pub model: Option<String>,
167
168    /// Max concurrent API requests (rate limiting)
169    pub max_concurrent_requests: usize,
170
171    /// Delay between API calls in ms (rate limiting)
172    pub request_delay_ms: u64,
173
174    /// Enable worktree isolation for sub-agents
175    pub worktree_enabled: bool,
176
177    /// Automatically merge worktree changes on success
178    pub worktree_auto_merge: bool,
179
180    /// Working directory for worktree creation
181    pub working_dir: Option<String>,
182
183    /// Execution mode for sub-agent runtime
184    #[serde(default)]
185    pub execution_mode: ExecutionMode,
186
187    /// Maximum number of Kubernetes sub-agent pods active at once.
188    #[serde(default = "default_k8s_pod_budget")]
189    pub k8s_pod_budget: usize,
190
191    /// Optional container image override for Kubernetes sub-agent pods.
192    #[serde(default)]
193    pub k8s_subagent_image: Option<String>,
194
195    /// Maximum number of retry attempts for transient failures (0 = no retries)
196    #[serde(default = "default_max_retries")]
197    pub max_retries: u32,
198
199    /// Base delay in milliseconds for the first retry (exponential backoff)
200    #[serde(default = "default_base_delay_ms")]
201    pub base_delay_ms: u64,
202
203    /// Maximum delay in milliseconds between retries (caps exponential growth)
204    #[serde(default = "default_max_delay_ms")]
205    pub max_delay_ms: u64,
206}
207
208/// Sub-agent execution mode.
209#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
210#[serde(rename_all = "snake_case")]
211#[derive(Default)]
212pub enum ExecutionMode {
213    /// Run sub-agents as local async tasks in the current process.
214    #[default]
215    LocalThread,
216    /// Run sub-agents as isolated Kubernetes pods.
217    KubernetesPod,
218}
219
220impl ExecutionMode {
221    pub fn from_cli_value(value: &str) -> Self {
222        match value {
223            "k8s" | "kubernetes" | "kubernetes-pod" | "pod" => Self::KubernetesPod,
224            _ => Self::LocalThread,
225        }
226    }
227}
228
229fn default_k8s_pod_budget() -> usize {
230    8
231}
232
233fn default_max_retries() -> u32 {
234    3
235}
236
237fn default_base_delay_ms() -> u64 {
238    500
239}
240
241fn default_max_delay_ms() -> u64 {
242    30_000
243}
244
245impl Default for SwarmConfig {
246    fn default() -> Self {
247        Self {
248            max_subagents: MAX_SUBAGENTS,
249            max_steps_per_subagent: 100,
250            max_total_steps: MAX_TOOL_CALLS,
251            subagent_timeout_secs: 600, // 10 minutes for complex tasks
252            parallel_enabled: true,
253            critical_path_threshold: 10,
254            model: None,
255            // Concurrency tuned for modern paid tiers and multi-agent swarms.
256            // Override via SwarmConfigBuilder for conservative free-tier
257            // accounts (e.g. 3 concurrent, 1000ms delay).
258            max_concurrent_requests: 8,
259            request_delay_ms: 250,
260            worktree_enabled: true,    // Enable worktree isolation by default
261            worktree_auto_merge: true, // Auto-merge on success
262            working_dir: None,
263            execution_mode: ExecutionMode::LocalThread,
264            k8s_pod_budget: 8,
265            k8s_subagent_image: None,
266            max_retries: 3,
267            base_delay_ms: 500,
268            max_delay_ms: 30_000,
269        }
270    }
271}
272
273/// Swarm execution statistics
274#[derive(Debug, Clone, Default, Serialize, Deserialize)]
275pub struct SwarmStats {
276    /// Total number of sub-agents spawned
277    pub subagents_spawned: usize,
278
279    /// Number of sub-agents that completed successfully
280    pub subagents_completed: usize,
281
282    /// Number of sub-agents that failed
283    pub subagents_failed: usize,
284
285    /// Total tool calls across all sub-agents
286    pub total_tool_calls: usize,
287
288    /// Critical path length (longest chain of dependent steps)
289    pub critical_path_length: usize,
290
291    /// Wall-clock execution time (milliseconds)
292    pub execution_time_ms: u64,
293
294    /// Estimated sequential time (milliseconds)
295    pub sequential_time_estimate_ms: u64,
296
297    /// Parallelization speedup factor
298    pub speedup_factor: f64,
299
300    /// Per-stage statistics
301    pub stages: Vec<StageStats>,
302
303    /// Rate limiting statistics
304    pub rate_limit_stats: RateLimitStats,
305
306    /// Number of cache hits (subtasks served from cache)
307    pub cache_hits: u64,
308
309    /// Number of cache misses (subtasks that required execution)
310    pub cache_misses: u64,
311}
312
313/// Statistics for a single execution stage
314#[derive(Debug, Clone, Default, Serialize, Deserialize)]
315pub struct StageStats {
316    /// Stage index
317    pub stage: usize,
318
319    /// Number of sub-agents in this stage
320    pub subagent_count: usize,
321
322    /// Maximum steps in this stage (critical path contribution)
323    pub max_steps: usize,
324
325    /// Total steps across all sub-agents in this stage
326    pub total_steps: usize,
327
328    /// Execution time for this stage (milliseconds)
329    pub execution_time_ms: u64,
330}
331
332impl SwarmStats {
333    /// Calculate the speedup factor
334    pub fn calculate_speedup(&mut self) {
335        if self.execution_time_ms > 0 {
336            self.speedup_factor =
337                self.sequential_time_estimate_ms as f64 / self.execution_time_ms as f64;
338        }
339    }
340
341    /// Calculate critical path from stages
342    pub fn calculate_critical_path(&mut self) {
343        self.critical_path_length = self.stages.iter().map(|s| s.max_steps).sum();
344    }
345
346    /// Merge cache statistics from a [`CacheStats`] snapshot into these stats
347    pub fn merge_cache_stats(&mut self, cache_stats: &cache::CacheStats) {
348        self.cache_hits = cache_stats.hits;
349        self.cache_misses = cache_stats.misses;
350    }
351
352    /// Cache hit rate (0.0 to 1.0); returns 0.0 when no cache lookups occurred
353    pub fn cache_hit_rate(&self) -> f64 {
354        let total = self.cache_hits + self.cache_misses;
355        if total == 0 {
356            0.0
357        } else {
358            self.cache_hits as f64 / total as f64
359        }
360    }
361}
362
363/// Decomposition strategy for breaking down tasks
364#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
365#[serde(rename_all = "snake_case")]
366#[derive(Default)]
367pub enum DecompositionStrategy {
368    /// Let the AI decide how to decompose
369    #[default]
370    Automatic,
371
372    /// Decompose by domain/specialty
373    ByDomain,
374
375    /// Decompose by data partition
376    ByData,
377
378    /// Decompose by workflow stage
379    ByStage,
380
381    /// Single agent (no decomposition)
382    None,
383}
384
385/// Result of swarm execution
386#[derive(Debug, Clone, Serialize, Deserialize)]
387pub struct SwarmResult {
388    /// Overall success status
389    pub success: bool,
390
391    /// Aggregated result from all sub-agents
392    pub result: String,
393
394    /// Individual sub-task results
395    pub subtask_results: Vec<SubTaskResult>,
396
397    /// Execution statistics
398    pub stats: SwarmStats,
399
400    /// Any artifacts produced
401    pub artifacts: Vec<SwarmArtifact>,
402
403    /// Error message if failed
404    pub error: Option<String>,
405}
406
407/// An artifact produced by the swarm
408#[derive(Debug, Clone, Serialize, Deserialize)]
409pub struct SwarmArtifact {
410    /// Artifact type
411    pub artifact_type: String,
412
413    /// Name/identifier
414    pub name: String,
415
416    /// Content or path
417    pub content: String,
418
419    /// Which sub-agent produced it
420    pub source_subagent: Option<String>,
421
422    /// MIME type
423    pub mime_type: Option<String>,
424}