Skip to main content

codetether_agent/swarm/
collapse_controller.rs

1//! Deterministic collapse controller for speculative swarm branches.
2//!
3//! Phase 0 focuses on low-latency, mechanical regulation:
4//! - sample active worktrees on an interval
5//! - compute a coherence score vector per branch
6//! - kill low-coherence branches deterministically
7//! - promote one branch as the current integration candidate
8
9use anyhow::{Context, Result, anyhow};
10use std::collections::{HashMap, HashSet};
11use std::path::PathBuf;
12use std::process::{Command, Stdio};
13use std::time::{Duration, Instant};
14
15/// Multi-dimensional coherence score for a branch.
16#[derive(Debug, Clone, Copy, PartialEq)]
17pub struct CoherenceScore {
18    pub compile_health: f32,
19    pub test_health: f32,
20    pub contract_alignment: f32,
21    pub diff_conflict_risk: f32,
22    pub velocity: f32,
23    pub resource_health_score: f32,
24    pub invariant_breaks: u32,
25}
26
27impl CoherenceScore {
28    pub fn collapse_score(&self) -> f32 {
29        // Penalize conflicts/invariant breaks and reward deterministic health signals.
30        let weighted = (self.compile_health * 0.29)
31            + (self.test_health * 0.14)
32            + (self.contract_alignment * 0.18)
33            + ((1.0 - self.diff_conflict_risk) * 0.14)
34            + (self.velocity * 0.08)
35            + (self.resource_health_score * 0.17);
36
37        // Hard penalty once invariant breaks begin to accumulate.
38        let invariant_penalty = (self.invariant_breaks as f32 * 0.1).clamp(0.0, 0.6);
39        (weighted - invariant_penalty).clamp(0.0, 1.0)
40    }
41}
42
43/// Runtime state for a currently active branch/worktree.
44#[derive(Debug, Clone)]
45pub struct BranchRuntimeState {
46    pub subtask_id: String,
47    pub branch: String,
48    pub worktree_path: PathBuf,
49}
50
51/// Full evaluation output for one branch.
52#[derive(Debug, Clone)]
53pub struct BranchEvaluation {
54    pub subtask_id: String,
55    pub branch: String,
56    pub score: CoherenceScore,
57    pub aggregate_score: f32,
58}
59
60/// Branch observation produced by any execution backend.
61#[derive(Debug, Clone)]
62pub struct BranchObservation {
63    pub subtask_id: String,
64    pub branch: String,
65    pub compile_ok: bool,
66    pub changed_files: HashSet<String>,
67    pub changed_lines: u32,
68    pub resource_health_score: f32,
69    pub infra_unhealthy_signals: u32,
70}
71
72/// Deterministic policy knobs for collapse behavior.
73#[derive(Debug, Clone, Copy)]
74pub struct CollapsePolicy {
75    pub max_invariant_breaks: u32,
76    pub max_consecutive_compile_failures: u32,
77    pub max_consecutive_resource_failures: u32,
78    pub min_collapse_score: f32,
79    pub min_score_gap_for_prune: f32,
80    pub min_resource_health_score: f32,
81}
82
83impl Default for CollapsePolicy {
84    fn default() -> Self {
85        Self {
86            max_invariant_breaks: 2,
87            max_consecutive_compile_failures: 1,
88            max_consecutive_resource_failures: 2,
89            min_collapse_score: 0.30,
90            min_score_gap_for_prune: 0.15,
91            min_resource_health_score: 0.35,
92        }
93    }
94}
95
96/// Deterministic kill decision.
97#[derive(Debug, Clone)]
98pub struct KillDecision {
99    pub subtask_id: String,
100    pub branch: String,
101    pub reason: String,
102}
103
104/// One collapse sampling tick output.
105#[derive(Debug, Clone, Default)]
106pub struct CollapseTick {
107    pub evaluations: Vec<BranchEvaluation>,
108    pub kills: Vec<KillDecision>,
109    pub promoted_subtask_id: Option<String>,
110}
111
112/// First-class regulator that samples branch health and applies deterministic collapse policy.
113#[derive(Debug)]
114pub struct CollapseController {
115    policy: CollapsePolicy,
116    first_seen_at: HashMap<String, Instant>,
117    consecutive_compile_failures: HashMap<String, u32>,
118    consecutive_resource_failures: HashMap<String, u32>,
119    invariant_breaks: HashMap<String, u32>,
120    promoted_subtask_id: Option<String>,
121}
122
123impl CollapseController {
124    pub fn new(policy: CollapsePolicy) -> Self {
125        Self {
126            policy,
127            first_seen_at: HashMap::new(),
128            consecutive_compile_failures: HashMap::new(),
129            consecutive_resource_failures: HashMap::new(),
130            invariant_breaks: HashMap::new(),
131            promoted_subtask_id: None,
132        }
133    }
134
135    pub fn sample(&mut self, branches: &[BranchRuntimeState]) -> Result<CollapseTick> {
136        if branches.is_empty() {
137            return Ok(CollapseTick::default());
138        }
139
140        let mut probe_handles = Vec::with_capacity(branches.len());
141        for branch in branches {
142            let subtask_id = branch.subtask_id.clone();
143            let branch_name = branch.branch.clone();
144            let worktree_path = branch.worktree_path.clone();
145            let handle = std::thread::spawn(move || -> Result<BranchObservation> {
146                Ok(BranchObservation {
147                    subtask_id,
148                    branch: branch_name,
149                    compile_ok: run_cargo_check(&worktree_path)?,
150                    changed_files: collect_changed_files(&worktree_path)?,
151                    changed_lines: collect_changed_lines(&worktree_path)?,
152                    resource_health_score: 1.0,
153                    infra_unhealthy_signals: 0,
154                })
155            });
156            probe_handles.push(handle);
157        }
158
159        let mut observations = Vec::with_capacity(branches.len());
160        for handle in probe_handles {
161            let observation = handle
162                .join()
163                .map_err(|_| anyhow!("Branch sampling thread panicked"))??;
164            observations.push(observation);
165        }
166        Ok(self.sample_observations(&observations))
167    }
168
169    pub fn sample_observations(&mut self, observations: &[BranchObservation]) -> CollapseTick {
170        if observations.is_empty() {
171            return CollapseTick::default();
172        }
173
174        let active: HashSet<&str> = observations.iter().map(|b| b.subtask_id.as_str()).collect();
175        self.first_seen_at
176            .retain(|id, _| active.contains(id.as_str()));
177        self.consecutive_compile_failures
178            .retain(|id, _| active.contains(id.as_str()));
179        self.consecutive_resource_failures
180            .retain(|id, _| active.contains(id.as_str()));
181        self.invariant_breaks
182            .retain(|id, _| active.contains(id.as_str()));
183
184        let mut changed_files: HashMap<String, HashSet<String>> = HashMap::new();
185        let mut changed_lines: HashMap<String, u32> = HashMap::new();
186        let mut compile_ok: HashMap<String, bool> = HashMap::new();
187
188        for observation in observations {
189            self.first_seen_at
190                .entry(observation.subtask_id.clone())
191                .or_insert_with(Instant::now);
192            changed_files.insert(
193                observation.subtask_id.clone(),
194                observation.changed_files.clone(),
195            );
196            changed_lines.insert(observation.subtask_id.clone(), observation.changed_lines);
197            compile_ok.insert(observation.subtask_id.clone(), observation.compile_ok);
198
199            let fail_counter = self
200                .consecutive_compile_failures
201                .entry(observation.subtask_id.clone())
202                .or_insert(0);
203            if observation.compile_ok {
204                *fail_counter = 0;
205            } else {
206                *fail_counter += 1;
207                *self
208                    .invariant_breaks
209                    .entry(observation.subtask_id.clone())
210                    .or_insert(0) += 1;
211            }
212
213            let infra_fail_counter = self
214                .consecutive_resource_failures
215                .entry(observation.subtask_id.clone())
216                .or_insert(0);
217            if observation.resource_health_score < self.policy.min_resource_health_score {
218                *infra_fail_counter += 1;
219            } else {
220                *infra_fail_counter = 0;
221            }
222            if observation.infra_unhealthy_signals > 0 {
223                *self
224                    .invariant_breaks
225                    .entry(observation.subtask_id.clone())
226                    .or_insert(0) += observation.infra_unhealthy_signals;
227            }
228        }
229
230        let mut evaluations = Vec::with_capacity(observations.len());
231        for observation in observations {
232            let my_files = changed_files
233                .get(&observation.subtask_id)
234                .cloned()
235                .unwrap_or_default();
236
237            let overlap_files = changed_files
238                .iter()
239                .filter(|(other_id, _)| *other_id != &observation.subtask_id)
240                .map(|(_, files)| my_files.intersection(files).count())
241                .sum::<usize>();
242
243            let conflict_risk = if my_files.is_empty() {
244                0.0
245            } else {
246                (overlap_files as f32 / my_files.len() as f32).clamp(0.0, 1.0)
247            };
248
249            let compile_health = if *compile_ok.get(&observation.subtask_id).unwrap_or(&false) {
250                1.0
251            } else {
252                0.0
253            };
254
255            // Phase 0 keeps test health lightweight to avoid expensive frequent test loops.
256            let test_health = if compile_health > 0.0 { 0.6 } else { 0.0 };
257            let contract_alignment = if compile_health > 0.0 {
258                (1.0 - conflict_risk).clamp(0.0, 1.0)
259            } else {
260                0.0
261            };
262
263            let elapsed_secs = self
264                .first_seen_at
265                .get(&observation.subtask_id)
266                .map(|t| t.elapsed().as_secs_f32().max(1.0))
267                .unwrap_or(1.0);
268            let lines = changed_lines
269                .get(&observation.subtask_id)
270                .copied()
271                .unwrap_or(0) as f32;
272            let velocity = (lines / elapsed_secs / 20.0).clamp(0.0, 1.0);
273
274            let invariant_breaks = *self
275                .invariant_breaks
276                .get(&observation.subtask_id)
277                .unwrap_or(&0);
278            let score = CoherenceScore {
279                compile_health,
280                test_health,
281                contract_alignment,
282                diff_conflict_risk: conflict_risk,
283                velocity,
284                resource_health_score: observation.resource_health_score.clamp(0.0, 1.0),
285                invariant_breaks,
286            };
287
288            evaluations.push(BranchEvaluation {
289                subtask_id: observation.subtask_id.clone(),
290                branch: observation.branch.clone(),
291                aggregate_score: score.collapse_score(),
292                score,
293            });
294        }
295
296        self.derive_decisions(evaluations)
297    }
298
299    fn derive_decisions(&mut self, evaluations: Vec<BranchEvaluation>) -> CollapseTick {
300        if evaluations.is_empty() {
301            self.promoted_subtask_id = None;
302            return CollapseTick::default();
303        }
304
305        let mut kills = Vec::new();
306        let mut killed_ids = HashSet::new();
307
308        for eval in &evaluations {
309            let consecutive_failures = *self
310                .consecutive_compile_failures
311                .get(&eval.subtask_id)
312                .unwrap_or(&0);
313            let consecutive_resource_failures = *self
314                .consecutive_resource_failures
315                .get(&eval.subtask_id)
316                .unwrap_or(&0);
317            if eval.score.invariant_breaks >= self.policy.max_invariant_breaks
318                || consecutive_failures >= self.policy.max_consecutive_compile_failures
319                || consecutive_resource_failures >= self.policy.max_consecutive_resource_failures
320            {
321                killed_ids.insert(eval.subtask_id.clone());
322                kills.push(KillDecision {
323                    subtask_id: eval.subtask_id.clone(),
324                    branch: eval.branch.clone(),
325                    reason: format!(
326                        "policy_threshold exceeded (invariant_breaks={}, consecutive_compile_failures={}, consecutive_resource_failures={}, resource_health_score={:.3})",
327                        eval.score.invariant_breaks, consecutive_failures, consecutive_resource_failures, eval.score.resource_health_score
328                    ),
329                });
330            }
331        }
332
333        // Never collapse every branch in a single sampling tick.
334        if killed_ids.len() == evaluations.len() {
335            if let Some(best) = evaluations
336                .iter()
337                .max_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
338            {
339                killed_ids.remove(&best.subtask_id);
340                kills.retain(|k| k.subtask_id != best.subtask_id);
341            }
342        }
343
344        let alive: Vec<&BranchEvaluation> = evaluations
345            .iter()
346            .filter(|e| !killed_ids.contains(&e.subtask_id))
347            .collect();
348
349        // If enough candidates remain, prune one low-coherence branch.
350        if alive.len() > 1 {
351            let best = alive
352                .iter()
353                .max_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
354                .copied();
355            let worst = alive
356                .iter()
357                .min_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
358                .copied();
359
360            if let (Some(best), Some(worst)) = (best, worst) {
361                let gap = best.aggregate_score - worst.aggregate_score;
362                if worst.aggregate_score <= self.policy.min_collapse_score
363                    && gap >= self.policy.min_score_gap_for_prune
364                {
365                    let newly_killed = killed_ids.insert(worst.subtask_id.clone());
366                    if newly_killed {
367                        kills.push(KillDecision {
368                            subtask_id: worst.subtask_id.clone(),
369                            branch: worst.branch.clone(),
370                            reason: format!(
371                                "low coherence pruned (score={:.3}, best={:.3}, gap={:.3})",
372                                worst.aggregate_score, best.aggregate_score, gap
373                            ),
374                        });
375                    }
376                }
377            }
378        }
379
380        let promoted_subtask_id = evaluations
381            .iter()
382            .filter(|e| !killed_ids.contains(&e.subtask_id))
383            .max_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
384            .map(|e| e.subtask_id.clone());
385
386        self.promoted_subtask_id = promoted_subtask_id.clone();
387
388        CollapseTick {
389            evaluations,
390            kills,
391            promoted_subtask_id,
392        }
393    }
394}
395
396fn run_cargo_check(worktree_path: &PathBuf) -> Result<bool> {
397    let mut child = Command::new("cargo")
398        .args(["check", "--quiet"])
399        .current_dir(worktree_path)
400        .stdout(Stdio::null())
401        .stderr(Stdio::null())
402        .spawn()
403        .with_context(|| {
404            format!(
405                "Failed to execute cargo check in {}",
406                worktree_path.display()
407            )
408        })?;
409
410    let deadline = Instant::now() + Duration::from_secs(COLLAPSE_CHECK_TIMEOUT_SECS);
411    loop {
412        if let Some(status) = child.try_wait().with_context(|| {
413            format!(
414                "Failed waiting on cargo check in {}",
415                worktree_path.display()
416            )
417        })? {
418            return Ok(status.success());
419        }
420
421        if Instant::now() >= deadline {
422            let _ = child.kill();
423            let _ = child.wait();
424            tracing::warn!(
425                worktree_path = %worktree_path.display(),
426                timeout_secs = COLLAPSE_CHECK_TIMEOUT_SECS,
427                "Collapse sampling cargo check timed out"
428            );
429            return Ok(false);
430        }
431
432        std::thread::sleep(Duration::from_millis(100));
433    }
434}
435
436const COLLAPSE_CHECK_TIMEOUT_SECS: u64 = 45;
437
438fn collect_changed_files(worktree_path: &PathBuf) -> Result<HashSet<String>> {
439    let output = Command::new("git")
440        .args(["diff", "--name-only"])
441        .current_dir(worktree_path)
442        .output()
443        .with_context(|| {
444            format!(
445                "Failed to collect changed files in {}",
446                worktree_path.display()
447            )
448        })?;
449    if !output.status.success() {
450        return Ok(HashSet::new());
451    }
452    Ok(String::from_utf8_lossy(&output.stdout)
453        .lines()
454        .filter(|line| !line.trim().is_empty())
455        .map(|line| line.to_string())
456        .collect())
457}
458
459fn collect_changed_lines(worktree_path: &PathBuf) -> Result<u32> {
460    let output = Command::new("git")
461        .args(["diff", "--numstat"])
462        .current_dir(worktree_path)
463        .output()
464        .with_context(|| {
465            format!(
466                "Failed to collect changed lines in {}",
467                worktree_path.display()
468            )
469        })?;
470    if !output.status.success() {
471        return Ok(0);
472    }
473    let mut total = 0u32;
474    for line in String::from_utf8_lossy(&output.stdout).lines() {
475        let parts: Vec<&str> = line.split('\t').collect();
476        if parts.len() < 2 {
477            continue;
478        }
479        total += parts[0].parse::<u32>().unwrap_or(0);
480        total += parts[1].parse::<u32>().unwrap_or(0);
481    }
482    Ok(total)
483}
484
485#[cfg(test)]
486mod tests {
487    use super::*;
488
489    #[test]
490    fn collapse_score_penalizes_invariants() {
491        let healthy = CoherenceScore {
492            compile_health: 1.0,
493            test_health: 0.8,
494            contract_alignment: 0.9,
495            diff_conflict_risk: 0.1,
496            velocity: 0.5,
497            resource_health_score: 1.0,
498            invariant_breaks: 0,
499        };
500        let broken = CoherenceScore {
501            invariant_breaks: 3,
502            ..healthy
503        };
504
505        assert!(healthy.collapse_score() > broken.collapse_score());
506    }
507
508    #[test]
509    fn derive_decisions_kills_repeated_failures_and_promotes_best() {
510        let mut controller = CollapseController::new(CollapsePolicy::default());
511        controller
512            .consecutive_compile_failures
513            .insert("bad".to_string(), 3);
514        controller.invariant_breaks.insert("bad".to_string(), 2);
515
516        let evals = vec![
517            BranchEvaluation {
518                subtask_id: "bad".to_string(),
519                branch: "codetether/subagent-bad".to_string(),
520                score: CoherenceScore {
521                    compile_health: 0.0,
522                    test_health: 0.0,
523                    contract_alignment: 0.0,
524                    diff_conflict_risk: 0.9,
525                    velocity: 0.1,
526                    resource_health_score: 0.2,
527                    invariant_breaks: 2,
528                },
529                aggregate_score: 0.0,
530            },
531            BranchEvaluation {
532                subtask_id: "good".to_string(),
533                branch: "codetether/subagent-good".to_string(),
534                score: CoherenceScore {
535                    compile_health: 1.0,
536                    test_health: 0.6,
537                    contract_alignment: 0.9,
538                    diff_conflict_risk: 0.1,
539                    velocity: 0.4,
540                    resource_health_score: 1.0,
541                    invariant_breaks: 0,
542                },
543                aggregate_score: 0.82,
544            },
545        ];
546
547        let tick = controller.derive_decisions(evals);
548        assert_eq!(tick.kills.len(), 1);
549        assert_eq!(tick.kills[0].subtask_id, "bad");
550        assert_eq!(tick.promoted_subtask_id.as_deref(), Some("good"));
551    }
552
553    #[test]
554    fn sample_observations_penalizes_resource_unhealth() {
555        let mut controller = CollapseController::new(CollapsePolicy::default());
556        let observations = vec![
557            BranchObservation {
558                subtask_id: "infra-bad".to_string(),
559                branch: "codetether/subagent-infra-bad".to_string(),
560                compile_ok: true,
561                changed_files: HashSet::new(),
562                changed_lines: 1,
563                resource_health_score: 0.0,
564                infra_unhealthy_signals: 2,
565            },
566            BranchObservation {
567                subtask_id: "infra-good".to_string(),
568                branch: "codetether/subagent-infra-good".to_string(),
569                compile_ok: true,
570                changed_files: HashSet::new(),
571                changed_lines: 1,
572                resource_health_score: 1.0,
573                infra_unhealthy_signals: 0,
574            },
575        ];
576
577        let tick = controller.sample_observations(&observations);
578        let bad = tick
579            .evaluations
580            .iter()
581            .find(|e| e.subtask_id == "infra-bad")
582            .expect("infra-bad evaluation");
583        let good = tick
584            .evaluations
585            .iter()
586            .find(|e| e.subtask_id == "infra-good")
587            .expect("infra-good evaluation");
588        assert!(bad.aggregate_score < good.aggregate_score);
589    }
590}