1use anyhow::{Context, Result, anyhow};
10use std::collections::{HashMap, HashSet};
11use std::path::PathBuf;
12use std::process::{Command, Stdio};
13use std::time::{Duration, Instant};
14
15#[derive(Debug, Clone, Copy, PartialEq)]
17pub struct CoherenceScore {
18 pub compile_health: f32,
19 pub test_health: f32,
20 pub contract_alignment: f32,
21 pub diff_conflict_risk: f32,
22 pub velocity: f32,
23 pub resource_health_score: f32,
24 pub invariant_breaks: u32,
25}
26
27impl CoherenceScore {
28 pub fn collapse_score(&self) -> f32 {
29 let weighted = (self.compile_health * 0.29)
31 + (self.test_health * 0.14)
32 + (self.contract_alignment * 0.18)
33 + ((1.0 - self.diff_conflict_risk) * 0.14)
34 + (self.velocity * 0.08)
35 + (self.resource_health_score * 0.17);
36
37 let invariant_penalty = (self.invariant_breaks as f32 * 0.1).clamp(0.0, 0.6);
39 (weighted - invariant_penalty).clamp(0.0, 1.0)
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct BranchRuntimeState {
46 pub subtask_id: String,
47 pub branch: String,
48 pub worktree_path: PathBuf,
49}
50
51#[derive(Debug, Clone)]
53pub struct BranchEvaluation {
54 pub subtask_id: String,
55 pub branch: String,
56 pub score: CoherenceScore,
57 pub aggregate_score: f32,
58}
59
60#[derive(Debug, Clone)]
62pub struct BranchObservation {
63 pub subtask_id: String,
64 pub branch: String,
65 pub compile_ok: bool,
66 pub changed_files: HashSet<String>,
67 pub changed_lines: u32,
68 pub resource_health_score: f32,
69 pub infra_unhealthy_signals: u32,
70}
71
72#[derive(Debug, Clone, Copy)]
74pub struct CollapsePolicy {
75 pub max_invariant_breaks: u32,
76 pub max_consecutive_compile_failures: u32,
77 pub max_consecutive_resource_failures: u32,
78 pub min_collapse_score: f32,
79 pub min_score_gap_for_prune: f32,
80 pub min_resource_health_score: f32,
81}
82
83impl Default for CollapsePolicy {
84 fn default() -> Self {
85 Self {
86 max_invariant_breaks: 2,
87 max_consecutive_compile_failures: 1,
88 max_consecutive_resource_failures: 2,
89 min_collapse_score: 0.30,
90 min_score_gap_for_prune: 0.15,
91 min_resource_health_score: 0.35,
92 }
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct KillDecision {
99 pub subtask_id: String,
100 pub branch: String,
101 pub reason: String,
102}
103
104#[derive(Debug, Clone, Default)]
106pub struct CollapseTick {
107 pub evaluations: Vec<BranchEvaluation>,
108 pub kills: Vec<KillDecision>,
109 pub promoted_subtask_id: Option<String>,
110}
111
112#[derive(Debug)]
114pub struct CollapseController {
115 policy: CollapsePolicy,
116 first_seen_at: HashMap<String, Instant>,
117 consecutive_compile_failures: HashMap<String, u32>,
118 consecutive_resource_failures: HashMap<String, u32>,
119 invariant_breaks: HashMap<String, u32>,
120 promoted_subtask_id: Option<String>,
121}
122
123impl CollapseController {
124 pub fn new(policy: CollapsePolicy) -> Self {
125 Self {
126 policy,
127 first_seen_at: HashMap::new(),
128 consecutive_compile_failures: HashMap::new(),
129 consecutive_resource_failures: HashMap::new(),
130 invariant_breaks: HashMap::new(),
131 promoted_subtask_id: None,
132 }
133 }
134
135 pub fn sample(&mut self, branches: &[BranchRuntimeState]) -> Result<CollapseTick> {
136 if branches.is_empty() {
137 return Ok(CollapseTick::default());
138 }
139
140 let mut probe_handles = Vec::with_capacity(branches.len());
141 for branch in branches {
142 let subtask_id = branch.subtask_id.clone();
143 let branch_name = branch.branch.clone();
144 let worktree_path = branch.worktree_path.clone();
145 let handle = std::thread::spawn(move || -> Result<BranchObservation> {
146 Ok(BranchObservation {
147 subtask_id,
148 branch: branch_name,
149 compile_ok: run_cargo_check(&worktree_path)?,
150 changed_files: collect_changed_files(&worktree_path)?,
151 changed_lines: collect_changed_lines(&worktree_path)?,
152 resource_health_score: 1.0,
153 infra_unhealthy_signals: 0,
154 })
155 });
156 probe_handles.push(handle);
157 }
158
159 let mut observations = Vec::with_capacity(branches.len());
160 for handle in probe_handles {
161 let observation = handle
162 .join()
163 .map_err(|_| anyhow!("Branch sampling thread panicked"))??;
164 observations.push(observation);
165 }
166 Ok(self.sample_observations(&observations))
167 }
168
169 pub fn sample_observations(&mut self, observations: &[BranchObservation]) -> CollapseTick {
170 if observations.is_empty() {
171 return CollapseTick::default();
172 }
173
174 let active: HashSet<&str> = observations.iter().map(|b| b.subtask_id.as_str()).collect();
175 self.first_seen_at
176 .retain(|id, _| active.contains(id.as_str()));
177 self.consecutive_compile_failures
178 .retain(|id, _| active.contains(id.as_str()));
179 self.consecutive_resource_failures
180 .retain(|id, _| active.contains(id.as_str()));
181 self.invariant_breaks
182 .retain(|id, _| active.contains(id.as_str()));
183
184 let mut changed_files: HashMap<String, HashSet<String>> = HashMap::new();
185 let mut changed_lines: HashMap<String, u32> = HashMap::new();
186 let mut compile_ok: HashMap<String, bool> = HashMap::new();
187
188 for observation in observations {
189 self.first_seen_at
190 .entry(observation.subtask_id.clone())
191 .or_insert_with(Instant::now);
192 changed_files.insert(
193 observation.subtask_id.clone(),
194 observation.changed_files.clone(),
195 );
196 changed_lines.insert(observation.subtask_id.clone(), observation.changed_lines);
197 compile_ok.insert(observation.subtask_id.clone(), observation.compile_ok);
198
199 let fail_counter = self
200 .consecutive_compile_failures
201 .entry(observation.subtask_id.clone())
202 .or_insert(0);
203 if observation.compile_ok {
204 *fail_counter = 0;
205 } else {
206 *fail_counter += 1;
207 *self
208 .invariant_breaks
209 .entry(observation.subtask_id.clone())
210 .or_insert(0) += 1;
211 }
212
213 let infra_fail_counter = self
214 .consecutive_resource_failures
215 .entry(observation.subtask_id.clone())
216 .or_insert(0);
217 if observation.resource_health_score < self.policy.min_resource_health_score {
218 *infra_fail_counter += 1;
219 } else {
220 *infra_fail_counter = 0;
221 }
222 if observation.infra_unhealthy_signals > 0 {
223 *self
224 .invariant_breaks
225 .entry(observation.subtask_id.clone())
226 .or_insert(0) += observation.infra_unhealthy_signals;
227 }
228 }
229
230 let mut evaluations = Vec::with_capacity(observations.len());
231 for observation in observations {
232 let my_files = changed_files
233 .get(&observation.subtask_id)
234 .cloned()
235 .unwrap_or_default();
236
237 let overlap_files = changed_files
238 .iter()
239 .filter(|(other_id, _)| *other_id != &observation.subtask_id)
240 .map(|(_, files)| my_files.intersection(files).count())
241 .sum::<usize>();
242
243 let conflict_risk = if my_files.is_empty() {
244 0.0
245 } else {
246 (overlap_files as f32 / my_files.len() as f32).clamp(0.0, 1.0)
247 };
248
249 let compile_health = if *compile_ok.get(&observation.subtask_id).unwrap_or(&false) {
250 1.0
251 } else {
252 0.0
253 };
254
255 let test_health = if compile_health > 0.0 { 0.6 } else { 0.0 };
257 let contract_alignment = if compile_health > 0.0 {
258 (1.0 - conflict_risk).clamp(0.0, 1.0)
259 } else {
260 0.0
261 };
262
263 let elapsed_secs = self
264 .first_seen_at
265 .get(&observation.subtask_id)
266 .map(|t| t.elapsed().as_secs_f32().max(1.0))
267 .unwrap_or(1.0);
268 let lines = changed_lines
269 .get(&observation.subtask_id)
270 .copied()
271 .unwrap_or(0) as f32;
272 let velocity = (lines / elapsed_secs / 20.0).clamp(0.0, 1.0);
273
274 let invariant_breaks = *self
275 .invariant_breaks
276 .get(&observation.subtask_id)
277 .unwrap_or(&0);
278 let score = CoherenceScore {
279 compile_health,
280 test_health,
281 contract_alignment,
282 diff_conflict_risk: conflict_risk,
283 velocity,
284 resource_health_score: observation.resource_health_score.clamp(0.0, 1.0),
285 invariant_breaks,
286 };
287
288 evaluations.push(BranchEvaluation {
289 subtask_id: observation.subtask_id.clone(),
290 branch: observation.branch.clone(),
291 aggregate_score: score.collapse_score(),
292 score,
293 });
294 }
295
296 self.derive_decisions(evaluations)
297 }
298
299 fn derive_decisions(&mut self, evaluations: Vec<BranchEvaluation>) -> CollapseTick {
300 if evaluations.is_empty() {
301 self.promoted_subtask_id = None;
302 return CollapseTick::default();
303 }
304
305 let mut kills = Vec::new();
306 let mut killed_ids = HashSet::new();
307
308 for eval in &evaluations {
309 let consecutive_failures = *self
310 .consecutive_compile_failures
311 .get(&eval.subtask_id)
312 .unwrap_or(&0);
313 let consecutive_resource_failures = *self
314 .consecutive_resource_failures
315 .get(&eval.subtask_id)
316 .unwrap_or(&0);
317 if eval.score.invariant_breaks >= self.policy.max_invariant_breaks
318 || consecutive_failures >= self.policy.max_consecutive_compile_failures
319 || consecutive_resource_failures >= self.policy.max_consecutive_resource_failures
320 {
321 killed_ids.insert(eval.subtask_id.clone());
322 kills.push(KillDecision {
323 subtask_id: eval.subtask_id.clone(),
324 branch: eval.branch.clone(),
325 reason: format!(
326 "policy_threshold exceeded (invariant_breaks={}, consecutive_compile_failures={}, consecutive_resource_failures={}, resource_health_score={:.3})",
327 eval.score.invariant_breaks, consecutive_failures, consecutive_resource_failures, eval.score.resource_health_score
328 ),
329 });
330 }
331 }
332
333 if killed_ids.len() == evaluations.len() {
335 if let Some(best) = evaluations
336 .iter()
337 .max_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
338 {
339 killed_ids.remove(&best.subtask_id);
340 kills.retain(|k| k.subtask_id != best.subtask_id);
341 }
342 }
343
344 let alive: Vec<&BranchEvaluation> = evaluations
345 .iter()
346 .filter(|e| !killed_ids.contains(&e.subtask_id))
347 .collect();
348
349 if alive.len() > 1 {
351 let best = alive
352 .iter()
353 .max_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
354 .copied();
355 let worst = alive
356 .iter()
357 .min_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
358 .copied();
359
360 if let (Some(best), Some(worst)) = (best, worst) {
361 let gap = best.aggregate_score - worst.aggregate_score;
362 if worst.aggregate_score <= self.policy.min_collapse_score
363 && gap >= self.policy.min_score_gap_for_prune
364 {
365 let newly_killed = killed_ids.insert(worst.subtask_id.clone());
366 if newly_killed {
367 kills.push(KillDecision {
368 subtask_id: worst.subtask_id.clone(),
369 branch: worst.branch.clone(),
370 reason: format!(
371 "low coherence pruned (score={:.3}, best={:.3}, gap={:.3})",
372 worst.aggregate_score, best.aggregate_score, gap
373 ),
374 });
375 }
376 }
377 }
378 }
379
380 let promoted_subtask_id = evaluations
381 .iter()
382 .filter(|e| !killed_ids.contains(&e.subtask_id))
383 .max_by(|a, b| a.aggregate_score.total_cmp(&b.aggregate_score))
384 .map(|e| e.subtask_id.clone());
385
386 self.promoted_subtask_id = promoted_subtask_id.clone();
387
388 CollapseTick {
389 evaluations,
390 kills,
391 promoted_subtask_id,
392 }
393 }
394}
395
396fn run_cargo_check(worktree_path: &PathBuf) -> Result<bool> {
397 let mut child = Command::new("cargo")
398 .args(["check", "--quiet"])
399 .current_dir(worktree_path)
400 .stdout(Stdio::null())
401 .stderr(Stdio::null())
402 .spawn()
403 .with_context(|| {
404 format!(
405 "Failed to execute cargo check in {}",
406 worktree_path.display()
407 )
408 })?;
409
410 let deadline = Instant::now() + Duration::from_secs(COLLAPSE_CHECK_TIMEOUT_SECS);
411 loop {
412 if let Some(status) = child.try_wait().with_context(|| {
413 format!(
414 "Failed waiting on cargo check in {}",
415 worktree_path.display()
416 )
417 })? {
418 return Ok(status.success());
419 }
420
421 if Instant::now() >= deadline {
422 let _ = child.kill();
423 let _ = child.wait();
424 tracing::warn!(
425 worktree_path = %worktree_path.display(),
426 timeout_secs = COLLAPSE_CHECK_TIMEOUT_SECS,
427 "Collapse sampling cargo check timed out"
428 );
429 return Ok(false);
430 }
431
432 std::thread::sleep(Duration::from_millis(100));
433 }
434}
435
436const COLLAPSE_CHECK_TIMEOUT_SECS: u64 = 45;
437
438fn collect_changed_files(worktree_path: &PathBuf) -> Result<HashSet<String>> {
439 let output = Command::new("git")
440 .args(["diff", "--name-only"])
441 .current_dir(worktree_path)
442 .output()
443 .with_context(|| {
444 format!(
445 "Failed to collect changed files in {}",
446 worktree_path.display()
447 )
448 })?;
449 if !output.status.success() {
450 return Ok(HashSet::new());
451 }
452 Ok(String::from_utf8_lossy(&output.stdout)
453 .lines()
454 .filter(|line| !line.trim().is_empty())
455 .map(|line| line.to_string())
456 .collect())
457}
458
459fn collect_changed_lines(worktree_path: &PathBuf) -> Result<u32> {
460 let output = Command::new("git")
461 .args(["diff", "--numstat"])
462 .current_dir(worktree_path)
463 .output()
464 .with_context(|| {
465 format!(
466 "Failed to collect changed lines in {}",
467 worktree_path.display()
468 )
469 })?;
470 if !output.status.success() {
471 return Ok(0);
472 }
473 let mut total = 0u32;
474 for line in String::from_utf8_lossy(&output.stdout).lines() {
475 let parts: Vec<&str> = line.split('\t').collect();
476 if parts.len() < 2 {
477 continue;
478 }
479 total += parts[0].parse::<u32>().unwrap_or(0);
480 total += parts[1].parse::<u32>().unwrap_or(0);
481 }
482 Ok(total)
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488
489 #[test]
490 fn collapse_score_penalizes_invariants() {
491 let healthy = CoherenceScore {
492 compile_health: 1.0,
493 test_health: 0.8,
494 contract_alignment: 0.9,
495 diff_conflict_risk: 0.1,
496 velocity: 0.5,
497 resource_health_score: 1.0,
498 invariant_breaks: 0,
499 };
500 let broken = CoherenceScore {
501 invariant_breaks: 3,
502 ..healthy
503 };
504
505 assert!(healthy.collapse_score() > broken.collapse_score());
506 }
507
508 #[test]
509 fn derive_decisions_kills_repeated_failures_and_promotes_best() {
510 let mut controller = CollapseController::new(CollapsePolicy::default());
511 controller
512 .consecutive_compile_failures
513 .insert("bad".to_string(), 3);
514 controller.invariant_breaks.insert("bad".to_string(), 2);
515
516 let evals = vec![
517 BranchEvaluation {
518 subtask_id: "bad".to_string(),
519 branch: "codetether/subagent-bad".to_string(),
520 score: CoherenceScore {
521 compile_health: 0.0,
522 test_health: 0.0,
523 contract_alignment: 0.0,
524 diff_conflict_risk: 0.9,
525 velocity: 0.1,
526 resource_health_score: 0.2,
527 invariant_breaks: 2,
528 },
529 aggregate_score: 0.0,
530 },
531 BranchEvaluation {
532 subtask_id: "good".to_string(),
533 branch: "codetether/subagent-good".to_string(),
534 score: CoherenceScore {
535 compile_health: 1.0,
536 test_health: 0.6,
537 contract_alignment: 0.9,
538 diff_conflict_risk: 0.1,
539 velocity: 0.4,
540 resource_health_score: 1.0,
541 invariant_breaks: 0,
542 },
543 aggregate_score: 0.82,
544 },
545 ];
546
547 let tick = controller.derive_decisions(evals);
548 assert_eq!(tick.kills.len(), 1);
549 assert_eq!(tick.kills[0].subtask_id, "bad");
550 assert_eq!(tick.promoted_subtask_id.as_deref(), Some("good"));
551 }
552
553 #[test]
554 fn sample_observations_penalizes_resource_unhealth() {
555 let mut controller = CollapseController::new(CollapsePolicy::default());
556 let observations = vec![
557 BranchObservation {
558 subtask_id: "infra-bad".to_string(),
559 branch: "codetether/subagent-infra-bad".to_string(),
560 compile_ok: true,
561 changed_files: HashSet::new(),
562 changed_lines: 1,
563 resource_health_score: 0.0,
564 infra_unhealthy_signals: 2,
565 },
566 BranchObservation {
567 subtask_id: "infra-good".to_string(),
568 branch: "codetether/subagent-infra-good".to_string(),
569 compile_ok: true,
570 changed_files: HashSet::new(),
571 changed_lines: 1,
572 resource_health_score: 1.0,
573 infra_unhealthy_signals: 0,
574 },
575 ];
576
577 let tick = controller.sample_observations(&observations);
578 let bad = tick
579 .evaluations
580 .iter()
581 .find(|e| e.subtask_id == "infra-bad")
582 .expect("infra-bad evaluation");
583 let good = tick
584 .evaluations
585 .iter()
586 .find(|e| e.subtask_id == "infra-good")
587 .expect("infra-good evaluation");
588 assert!(bad.aggregate_score < good.aggregate_score);
589 }
590}