Skip to main content

codetether_agent/swarm/
mod.rs

1//! Swarm orchestration for parallel sub-agent execution
2//!
3//! Implements the SubAgent/SubTask paradigm for parallel task execution,
4//! similar to Kimi K2.5's Agent Swarm but generalized for the CodeTether ecosystem.
5//!
6//! Key concepts:
7//! - **Orchestrator**: Decomposes complex tasks into parallelizable subtasks
8//! - **SubAgent**: A dynamically instantiated agent for executing a subtask
9//! - **SubTask**: A unit of work that can be executed in parallel
10//! - **Critical Path**: Latency-oriented metric for parallel execution
11
12pub mod cache;
13pub mod collapse_controller;
14pub mod executor;
15pub mod kubernetes_executor;
16pub mod orchestrator;
17pub mod rate_limiter;
18pub mod remote_subtask;
19pub mod result_store;
20pub mod subtask;
21
22pub use cache::{CacheConfig, CacheStats, SwarmCache};
23pub use collapse_controller::{
24    BranchEvaluation, BranchObservation, BranchRuntimeState, CoherenceScore, CollapseController,
25    CollapsePolicy, CollapseTick, KillDecision,
26};
27pub use executor::{SwarmExecutor, run_agent_loop};
28pub use orchestrator::Orchestrator;
29pub use rate_limiter::{AdaptiveRateLimiter, RateLimitInfo, RateLimitStats};
30pub use result_store::{ResultStore, ResultStoreContext, SharedResult, SubTaskStoreHandle};
31pub use subtask::{SubAgent, SubTask, SubTaskContext, SubTaskResult, SubTaskStatus};
32
33use anyhow::Result;
34use async_trait::async_trait;
35
36/// Actor trait for swarm participants
37///
38/// An Actor is an entity that can participate in the swarm by receiving
39/// and processing messages. This is the base trait for all swarm participants.
40#[async_trait]
41pub trait Actor: Send + Sync {
42    /// Get the unique identifier for this actor
43    fn actor_id(&self) -> &str;
44
45    /// Get the actor's current status
46    fn actor_status(&self) -> ActorStatus;
47
48    /// Initialize the actor for swarm participation
49    async fn initialize(&mut self) -> Result<()>;
50
51    /// Shutdown the actor gracefully
52    async fn shutdown(&mut self) -> Result<()>;
53}
54
55/// Handler trait for processing messages in the swarm
56///
57/// A Handler can receive and process messages of a specific type.
58/// This enables actors to respond to different message types.
59#[async_trait]
60pub trait Handler<M>: Actor {
61    /// The response type for this handler
62    type Response: Send + Sync;
63
64    /// Handle a message and return a response
65    async fn handle(&mut self, message: M) -> Result<Self::Response>;
66}
67
68/// Status of an actor in the swarm
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum ActorStatus {
71    /// Actor is initializing
72    Initializing,
73    /// Actor is ready to process messages
74    Ready,
75    /// Actor is currently processing
76    Busy,
77    /// Actor is paused
78    Paused,
79    /// Actor is shutting down
80    ShuttingDown,
81    /// Actor has stopped
82    Stopped,
83}
84
85impl std::fmt::Display for ActorStatus {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        match self {
88            ActorStatus::Initializing => write!(f, "initializing"),
89            ActorStatus::Ready => write!(f, "ready"),
90            ActorStatus::Busy => write!(f, "busy"),
91            ActorStatus::Paused => write!(f, "paused"),
92            ActorStatus::ShuttingDown => write!(f, "shutting_down"),
93            ActorStatus::Stopped => write!(f, "stopped"),
94        }
95    }
96}
97
98/// Message types for swarm coordination
99#[derive(Debug, Clone)]
100pub enum SwarmMessage {
101    /// Execute a task
102    ExecuteTask {
103        task_id: String,
104        instruction: String,
105    },
106    /// Report progress
107    Progress {
108        task_id: String,
109        progress: f32,
110        message: String,
111    },
112    /// Task completed
113    TaskCompleted { task_id: String, result: String },
114    /// Task failed
115    TaskFailed { task_id: String, error: String },
116    /// Request tool execution
117    ToolRequest {
118        tool_id: String,
119        arguments: serde_json::Value,
120    },
121    /// Tool response
122    ToolResponse {
123        tool_id: String,
124        result: crate::tool::ToolResult,
125    },
126}
127
128use serde::{Deserialize, Serialize};
129
130/// Maximum number of concurrent sub-agents
131pub const MAX_SUBAGENTS: usize = 100;
132
133/// Maximum total tool calls across all sub-agents
134pub const MAX_TOOL_CALLS: usize = 1500;
135
136/// Swarm execution configuration
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct SwarmConfig {
139    /// Maximum number of concurrent sub-agents
140    pub max_subagents: usize,
141
142    /// Maximum tool calls per sub-agent
143    pub max_steps_per_subagent: usize,
144
145    /// Maximum total tool calls across all sub-agents
146    pub max_total_steps: usize,
147
148    /// Timeout for individual sub-agent execution (seconds)
149    pub subagent_timeout_secs: u64,
150
151    /// Whether to enable parallel execution
152    pub parallel_enabled: bool,
153
154    /// Critical path optimization threshold
155    pub critical_path_threshold: usize,
156
157    /// Model to use for sub-agents (provider/model format)
158    pub model: Option<String>,
159
160    /// Max concurrent API requests (rate limiting)
161    pub max_concurrent_requests: usize,
162
163    /// Delay between API calls in ms (rate limiting)
164    pub request_delay_ms: u64,
165
166    /// Enable worktree isolation for sub-agents
167    pub worktree_enabled: bool,
168
169    /// Automatically merge worktree changes on success
170    pub worktree_auto_merge: bool,
171
172    /// Working directory for worktree creation
173    pub working_dir: Option<String>,
174
175    /// Execution mode for sub-agent runtime
176    #[serde(default)]
177    pub execution_mode: ExecutionMode,
178
179    /// Maximum number of Kubernetes sub-agent pods active at once.
180    #[serde(default = "default_k8s_pod_budget")]
181    pub k8s_pod_budget: usize,
182
183    /// Optional container image override for Kubernetes sub-agent pods.
184    #[serde(default)]
185    pub k8s_subagent_image: Option<String>,
186}
187
188/// Sub-agent execution mode.
189#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
190#[serde(rename_all = "snake_case")]
191pub enum ExecutionMode {
192    /// Run sub-agents as local async tasks in the current process.
193    LocalThread,
194    /// Run sub-agents as isolated Kubernetes pods.
195    KubernetesPod,
196}
197
198impl ExecutionMode {
199    pub fn from_cli_value(value: &str) -> Self {
200        match value {
201            "k8s" | "kubernetes" | "kubernetes-pod" | "pod" => Self::KubernetesPod,
202            _ => Self::LocalThread,
203        }
204    }
205}
206
207impl Default for ExecutionMode {
208    fn default() -> Self {
209        Self::LocalThread
210    }
211}
212
213fn default_k8s_pod_budget() -> usize {
214    8
215}
216
217impl Default for SwarmConfig {
218    fn default() -> Self {
219        Self {
220            max_subagents: MAX_SUBAGENTS,
221            max_steps_per_subagent: 100,
222            max_total_steps: MAX_TOOL_CALLS,
223            subagent_timeout_secs: 600, // 10 minutes for complex tasks
224            parallel_enabled: true,
225            critical_path_threshold: 10,
226            model: Some("zai/glm-5".to_string()),
227            max_concurrent_requests: 3, // V1 tier allows 3 concurrent
228            request_delay_ms: 1000,     // V1 tier: 60 RPM, 3 concurrent = fast
229            worktree_enabled: true,     // Enable worktree isolation by default
230            worktree_auto_merge: true,  // Auto-merge on success
231            working_dir: None,
232            execution_mode: ExecutionMode::LocalThread,
233            k8s_pod_budget: 8,
234            k8s_subagent_image: None,
235        }
236    }
237}
238
239/// Swarm execution statistics
240#[derive(Debug, Clone, Default, Serialize, Deserialize)]
241pub struct SwarmStats {
242    /// Total number of sub-agents spawned
243    pub subagents_spawned: usize,
244
245    /// Number of sub-agents that completed successfully
246    pub subagents_completed: usize,
247
248    /// Number of sub-agents that failed
249    pub subagents_failed: usize,
250
251    /// Total tool calls across all sub-agents
252    pub total_tool_calls: usize,
253
254    /// Critical path length (longest chain of dependent steps)
255    pub critical_path_length: usize,
256
257    /// Wall-clock execution time (milliseconds)
258    pub execution_time_ms: u64,
259
260    /// Estimated sequential time (milliseconds)
261    pub sequential_time_estimate_ms: u64,
262
263    /// Parallelization speedup factor
264    pub speedup_factor: f64,
265
266    /// Per-stage statistics
267    pub stages: Vec<StageStats>,
268
269    /// Rate limiting statistics
270    pub rate_limit_stats: RateLimitStats,
271}
272
273/// Statistics for a single execution stage
274#[derive(Debug, Clone, Default, Serialize, Deserialize)]
275pub struct StageStats {
276    /// Stage index
277    pub stage: usize,
278
279    /// Number of sub-agents in this stage
280    pub subagent_count: usize,
281
282    /// Maximum steps in this stage (critical path contribution)
283    pub max_steps: usize,
284
285    /// Total steps across all sub-agents in this stage
286    pub total_steps: usize,
287
288    /// Execution time for this stage (milliseconds)
289    pub execution_time_ms: u64,
290}
291
292impl SwarmStats {
293    /// Calculate the speedup factor
294    pub fn calculate_speedup(&mut self) {
295        if self.execution_time_ms > 0 {
296            self.speedup_factor =
297                self.sequential_time_estimate_ms as f64 / self.execution_time_ms as f64;
298        }
299    }
300
301    /// Calculate critical path from stages
302    pub fn calculate_critical_path(&mut self) {
303        self.critical_path_length = self.stages.iter().map(|s| s.max_steps).sum();
304    }
305}
306
307/// Decomposition strategy for breaking down tasks
308#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
309#[serde(rename_all = "snake_case")]
310pub enum DecompositionStrategy {
311    /// Let the AI decide how to decompose
312    Automatic,
313
314    /// Decompose by domain/specialty
315    ByDomain,
316
317    /// Decompose by data partition
318    ByData,
319
320    /// Decompose by workflow stage
321    ByStage,
322
323    /// Single agent (no decomposition)
324    None,
325}
326
327impl Default for DecompositionStrategy {
328    fn default() -> Self {
329        Self::Automatic
330    }
331}
332
333/// Result of swarm execution
334#[derive(Debug, Clone, Serialize, Deserialize)]
335pub struct SwarmResult {
336    /// Overall success status
337    pub success: bool,
338
339    /// Aggregated result from all sub-agents
340    pub result: String,
341
342    /// Individual sub-task results
343    pub subtask_results: Vec<SubTaskResult>,
344
345    /// Execution statistics
346    pub stats: SwarmStats,
347
348    /// Any artifacts produced
349    pub artifacts: Vec<SwarmArtifact>,
350
351    /// Error message if failed
352    pub error: Option<String>,
353}
354
355/// An artifact produced by the swarm
356#[derive(Debug, Clone, Serialize, Deserialize)]
357pub struct SwarmArtifact {
358    /// Artifact type
359    pub artifact_type: String,
360
361    /// Name/identifier
362    pub name: String,
363
364    /// Content or path
365    pub content: String,
366
367    /// Which sub-agent produced it
368    pub source_subagent: Option<String>,
369
370    /// MIME type
371    pub mime_type: Option<String>,
372}