Skip to main content

codetether_agent/swarm/
collapse_controller.rs

1//! Deterministic collapse controller for speculative swarm branches.
2//!
3//! Phase 0 focuses on low-latency, mechanical regulation:
4//! - sample active worktrees on an interval
5//! - compute a coherence score vector per branch
6//! - kill low-coherence branches deterministically
7//! - promote one branch as the current integration candidate
8
9use anyhow::{Context, Result, anyhow};
10use std::collections::{HashMap, HashSet};
11use std::path::PathBuf;
12use std::process::{Command, Stdio};
13use std::time::{Duration, Instant};
14
15/// Multi-dimensional coherence score for a branch.
16#[derive(Debug, Clone, Copy, PartialEq)]
17pub struct CoherenceScore {
18    pub compile_health: f32,
19    pub test_health: f32,
20    pub contract_alignment: f32,
21    pub diff_conflict_risk: f32,
22    pub velocity: f32,
23    pub resource_health_score: f32,
24    pub invariant_breaks: u32,
25}
26
27impl CoherenceScore {
28    pub fn collapse_score(&self) -> f32 {
29        // Penalize conflicts/invariant breaks and reward deterministic health signals.
30        let weighted = (self.compile_health * 0.29)
31            + (self.test_health * 0.14)
32            + (self.contract_alignment * 0.18)
33            + ((1.0 - self.diff_conflict_risk) * 0.14)
34            + (self.velocity * 0.08)
35            + (self.resource_health_score * 0.17);
36
37        // Hard penalty once invariant breaks begin to accumulate.
38        let invariant_penalty = (self.invariant_breaks as f32 * 0.1).clamp(0.0, 0.6);
39        (weighted - invariant_penalty).clamp(0.0, 1.0)
40    }
41}
42
43/// Runtime state for a currently active branch/worktree.
44#[derive(Debug, Clone)]
45pub struct BranchRuntimeState {
46    pub subtask_id: String,
47    pub branch: String,
48    pub worktree_path: PathBuf,
49}
50
51/// Full evaluation output for one branch.
52#[derive(Debug, Clone)]
53pub struct BranchEvaluation {
54    pub subtask_id: String,
55    pub branch: String,
56    pub score: CoherenceScore,
57    pub aggregate_score: f32,
58}
59
60/// Branch observation produced by any execution backend.
61#[derive(Debug, Clone)]
62pub struct BranchObservation {
63    pub subtask_id: String,
64    pub branch: String,
65    pub compile_ok: bool,
66    pub changed_files: HashSet<String>,
67    pub changed_lines: u32,
68    pub resource_health_score: f32,
69    pub infra_unhealthy_signals: u32,
70}
71
72/// Deterministic policy knobs for collapse behavior.
73#[derive(Debug, Clone, Copy)]
74pub struct CollapsePolicy {
75    pub max_invariant_breaks: u32,
76    pub max_consecutive_compile_failures: u32,
77    pub max_consecutive_resource_failures: u32,
78    pub min_collapse_score: f32,
79    pub min_score_gap_for_prune: f32,
80    pub min_resource_health_score: f32,
81}
82
83impl Default for CollapsePolicy {
84    fn default() -> Self {
85        Self {
86            max_invariant_breaks: 2,
87            max_consecutive_compile_failures: 1,
88            max_consecutive_resource_failures: 2,
89            min_collapse_score: 0.30,
90            min_score_gap_for_prune: 0.15,
91            min_resource_health_score: 0.35,
92        }
93    }
94}
95
96/// Deterministic kill decision.
97#[derive(Debug, Clone)]
98pub struct KillDecision {
99    pub subtask_id: String,
100    pub branch: String,
101    pub reason: String,
102}
103
104/// One collapse sampling tick output.
105#[derive(Debug, Clone, Default)]
106pub struct CollapseTick {
107    pub evaluations: Vec<BranchEvaluation>,
108    pub kills: Vec<KillDecision>,
109    pub promoted_subtask_id: Option<String>,
110}
111
112/// First-class regulator that samples branch health and applies deterministic collapse policy.
113#[derive(Debug)]
114pub struct CollapseController {
115    policy: CollapsePolicy,
116    first_seen_at: HashMap<String, Instant>,
117    consecutive_compile_failures: HashMap<String, u32>,
118    consecutive_resource_failures: HashMap<String, u32>,
119    invariant_breaks: HashMap<String, u32>,
120    promoted_subtask_id: Option<String>,
121}
122
123impl CollapseController {
124    pub fn new(policy: CollapsePolicy) -> Self {
125        Self {
126            policy,
127            first_seen_at: HashMap::new(),
128            consecutive_compile_failures: HashMap::new(),
129            consecutive_resource_failures: HashMap::new(),
130            invariant_breaks: HashMap::new(),
131            promoted_subtask_id: None,
132        }
133    }
134
135    pub fn sample(&mut self, branches: &[BranchRuntimeState]) -> Result<CollapseTick> {
136        if branches.is_empty() {
137            return Ok(CollapseTick::default());
138        }
139
140        let mut probe_handles = Vec::with_capacity(branches.len());
141        for branch in branches {
142            let subtask_id = branch.subtask_id.clone();
143            let branch_name = branch.branch.clone();
144            let worktree_path = branch.worktree_path.clone();
145            let handle = std::thread::spawn(move || -> Result<BranchObservation> {
146                Ok(BranchObservation {
147                    subtask_id,
148                    branch: branch_name,
149                    compile_ok: run_cargo_check(&worktree_path)?,
150                    changed_files: collect_changed_files(&worktree_path)?,
151                    changed_lines: collect_changed_lines(&worktree_path)?,
152                    resource_health_score: 1.0,
153                    infra_unhealthy_signals: 0,
154                })
155            });
156            probe_handles.push(handle);
157        }
158
159        let mut observations = Vec::with_capacity(branches.len());
160        for handle in probe_handles {
161            let observation = handle
162                .join()
163                .map_err(|_| anyhow!("Branch sampling thread panicked"))??;
164            observations.push(observation);
165        }
166        Ok(self.sample_observations(&observations))
167    }
168
169    pub fn sample_observations(&mut self, observations: &[BranchObservation]) -> CollapseTick {
170        if observations.is_empty() {
171            return CollapseTick::default();
172        }
173
174        let active: HashSet<&str> = observations.iter().map(|b| b.subtask_id.as_str()).collect();
175        self.first_seen_at
176            .retain(|id, _| active.contains(id.as_str()));
177        self.consecutive_compile_failures
178            .retain(|id, _| active.contains(id.as_str()));
179        self.consecutive_resource_failures
180            .retain(|id, _| active.contains(id.as_str()));
181        self.invariant_breaks
182            .retain(|id, _| active.contains(id.as_str()));
183
184        let mut changed_files: HashMap<String, HashSet<String>> = HashMap::new();
185        let mut changed_lines: HashMap<String, u32> = HashMap::new();
186        let mut compile_ok: HashMap<String, bool> = HashMap::new();
187
188        for observation in observations {
189            self.first_seen_at
190                .entry(observation.subtask_id.clone())
191                .or_insert_with(Instant::now);
192            changed_files.insert(
193                observation.subtask_id.clone(),
194                observation.changed_files.clone(),
195            );
196            changed_lines.insert(observation.subtask_id.clone(), observation.changed_lines);
197            compile_ok.insert(observation.subtask_id.clone(), observation.compile_ok);
198
199            let fail_counter = self
200                .consecutive_compile_failures
201                .entry(observation.subtask_id.clone())
202                .or_insert(0);
203            if observation.compile_ok {
204                *fail_counter = 0;
205            } else {
206                *fail_counter += 1;
207                *self
208                    .invariant_breaks
209                    .entry(observation.subtask_id.clone())
210                    .or_insert(0) += 1;
211            }
212
213            let infra_fail_counter = self
214                .consecutive_resource_failures
215                .entry(observation.subtask_id.clone())
216                .or_insert(0);
217            if observation.resource_health_score < self.policy.min_resource_health_score {
218                *infra_fail_counter += 1;
219            } else {
220                *infra_fail_counter = 0;
221            }
222            if observation.infra_unhealthy_signals > 0 {
223                *self
224                    .invariant_breaks
225                    .entry(observation.subtask_id.clone())
226                    .or_insert(0) += observation.infra_unhealthy_signals;
227            }
228        }
229
230        let mut evaluations = Vec::with_capacity(observations.len());
231        for observation in observations {
232            let my_files = changed_files
233                .get(&observation.subtask_id)
234                .cloned()
235                .unwrap_or_default();
236
237            let overlap_files = changed_files
238                .iter()
239                .filter(|(other_id, _)| *other_id != &observation.subtask_id)
240                .map(|(_, files)| my_files.intersection(files).count())
241                .sum::<usize>();
242
243            let conflict_risk = if my_files.is_empty() {
244                0.0
245            } else {
246                (overlap_files as f32 / my_files.len() as f32).clamp(0.0, 1.0)
247            };
248
249            let compile_health = if *compile_ok.get(&observation.subtask_id).unwrap_or(&false) {
250                1.0
251            } else {
252                0.0
253            };
254
255            // Phase 0 keeps test health lightweight to avoid expensive frequent test loops.
256            let test_health = if compile_health > 0.0 { 0.6 } else { 0.0 };
257            let contract_alignment = if compile_health > 0.0 {
258                (1.0 - conflict_risk).clamp(0.0, 1.0)
259            } else {
260                0.0
261            };
262
263            let elapsed_secs = self
264                .first_seen_at
265                .get(&observation.subtask_id)
266                .map(|t| t.elapsed().as_secs_f32().max(1.0))
267                .unwrap_or(1.0);
268            let lines = changed_lines
269                .get(&observation.subtask_id)
270                .copied()
271                .unwrap_or(0) as f32;
272            let velocity = (lines / elapsed_secs / 20.0).clamp(0.0, 1.0);
273
274            let invariant_breaks = *self
275                .invariant_breaks
276                .get(&observation.subtask_id)
277                .unwrap_or(&0);
278            let score = CoherenceScore {
279                compile_health,
280                test_health,
281                contract_alignment,
282                diff_conflict_risk: conflict_risk,
283                velocity,
284                resource_health_score: observation.resource_health_score.clamp(0.0, 1.0),
285                invariant_breaks,
286            };
287
288            evaluations.push(BranchEvaluation {
289                subtask_id: observation.subtask_id.clone(),
290                branch: observation.branch.clone(),
291                aggregate_score: score.collapse_score(),
292                score,
293            });
294        }
295
296        self.derive_decisions(evaluations)
297    }
298
299    fn derive_decisions(&mut self, evaluations: Vec<BranchEvaluation>) -> CollapseTick {
300        if evaluations.is_empty() {
301            self.promoted_subtask_id = None;
302            return CollapseTick::default();
303        }
304
305        let mut kills = Vec::new();
306        let mut killed_ids = HashSet::new();
307
308        for eval in &evaluations {
309            let consecutive_failures = *self
310                .consecutive_compile_failures
311                .get(&eval.subtask_id)
312                .unwrap_or(&0);
313            let consecutive_resource_failures = *self
314                .consecutive_resource_failures
315                .get(&eval.subtask_id)
316                .unwrap_or(&0);
317            if eval.score.invariant_breaks >= self.policy.max_invariant_breaks
318                || consecutive_failures >= self.policy.max_consecutive_compile_failures
319                || consecutive_resource_failures >= self.policy.max_consecutive_resource_failures
320            {
321                killed_ids.insert(eval.subtask_id.clone());
322                kills.push(KillDecision {
323                    subtask_id: eval.subtask_id.clone(),
324                    branch: eval.branch.clone(),
325                    reason: format!(
326                        "policy_threshold exceeded (invariant_breaks={}, consecutive_compile_failures={}, consecutive_resource_failures={}, resource_health_score={:.3})",
327                        eval.score.invariant_breaks, consecutive_failures, consecutive_resource_failures, eval.score.resource_health_score
328                    ),
329                });
330            }
331        }
332
333        // Never collapse every branch in a single sampling tick.
334        if killed_ids.len() == evaluations.len()
335            && let Some(best) = evaluations
336                .iter()
337                .max_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
338        {
339            killed_ids.remove(&best.subtask_id);
340            kills.retain(|k| k.subtask_id != best.subtask_id);
341        }
342
343        let alive: Vec<&BranchEvaluation> = evaluations
344            .iter()
345            .filter(|e| !killed_ids.contains(&e.subtask_id))
346            .collect();
347
348        // If enough candidates remain, prune one low-coherence branch.
349        if alive.len() > 1 {
350            let best = alive
351                .iter()
352                .max_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
353                .copied();
354            let worst = alive
355                .iter()
356                .min_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
357                .copied();
358
359            if let (Some(best), Some(worst)) = (best, worst) {
360                let gap = best.aggregate_score - worst.aggregate_score;
361                if worst.aggregate_score <= self.policy.min_collapse_score
362                    && gap >= self.policy.min_score_gap_for_prune
363                {
364                    let newly_killed = killed_ids.insert(worst.subtask_id.clone());
365                    if newly_killed {
366                        kills.push(KillDecision {
367                            subtask_id: worst.subtask_id.clone(),
368                            branch: worst.branch.clone(),
369                            reason: format!(
370                                "low coherence pruned (score={:.3}, best={:.3}, gap={:.3})",
371                                worst.aggregate_score, best.aggregate_score, gap
372                            ),
373                        });
374                    }
375                }
376            }
377        }
378
379        let promoted_subtask_id = evaluations
380            .iter()
381            .filter(|e| !killed_ids.contains(&e.subtask_id))
382            .max_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
383            .map(|e| e.subtask_id.clone());
384
385        self.promoted_subtask_id = promoted_subtask_id.clone();
386
387        CollapseTick {
388            evaluations,
389            kills,
390            promoted_subtask_id,
391        }
392    }
393}
394
395fn run_cargo_check(worktree_path: &PathBuf) -> Result<bool> {
396    let mut child = Command::new("cargo")
397        .args(["check", "--quiet"])
398        .current_dir(worktree_path)
399        .stdout(Stdio::null())
400        .stderr(Stdio::null())
401        .spawn()
402        .with_context(|| {
403            format!(
404                "Failed to execute cargo check in {}",
405                worktree_path.display()
406            )
407        })?;
408
409    let deadline = Instant::now() + Duration::from_secs(COLLAPSE_CHECK_TIMEOUT_SECS);
410    loop {
411        if let Some(status) = child.try_wait().with_context(|| {
412            format!(
413                "Failed waiting on cargo check in {}",
414                worktree_path.display()
415            )
416        })? {
417            return Ok(status.success());
418        }
419
420        if Instant::now() >= deadline {
421            let _ = child.kill();
422            let _ = child.wait();
423            tracing::warn!(
424                worktree_path = %worktree_path.display(),
425                timeout_secs = COLLAPSE_CHECK_TIMEOUT_SECS,
426                "Collapse sampling cargo check timed out"
427            );
428            return Ok(false);
429        }
430
431        std::thread::sleep(Duration::from_millis(100));
432    }
433}
434
435const COLLAPSE_CHECK_TIMEOUT_SECS: u64 = 45;
436
437fn collect_changed_files(worktree_path: &PathBuf) -> Result<HashSet<String>> {
438    let output = Command::new("git")
439        .args(["diff", "--name-only"])
440        .current_dir(worktree_path)
441        .output()
442        .with_context(|| {
443            format!(
444                "Failed to collect changed files in {}",
445                worktree_path.display()
446            )
447        })?;
448    if !output.status.success() {
449        return Ok(HashSet::new());
450    }
451    Ok(String::from_utf8_lossy(&output.stdout)
452        .lines()
453        .filter(|line| !line.trim().is_empty())
454        .map(|line| line.to_string())
455        .collect())
456}
457
458fn collect_changed_lines(worktree_path: &PathBuf) -> Result<u32> {
459    let output = Command::new("git")
460        .args(["diff", "--numstat"])
461        .current_dir(worktree_path)
462        .output()
463        .with_context(|| {
464            format!(
465                "Failed to collect changed lines in {}",
466                worktree_path.display()
467            )
468        })?;
469    if !output.status.success() {
470        return Ok(0);
471    }
472    let mut total = 0u32;
473    for line in String::from_utf8_lossy(&output.stdout).lines() {
474        let parts: Vec<&str> = line.split('\t').collect();
475        if parts.len() < 2 {
476            continue;
477        }
478        total += parts[0].parse::<u32>().unwrap_or(0);
479        total += parts[1].parse::<u32>().unwrap_or(0);
480    }
481    Ok(total)
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487
488    #[test]
489    fn collapse_score_penalizes_invariants() {
490        let healthy = CoherenceScore {
491            compile_health: 1.0,
492            test_health: 0.8,
493            contract_alignment: 0.9,
494            diff_conflict_risk: 0.1,
495            velocity: 0.5,
496            resource_health_score: 1.0,
497            invariant_breaks: 0,
498        };
499        let broken = CoherenceScore {
500            invariant_breaks: 3,
501            ..healthy
502        };
503
504        assert!(healthy.collapse_score() > broken.collapse_score());
505    }
506
507    #[test]
508    fn derive_decisions_kills_repeated_failures_and_promotes_best() {
509        let mut controller = CollapseController::new(CollapsePolicy::default());
510        controller
511            .consecutive_compile_failures
512            .insert("bad".to_string(), 3);
513        controller.invariant_breaks.insert("bad".to_string(), 2);
514
515        let evals = vec![
516            BranchEvaluation {
517                subtask_id: "bad".to_string(),
518                branch: "codetether/subagent-bad".to_string(),
519                score: CoherenceScore {
520                    compile_health: 0.0,
521                    test_health: 0.0,
522                    contract_alignment: 0.0,
523                    diff_conflict_risk: 0.9,
524                    velocity: 0.1,
525                    resource_health_score: 0.2,
526                    invariant_breaks: 2,
527                },
528                aggregate_score: 0.0,
529            },
530            BranchEvaluation {
531                subtask_id: "good".to_string(),
532                branch: "codetether/subagent-good".to_string(),
533                score: CoherenceScore {
534                    compile_health: 1.0,
535                    test_health: 0.6,
536                    contract_alignment: 0.9,
537                    diff_conflict_risk: 0.1,
538                    velocity: 0.4,
539                    resource_health_score: 1.0,
540                    invariant_breaks: 0,
541                },
542                aggregate_score: 0.82,
543            },
544        ];
545
546        let tick = controller.derive_decisions(evals);
547        assert_eq!(tick.kills.len(), 1);
548        assert_eq!(tick.kills[0].subtask_id, "bad");
549        assert_eq!(tick.promoted_subtask_id.as_deref(), Some("good"));
550    }
551
552    #[test]
553    fn sample_observations_penalizes_resource_unhealth() {
554        let mut controller = CollapseController::new(CollapsePolicy::default());
555        let observations = vec![
556            BranchObservation {
557                subtask_id: "infra-bad".to_string(),
558                branch: "codetether/subagent-infra-bad".to_string(),
559                compile_ok: true,
560                changed_files: HashSet::new(),
561                changed_lines: 1,
562                resource_health_score: 0.0,
563                infra_unhealthy_signals: 2,
564            },
565            BranchObservation {
566                subtask_id: "infra-good".to_string(),
567                branch: "codetether/subagent-infra-good".to_string(),
568                compile_ok: true,
569                changed_files: HashSet::new(),
570                changed_lines: 1,
571                resource_health_score: 1.0,
572                infra_unhealthy_signals: 0,
573            },
574        ];
575
576        let tick = controller.sample_observations(&observations);
577        let bad = tick
578            .evaluations
579            .iter()
580            .find(|e| e.subtask_id == "infra-bad")
581            .expect("infra-bad evaluation");
582        let good = tick
583            .evaluations
584            .iter()
585            .find(|e| e.subtask_id == "infra-good")
586            .expect("infra-good evaluation");
587        assert!(bad.aggregate_score < good.aggregate_score);
588    }
589}