1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3
4use crate::task::Task;
5use anyhow::{Context, Result};
6use chrono::{DateTime, Duration, Utc};
7use rusqlite::Connection;
8use serde::{Deserialize, Serialize};
9
10use super::config::{AllocationPolicy, RoleType, TeamConfig};
11use super::hierarchy::resolve_hierarchy;
12use super::standup::MemberState;
13use super::{daemon_state_path, team_config_dir};
14
15#[derive(Debug, Clone, Default, PartialEq)]
16pub struct EngineerProfile {
17 pub name: String,
18 pub completed_task_ids: Vec<u32>,
19 pub active_file_paths: HashSet<String>,
20 pub domain_tags: HashSet<String>,
21 pub active_task_count: u32,
22 pub total_completions: u32,
23 pub recent_merge_conflicts: u32,
24 pub performance: Option<EngineerPerformanceProfile>,
25 pub telemetry_completed_tasks: u32,
26 pub completion_rate: f64,
27 pub avg_task_duration_secs: Option<f64>,
28 pub first_pass_test_rate: Option<f64>,
29}
30
31#[derive(Debug, Clone, Default, PartialEq)]
32pub struct EngineerPerformanceProfile {
33 pub avg_task_completion_secs: Option<f64>,
34 pub lines_per_hour: Option<f64>,
35 pub first_pass_test_rate: Option<f64>,
36 pub context_exhaustion_frequency: Option<f64>,
37}
38
39#[derive(Debug, Clone, Default, PartialEq)]
40pub struct EngineerRoutingBreakdown {
41 pub engineer: String,
42 pub total_score: i32,
43 pub tag_matches: usize,
44 pub file_matches: usize,
45 pub completion_rate: f64,
46 pub avg_task_duration_secs: Option<f64>,
47 pub first_pass_test_rate: Option<f64>,
48 pub telemetry_completed_tasks: u32,
49}
50
51#[derive(Debug, Clone, Default, PartialEq)]
52pub struct RoutingDecisionExplanation {
53 pub chosen_engineer: Option<String>,
54 pub fallback_to_round_robin: bool,
55 pub fallback_reason: Option<String>,
56 pub breakdowns: Vec<EngineerRoutingBreakdown>,
57}
58
59#[derive(Debug, Default, Deserialize)]
60struct AllocationTaskFrontmatter {
61 #[serde(default)]
62 changed_paths: Vec<String>,
63}
64
65const PROFILE_RETENTION_DAYS: i64 = 7;
66const ENGINEER_PROFILES_FILE: &str = "engineer_profiles.json";
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
69struct PersistedEngineerProfiles {
70 #[serde(default = "engineer_profiles_format_version")]
71 version: u32,
72 #[serde(default)]
73 updated_at: Option<String>,
74 #[serde(default)]
75 completions: Vec<PersistedTaskProfile>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
79struct PersistedTaskProfile {
80 engineer: String,
81 task_id: u32,
82 completed_at: String,
83 #[serde(default)]
84 changed_paths: Vec<String>,
85 #[serde(default)]
86 tags: Vec<String>,
87 #[serde(default)]
88 recent_merge_conflict: bool,
89}
90
91#[derive(Debug, Clone, Default, PartialEq)]
92struct EngineerTelemetryStats {
93 completed_tasks: u32,
94 completion_rate: f64,
95 avg_task_duration_secs: Option<f64>,
96 first_pass_test_rate: Option<f64>,
97}
98
99#[derive(Debug, Deserialize)]
100struct PersistedDaemonStateView {
101 #[serde(default)]
102 states: HashMap<String, MemberState>,
103}
104
105fn engineer_profiles_format_version() -> u32 {
106 1
107}
108
109pub fn build_engineer_profiles(
110 engineers: &[String],
111 tasks: &[Task],
112) -> Result<HashMap<String, EngineerProfile>> {
113 build_engineer_profiles_with_history(engineers, tasks, &[], &HashMap::new())
114}
115
116pub fn load_engineer_profiles(
117 project_root: &Path,
118 engineers: &[String],
119 tasks: &[Task],
120) -> Result<HashMap<String, EngineerProfile>> {
121 let persisted = load_persisted_task_profiles(project_root)?;
122 let telemetry = load_engineer_telemetry_stats(project_root)?;
123 let mut profiles =
124 build_engineer_profiles_with_history(engineers, tasks, &persisted, &telemetry)?;
125 if let Ok(conn) = crate::team::telemetry_db::open(project_root)
126 && let Ok(rows) = crate::team::telemetry_db::query_engineer_performance_profiles(&conn)
127 {
128 for row in rows {
129 if let Some(profile) = profiles.get_mut(&row.role) {
130 profile.performance = Some(EngineerPerformanceProfile {
131 avg_task_completion_secs: row.avg_task_completion_secs,
132 lines_per_hour: row.lines_per_hour,
133 first_pass_test_rate: row.first_pass_test_rate,
134 context_exhaustion_frequency: row.context_exhaustion_frequency,
135 });
136 }
137 }
138 }
139 Ok(profiles)
140}
141
142pub(crate) fn predict_task_file_paths(project_root: &Path, task: &Task) -> Result<HashSet<String>> {
143 let mut paths = task_profile_paths(task)?;
144 for record in load_persisted_task_profiles(project_root)? {
145 if persisted_profile_matches_task(task, &record) {
146 paths.extend(record.changed_paths);
147 }
148 }
149 Ok(paths)
150}
151
152pub fn persist_completed_task_profile(project_root: &Path, task: &Task) -> Result<()> {
153 let Some(engineer) = task.claimed_by.as_deref() else {
154 return Ok(());
155 };
156
157 let mut persisted = read_persisted_engineer_profiles(project_root)?;
158 prune_persisted_profiles(&mut persisted);
159 persisted
160 .completions
161 .retain(|entry| !(entry.engineer == engineer && entry.task_id == task.id));
162 persisted.completions.push(PersistedTaskProfile {
163 engineer: engineer.to_string(),
164 task_id: task.id,
165 completed_at: task
166 .completed
167 .clone()
168 .unwrap_or_else(|| Utc::now().to_rfc3339()),
169 changed_paths: load_changed_paths(task.source_path.as_path())?,
170 tags: task.tags.clone(),
171 recent_merge_conflict: task_has_conflict_signal(task),
172 });
173 persisted.updated_at = Some(Utc::now().to_rfc3339());
174 write_persisted_engineer_profiles(project_root, &persisted)
175}
176
177fn build_engineer_profiles_with_history(
178 engineers: &[String],
179 tasks: &[Task],
180 persisted: &[PersistedTaskProfile],
181 telemetry: &HashMap<String, EngineerTelemetryStats>,
182) -> Result<HashMap<String, EngineerProfile>> {
183 let mut profiles: HashMap<String, EngineerProfile> = engineers
184 .iter()
185 .cloned()
186 .map(|name| {
187 let profile = EngineerProfile {
188 name: name.clone(),
189 ..EngineerProfile::default()
190 };
191 (name, profile)
192 })
193 .collect();
194
195 let mut completed_task_keys: HashSet<(String, u32)> = HashSet::new();
196
197 for record in persisted {
198 let Some(profile) = profiles.get_mut(&record.engineer) else {
199 continue;
200 };
201 apply_completed_profile(
202 profile,
203 record.task_id,
204 record.tags.iter().cloned(),
205 record.changed_paths.iter().cloned(),
206 record.recent_merge_conflict,
207 );
208 completed_task_keys.insert((record.engineer.clone(), record.task_id));
209 }
210
211 for task in tasks {
212 let Some(owner) = task.claimed_by.as_deref() else {
213 continue;
214 };
215 let Some(profile) = profiles.get_mut(owner) else {
216 continue;
217 };
218
219 if task.status == "done" {
220 if completed_task_keys.insert((owner.to_string(), task.id)) {
221 apply_completed_profile(
222 profile,
223 task.id,
224 task.tags.iter().cloned(),
225 task_profile_paths(task)?.into_iter(),
226 task_has_conflict_signal(task),
227 );
228 }
229 } else if task_is_active_for_load(task) {
230 profile.active_task_count += 1;
231 profile.active_file_paths.extend(task_profile_paths(task)?);
232 if task_has_conflict_signal(task) {
233 profile.recent_merge_conflicts += 1;
234 }
235 }
236 }
237
238 for (engineer, stats) in telemetry {
239 let Some(profile) = profiles.get_mut(engineer) else {
240 continue;
241 };
242 profile.telemetry_completed_tasks = stats.completed_tasks;
243 profile.completion_rate = stats.completion_rate;
244 profile.avg_task_duration_secs = stats.avg_task_duration_secs;
245 profile.first_pass_test_rate = stats.first_pass_test_rate;
246 }
247
248 Ok(profiles)
249}
250
251fn apply_completed_profile<I, J>(
252 profile: &mut EngineerProfile,
253 task_id: u32,
254 tags: I,
255 changed_paths: J,
256 recent_merge_conflict: bool,
257) where
258 I: IntoIterator<Item = String>,
259 J: IntoIterator<Item = String>,
260{
261 profile.completed_task_ids.push(task_id);
262 profile.total_completions += 1;
263 profile.domain_tags.extend(tags);
264 profile.active_file_paths.extend(changed_paths);
265 if recent_merge_conflict {
266 profile.recent_merge_conflicts += 1;
267 }
268}
269
270fn persisted_profile_matches_task(task: &Task, record: &PersistedTaskProfile) -> bool {
271 if task
272 .tags
273 .iter()
274 .any(|tag| record.tags.iter().any(|candidate| candidate == tag))
275 {
276 return true;
277 }
278
279 let hinted_dirs: HashSet<String> = task_hint_paths(task)
280 .into_iter()
281 .filter_map(|path| parent_dir(&path))
282 .collect();
283 let record_dirs: HashSet<String> = record
284 .changed_paths
285 .iter()
286 .filter_map(|path| parent_dir(path))
287 .collect();
288 !hinted_dirs.is_empty() && !hinted_dirs.is_disjoint(&record_dirs)
289}
290
291pub fn score_engineer_for_task(
292 engineer: &EngineerProfile,
293 task: &Task,
294 policy: &AllocationPolicy,
295) -> i32 {
296 let mut score = 0;
297
298 let tag_overlap = task
299 .tags
300 .iter()
301 .filter(|tag| engineer.domain_tags.contains(*tag))
302 .count() as i32;
303 score += tag_overlap * policy.tag_weight;
304
305 let task_dirs = task_hint_directories(task);
306 let engineer_dirs: HashSet<String> = engineer
307 .active_file_paths
308 .iter()
309 .filter_map(|path| parent_dir(path))
310 .collect();
311 let dir_overlap = engineer_dirs.intersection(&task_dirs).count() as i32;
312 score += dir_overlap * policy.file_overlap_weight;
313 score += (engineer.completion_rate * 100.0).round() as i32;
314
315 score -= (engineer.active_task_count as i32) * policy.load_penalty;
316 score -= (engineer.recent_merge_conflicts as i32) * policy.conflict_penalty;
317 if engineer.total_completions > 3 {
318 score += policy.experience_bonus;
319 }
320 score += performance_score(engineer.performance.as_ref());
321
322 score
323}
324
325fn performance_score(performance: Option<&EngineerPerformanceProfile>) -> i32 {
326 let Some(performance) = performance else {
327 return 0;
328 };
329
330 let mut score = 0;
331
332 if let Some(first_pass_rate) = performance.first_pass_test_rate {
333 if first_pass_rate >= 0.75 {
334 score += 1;
335 } else if first_pass_rate < 0.5 {
336 score -= 1;
337 }
338 }
339
340 if let Some(context_freq) = performance.context_exhaustion_frequency {
341 if context_freq >= 0.5 {
342 score -= 2;
343 } else if context_freq > 0.0 {
344 score -= 1;
345 }
346 }
347
348 if let Some(avg_task_completion_secs) = performance.avg_task_completion_secs {
349 if avg_task_completion_secs > 0.0 && avg_task_completion_secs <= 3_600.0 {
350 score += 1;
351 } else if avg_task_completion_secs >= 14_400.0 {
352 score -= 1;
353 }
354 }
355
356 if let Some(lines_per_hour) = performance.lines_per_hour
357 && lines_per_hour >= 200.0
358 {
359 score += 1;
360 }
361
362 score
363}
364
365pub fn rank_engineers_for_task(
366 engineers: &[String],
367 profiles: &HashMap<String, EngineerProfile>,
368 task: &Task,
369 policy: &AllocationPolicy,
370) -> Vec<String> {
371 explain_routing_for_task(engineers, profiles, task, policy)
372 .breakdowns
373 .into_iter()
374 .map(|breakdown| breakdown.engineer)
375 .collect()
376}
377
378pub fn explain_routing_for_task(
379 engineers: &[String],
380 profiles: &HashMap<String, EngineerProfile>,
381 task: &Task,
382 policy: &AllocationPolicy,
383) -> RoutingDecisionExplanation {
384 let mut breakdowns: Vec<EngineerRoutingBreakdown> = engineers
385 .iter()
386 .map(|engineer| engineer_breakdown(engineer, profiles.get(engineer), task, policy))
387 .collect();
388
389 let has_any_telemetry = breakdowns
390 .iter()
391 .any(|breakdown| breakdown.telemetry_completed_tasks > 0);
392 let telemetry_ready = has_any_telemetry
393 && breakdowns
394 .iter()
395 .all(|breakdown| breakdown.telemetry_completed_tasks >= 5);
396 if telemetry_ready || !has_any_telemetry {
397 breakdowns.sort_by(compare_breakdowns);
398 let chosen_engineer = breakdowns
399 .first()
400 .map(|breakdown| breakdown.engineer.clone());
401 return RoutingDecisionExplanation {
402 chosen_engineer,
403 fallback_to_round_robin: false,
404 fallback_reason: None,
405 breakdowns,
406 };
407 }
408
409 breakdowns.sort_by(|left, right| left.engineer.cmp(&right.engineer));
410 let chosen_engineer = breakdowns
411 .first()
412 .map(|breakdown| breakdown.engineer.clone());
413 RoutingDecisionExplanation {
414 chosen_engineer,
415 fallback_to_round_robin: true,
416 fallback_reason: Some(
417 "telemetry fallback: each eligible engineer needs at least 5 completed tasks"
418 .to_string(),
419 ),
420 breakdowns,
421 }
422}
423
424pub fn print_dispatch_explanation(project_root: &Path, task_id: Option<u32>) -> Result<()> {
425 let board_dir = project_root
426 .join(".batty")
427 .join("team_config")
428 .join("board");
429 let tasks = crate::task::load_tasks_from_dir(&board_dir.join("tasks"))?;
430 let task = select_dispatch_task(&tasks, task_id)
431 .with_context(|| format!("no dispatchable task found for {:?}", task_id))?;
432
433 let team_config = TeamConfig::load(&team_config_dir(project_root).join("team.yaml"))?;
434 let members = resolve_hierarchy(&team_config)?;
435 let mut engineers = load_idle_engineers(project_root, &members)?;
436 if engineers.is_empty() {
437 engineers = members
438 .into_iter()
439 .filter(|member| member.role_type == RoleType::Engineer)
440 .map(|member| member.name)
441 .collect();
442 }
443 let bench_state = crate::team::bench::load_bench_state(project_root)?;
444 engineers.retain(|engineer| !bench_state.benched.contains_key(engineer));
445 engineers.sort();
446 let profiles = load_engineer_profiles(project_root, &engineers, &tasks)?;
447 let explanation = explain_routing_for_task(
448 &engineers,
449 &profiles,
450 task,
451 &team_config.workflow_policy.allocation,
452 );
453
454 println!("Task #{}: {}", task.id, task.title);
455 if let Some(chosen) = &explanation.chosen_engineer {
456 println!("Chosen engineer: {chosen}");
457 } else {
458 println!("Chosen engineer: none");
459 }
460 if let Some(reason) = &explanation.fallback_reason {
461 println!("Routing mode: {reason}");
462 } else {
463 println!("Routing mode: telemetry-scored");
464 }
465 println!();
466 println!(
467 "{:<20} {:>6} {:>5} {:>5} {:>10} {:>10} {:>11} {:>8}",
468 "ENGINEER", "SCORE", "TAGS", "FILES", "COMPLETE%", "AVG SECS", "FIRST PASS%", "SAMPLES"
469 );
470 println!("{}", "-".repeat(88));
471 for breakdown in explanation.breakdowns {
472 println!(
473 "{:<20} {:>6} {:>5} {:>5} {:>10.1} {:>10} {:>11.1} {:>8}",
474 breakdown.engineer,
475 breakdown.total_score,
476 breakdown.tag_matches,
477 breakdown.file_matches,
478 breakdown.completion_rate * 100.0,
479 breakdown
480 .avg_task_duration_secs
481 .map(|secs| format!("{secs:.0}"))
482 .unwrap_or_else(|| "-".to_string()),
483 breakdown.first_pass_test_rate.unwrap_or(0.0) * 100.0,
484 breakdown.telemetry_completed_tasks,
485 );
486 }
487
488 Ok(())
489}
490
491fn engineer_breakdown(
492 engineer: &str,
493 profile: Option<&EngineerProfile>,
494 task: &Task,
495 policy: &AllocationPolicy,
496) -> EngineerRoutingBreakdown {
497 let profile = profile.cloned().unwrap_or_else(|| EngineerProfile {
498 name: engineer.to_string(),
499 ..EngineerProfile::default()
500 });
501 let task_dirs = task_hint_directories(task);
502 let engineer_dirs: HashSet<String> = profile
503 .active_file_paths
504 .iter()
505 .filter_map(|path| parent_dir(path))
506 .collect();
507 let tag_matches = task
508 .tags
509 .iter()
510 .filter(|tag| profile.domain_tags.contains(*tag))
511 .count();
512 let file_matches = engineer_dirs.intersection(&task_dirs).count();
513 EngineerRoutingBreakdown {
514 engineer: engineer.to_string(),
515 total_score: score_engineer_for_task(&profile, task, policy),
516 tag_matches,
517 file_matches,
518 completion_rate: profile.completion_rate,
519 avg_task_duration_secs: profile.avg_task_duration_secs,
520 first_pass_test_rate: profile.first_pass_test_rate,
521 telemetry_completed_tasks: profile.telemetry_completed_tasks,
522 }
523}
524
525fn compare_breakdowns(
526 left: &EngineerRoutingBreakdown,
527 right: &EngineerRoutingBreakdown,
528) -> std::cmp::Ordering {
529 right
530 .total_score
531 .cmp(&left.total_score)
532 .then_with(|| {
533 right
534 .first_pass_test_rate
535 .partial_cmp(&left.first_pass_test_rate)
536 .unwrap_or(std::cmp::Ordering::Equal)
537 })
538 .then_with(|| {
539 left.avg_task_duration_secs
540 .partial_cmp(&right.avg_task_duration_secs)
541 .unwrap_or(std::cmp::Ordering::Equal)
542 })
543 .then_with(|| left.engineer.cmp(&right.engineer))
544}
545
546fn select_dispatch_task(tasks: &[Task], task_id: Option<u32>) -> Option<&Task> {
547 if let Some(task_id) = task_id {
548 return tasks.iter().find(|task| task.id == task_id);
549 }
550
551 let task_status_by_id: HashMap<u32, String> = tasks
552 .iter()
553 .map(|task| (task.id, task.status.clone()))
554 .collect();
555 let mut dispatchable: Vec<&Task> = tasks
556 .iter()
557 .filter(|task| matches!(task.status.as_str(), "backlog" | "todo"))
558 .filter(|task| task.claimed_by.is_none())
559 .filter(|task| task.blocked.is_none())
560 .filter(|task| task.blocked_on.is_none())
561 .filter(|task| !task.is_schedule_blocked())
562 .filter(|task| {
563 task.depends_on.iter().all(|dep_id| {
564 task_status_by_id
565 .get(dep_id)
566 .is_none_or(|status| status == "done")
567 })
568 })
569 .collect();
570 dispatchable.sort_by_key(|task| {
571 (
572 match task.priority.as_str() {
573 "critical" => 0,
574 "high" => 1,
575 "medium" => 2,
576 "low" => 3,
577 _ => 4,
578 },
579 task.id,
580 )
581 });
582 dispatchable.into_iter().next()
583}
584
585fn load_idle_engineers(
586 project_root: &Path,
587 members: &[super::hierarchy::MemberInstance],
588) -> Result<Vec<String>> {
589 let path = daemon_state_path(project_root);
590 if !path.exists() {
591 return Ok(Vec::new());
592 }
593 let content = std::fs::read_to_string(&path)
594 .with_context(|| format!("failed to read {}", path.display()))?;
595 let state: PersistedDaemonStateView = serde_json::from_str(&content)
596 .with_context(|| format!("failed to parse {}", path.display()))?;
597 Ok(members
598 .iter()
599 .filter(|member| member.role_type == RoleType::Engineer)
600 .filter(|member| state.states.get(&member.name) == Some(&MemberState::Idle))
601 .map(|member| member.name.clone())
602 .collect())
603}
604
605fn load_engineer_telemetry_stats(
606 project_root: &Path,
607) -> Result<HashMap<String, EngineerTelemetryStats>> {
608 let conn = match super::telemetry_db::open(project_root) {
609 Ok(conn) => conn,
610 Err(_) => return Ok(HashMap::new()),
611 };
612
613 let mut stats = load_quality_metric_stats(&conn)?;
614 for (engineer, completion_rate) in load_completion_rates(&conn)? {
615 stats.entry(engineer).or_default().completion_rate = completion_rate;
616 }
617 Ok(stats)
618}
619
620fn load_completion_rates(conn: &Connection) -> Result<HashMap<String, f64>> {
621 let mut stmt = conn.prepare(
622 "SELECT role,
623 SUM(CASE WHEN event_type = 'task_assigned' THEN 1 ELSE 0 END) AS assigned,
624 SUM(CASE WHEN event_type = 'task_completed' THEN 1 ELSE 0 END) AS completed
625 FROM events
626 WHERE role IS NOT NULL AND event_type IN ('task_assigned', 'task_completed')
627 GROUP BY role",
628 )?;
629 let rows = stmt.query_map([], |row| {
630 let role: String = row.get(0)?;
631 let assigned: i64 = row.get(1)?;
632 let completed: i64 = row.get(2)?;
633 let rate = if assigned > 0 {
634 completed as f64 / assigned as f64
635 } else {
636 0.0
637 };
638 Ok((role, rate))
639 })?;
640
641 let mut rates = HashMap::new();
642 for row in rows {
643 let (role, rate) = row?;
644 rates.insert(role, rate);
645 }
646 Ok(rates)
647}
648
649fn load_quality_metric_stats(conn: &Connection) -> Result<HashMap<String, EngineerTelemetryStats>> {
650 let mut stmt = conn.prepare(
651 "SELECT role,
652 COUNT(*) AS samples,
653 AVG(CAST(json_extract(payload, '$.time_to_completion_secs') AS REAL)) AS avg_secs,
654 AVG(CAST(json_extract(payload, '$.first_pass_test_rate') AS REAL)) AS first_pass
655 FROM events
656 WHERE role IS NOT NULL AND event_type = 'quality_metrics_recorded'
657 GROUP BY role",
658 )?;
659 let rows = stmt.query_map([], |row| {
660 Ok((
661 row.get::<_, String>(0)?,
662 EngineerTelemetryStats {
663 completed_tasks: row.get::<_, i64>(1)? as u32,
664 completion_rate: 0.0,
665 avg_task_duration_secs: row.get(2)?,
666 first_pass_test_rate: row.get(3)?,
667 },
668 ))
669 })?;
670
671 let mut stats = HashMap::new();
672 for row in rows {
673 let (engineer, profile) = row?;
674 stats.insert(engineer, profile);
675 }
676 Ok(stats)
677}
678
679fn task_is_active_for_load(task: &Task) -> bool {
680 matches!(
681 task.status.as_str(),
682 "todo" | "backlog" | "in-progress" | "review" | "blocked"
683 )
684}
685
686fn task_has_conflict_signal(task: &Task) -> bool {
687 let blocked = task.blocked.as_deref().unwrap_or_default();
688 let blocked_on = task.blocked_on.as_deref().unwrap_or_default();
689 let description = task.description.to_ascii_lowercase();
690 blocked.to_ascii_lowercase().contains("conflict")
691 || blocked_on.to_ascii_lowercase().contains("conflict")
692 || description.contains("merge conflict")
693 || description.contains("rebase conflict")
694}
695
696fn task_profile_paths(task: &Task) -> Result<HashSet<String>> {
697 let mut paths = task_hint_paths(task);
698 for path in load_changed_paths(task.source_path.as_path())? {
699 paths.insert(path);
700 }
701 Ok(paths)
702}
703
704fn task_hint_directories(task: &Task) -> HashSet<String> {
705 task_hint_paths(task)
706 .into_iter()
707 .filter_map(|path| parent_dir(&path))
708 .collect()
709}
710
711fn task_hint_paths(task: &Task) -> HashSet<String> {
712 task.description
713 .split_whitespace()
714 .filter_map(clean_task_path_token)
715 .collect()
716}
717
718fn clean_task_path_token(token: &str) -> Option<String> {
719 let cleaned = token.trim_matches(|ch: char| {
720 matches!(
721 ch,
722 '"' | '\'' | ',' | ':' | ';' | '(' | ')' | '[' | ']' | '`'
723 )
724 });
725 parent_dir(cleaned).map(|_| cleaned.to_string())
726}
727
728fn load_changed_paths(path: &Path) -> Result<Vec<String>> {
729 if path.as_os_str().is_empty() || !path.exists() {
730 return Ok(Vec::new());
731 }
732
733 let content = std::fs::read_to_string(path)?;
734 let Some(frontmatter) = extract_frontmatter(&content) else {
735 return Ok(Vec::new());
736 };
737 let parsed: AllocationTaskFrontmatter = serde_yaml::from_str(frontmatter).unwrap_or_default();
738 Ok(parsed.changed_paths)
739}
740
741fn load_persisted_task_profiles(project_root: &Path) -> Result<Vec<PersistedTaskProfile>> {
742 let mut persisted = read_persisted_engineer_profiles(project_root)?;
743 prune_persisted_profiles(&mut persisted);
744 if persisted.updated_at.is_some() {
745 write_persisted_engineer_profiles(project_root, &persisted)?;
746 }
747 Ok(persisted.completions)
748}
749
750fn read_persisted_engineer_profiles(project_root: &Path) -> Result<PersistedEngineerProfiles> {
751 let path = engineer_profiles_path(project_root);
752 if !path.exists() {
753 return Ok(PersistedEngineerProfiles {
754 version: engineer_profiles_format_version(),
755 updated_at: None,
756 completions: Vec::new(),
757 });
758 }
759
760 let content = std::fs::read_to_string(&path)?;
761 let persisted = serde_json::from_str(&content)?;
762 Ok(persisted)
763}
764
765fn write_persisted_engineer_profiles(
766 project_root: &Path,
767 persisted: &PersistedEngineerProfiles,
768) -> Result<()> {
769 let path = engineer_profiles_path(project_root);
770 if let Some(parent) = path.parent() {
771 std::fs::create_dir_all(parent)?;
772 }
773 std::fs::write(path, serde_json::to_vec_pretty(persisted)?)?;
774 Ok(())
775}
776
777fn prune_persisted_profiles(persisted: &mut PersistedEngineerProfiles) {
778 let cutoff = Utc::now() - Duration::days(PROFILE_RETENTION_DAYS);
779 persisted.completions.retain(|entry| {
780 DateTime::parse_from_rfc3339(&entry.completed_at)
781 .map(|completed| completed.with_timezone(&Utc) >= cutoff)
782 .unwrap_or(false)
783 });
784}
785
786fn engineer_profiles_path(project_root: &Path) -> PathBuf {
787 team_config_dir(project_root).join(ENGINEER_PROFILES_FILE)
788}
789
790fn extract_frontmatter(content: &str) -> Option<&str> {
791 let trimmed = content.trim_start();
792 if !trimmed.starts_with("---") {
793 return None;
794 }
795 let after_open = trimmed[3..].strip_prefix('\n').unwrap_or(&trimmed[3..]);
796 let close_pos = after_open.find("\n---")?;
797 Some(&after_open[..close_pos])
798}
799
800fn parent_dir(path: &str) -> Option<String> {
801 PathBuf::from(path)
802 .parent()
803 .filter(|parent| !parent.as_os_str().is_empty())
804 .map(|parent| parent.to_string_lossy().into_owned())
805}
806
807#[cfg(test)]
808mod tests {
809 use super::*;
810 use crate::team::config::AllocationStrategy;
811 use crate::team::events::{QualityMetricsInfo, TeamEvent};
812 use crate::team::telemetry_db;
813 use std::fs;
814
815 fn task(tags: &[&str], description: &str) -> Task {
816 Task {
817 id: 1,
818 title: "task".to_string(),
819 status: "todo".to_string(),
820 priority: "high".to_string(),
821 claimed_by: None,
822 claimed_at: None,
823 claim_ttl_secs: None,
824 claim_expires_at: None,
825 last_progress_at: None,
826 claim_warning_sent_at: None,
827 claim_extensions: None,
828 last_output_bytes: None,
829 blocked: None,
830 tags: tags.iter().map(|tag| (*tag).to_string()).collect(),
831 depends_on: Vec::new(),
832 review_owner: None,
833 blocked_on: None,
834 worktree_path: None,
835 branch: None,
836 commit: None,
837 artifacts: Vec::new(),
838 next_action: None,
839 scheduled_for: None,
840 cron_schedule: None,
841 cron_last_run: None,
842 completed: None,
843 description: description.to_string(),
844 batty_config: None,
845 source_path: PathBuf::new(),
846 }
847 }
848
849 fn policy() -> AllocationPolicy {
850 AllocationPolicy {
851 strategy: AllocationStrategy::Scored,
852 ..AllocationPolicy::default()
853 }
854 }
855
856 #[test]
857 fn score_prefers_tag_overlap() {
858 let profile = EngineerProfile {
859 domain_tags: HashSet::from(["dispatch".to_string()]),
860 ..EngineerProfile::default()
861 };
862 assert!(score_engineer_for_task(&profile, &task(&["dispatch"], ""), &policy()) > 0);
863 }
864
865 #[test]
866 fn score_prefers_matching_directory_hints() {
867 let profile = EngineerProfile {
868 active_file_paths: HashSet::from(["src/team/dispatch/queue.rs".to_string()]),
869 ..EngineerProfile::default()
870 };
871 assert!(
872 score_engineer_for_task(
873 &profile,
874 &task(&[], "Touch src/team/dispatch/mod.rs next."),
875 &policy(),
876 ) > 0
877 );
878 }
879
880 #[test]
881 fn score_penalizes_active_load() {
882 let light = EngineerProfile {
883 active_task_count: 0,
884 ..EngineerProfile::default()
885 };
886 let busy = EngineerProfile {
887 active_task_count: 2,
888 ..EngineerProfile::default()
889 };
890 assert!(
891 score_engineer_for_task(&light, &task(&[], ""), &policy())
892 > score_engineer_for_task(&busy, &task(&[], ""), &policy())
893 );
894 }
895
896 #[test]
897 fn rank_engineers_falls_back_to_name_order_on_tie() {
898 let engineers = vec!["eng-2".to_string(), "eng-1".to_string()];
899 let profiles = HashMap::from([
900 ("eng-1".to_string(), EngineerProfile::default()),
901 ("eng-2".to_string(), EngineerProfile::default()),
902 ]);
903 let ranked = rank_engineers_for_task(&engineers, &profiles, &task(&[], ""), &policy());
904 assert_eq!(ranked, vec!["eng-1".to_string(), "eng-2".to_string()]);
905 }
906
907 #[test]
908 fn score_prefers_more_reliable_performance_profile() {
909 let reliable = EngineerProfile {
910 performance: Some(EngineerPerformanceProfile {
911 avg_task_completion_secs: Some(1800.0),
912 lines_per_hour: Some(250.0),
913 first_pass_test_rate: Some(1.0),
914 context_exhaustion_frequency: Some(0.0),
915 }),
916 ..EngineerProfile::default()
917 };
918 let unstable = EngineerProfile {
919 performance: Some(EngineerPerformanceProfile {
920 avg_task_completion_secs: Some(18_000.0),
921 lines_per_hour: Some(10.0),
922 first_pass_test_rate: Some(0.0),
923 context_exhaustion_frequency: Some(1.0),
924 }),
925 ..EngineerProfile::default()
926 };
927
928 assert!(
929 score_engineer_for_task(&reliable, &task(&[], ""), &policy())
930 > score_engineer_for_task(&unstable, &task(&[], ""), &policy())
931 );
932 }
933
934 #[test]
935 fn build_profiles_reads_changed_paths_and_tags_from_board_tasks() {
936 let tmp = tempfile::tempdir().unwrap();
937 let task_path = tmp.path().join("042-profile.md");
938 fs::write(
939 &task_path,
940 "---\nid: 42\ntitle: profile\nstatus: done\npriority: high\nclaimed_by: eng-2\ntags:\n - dispatch\nchanged_paths:\n - src/team/dispatch/queue.rs\nclass: standard\n---\n\nTouch src/team/dispatch/mod.rs too.\n",
941 )
942 .unwrap();
943
944 let task = Task::from_file(&task_path).unwrap();
945 let profiles =
946 build_engineer_profiles(&["eng-1".to_string(), "eng-2".to_string()], &[task]).unwrap();
947 let profile = profiles.get("eng-2").unwrap();
948
949 assert_eq!(profile.total_completions, 1);
950 assert!(profile.domain_tags.contains("dispatch"));
951 assert!(
952 profile
953 .active_file_paths
954 .contains("src/team/dispatch/queue.rs")
955 );
956 }
957
958 #[test]
959 fn build_profiles_counts_active_load_and_conflict_signals() {
960 let task = Task {
961 id: 7,
962 title: "conflicted".to_string(),
963 status: "review".to_string(),
964 priority: "high".to_string(),
965 claimed_by: Some("eng-1".to_string()),
966 claimed_at: None,
967 claim_ttl_secs: None,
968 claim_expires_at: None,
969 last_progress_at: None,
970 claim_warning_sent_at: None,
971 claim_extensions: None,
972 last_output_bytes: None,
973 blocked: Some("merge conflict".to_string()),
974 tags: vec!["daemon".to_string()],
975 depends_on: Vec::new(),
976 review_owner: None,
977 blocked_on: None,
978 worktree_path: None,
979 branch: None,
980 commit: None,
981 artifacts: Vec::new(),
982 next_action: None,
983 scheduled_for: None,
984 cron_schedule: None,
985 cron_last_run: None,
986 completed: None,
987 description: "Resolve rebase conflict in src/team/daemon/mod.rs".to_string(),
988 batty_config: None,
989 source_path: PathBuf::new(),
990 };
991
992 let profiles = build_engineer_profiles(&["eng-1".to_string()], &[task]).unwrap();
993 let profile = profiles.get("eng-1").unwrap();
994 assert_eq!(profile.active_task_count, 1);
995 assert_eq!(profile.recent_merge_conflicts, 1);
996 }
997
998 #[test]
999 fn load_engineer_profiles_merges_recent_persisted_history() {
1000 let tmp = tempfile::tempdir().unwrap();
1001 let persisted = PersistedEngineerProfiles {
1002 version: engineer_profiles_format_version(),
1003 updated_at: Some(Utc::now().to_rfc3339()),
1004 completions: vec![PersistedTaskProfile {
1005 engineer: "eng-2".to_string(),
1006 task_id: 9,
1007 completed_at: Utc::now().to_rfc3339(),
1008 changed_paths: vec!["src/team/dispatch/queue.rs".to_string()],
1009 tags: vec!["dispatch".to_string()],
1010 recent_merge_conflict: false,
1011 }],
1012 };
1013 write_persisted_engineer_profiles(tmp.path(), &persisted).unwrap();
1014
1015 let profiles = load_engineer_profiles(tmp.path(), &["eng-2".to_string()], &[]).unwrap();
1016 let profile = profiles.get("eng-2").unwrap();
1017
1018 assert_eq!(profile.total_completions, 1);
1019 assert!(profile.domain_tags.contains("dispatch"));
1020 assert!(
1021 profile
1022 .active_file_paths
1023 .contains("src/team/dispatch/queue.rs")
1024 );
1025 }
1026
1027 #[test]
1028 fn persist_completed_task_profile_prunes_old_history() {
1029 let tmp = tempfile::tempdir().unwrap();
1030 let stale_completed_at =
1031 (Utc::now() - Duration::days(PROFILE_RETENTION_DAYS + 1)).to_rfc3339();
1032 let persisted = PersistedEngineerProfiles {
1033 version: engineer_profiles_format_version(),
1034 updated_at: Some(Utc::now().to_rfc3339()),
1035 completions: vec![PersistedTaskProfile {
1036 engineer: "eng-1".to_string(),
1037 task_id: 1,
1038 completed_at: stale_completed_at,
1039 changed_paths: vec!["src/old.rs".to_string()],
1040 tags: vec!["old".to_string()],
1041 recent_merge_conflict: false,
1042 }],
1043 };
1044 write_persisted_engineer_profiles(tmp.path(), &persisted).unwrap();
1045
1046 let task_path = tmp.path().join("042-profile.md");
1047 fs::write(
1048 &task_path,
1049 "---\nid: 42\ntitle: profile\nstatus: done\npriority: high\nclaimed_by: eng-2\ntags:\n - dispatch\nchanged_paths:\n - src/team/dispatch/queue.rs\nclass: standard\ncompleted: 2026-04-06T03:00:00-04:00\n---\n\nTouch src/team/dispatch/mod.rs too.\n",
1050 )
1051 .unwrap();
1052
1053 let task = Task::from_file(&task_path).unwrap();
1054 persist_completed_task_profile(tmp.path(), &task).unwrap();
1055
1056 let loaded =
1057 load_engineer_profiles(tmp.path(), &["eng-1".to_string(), "eng-2".to_string()], &[])
1058 .unwrap();
1059 assert_eq!(loaded.get("eng-1").unwrap().total_completions, 0);
1060 assert_eq!(loaded.get("eng-2").unwrap().total_completions, 1);
1061 }
1062
1063 #[test]
1064 fn load_engineer_profiles_reads_telemetry_reliability_metrics() {
1065 let tmp = tempfile::tempdir().unwrap();
1066 fs::create_dir_all(tmp.path().join(".batty")).unwrap();
1067 let conn = telemetry_db::open(tmp.path()).unwrap();
1068 for task_id in 1..=5 {
1069 telemetry_db::insert_event(
1070 &conn,
1071 &TeamEvent::task_assigned("eng-2", &task_id.to_string()),
1072 )
1073 .unwrap();
1074 telemetry_db::insert_event(
1075 &conn,
1076 &TeamEvent::quality_metrics_recorded(&QualityMetricsInfo {
1077 backend: "codex",
1078 role: "eng-2",
1079 task: &task_id.to_string(),
1080 narration_ratio: 0.1,
1081 commit_frequency: 1.0,
1082 first_pass_test_rate: 1.0,
1083 retry_rate: 0.0,
1084 time_to_completion_secs: 120,
1085 }),
1086 )
1087 .unwrap();
1088 telemetry_db::insert_event(
1089 &conn,
1090 &TeamEvent::task_completed("eng-2", Some(&task_id.to_string())),
1091 )
1092 .unwrap();
1093 }
1094
1095 let profiles = load_engineer_profiles(tmp.path(), &["eng-2".to_string()], &[]).unwrap();
1096 let profile = profiles.get("eng-2").unwrap();
1097 assert_eq!(profile.telemetry_completed_tasks, 5);
1098 assert!((profile.completion_rate - 1.0).abs() < f64::EPSILON);
1099 assert_eq!(profile.avg_task_duration_secs, Some(120.0));
1100 assert_eq!(profile.first_pass_test_rate, Some(1.0));
1101 }
1102
1103 #[test]
1104 fn rank_engineers_prefers_higher_completion_rate_when_telemetry_is_sufficient() {
1105 let engineers = vec!["eng-1".to_string(), "eng-2".to_string()];
1106 let profiles = HashMap::from([
1107 (
1108 "eng-1".to_string(),
1109 EngineerProfile {
1110 telemetry_completed_tasks: 5,
1111 completion_rate: 0.4,
1112 ..EngineerProfile::default()
1113 },
1114 ),
1115 (
1116 "eng-2".to_string(),
1117 EngineerProfile {
1118 telemetry_completed_tasks: 5,
1119 completion_rate: 0.9,
1120 ..EngineerProfile::default()
1121 },
1122 ),
1123 ]);
1124
1125 let ranked = rank_engineers_for_task(&engineers, &profiles, &task(&[], ""), &policy());
1126 assert_eq!(ranked[0], "eng-2");
1127 }
1128
1129 #[test]
1130 fn explain_routing_falls_back_when_any_engineer_lacks_enough_samples() {
1131 let engineers = vec!["eng-2".to_string(), "eng-1".to_string()];
1132 let profiles = HashMap::from([
1133 (
1134 "eng-1".to_string(),
1135 EngineerProfile {
1136 telemetry_completed_tasks: 4,
1137 completion_rate: 1.0,
1138 ..EngineerProfile::default()
1139 },
1140 ),
1141 (
1142 "eng-2".to_string(),
1143 EngineerProfile {
1144 telemetry_completed_tasks: 7,
1145 completion_rate: 0.2,
1146 ..EngineerProfile::default()
1147 },
1148 ),
1149 ]);
1150
1151 let explanation =
1152 explain_routing_for_task(&engineers, &profiles, &task(&[], ""), &policy());
1153 assert!(explanation.fallback_to_round_robin);
1154 assert_eq!(explanation.chosen_engineer.as_deref(), Some("eng-1"));
1155 }
1156
1157 #[test]
1158 fn explain_routing_keeps_scored_mode_when_no_telemetry_exists() {
1159 let engineers = vec!["eng-1".to_string(), "eng-2".to_string()];
1160 let profiles = HashMap::from([
1161 ("eng-1".to_string(), EngineerProfile::default()),
1162 (
1163 "eng-2".to_string(),
1164 EngineerProfile {
1165 domain_tags: HashSet::from(["dispatch".to_string()]),
1166 ..EngineerProfile::default()
1167 },
1168 ),
1169 ]);
1170
1171 let explanation =
1172 explain_routing_for_task(&engineers, &profiles, &task(&["dispatch"], ""), &policy());
1173 assert!(!explanation.fallback_to_round_robin);
1174 assert_eq!(explanation.chosen_engineer.as_deref(), Some("eng-2"));
1175 }
1176}