1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3
4use crate::task::Task;
5use anyhow::{Context, Result};
6use chrono::{DateTime, Duration, Utc};
7use rusqlite::Connection;
8use serde::{Deserialize, Serialize};
9
10use super::config::{AllocationPolicy, RoleType, TeamConfig};
11use super::hierarchy::resolve_hierarchy;
12use super::standup::MemberState;
13use super::{daemon_state_path, team_config_dir};
14
15#[derive(Debug, Clone, Default, PartialEq)]
16pub struct EngineerProfile {
17 pub name: String,
18 pub completed_task_ids: Vec<u32>,
19 pub active_file_paths: HashSet<String>,
20 pub domain_tags: HashSet<String>,
21 pub active_task_count: u32,
22 pub total_completions: u32,
23 pub recent_merge_conflicts: u32,
24 pub performance: Option<EngineerPerformanceProfile>,
25 pub telemetry_completed_tasks: u32,
26 pub completion_rate: f64,
27 pub avg_task_duration_secs: Option<f64>,
28 pub first_pass_test_rate: Option<f64>,
29}
30
31#[derive(Debug, Clone, Default, PartialEq)]
32pub struct EngineerPerformanceProfile {
33 pub avg_task_completion_secs: Option<f64>,
34 pub lines_per_hour: Option<f64>,
35 pub first_pass_test_rate: Option<f64>,
36 pub context_exhaustion_frequency: Option<f64>,
37}
38
39#[derive(Debug, Clone, Default, PartialEq)]
40pub struct EngineerRoutingBreakdown {
41 pub engineer: String,
42 pub total_score: i32,
43 pub tag_matches: usize,
44 pub file_matches: usize,
45 pub completion_rate: f64,
46 pub avg_task_duration_secs: Option<f64>,
47 pub first_pass_test_rate: Option<f64>,
48 pub telemetry_completed_tasks: u32,
49}
50
51#[derive(Debug, Clone, Default, PartialEq)]
52pub struct RoutingDecisionExplanation {
53 pub chosen_engineer: Option<String>,
54 pub fallback_to_round_robin: bool,
55 pub fallback_reason: Option<String>,
56 pub breakdowns: Vec<EngineerRoutingBreakdown>,
57}
58
59#[derive(Debug, Default, Deserialize)]
60struct AllocationTaskFrontmatter {
61 #[serde(default)]
62 changed_paths: Vec<String>,
63}
64
65const PROFILE_RETENTION_DAYS: i64 = 7;
66const ENGINEER_PROFILES_FILE: &str = "engineer_profiles.json";
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
69struct PersistedEngineerProfiles {
70 #[serde(default = "engineer_profiles_format_version")]
71 version: u32,
72 #[serde(default)]
73 updated_at: Option<String>,
74 #[serde(default)]
75 completions: Vec<PersistedTaskProfile>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
79struct PersistedTaskProfile {
80 engineer: String,
81 task_id: u32,
82 completed_at: String,
83 #[serde(default)]
84 changed_paths: Vec<String>,
85 #[serde(default)]
86 tags: Vec<String>,
87 #[serde(default)]
88 recent_merge_conflict: bool,
89}
90
91#[derive(Debug, Clone, Default, PartialEq)]
92struct EngineerTelemetryStats {
93 completed_tasks: u32,
94 completion_rate: f64,
95 avg_task_duration_secs: Option<f64>,
96 first_pass_test_rate: Option<f64>,
97}
98
99#[derive(Debug, Deserialize)]
100struct PersistedDaemonStateView {
101 #[serde(default)]
102 states: HashMap<String, MemberState>,
103}
104
105fn engineer_profiles_format_version() -> u32 {
106 1
107}
108
109pub fn build_engineer_profiles(
110 engineers: &[String],
111 tasks: &[Task],
112) -> Result<HashMap<String, EngineerProfile>> {
113 build_engineer_profiles_with_history(engineers, tasks, &[], &HashMap::new())
114}
115
116pub fn load_engineer_profiles(
117 project_root: &Path,
118 engineers: &[String],
119 tasks: &[Task],
120) -> Result<HashMap<String, EngineerProfile>> {
121 let persisted = load_persisted_task_profiles(project_root)?;
122 let telemetry = load_engineer_telemetry_stats(project_root)?;
123 let mut profiles =
124 build_engineer_profiles_with_history(engineers, tasks, &persisted, &telemetry)?;
125 if let Ok(conn) = crate::team::telemetry_db::open(project_root)
126 && let Ok(rows) = crate::team::telemetry_db::query_engineer_performance_profiles(&conn)
127 {
128 for row in rows {
129 if let Some(profile) = profiles.get_mut(&row.role) {
130 profile.performance = Some(EngineerPerformanceProfile {
131 avg_task_completion_secs: row.avg_task_completion_secs,
132 lines_per_hour: row.lines_per_hour,
133 first_pass_test_rate: row.first_pass_test_rate,
134 context_exhaustion_frequency: row.context_exhaustion_frequency,
135 });
136 }
137 }
138 }
139 Ok(profiles)
140}
141
142pub(crate) fn predict_task_file_paths(project_root: &Path, task: &Task) -> Result<HashSet<String>> {
143 let mut paths = task_profile_paths(task)?;
144 for record in load_persisted_task_profiles(project_root)? {
145 if persisted_profile_matches_task(task, &record) {
146 paths.extend(record.changed_paths);
147 }
148 }
149 Ok(paths)
150}
151
152pub fn persist_completed_task_profile(project_root: &Path, task: &Task) -> Result<()> {
153 let Some(engineer) = task.claimed_by.as_deref() else {
154 return Ok(());
155 };
156
157 let mut persisted = read_persisted_engineer_profiles(project_root)?;
158 prune_persisted_profiles(&mut persisted);
159 persisted
160 .completions
161 .retain(|entry| !(entry.engineer == engineer && entry.task_id == task.id));
162 persisted.completions.push(PersistedTaskProfile {
163 engineer: engineer.to_string(),
164 task_id: task.id,
165 completed_at: task
166 .completed
167 .clone()
168 .unwrap_or_else(|| Utc::now().to_rfc3339()),
169 changed_paths: load_changed_paths(task.source_path.as_path())?,
170 tags: task.tags.clone(),
171 recent_merge_conflict: task_has_conflict_signal(task),
172 });
173 persisted.updated_at = Some(Utc::now().to_rfc3339());
174 write_persisted_engineer_profiles(project_root, &persisted)
175}
176
177fn build_engineer_profiles_with_history(
178 engineers: &[String],
179 tasks: &[Task],
180 persisted: &[PersistedTaskProfile],
181 telemetry: &HashMap<String, EngineerTelemetryStats>,
182) -> Result<HashMap<String, EngineerProfile>> {
183 let mut profiles: HashMap<String, EngineerProfile> = engineers
184 .iter()
185 .cloned()
186 .map(|name| {
187 let profile = EngineerProfile {
188 name: name.clone(),
189 ..EngineerProfile::default()
190 };
191 (name, profile)
192 })
193 .collect();
194
195 let mut completed_task_keys: HashSet<(String, u32)> = HashSet::new();
196
197 for record in persisted {
198 let Some(profile) = profiles.get_mut(&record.engineer) else {
199 continue;
200 };
201 apply_completed_profile(
202 profile,
203 record.task_id,
204 record.tags.iter().cloned(),
205 record.changed_paths.iter().cloned(),
206 record.recent_merge_conflict,
207 );
208 completed_task_keys.insert((record.engineer.clone(), record.task_id));
209 }
210
211 for task in tasks {
212 let Some(owner) = task.claimed_by.as_deref() else {
213 continue;
214 };
215 let Some(profile) = profiles.get_mut(owner) else {
216 continue;
217 };
218
219 if task.status == "done" {
220 if completed_task_keys.insert((owner.to_string(), task.id)) {
221 apply_completed_profile(
222 profile,
223 task.id,
224 task.tags.iter().cloned(),
225 task_profile_paths(task)?.into_iter(),
226 task_has_conflict_signal(task),
227 );
228 }
229 } else if task_is_active_for_load(task) {
230 profile.active_task_count += 1;
231 profile.active_file_paths.extend(task_profile_paths(task)?);
232 if task_has_conflict_signal(task) {
233 profile.recent_merge_conflicts += 1;
234 }
235 }
236 }
237
238 for (engineer, stats) in telemetry {
239 let Some(profile) = profiles.get_mut(engineer) else {
240 continue;
241 };
242 profile.telemetry_completed_tasks = stats.completed_tasks;
243 profile.completion_rate = stats.completion_rate;
244 profile.avg_task_duration_secs = stats.avg_task_duration_secs;
245 profile.first_pass_test_rate = stats.first_pass_test_rate;
246 }
247
248 Ok(profiles)
249}
250
251fn apply_completed_profile<I, J>(
252 profile: &mut EngineerProfile,
253 task_id: u32,
254 tags: I,
255 changed_paths: J,
256 recent_merge_conflict: bool,
257) where
258 I: IntoIterator<Item = String>,
259 J: IntoIterator<Item = String>,
260{
261 profile.completed_task_ids.push(task_id);
262 profile.total_completions += 1;
263 profile.domain_tags.extend(tags);
264 profile.active_file_paths.extend(changed_paths);
265 if recent_merge_conflict {
266 profile.recent_merge_conflicts += 1;
267 }
268}
269
270fn persisted_profile_matches_task(task: &Task, record: &PersistedTaskProfile) -> bool {
271 if task
272 .tags
273 .iter()
274 .any(|tag| record.tags.iter().any(|candidate| candidate == tag))
275 {
276 return true;
277 }
278
279 let hinted_dirs: HashSet<String> = crate::task::task_file_hints(task)
280 .unwrap_or_default()
281 .into_iter()
282 .filter_map(|path| parent_dir(&path))
283 .collect();
284 let record_dirs: HashSet<String> = record
285 .changed_paths
286 .iter()
287 .filter_map(|path| parent_dir(path))
288 .collect();
289 !hinted_dirs.is_empty() && !hinted_dirs.is_disjoint(&record_dirs)
290}
291
292pub fn score_engineer_for_task(
293 engineer: &EngineerProfile,
294 task: &Task,
295 policy: &AllocationPolicy,
296) -> i32 {
297 let mut score = 0;
298
299 let tag_overlap = task
300 .tags
301 .iter()
302 .filter(|tag| engineer.domain_tags.contains(*tag))
303 .count() as i32;
304 score += tag_overlap * policy.tag_weight;
305
306 let task_dirs = task_hint_directories(task);
307 let engineer_dirs: HashSet<String> = engineer
308 .active_file_paths
309 .iter()
310 .filter_map(|path| parent_dir(path))
311 .collect();
312 let dir_overlap = engineer_dirs.intersection(&task_dirs).count() as i32;
313 score += dir_overlap * policy.file_overlap_weight;
314 score += (engineer.completion_rate * 100.0).round() as i32;
315
316 score -= (engineer.active_task_count as i32) * policy.load_penalty;
317 score -= (engineer.recent_merge_conflicts as i32) * policy.conflict_penalty;
318 if engineer.total_completions > 3 {
319 score += policy.experience_bonus;
320 }
321 score += performance_score(engineer.performance.as_ref());
322
323 score
324}
325
326fn performance_score(performance: Option<&EngineerPerformanceProfile>) -> i32 {
327 let Some(performance) = performance else {
328 return 0;
329 };
330
331 let mut score = 0;
332
333 if let Some(first_pass_rate) = performance.first_pass_test_rate {
334 if first_pass_rate >= 0.75 {
335 score += 1;
336 } else if first_pass_rate < 0.5 {
337 score -= 1;
338 }
339 }
340
341 if let Some(context_freq) = performance.context_exhaustion_frequency {
342 if context_freq >= 0.5 {
343 score -= 2;
344 } else if context_freq > 0.0 {
345 score -= 1;
346 }
347 }
348
349 if let Some(avg_task_completion_secs) = performance.avg_task_completion_secs {
350 if avg_task_completion_secs > 0.0 && avg_task_completion_secs <= 3_600.0 {
351 score += 1;
352 } else if avg_task_completion_secs >= 14_400.0 {
353 score -= 1;
354 }
355 }
356
357 if let Some(lines_per_hour) = performance.lines_per_hour
358 && lines_per_hour >= 200.0
359 {
360 score += 1;
361 }
362
363 score
364}
365
366pub fn rank_engineers_for_task(
367 engineers: &[String],
368 profiles: &HashMap<String, EngineerProfile>,
369 task: &Task,
370 policy: &AllocationPolicy,
371) -> Vec<String> {
372 explain_routing_for_task(engineers, profiles, task, policy)
373 .breakdowns
374 .into_iter()
375 .map(|breakdown| breakdown.engineer)
376 .collect()
377}
378
379pub fn explain_routing_for_task(
380 engineers: &[String],
381 profiles: &HashMap<String, EngineerProfile>,
382 task: &Task,
383 policy: &AllocationPolicy,
384) -> RoutingDecisionExplanation {
385 let mut breakdowns: Vec<EngineerRoutingBreakdown> = engineers
386 .iter()
387 .map(|engineer| engineer_breakdown(engineer, profiles.get(engineer), task, policy))
388 .collect();
389
390 let has_any_telemetry = breakdowns
391 .iter()
392 .any(|breakdown| breakdown.telemetry_completed_tasks > 0);
393 let telemetry_ready = has_any_telemetry
394 && breakdowns
395 .iter()
396 .all(|breakdown| breakdown.telemetry_completed_tasks >= 5);
397 if telemetry_ready || !has_any_telemetry {
398 breakdowns.sort_by(compare_breakdowns);
399 let chosen_engineer = breakdowns
400 .first()
401 .map(|breakdown| breakdown.engineer.clone());
402 return RoutingDecisionExplanation {
403 chosen_engineer,
404 fallback_to_round_robin: false,
405 fallback_reason: None,
406 breakdowns,
407 };
408 }
409
410 breakdowns.sort_by(|left, right| left.engineer.cmp(&right.engineer));
411 let chosen_engineer = breakdowns
412 .first()
413 .map(|breakdown| breakdown.engineer.clone());
414 RoutingDecisionExplanation {
415 chosen_engineer,
416 fallback_to_round_robin: true,
417 fallback_reason: Some(
418 "telemetry fallback: each eligible engineer needs at least 5 completed tasks"
419 .to_string(),
420 ),
421 breakdowns,
422 }
423}
424
425pub fn print_dispatch_explanation(project_root: &Path, task_id: Option<u32>) -> Result<()> {
426 let board_dir = project_root
427 .join(".batty")
428 .join("team_config")
429 .join("board");
430 let tasks = crate::task::load_tasks_from_dir(&board_dir.join("tasks"))?;
431 let task = select_dispatch_task(&tasks, task_id)
432 .with_context(|| format!("no dispatchable task found for {:?}", task_id))?;
433
434 let team_config = TeamConfig::load(&team_config_dir(project_root).join("team.yaml"))?;
435 let members = resolve_hierarchy(&team_config)?;
436 let mut engineers = load_idle_engineers(project_root, &members)?;
437 if engineers.is_empty() {
438 engineers = members
439 .into_iter()
440 .filter(|member| member.role_type == RoleType::Engineer)
441 .map(|member| member.name)
442 .collect();
443 }
444 let bench_state = crate::team::bench::load_bench_state(project_root)?;
445 engineers.retain(|engineer| !bench_state.benched.contains_key(engineer));
446 engineers.sort();
447 let profiles = load_engineer_profiles(project_root, &engineers, &tasks)?;
448 let explanation = explain_routing_for_task(
449 &engineers,
450 &profiles,
451 task,
452 &team_config.workflow_policy.allocation,
453 );
454
455 println!("Task #{}: {}", task.id, task.title);
456 if let Some(chosen) = &explanation.chosen_engineer {
457 println!("Chosen engineer: {chosen}");
458 } else {
459 println!("Chosen engineer: none");
460 }
461 if let Some(reason) = &explanation.fallback_reason {
462 println!("Routing mode: {reason}");
463 } else {
464 println!("Routing mode: telemetry-scored");
465 }
466 println!();
467 println!(
468 "{:<20} {:>6} {:>5} {:>5} {:>10} {:>10} {:>11} {:>8}",
469 "ENGINEER", "SCORE", "TAGS", "FILES", "COMPLETE%", "AVG SECS", "FIRST PASS%", "SAMPLES"
470 );
471 println!("{}", "-".repeat(88));
472 for breakdown in explanation.breakdowns {
473 println!(
474 "{:<20} {:>6} {:>5} {:>5} {:>10.1} {:>10} {:>11.1} {:>8}",
475 breakdown.engineer,
476 breakdown.total_score,
477 breakdown.tag_matches,
478 breakdown.file_matches,
479 breakdown.completion_rate * 100.0,
480 breakdown
481 .avg_task_duration_secs
482 .map(|secs| format!("{secs:.0}"))
483 .unwrap_or_else(|| "-".to_string()),
484 breakdown.first_pass_test_rate.unwrap_or(0.0) * 100.0,
485 breakdown.telemetry_completed_tasks,
486 );
487 }
488
489 Ok(())
490}
491
492fn engineer_breakdown(
493 engineer: &str,
494 profile: Option<&EngineerProfile>,
495 task: &Task,
496 policy: &AllocationPolicy,
497) -> EngineerRoutingBreakdown {
498 let profile = profile.cloned().unwrap_or_else(|| EngineerProfile {
499 name: engineer.to_string(),
500 ..EngineerProfile::default()
501 });
502 let task_dirs = task_hint_directories(task);
503 let engineer_dirs: HashSet<String> = profile
504 .active_file_paths
505 .iter()
506 .filter_map(|path| parent_dir(path))
507 .collect();
508 let tag_matches = task
509 .tags
510 .iter()
511 .filter(|tag| profile.domain_tags.contains(*tag))
512 .count();
513 let file_matches = engineer_dirs.intersection(&task_dirs).count();
514 EngineerRoutingBreakdown {
515 engineer: engineer.to_string(),
516 total_score: score_engineer_for_task(&profile, task, policy),
517 tag_matches,
518 file_matches,
519 completion_rate: profile.completion_rate,
520 avg_task_duration_secs: profile.avg_task_duration_secs,
521 first_pass_test_rate: profile.first_pass_test_rate,
522 telemetry_completed_tasks: profile.telemetry_completed_tasks,
523 }
524}
525
526fn compare_breakdowns(
527 left: &EngineerRoutingBreakdown,
528 right: &EngineerRoutingBreakdown,
529) -> std::cmp::Ordering {
530 right
531 .total_score
532 .cmp(&left.total_score)
533 .then_with(|| {
534 right
535 .first_pass_test_rate
536 .partial_cmp(&left.first_pass_test_rate)
537 .unwrap_or(std::cmp::Ordering::Equal)
538 })
539 .then_with(|| {
540 left.avg_task_duration_secs
541 .partial_cmp(&right.avg_task_duration_secs)
542 .unwrap_or(std::cmp::Ordering::Equal)
543 })
544 .then_with(|| left.engineer.cmp(&right.engineer))
545}
546
547fn select_dispatch_task(tasks: &[Task], task_id: Option<u32>) -> Option<&Task> {
548 if let Some(task_id) = task_id {
549 return tasks.iter().find(|task| task.id == task_id);
550 }
551
552 let task_status_by_id: HashMap<u32, String> = tasks
553 .iter()
554 .map(|task| (task.id, task.status.clone()))
555 .collect();
556 let mut dispatchable: Vec<&Task> = tasks
557 .iter()
558 .filter(|task| matches!(task.status.as_str(), "backlog" | "todo"))
559 .filter(|task| task.claimed_by.is_none())
560 .filter(|task| task.blocked.is_none())
561 .filter(|task| task.blocked_on.is_none())
562 .filter(|task| !task.is_schedule_blocked())
563 .filter(|task| {
564 task.depends_on.iter().all(|dep_id| {
565 task_status_by_id
566 .get(dep_id)
567 .is_none_or(|status| status == "done")
568 })
569 })
570 .collect();
571 dispatchable.sort_by_key(|task| {
572 (
573 match task.priority.as_str() {
574 "critical" => 0,
575 "high" => 1,
576 "medium" => 2,
577 "low" => 3,
578 _ => 4,
579 },
580 task.id,
581 )
582 });
583 dispatchable.into_iter().next()
584}
585
586fn load_idle_engineers(
587 project_root: &Path,
588 members: &[super::hierarchy::MemberInstance],
589) -> Result<Vec<String>> {
590 let path = daemon_state_path(project_root);
591 if !path.exists() {
592 return Ok(Vec::new());
593 }
594 let content = std::fs::read_to_string(&path)
595 .with_context(|| format!("failed to read {}", path.display()))?;
596 let state: PersistedDaemonStateView = serde_json::from_str(&content)
597 .with_context(|| format!("failed to parse {}", path.display()))?;
598 Ok(members
599 .iter()
600 .filter(|member| member.role_type == RoleType::Engineer)
601 .filter(|member| state.states.get(&member.name) == Some(&MemberState::Idle))
602 .map(|member| member.name.clone())
603 .collect())
604}
605
606fn load_engineer_telemetry_stats(
607 project_root: &Path,
608) -> Result<HashMap<String, EngineerTelemetryStats>> {
609 let conn = match super::telemetry_db::open(project_root) {
610 Ok(conn) => conn,
611 Err(_) => return Ok(HashMap::new()),
612 };
613
614 let mut stats = load_quality_metric_stats(&conn)?;
615 for (engineer, completion_rate) in load_completion_rates(&conn)? {
616 stats.entry(engineer).or_default().completion_rate = completion_rate;
617 }
618 Ok(stats)
619}
620
621fn load_completion_rates(conn: &Connection) -> Result<HashMap<String, f64>> {
622 let mut stmt = conn.prepare(
623 "SELECT role,
624 SUM(CASE WHEN event_type = 'task_assigned' THEN 1 ELSE 0 END) AS assigned,
625 SUM(CASE WHEN event_type = 'task_completed' THEN 1 ELSE 0 END) AS completed
626 FROM events
627 WHERE role IS NOT NULL AND event_type IN ('task_assigned', 'task_completed')
628 GROUP BY role",
629 )?;
630 let rows = stmt.query_map([], |row| {
631 let role: String = row.get(0)?;
632 let assigned: i64 = row.get(1)?;
633 let completed: i64 = row.get(2)?;
634 let rate = if assigned > 0 {
635 completed as f64 / assigned as f64
636 } else {
637 0.0
638 };
639 Ok((role, rate))
640 })?;
641
642 let mut rates = HashMap::new();
643 for row in rows {
644 let (role, rate) = row?;
645 rates.insert(role, rate);
646 }
647 Ok(rates)
648}
649
650fn load_quality_metric_stats(conn: &Connection) -> Result<HashMap<String, EngineerTelemetryStats>> {
651 let mut stmt = conn.prepare(
652 "SELECT role,
653 COUNT(*) AS samples,
654 AVG(CAST(json_extract(payload, '$.time_to_completion_secs') AS REAL)) AS avg_secs,
655 AVG(CAST(json_extract(payload, '$.first_pass_test_rate') AS REAL)) AS first_pass
656 FROM events
657 WHERE role IS NOT NULL AND event_type = 'quality_metrics_recorded'
658 GROUP BY role",
659 )?;
660 let rows = stmt.query_map([], |row| {
661 Ok((
662 row.get::<_, String>(0)?,
663 EngineerTelemetryStats {
664 completed_tasks: row.get::<_, i64>(1)? as u32,
665 completion_rate: 0.0,
666 avg_task_duration_secs: row.get(2)?,
667 first_pass_test_rate: row.get(3)?,
668 },
669 ))
670 })?;
671
672 let mut stats = HashMap::new();
673 for row in rows {
674 let (engineer, profile) = row?;
675 stats.insert(engineer, profile);
676 }
677 Ok(stats)
678}
679
680fn task_is_active_for_load(task: &Task) -> bool {
681 matches!(
682 task.status.as_str(),
683 "todo" | "backlog" | "in-progress" | "review" | "blocked"
684 )
685}
686
687fn task_has_conflict_signal(task: &Task) -> bool {
688 let blocked = task.blocked.as_deref().unwrap_or_default();
689 let blocked_on = task.blocked_on.as_deref().unwrap_or_default();
690 let description = task.description.to_ascii_lowercase();
691 blocked.to_ascii_lowercase().contains("conflict")
692 || blocked_on.to_ascii_lowercase().contains("conflict")
693 || description.contains("merge conflict")
694 || description.contains("rebase conflict")
695}
696
697fn task_profile_paths(task: &Task) -> Result<HashSet<String>> {
698 let mut paths: HashSet<String> = crate::task::task_file_hints(task)?.into_iter().collect();
699 for path in load_changed_paths(task.source_path.as_path())? {
700 paths.insert(path);
701 }
702 Ok(paths)
703}
704
705fn task_hint_directories(task: &Task) -> HashSet<String> {
706 crate::task::task_file_hints(task)
707 .unwrap_or_default()
708 .into_iter()
709 .filter_map(|path| parent_dir(&path))
710 .collect()
711}
712
713fn load_changed_paths(path: &Path) -> Result<Vec<String>> {
714 if path.as_os_str().is_empty() || !path.exists() {
715 return Ok(Vec::new());
716 }
717
718 let content = std::fs::read_to_string(path)?;
719 let Some(frontmatter) = extract_frontmatter(&content) else {
720 return Ok(Vec::new());
721 };
722 let parsed: AllocationTaskFrontmatter = serde_yaml::from_str(frontmatter).unwrap_or_default();
723 Ok(parsed.changed_paths)
724}
725
726fn load_persisted_task_profiles(project_root: &Path) -> Result<Vec<PersistedTaskProfile>> {
727 let mut persisted = read_persisted_engineer_profiles(project_root)?;
728 prune_persisted_profiles(&mut persisted);
729 if persisted.updated_at.is_some() {
730 write_persisted_engineer_profiles(project_root, &persisted)?;
731 }
732 Ok(persisted.completions)
733}
734
735fn read_persisted_engineer_profiles(project_root: &Path) -> Result<PersistedEngineerProfiles> {
736 let path = engineer_profiles_path(project_root);
737 if !path.exists() {
738 return Ok(PersistedEngineerProfiles {
739 version: engineer_profiles_format_version(),
740 updated_at: None,
741 completions: Vec::new(),
742 });
743 }
744
745 let content = std::fs::read_to_string(&path)?;
746 let persisted = serde_json::from_str(&content)?;
747 Ok(persisted)
748}
749
750fn write_persisted_engineer_profiles(
751 project_root: &Path,
752 persisted: &PersistedEngineerProfiles,
753) -> Result<()> {
754 let path = engineer_profiles_path(project_root);
755 if let Some(parent) = path.parent() {
756 std::fs::create_dir_all(parent)?;
757 }
758 std::fs::write(path, serde_json::to_vec_pretty(persisted)?)?;
759 Ok(())
760}
761
762fn prune_persisted_profiles(persisted: &mut PersistedEngineerProfiles) {
763 let cutoff = Utc::now() - Duration::days(PROFILE_RETENTION_DAYS);
764 persisted.completions.retain(|entry| {
765 DateTime::parse_from_rfc3339(&entry.completed_at)
766 .map(|completed| completed.with_timezone(&Utc) >= cutoff)
767 .unwrap_or(false)
768 });
769}
770
771fn engineer_profiles_path(project_root: &Path) -> PathBuf {
772 team_config_dir(project_root).join(ENGINEER_PROFILES_FILE)
773}
774
775fn extract_frontmatter(content: &str) -> Option<&str> {
776 let trimmed = content.trim_start();
777 if !trimmed.starts_with("---") {
778 return None;
779 }
780 let after_open = trimmed[3..].strip_prefix('\n').unwrap_or(&trimmed[3..]);
781 let close_pos = after_open.find("\n---")?;
782 Some(&after_open[..close_pos])
783}
784
785fn parent_dir(path: &str) -> Option<String> {
786 PathBuf::from(path)
787 .parent()
788 .filter(|parent| !parent.as_os_str().is_empty())
789 .map(|parent| parent.to_string_lossy().into_owned())
790}
791
792#[cfg(test)]
793mod tests {
794 use super::*;
795 use crate::team::config::AllocationStrategy;
796 use crate::team::events::{QualityMetricsInfo, TeamEvent};
797 use crate::team::telemetry_db;
798 use std::fs;
799
800 fn task(tags: &[&str], description: &str) -> Task {
801 Task {
802 id: 1,
803 title: "task".to_string(),
804 status: "todo".to_string(),
805 priority: "high".to_string(),
806 claimed_by: None,
807 claimed_at: None,
808 claim_ttl_secs: None,
809 claim_expires_at: None,
810 last_progress_at: None,
811 claim_warning_sent_at: None,
812 claim_extensions: None,
813 last_output_bytes: None,
814 blocked: None,
815 tags: tags.iter().map(|tag| (*tag).to_string()).collect(),
816 depends_on: Vec::new(),
817 review_owner: None,
818 blocked_on: None,
819 worktree_path: None,
820 branch: None,
821 commit: None,
822 artifacts: Vec::new(),
823 next_action: None,
824 scheduled_for: None,
825 cron_schedule: None,
826 cron_last_run: None,
827 completed: None,
828 description: description.to_string(),
829 batty_config: None,
830 source_path: PathBuf::new(),
831 }
832 }
833
834 fn policy() -> AllocationPolicy {
835 AllocationPolicy {
836 strategy: AllocationStrategy::Scored,
837 ..AllocationPolicy::default()
838 }
839 }
840
841 #[test]
842 fn score_prefers_tag_overlap() {
843 let profile = EngineerProfile {
844 domain_tags: HashSet::from(["dispatch".to_string()]),
845 ..EngineerProfile::default()
846 };
847 assert!(score_engineer_for_task(&profile, &task(&["dispatch"], ""), &policy()) > 0);
848 }
849
850 #[test]
851 fn score_prefers_matching_directory_hints() {
852 let profile = EngineerProfile {
853 active_file_paths: HashSet::from(["src/team/dispatch/queue.rs".to_string()]),
854 ..EngineerProfile::default()
855 };
856 assert!(
857 score_engineer_for_task(
858 &profile,
859 &task(&[], "Touch src/team/dispatch/mod.rs next."),
860 &policy(),
861 ) > 0
862 );
863 }
864
865 #[test]
866 fn score_penalizes_active_load() {
867 let light = EngineerProfile {
868 active_task_count: 0,
869 ..EngineerProfile::default()
870 };
871 let busy = EngineerProfile {
872 active_task_count: 2,
873 ..EngineerProfile::default()
874 };
875 assert!(
876 score_engineer_for_task(&light, &task(&[], ""), &policy())
877 > score_engineer_for_task(&busy, &task(&[], ""), &policy())
878 );
879 }
880
881 #[test]
882 fn rank_engineers_falls_back_to_name_order_on_tie() {
883 let engineers = vec!["eng-2".to_string(), "eng-1".to_string()];
884 let profiles = HashMap::from([
885 ("eng-1".to_string(), EngineerProfile::default()),
886 ("eng-2".to_string(), EngineerProfile::default()),
887 ]);
888 let ranked = rank_engineers_for_task(&engineers, &profiles, &task(&[], ""), &policy());
889 assert_eq!(ranked, vec!["eng-1".to_string(), "eng-2".to_string()]);
890 }
891
892 #[test]
893 fn score_prefers_more_reliable_performance_profile() {
894 let reliable = EngineerProfile {
895 performance: Some(EngineerPerformanceProfile {
896 avg_task_completion_secs: Some(1800.0),
897 lines_per_hour: Some(250.0),
898 first_pass_test_rate: Some(1.0),
899 context_exhaustion_frequency: Some(0.0),
900 }),
901 ..EngineerProfile::default()
902 };
903 let unstable = EngineerProfile {
904 performance: Some(EngineerPerformanceProfile {
905 avg_task_completion_secs: Some(18_000.0),
906 lines_per_hour: Some(10.0),
907 first_pass_test_rate: Some(0.0),
908 context_exhaustion_frequency: Some(1.0),
909 }),
910 ..EngineerProfile::default()
911 };
912
913 assert!(
914 score_engineer_for_task(&reliable, &task(&[], ""), &policy())
915 > score_engineer_for_task(&unstable, &task(&[], ""), &policy())
916 );
917 }
918
919 #[test]
920 fn build_profiles_reads_changed_paths_and_tags_from_board_tasks() {
921 let tmp = tempfile::tempdir().unwrap();
922 let task_path = tmp.path().join("042-profile.md");
923 fs::write(
924 &task_path,
925 "---\nid: 42\ntitle: profile\nstatus: done\npriority: high\nclaimed_by: eng-2\ntags:\n - dispatch\nchanged_paths:\n - src/team/dispatch/queue.rs\nclass: standard\n---\n\nTouch src/team/dispatch/mod.rs too.\n",
926 )
927 .unwrap();
928
929 let task = Task::from_file(&task_path).unwrap();
930 let profiles =
931 build_engineer_profiles(&["eng-1".to_string(), "eng-2".to_string()], &[task]).unwrap();
932 let profile = profiles.get("eng-2").unwrap();
933
934 assert_eq!(profile.total_completions, 1);
935 assert!(profile.domain_tags.contains("dispatch"));
936 assert!(
937 profile
938 .active_file_paths
939 .contains("src/team/dispatch/queue.rs")
940 );
941 }
942
943 #[test]
944 fn build_profiles_counts_active_load_and_conflict_signals() {
945 let task = Task {
946 id: 7,
947 title: "conflicted".to_string(),
948 status: "review".to_string(),
949 priority: "high".to_string(),
950 claimed_by: Some("eng-1".to_string()),
951 claimed_at: None,
952 claim_ttl_secs: None,
953 claim_expires_at: None,
954 last_progress_at: None,
955 claim_warning_sent_at: None,
956 claim_extensions: None,
957 last_output_bytes: None,
958 blocked: Some("merge conflict".to_string()),
959 tags: vec!["daemon".to_string()],
960 depends_on: Vec::new(),
961 review_owner: None,
962 blocked_on: None,
963 worktree_path: None,
964 branch: None,
965 commit: None,
966 artifacts: Vec::new(),
967 next_action: None,
968 scheduled_for: None,
969 cron_schedule: None,
970 cron_last_run: None,
971 completed: None,
972 description: "Resolve rebase conflict in src/team/daemon/mod.rs".to_string(),
973 batty_config: None,
974 source_path: PathBuf::new(),
975 };
976
977 let profiles = build_engineer_profiles(&["eng-1".to_string()], &[task]).unwrap();
978 let profile = profiles.get("eng-1").unwrap();
979 assert_eq!(profile.active_task_count, 1);
980 assert_eq!(profile.recent_merge_conflicts, 1);
981 }
982
983 #[test]
984 fn load_engineer_profiles_merges_recent_persisted_history() {
985 let tmp = tempfile::tempdir().unwrap();
986 let persisted = PersistedEngineerProfiles {
987 version: engineer_profiles_format_version(),
988 updated_at: Some(Utc::now().to_rfc3339()),
989 completions: vec![PersistedTaskProfile {
990 engineer: "eng-2".to_string(),
991 task_id: 9,
992 completed_at: Utc::now().to_rfc3339(),
993 changed_paths: vec!["src/team/dispatch/queue.rs".to_string()],
994 tags: vec!["dispatch".to_string()],
995 recent_merge_conflict: false,
996 }],
997 };
998 write_persisted_engineer_profiles(tmp.path(), &persisted).unwrap();
999
1000 let profiles = load_engineer_profiles(tmp.path(), &["eng-2".to_string()], &[]).unwrap();
1001 let profile = profiles.get("eng-2").unwrap();
1002
1003 assert_eq!(profile.total_completions, 1);
1004 assert!(profile.domain_tags.contains("dispatch"));
1005 assert!(
1006 profile
1007 .active_file_paths
1008 .contains("src/team/dispatch/queue.rs")
1009 );
1010 }
1011
1012 #[test]
1013 fn persist_completed_task_profile_prunes_old_history() {
1014 let tmp = tempfile::tempdir().unwrap();
1015 let stale_completed_at =
1016 (Utc::now() - Duration::days(PROFILE_RETENTION_DAYS + 1)).to_rfc3339();
1017 let persisted = PersistedEngineerProfiles {
1018 version: engineer_profiles_format_version(),
1019 updated_at: Some(Utc::now().to_rfc3339()),
1020 completions: vec![PersistedTaskProfile {
1021 engineer: "eng-1".to_string(),
1022 task_id: 1,
1023 completed_at: stale_completed_at,
1024 changed_paths: vec!["src/old.rs".to_string()],
1025 tags: vec!["old".to_string()],
1026 recent_merge_conflict: false,
1027 }],
1028 };
1029 write_persisted_engineer_profiles(tmp.path(), &persisted).unwrap();
1030
1031 let recent_completed = (Utc::now() - Duration::days(1)).to_rfc3339();
1035 let task_path = tmp.path().join("042-profile.md");
1036 fs::write(
1037 &task_path,
1038 format!(
1039 "---\nid: 42\ntitle: profile\nstatus: done\npriority: high\nclaimed_by: eng-2\ntags:\n - dispatch\nchanged_paths:\n - src/team/dispatch/queue.rs\nclass: standard\ncompleted: {recent_completed}\n---\n\nTouch src/team/dispatch/mod.rs too.\n"
1040 ),
1041 )
1042 .unwrap();
1043
1044 let task = Task::from_file(&task_path).unwrap();
1045 persist_completed_task_profile(tmp.path(), &task).unwrap();
1046
1047 let loaded =
1048 load_engineer_profiles(tmp.path(), &["eng-1".to_string(), "eng-2".to_string()], &[])
1049 .unwrap();
1050 assert_eq!(loaded.get("eng-1").unwrap().total_completions, 0);
1051 assert_eq!(loaded.get("eng-2").unwrap().total_completions, 1);
1052 }
1053
1054 #[test]
1055 fn load_engineer_profiles_reads_telemetry_reliability_metrics() {
1056 let tmp = tempfile::tempdir().unwrap();
1057 fs::create_dir_all(tmp.path().join(".batty")).unwrap();
1058 let conn = telemetry_db::open(tmp.path()).unwrap();
1059 for task_id in 1..=5 {
1060 telemetry_db::insert_event(
1061 &conn,
1062 &TeamEvent::task_assigned("eng-2", &task_id.to_string()),
1063 )
1064 .unwrap();
1065 telemetry_db::insert_event(
1066 &conn,
1067 &TeamEvent::quality_metrics_recorded(&QualityMetricsInfo {
1068 backend: "codex",
1069 role: "eng-2",
1070 task: &task_id.to_string(),
1071 narration_ratio: 0.1,
1072 commit_frequency: 1.0,
1073 first_pass_test_rate: 1.0,
1074 retry_rate: 0.0,
1075 time_to_completion_secs: 120,
1076 }),
1077 )
1078 .unwrap();
1079 telemetry_db::insert_event(
1080 &conn,
1081 &TeamEvent::task_completed("eng-2", Some(&task_id.to_string())),
1082 )
1083 .unwrap();
1084 }
1085
1086 let profiles = load_engineer_profiles(tmp.path(), &["eng-2".to_string()], &[]).unwrap();
1087 let profile = profiles.get("eng-2").unwrap();
1088 assert_eq!(profile.telemetry_completed_tasks, 5);
1089 assert!((profile.completion_rate - 1.0).abs() < f64::EPSILON);
1090 assert_eq!(profile.avg_task_duration_secs, Some(120.0));
1091 assert_eq!(profile.first_pass_test_rate, Some(1.0));
1092 }
1093
1094 #[test]
1095 fn rank_engineers_prefers_higher_completion_rate_when_telemetry_is_sufficient() {
1096 let engineers = vec!["eng-1".to_string(), "eng-2".to_string()];
1097 let profiles = HashMap::from([
1098 (
1099 "eng-1".to_string(),
1100 EngineerProfile {
1101 telemetry_completed_tasks: 5,
1102 completion_rate: 0.4,
1103 ..EngineerProfile::default()
1104 },
1105 ),
1106 (
1107 "eng-2".to_string(),
1108 EngineerProfile {
1109 telemetry_completed_tasks: 5,
1110 completion_rate: 0.9,
1111 ..EngineerProfile::default()
1112 },
1113 ),
1114 ]);
1115
1116 let ranked = rank_engineers_for_task(&engineers, &profiles, &task(&[], ""), &policy());
1117 assert_eq!(ranked[0], "eng-2");
1118 }
1119
1120 #[test]
1121 fn explain_routing_falls_back_when_any_engineer_lacks_enough_samples() {
1122 let engineers = vec!["eng-2".to_string(), "eng-1".to_string()];
1123 let profiles = HashMap::from([
1124 (
1125 "eng-1".to_string(),
1126 EngineerProfile {
1127 telemetry_completed_tasks: 4,
1128 completion_rate: 1.0,
1129 ..EngineerProfile::default()
1130 },
1131 ),
1132 (
1133 "eng-2".to_string(),
1134 EngineerProfile {
1135 telemetry_completed_tasks: 7,
1136 completion_rate: 0.2,
1137 ..EngineerProfile::default()
1138 },
1139 ),
1140 ]);
1141
1142 let explanation =
1143 explain_routing_for_task(&engineers, &profiles, &task(&[], ""), &policy());
1144 assert!(explanation.fallback_to_round_robin);
1145 assert_eq!(explanation.chosen_engineer.as_deref(), Some("eng-1"));
1146 }
1147
1148 #[test]
1149 fn explain_routing_keeps_scored_mode_when_no_telemetry_exists() {
1150 let engineers = vec!["eng-1".to_string(), "eng-2".to_string()];
1151 let profiles = HashMap::from([
1152 ("eng-1".to_string(), EngineerProfile::default()),
1153 (
1154 "eng-2".to_string(),
1155 EngineerProfile {
1156 domain_tags: HashSet::from(["dispatch".to_string()]),
1157 ..EngineerProfile::default()
1158 },
1159 ),
1160 ]);
1161
1162 let explanation =
1163 explain_routing_for_task(&engineers, &profiles, &task(&["dispatch"], ""), &policy());
1164 assert!(!explanation.fallback_to_round_robin);
1165 assert_eq!(explanation.chosen_engineer.as_deref(), Some("eng-2"));
1166 }
1167}