1use anyhow::{Context, Result, anyhow};
10use std::collections::{HashMap, HashSet};
11use std::path::PathBuf;
12use std::process::{Command, Stdio};
13use std::time::{Duration, Instant};
14
15#[derive(Debug, Clone, Copy, PartialEq)]
17pub struct CoherenceScore {
18 pub compile_health: f32,
19 pub test_health: f32,
20 pub contract_alignment: f32,
21 pub diff_conflict_risk: f32,
22 pub velocity: f32,
23 pub resource_health_score: f32,
24 pub invariant_breaks: u32,
25}
26
27impl CoherenceScore {
28 pub fn collapse_score(&self) -> f32 {
29 let weighted = (self.compile_health * 0.29)
31 + (self.test_health * 0.14)
32 + (self.contract_alignment * 0.18)
33 + ((1.0 - self.diff_conflict_risk) * 0.14)
34 + (self.velocity * 0.08)
35 + (self.resource_health_score * 0.17);
36
37 let invariant_penalty = (self.invariant_breaks as f32 * 0.1).clamp(0.0, 0.6);
39 (weighted - invariant_penalty).clamp(0.0, 1.0)
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct BranchRuntimeState {
46 pub subtask_id: String,
47 pub branch: String,
48 pub worktree_path: PathBuf,
49}
50
51#[derive(Debug, Clone)]
53pub struct BranchEvaluation {
54 pub subtask_id: String,
55 pub branch: String,
56 pub score: CoherenceScore,
57 pub aggregate_score: f32,
58}
59
60#[derive(Debug, Clone)]
62pub struct BranchObservation {
63 pub subtask_id: String,
64 pub branch: String,
65 pub compile_ok: bool,
66 pub changed_files: HashSet<String>,
67 pub changed_lines: u32,
68 pub resource_health_score: f32,
69 pub infra_unhealthy_signals: u32,
70}
71
72#[derive(Debug, Clone, Copy)]
74pub struct CollapsePolicy {
75 pub max_invariant_breaks: u32,
76 pub max_consecutive_compile_failures: u32,
77 pub max_consecutive_resource_failures: u32,
78 pub min_collapse_score: f32,
79 pub min_score_gap_for_prune: f32,
80 pub min_resource_health_score: f32,
81}
82
83impl Default for CollapsePolicy {
84 fn default() -> Self {
85 Self {
86 max_invariant_breaks: 2,
87 max_consecutive_compile_failures: 1,
88 max_consecutive_resource_failures: 2,
89 min_collapse_score: 0.30,
90 min_score_gap_for_prune: 0.15,
91 min_resource_health_score: 0.35,
92 }
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct KillDecision {
99 pub subtask_id: String,
100 pub branch: String,
101 pub reason: String,
102}
103
104#[derive(Debug, Clone, Default)]
106pub struct CollapseTick {
107 pub evaluations: Vec<BranchEvaluation>,
108 pub kills: Vec<KillDecision>,
109 pub promoted_subtask_id: Option<String>,
110}
111
112#[derive(Debug)]
114pub struct CollapseController {
115 policy: CollapsePolicy,
116 first_seen_at: HashMap<String, Instant>,
117 consecutive_compile_failures: HashMap<String, u32>,
118 consecutive_resource_failures: HashMap<String, u32>,
119 invariant_breaks: HashMap<String, u32>,
120 promoted_subtask_id: Option<String>,
121}
122
123impl CollapseController {
124 pub fn new(policy: CollapsePolicy) -> Self {
125 Self {
126 policy,
127 first_seen_at: HashMap::new(),
128 consecutive_compile_failures: HashMap::new(),
129 consecutive_resource_failures: HashMap::new(),
130 invariant_breaks: HashMap::new(),
131 promoted_subtask_id: None,
132 }
133 }
134
135 pub fn sample(&mut self, branches: &[BranchRuntimeState]) -> Result<CollapseTick> {
136 if branches.is_empty() {
137 return Ok(CollapseTick::default());
138 }
139
140 let mut probe_handles = Vec::with_capacity(branches.len());
141 for branch in branches {
142 let subtask_id = branch.subtask_id.clone();
143 let branch_name = branch.branch.clone();
144 let worktree_path = branch.worktree_path.clone();
145 let handle = std::thread::spawn(move || -> Result<BranchObservation> {
146 Ok(BranchObservation {
147 subtask_id,
148 branch: branch_name,
149 compile_ok: run_cargo_check(&worktree_path)?,
150 changed_files: collect_changed_files(&worktree_path)?,
151 changed_lines: collect_changed_lines(&worktree_path)?,
152 resource_health_score: 1.0,
153 infra_unhealthy_signals: 0,
154 })
155 });
156 probe_handles.push(handle);
157 }
158
159 let mut observations = Vec::with_capacity(branches.len());
160 for handle in probe_handles {
161 let observation = handle
162 .join()
163 .map_err(|_| anyhow!("Branch sampling thread panicked"))??;
164 observations.push(observation);
165 }
166 Ok(self.sample_observations(&observations))
167 }
168
169 pub fn sample_observations(&mut self, observations: &[BranchObservation]) -> CollapseTick {
170 if observations.is_empty() {
171 return CollapseTick::default();
172 }
173
174 let active: HashSet<&str> = observations.iter().map(|b| b.subtask_id.as_str()).collect();
175 self.first_seen_at
176 .retain(|id, _| active.contains(id.as_str()));
177 self.consecutive_compile_failures
178 .retain(|id, _| active.contains(id.as_str()));
179 self.consecutive_resource_failures
180 .retain(|id, _| active.contains(id.as_str()));
181 self.invariant_breaks
182 .retain(|id, _| active.contains(id.as_str()));
183
184 let mut changed_files: HashMap<String, HashSet<String>> = HashMap::new();
185 let mut changed_lines: HashMap<String, u32> = HashMap::new();
186 let mut compile_ok: HashMap<String, bool> = HashMap::new();
187
188 for observation in observations {
189 self.first_seen_at
190 .entry(observation.subtask_id.clone())
191 .or_insert_with(Instant::now);
192 changed_files.insert(
193 observation.subtask_id.clone(),
194 observation.changed_files.clone(),
195 );
196 changed_lines.insert(observation.subtask_id.clone(), observation.changed_lines);
197 compile_ok.insert(observation.subtask_id.clone(), observation.compile_ok);
198
199 let fail_counter = self
200 .consecutive_compile_failures
201 .entry(observation.subtask_id.clone())
202 .or_insert(0);
203 if observation.compile_ok {
204 *fail_counter = 0;
205 } else {
206 *fail_counter += 1;
207 *self
208 .invariant_breaks
209 .entry(observation.subtask_id.clone())
210 .or_insert(0) += 1;
211 }
212
213 let infra_fail_counter = self
214 .consecutive_resource_failures
215 .entry(observation.subtask_id.clone())
216 .or_insert(0);
217 if observation.resource_health_score < self.policy.min_resource_health_score {
218 *infra_fail_counter += 1;
219 } else {
220 *infra_fail_counter = 0;
221 }
222 if observation.infra_unhealthy_signals > 0 {
223 *self
224 .invariant_breaks
225 .entry(observation.subtask_id.clone())
226 .or_insert(0) += observation.infra_unhealthy_signals;
227 }
228 }
229
230 let mut evaluations = Vec::with_capacity(observations.len());
231 for observation in observations {
232 let my_files = changed_files
233 .get(&observation.subtask_id)
234 .cloned()
235 .unwrap_or_default();
236
237 let overlap_files = changed_files
238 .iter()
239 .filter(|(other_id, _)| *other_id != &observation.subtask_id)
240 .map(|(_, files)| my_files.intersection(files).count())
241 .sum::<usize>();
242
243 let conflict_risk = if my_files.is_empty() {
244 0.0
245 } else {
246 (overlap_files as f32 / my_files.len() as f32).clamp(0.0, 1.0)
247 };
248
249 let compile_health = if *compile_ok.get(&observation.subtask_id).unwrap_or(&false) {
250 1.0
251 } else {
252 0.0
253 };
254
255 let test_health = if compile_health > 0.0 { 0.6 } else { 0.0 };
257 let contract_alignment = if compile_health > 0.0 {
258 (1.0 - conflict_risk).clamp(0.0, 1.0)
259 } else {
260 0.0
261 };
262
263 let elapsed_secs = self
264 .first_seen_at
265 .get(&observation.subtask_id)
266 .map(|t| t.elapsed().as_secs_f32().max(1.0))
267 .unwrap_or(1.0);
268 let lines = changed_lines
269 .get(&observation.subtask_id)
270 .copied()
271 .unwrap_or(0) as f32;
272 let velocity = (lines / elapsed_secs / 20.0).clamp(0.0, 1.0);
273
274 let invariant_breaks = *self
275 .invariant_breaks
276 .get(&observation.subtask_id)
277 .unwrap_or(&0);
278 let score = CoherenceScore {
279 compile_health,
280 test_health,
281 contract_alignment,
282 diff_conflict_risk: conflict_risk,
283 velocity,
284 resource_health_score: observation.resource_health_score.clamp(0.0, 1.0),
285 invariant_breaks,
286 };
287
288 evaluations.push(BranchEvaluation {
289 subtask_id: observation.subtask_id.clone(),
290 branch: observation.branch.clone(),
291 aggregate_score: score.collapse_score(),
292 score,
293 });
294 }
295
296 self.derive_decisions(evaluations)
297 }
298
299 fn derive_decisions(&mut self, evaluations: Vec<BranchEvaluation>) -> CollapseTick {
300 if evaluations.is_empty() {
301 self.promoted_subtask_id = None;
302 return CollapseTick::default();
303 }
304
305 let mut kills = Vec::new();
306 let mut killed_ids = HashSet::new();
307
308 for eval in &evaluations {
309 let consecutive_failures = *self
310 .consecutive_compile_failures
311 .get(&eval.subtask_id)
312 .unwrap_or(&0);
313 let consecutive_resource_failures = *self
314 .consecutive_resource_failures
315 .get(&eval.subtask_id)
316 .unwrap_or(&0);
317 if eval.score.invariant_breaks >= self.policy.max_invariant_breaks
318 || consecutive_failures >= self.policy.max_consecutive_compile_failures
319 || consecutive_resource_failures >= self.policy.max_consecutive_resource_failures
320 {
321 killed_ids.insert(eval.subtask_id.clone());
322 kills.push(KillDecision {
323 subtask_id: eval.subtask_id.clone(),
324 branch: eval.branch.clone(),
325 reason: format!(
326 "policy_threshold exceeded (invariant_breaks={}, consecutive_compile_failures={}, consecutive_resource_failures={}, resource_health_score={:.3})",
327 eval.score.invariant_breaks, consecutive_failures, consecutive_resource_failures, eval.score.resource_health_score
328 ),
329 });
330 }
331 }
332
333 if killed_ids.len() == evaluations.len()
335 && let Some(best) = evaluations
336 .iter()
337 .max_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
338 {
339 killed_ids.remove(&best.subtask_id);
340 kills.retain(|k| k.subtask_id != best.subtask_id);
341 }
342
343 let alive: Vec<&BranchEvaluation> = evaluations
344 .iter()
345 .filter(|e| !killed_ids.contains(&e.subtask_id))
346 .collect();
347
348 if alive.len() > 1 {
350 let best = alive
351 .iter()
352 .max_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
353 .copied();
354 let worst = alive
355 .iter()
356 .min_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
357 .copied();
358
359 if let (Some(best), Some(worst)) = (best, worst) {
360 let gap = best.aggregate_score - worst.aggregate_score;
361 if worst.aggregate_score <= self.policy.min_collapse_score
362 && gap >= self.policy.min_score_gap_for_prune
363 {
364 let newly_killed = killed_ids.insert(worst.subtask_id.clone());
365 if newly_killed {
366 kills.push(KillDecision {
367 subtask_id: worst.subtask_id.clone(),
368 branch: worst.branch.clone(),
369 reason: format!(
370 "low coherence pruned (score={:.3}, best={:.3}, gap={:.3})",
371 worst.aggregate_score, best.aggregate_score, gap
372 ),
373 });
374 }
375 }
376 }
377 }
378
379 let promoted_subtask_id = evaluations
380 .iter()
381 .filter(|e| !killed_ids.contains(&e.subtask_id))
382 .max_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
383 .map(|e| e.subtask_id.clone());
384
385 self.promoted_subtask_id = promoted_subtask_id.clone();
386
387 CollapseTick {
388 evaluations,
389 kills,
390 promoted_subtask_id,
391 }
392 }
393}
394
395fn run_cargo_check(worktree_path: &PathBuf) -> Result<bool> {
396 let mut child = Command::new("cargo")
397 .args(["check", "--quiet"])
398 .current_dir(worktree_path)
399 .stdout(Stdio::null())
400 .stderr(Stdio::null())
401 .spawn()
402 .with_context(|| {
403 format!(
404 "Failed to execute cargo check in {}",
405 worktree_path.display()
406 )
407 })?;
408
409 let deadline = Instant::now() + Duration::from_secs(COLLAPSE_CHECK_TIMEOUT_SECS);
410 loop {
411 if let Some(status) = child.try_wait().with_context(|| {
412 format!(
413 "Failed waiting on cargo check in {}",
414 worktree_path.display()
415 )
416 })? {
417 return Ok(status.success());
418 }
419
420 if Instant::now() >= deadline {
421 let _ = child.kill();
422 let _ = child.wait();
423 tracing::warn!(
424 worktree_path = %worktree_path.display(),
425 timeout_secs = COLLAPSE_CHECK_TIMEOUT_SECS,
426 "Collapse sampling cargo check timed out"
427 );
428 return Ok(false);
429 }
430
431 std::thread::sleep(Duration::from_millis(100));
432 }
433}
434
435const COLLAPSE_CHECK_TIMEOUT_SECS: u64 = 45;
436
437fn collect_changed_files(worktree_path: &PathBuf) -> Result<HashSet<String>> {
438 let output = Command::new("git")
439 .args(["diff", "--name-only"])
440 .current_dir(worktree_path)
441 .output()
442 .with_context(|| {
443 format!(
444 "Failed to collect changed files in {}",
445 worktree_path.display()
446 )
447 })?;
448 if !output.status.success() {
449 return Ok(HashSet::new());
450 }
451 Ok(String::from_utf8_lossy(&output.stdout)
452 .lines()
453 .filter(|line| !line.trim().is_empty())
454 .map(|line| line.to_string())
455 .collect())
456}
457
458fn collect_changed_lines(worktree_path: &PathBuf) -> Result<u32> {
459 let output = Command::new("git")
460 .args(["diff", "--numstat"])
461 .current_dir(worktree_path)
462 .output()
463 .with_context(|| {
464 format!(
465 "Failed to collect changed lines in {}",
466 worktree_path.display()
467 )
468 })?;
469 if !output.status.success() {
470 return Ok(0);
471 }
472 let mut total = 0u32;
473 for line in String::from_utf8_lossy(&output.stdout).lines() {
474 let parts: Vec<&str> = line.split('\t').collect();
475 if parts.len() < 2 {
476 continue;
477 }
478 total += parts[0].parse::<u32>().unwrap_or(0);
479 total += parts[1].parse::<u32>().unwrap_or(0);
480 }
481 Ok(total)
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487
488 #[test]
489 fn collapse_score_penalizes_invariants() {
490 let healthy = CoherenceScore {
491 compile_health: 1.0,
492 test_health: 0.8,
493 contract_alignment: 0.9,
494 diff_conflict_risk: 0.1,
495 velocity: 0.5,
496 resource_health_score: 1.0,
497 invariant_breaks: 0,
498 };
499 let broken = CoherenceScore {
500 invariant_breaks: 3,
501 ..healthy
502 };
503
504 assert!(healthy.collapse_score() > broken.collapse_score());
505 }
506
507 #[test]
508 fn derive_decisions_kills_repeated_failures_and_promotes_best() {
509 let mut controller = CollapseController::new(CollapsePolicy::default());
510 controller
511 .consecutive_compile_failures
512 .insert("bad".to_string(), 3);
513 controller.invariant_breaks.insert("bad".to_string(), 2);
514
515 let evals = vec![
516 BranchEvaluation {
517 subtask_id: "bad".to_string(),
518 branch: "codetether/subagent-bad".to_string(),
519 score: CoherenceScore {
520 compile_health: 0.0,
521 test_health: 0.0,
522 contract_alignment: 0.0,
523 diff_conflict_risk: 0.9,
524 velocity: 0.1,
525 resource_health_score: 0.2,
526 invariant_breaks: 2,
527 },
528 aggregate_score: 0.0,
529 },
530 BranchEvaluation {
531 subtask_id: "good".to_string(),
532 branch: "codetether/subagent-good".to_string(),
533 score: CoherenceScore {
534 compile_health: 1.0,
535 test_health: 0.6,
536 contract_alignment: 0.9,
537 diff_conflict_risk: 0.1,
538 velocity: 0.4,
539 resource_health_score: 1.0,
540 invariant_breaks: 0,
541 },
542 aggregate_score: 0.82,
543 },
544 ];
545
546 let tick = controller.derive_decisions(evals);
547 assert_eq!(tick.kills.len(), 1);
548 assert_eq!(tick.kills[0].subtask_id, "bad");
549 assert_eq!(tick.promoted_subtask_id.as_deref(), Some("good"));
550 }
551
552 #[test]
553 fn sample_observations_penalizes_resource_unhealth() {
554 let mut controller = CollapseController::new(CollapsePolicy::default());
555 let observations = vec![
556 BranchObservation {
557 subtask_id: "infra-bad".to_string(),
558 branch: "codetether/subagent-infra-bad".to_string(),
559 compile_ok: true,
560 changed_files: HashSet::new(),
561 changed_lines: 1,
562 resource_health_score: 0.0,
563 infra_unhealthy_signals: 2,
564 },
565 BranchObservation {
566 subtask_id: "infra-good".to_string(),
567 branch: "codetether/subagent-infra-good".to_string(),
568 compile_ok: true,
569 changed_files: HashSet::new(),
570 changed_lines: 1,
571 resource_health_score: 1.0,
572 infra_unhealthy_signals: 0,
573 },
574 ];
575
576 let tick = controller.sample_observations(&observations);
577 let bad = tick
578 .evaluations
579 .iter()
580 .find(|e| e.subtask_id == "infra-bad")
581 .expect("infra-bad evaluation");
582 let good = tick
583 .evaluations
584 .iter()
585 .find(|e| e.subtask_id == "infra-good")
586 .expect("infra-good evaluation");
587 assert!(bad.aggregate_score < good.aggregate_score);
588 }
589}