Skip to main content

batty_cli/team/
allocation.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3
4use crate::task::Task;
5use anyhow::{Context, Result};
6use chrono::{DateTime, Duration, Utc};
7use rusqlite::Connection;
8use serde::{Deserialize, Serialize};
9
10use super::config::{AllocationPolicy, RoleType, TeamConfig};
11use super::hierarchy::resolve_hierarchy;
12use super::standup::MemberState;
13use super::{daemon_state_path, team_config_dir};
14
15#[derive(Debug, Clone, Default, PartialEq)]
16pub struct EngineerProfile {
17    pub name: String,
18    pub completed_task_ids: Vec<u32>,
19    pub active_file_paths: HashSet<String>,
20    pub domain_tags: HashSet<String>,
21    pub active_task_count: u32,
22    pub total_completions: u32,
23    pub recent_merge_conflicts: u32,
24    pub performance: Option<EngineerPerformanceProfile>,
25    pub telemetry_completed_tasks: u32,
26    pub completion_rate: f64,
27    pub avg_task_duration_secs: Option<f64>,
28    pub first_pass_test_rate: Option<f64>,
29}
30
31#[derive(Debug, Clone, Default, PartialEq)]
32pub struct EngineerPerformanceProfile {
33    pub avg_task_completion_secs: Option<f64>,
34    pub lines_per_hour: Option<f64>,
35    pub first_pass_test_rate: Option<f64>,
36    pub context_exhaustion_frequency: Option<f64>,
37}
38
39#[derive(Debug, Clone, Default, PartialEq)]
40pub struct EngineerRoutingBreakdown {
41    pub engineer: String,
42    pub total_score: i32,
43    pub tag_matches: usize,
44    pub file_matches: usize,
45    pub completion_rate: f64,
46    pub avg_task_duration_secs: Option<f64>,
47    pub first_pass_test_rate: Option<f64>,
48    pub telemetry_completed_tasks: u32,
49}
50
51#[derive(Debug, Clone, Default, PartialEq)]
52pub struct RoutingDecisionExplanation {
53    pub chosen_engineer: Option<String>,
54    pub fallback_to_round_robin: bool,
55    pub fallback_reason: Option<String>,
56    pub breakdowns: Vec<EngineerRoutingBreakdown>,
57}
58
59#[derive(Debug, Default, Deserialize)]
60struct AllocationTaskFrontmatter {
61    #[serde(default)]
62    changed_paths: Vec<String>,
63}
64
65const PROFILE_RETENTION_DAYS: i64 = 7;
66const ENGINEER_PROFILES_FILE: &str = "engineer_profiles.json";
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
69struct PersistedEngineerProfiles {
70    #[serde(default = "engineer_profiles_format_version")]
71    version: u32,
72    #[serde(default)]
73    updated_at: Option<String>,
74    #[serde(default)]
75    completions: Vec<PersistedTaskProfile>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
79struct PersistedTaskProfile {
80    engineer: String,
81    task_id: u32,
82    completed_at: String,
83    #[serde(default)]
84    changed_paths: Vec<String>,
85    #[serde(default)]
86    tags: Vec<String>,
87    #[serde(default)]
88    recent_merge_conflict: bool,
89}
90
91#[derive(Debug, Clone, Default, PartialEq)]
92struct EngineerTelemetryStats {
93    completed_tasks: u32,
94    completion_rate: f64,
95    avg_task_duration_secs: Option<f64>,
96    first_pass_test_rate: Option<f64>,
97}
98
99#[derive(Debug, Deserialize)]
100struct PersistedDaemonStateView {
101    #[serde(default)]
102    states: HashMap<String, MemberState>,
103}
104
105fn engineer_profiles_format_version() -> u32 {
106    1
107}
108
109pub fn build_engineer_profiles(
110    engineers: &[String],
111    tasks: &[Task],
112) -> Result<HashMap<String, EngineerProfile>> {
113    build_engineer_profiles_with_history(engineers, tasks, &[], &HashMap::new())
114}
115
116pub fn load_engineer_profiles(
117    project_root: &Path,
118    engineers: &[String],
119    tasks: &[Task],
120) -> Result<HashMap<String, EngineerProfile>> {
121    let persisted = load_persisted_task_profiles(project_root)?;
122    let telemetry = load_engineer_telemetry_stats(project_root)?;
123    let mut profiles =
124        build_engineer_profiles_with_history(engineers, tasks, &persisted, &telemetry)?;
125    if let Ok(conn) = crate::team::telemetry_db::open(project_root)
126        && let Ok(rows) = crate::team::telemetry_db::query_engineer_performance_profiles(&conn)
127    {
128        for row in rows {
129            if let Some(profile) = profiles.get_mut(&row.role) {
130                profile.performance = Some(EngineerPerformanceProfile {
131                    avg_task_completion_secs: row.avg_task_completion_secs,
132                    lines_per_hour: row.lines_per_hour,
133                    first_pass_test_rate: row.first_pass_test_rate,
134                    context_exhaustion_frequency: row.context_exhaustion_frequency,
135                });
136            }
137        }
138    }
139    Ok(profiles)
140}
141
142pub(crate) fn predict_task_file_paths(project_root: &Path, task: &Task) -> Result<HashSet<String>> {
143    let mut paths = task_profile_paths(task)?;
144    for record in load_persisted_task_profiles(project_root)? {
145        if persisted_profile_matches_task(task, &record) {
146            paths.extend(record.changed_paths);
147        }
148    }
149    Ok(paths)
150}
151
152pub fn persist_completed_task_profile(project_root: &Path, task: &Task) -> Result<()> {
153    let Some(engineer) = task.claimed_by.as_deref() else {
154        return Ok(());
155    };
156
157    let mut persisted = read_persisted_engineer_profiles(project_root)?;
158    prune_persisted_profiles(&mut persisted);
159    persisted
160        .completions
161        .retain(|entry| !(entry.engineer == engineer && entry.task_id == task.id));
162    persisted.completions.push(PersistedTaskProfile {
163        engineer: engineer.to_string(),
164        task_id: task.id,
165        completed_at: task
166            .completed
167            .clone()
168            .unwrap_or_else(|| Utc::now().to_rfc3339()),
169        changed_paths: load_changed_paths(task.source_path.as_path())?,
170        tags: task.tags.clone(),
171        recent_merge_conflict: task_has_conflict_signal(task),
172    });
173    persisted.updated_at = Some(Utc::now().to_rfc3339());
174    write_persisted_engineer_profiles(project_root, &persisted)
175}
176
177fn build_engineer_profiles_with_history(
178    engineers: &[String],
179    tasks: &[Task],
180    persisted: &[PersistedTaskProfile],
181    telemetry: &HashMap<String, EngineerTelemetryStats>,
182) -> Result<HashMap<String, EngineerProfile>> {
183    let mut profiles: HashMap<String, EngineerProfile> = engineers
184        .iter()
185        .cloned()
186        .map(|name| {
187            let profile = EngineerProfile {
188                name: name.clone(),
189                ..EngineerProfile::default()
190            };
191            (name, profile)
192        })
193        .collect();
194
195    let mut completed_task_keys: HashSet<(String, u32)> = HashSet::new();
196
197    for record in persisted {
198        let Some(profile) = profiles.get_mut(&record.engineer) else {
199            continue;
200        };
201        apply_completed_profile(
202            profile,
203            record.task_id,
204            record.tags.iter().cloned(),
205            record.changed_paths.iter().cloned(),
206            record.recent_merge_conflict,
207        );
208        completed_task_keys.insert((record.engineer.clone(), record.task_id));
209    }
210
211    for task in tasks {
212        let Some(owner) = task.claimed_by.as_deref() else {
213            continue;
214        };
215        let Some(profile) = profiles.get_mut(owner) else {
216            continue;
217        };
218
219        if task.status == "done" {
220            if completed_task_keys.insert((owner.to_string(), task.id)) {
221                apply_completed_profile(
222                    profile,
223                    task.id,
224                    task.tags.iter().cloned(),
225                    task_profile_paths(task)?.into_iter(),
226                    task_has_conflict_signal(task),
227                );
228            }
229        } else if task_is_active_for_load(task) {
230            profile.active_task_count += 1;
231            profile.active_file_paths.extend(task_profile_paths(task)?);
232            if task_has_conflict_signal(task) {
233                profile.recent_merge_conflicts += 1;
234            }
235        }
236    }
237
238    for (engineer, stats) in telemetry {
239        let Some(profile) = profiles.get_mut(engineer) else {
240            continue;
241        };
242        profile.telemetry_completed_tasks = stats.completed_tasks;
243        profile.completion_rate = stats.completion_rate;
244        profile.avg_task_duration_secs = stats.avg_task_duration_secs;
245        profile.first_pass_test_rate = stats.first_pass_test_rate;
246    }
247
248    Ok(profiles)
249}
250
251fn apply_completed_profile<I, J>(
252    profile: &mut EngineerProfile,
253    task_id: u32,
254    tags: I,
255    changed_paths: J,
256    recent_merge_conflict: bool,
257) where
258    I: IntoIterator<Item = String>,
259    J: IntoIterator<Item = String>,
260{
261    profile.completed_task_ids.push(task_id);
262    profile.total_completions += 1;
263    profile.domain_tags.extend(tags);
264    profile.active_file_paths.extend(changed_paths);
265    if recent_merge_conflict {
266        profile.recent_merge_conflicts += 1;
267    }
268}
269
270fn persisted_profile_matches_task(task: &Task, record: &PersistedTaskProfile) -> bool {
271    if task
272        .tags
273        .iter()
274        .any(|tag| record.tags.iter().any(|candidate| candidate == tag))
275    {
276        return true;
277    }
278
279    let hinted_dirs: HashSet<String> = crate::task::task_file_hints(task)
280        .unwrap_or_default()
281        .into_iter()
282        .filter_map(|path| parent_dir(&path))
283        .collect();
284    let record_dirs: HashSet<String> = record
285        .changed_paths
286        .iter()
287        .filter_map(|path| parent_dir(path))
288        .collect();
289    !hinted_dirs.is_empty() && !hinted_dirs.is_disjoint(&record_dirs)
290}
291
292pub fn score_engineer_for_task(
293    engineer: &EngineerProfile,
294    task: &Task,
295    policy: &AllocationPolicy,
296) -> i32 {
297    let mut score = 0;
298
299    let tag_overlap = task
300        .tags
301        .iter()
302        .filter(|tag| engineer.domain_tags.contains(*tag))
303        .count() as i32;
304    score += tag_overlap * policy.tag_weight;
305
306    let task_dirs = task_hint_directories(task);
307    let engineer_dirs: HashSet<String> = engineer
308        .active_file_paths
309        .iter()
310        .filter_map(|path| parent_dir(path))
311        .collect();
312    let dir_overlap = engineer_dirs.intersection(&task_dirs).count() as i32;
313    score += dir_overlap * policy.file_overlap_weight;
314    score += (engineer.completion_rate * 100.0).round() as i32;
315
316    score -= (engineer.active_task_count as i32) * policy.load_penalty;
317    score -= (engineer.recent_merge_conflicts as i32) * policy.conflict_penalty;
318    if engineer.total_completions > 3 {
319        score += policy.experience_bonus;
320    }
321    score += performance_score(engineer.performance.as_ref());
322
323    score
324}
325
326fn performance_score(performance: Option<&EngineerPerformanceProfile>) -> i32 {
327    let Some(performance) = performance else {
328        return 0;
329    };
330
331    let mut score = 0;
332
333    if let Some(first_pass_rate) = performance.first_pass_test_rate {
334        if first_pass_rate >= 0.75 {
335            score += 1;
336        } else if first_pass_rate < 0.5 {
337            score -= 1;
338        }
339    }
340
341    if let Some(context_freq) = performance.context_exhaustion_frequency {
342        if context_freq >= 0.5 {
343            score -= 2;
344        } else if context_freq > 0.0 {
345            score -= 1;
346        }
347    }
348
349    if let Some(avg_task_completion_secs) = performance.avg_task_completion_secs {
350        if avg_task_completion_secs > 0.0 && avg_task_completion_secs <= 3_600.0 {
351            score += 1;
352        } else if avg_task_completion_secs >= 14_400.0 {
353            score -= 1;
354        }
355    }
356
357    if let Some(lines_per_hour) = performance.lines_per_hour
358        && lines_per_hour >= 200.0
359    {
360        score += 1;
361    }
362
363    score
364}
365
366pub fn rank_engineers_for_task(
367    engineers: &[String],
368    profiles: &HashMap<String, EngineerProfile>,
369    task: &Task,
370    policy: &AllocationPolicy,
371) -> Vec<String> {
372    explain_routing_for_task(engineers, profiles, task, policy)
373        .breakdowns
374        .into_iter()
375        .map(|breakdown| breakdown.engineer)
376        .collect()
377}
378
379pub fn explain_routing_for_task(
380    engineers: &[String],
381    profiles: &HashMap<String, EngineerProfile>,
382    task: &Task,
383    policy: &AllocationPolicy,
384) -> RoutingDecisionExplanation {
385    let mut breakdowns: Vec<EngineerRoutingBreakdown> = engineers
386        .iter()
387        .map(|engineer| engineer_breakdown(engineer, profiles.get(engineer), task, policy))
388        .collect();
389
390    let has_any_telemetry = breakdowns
391        .iter()
392        .any(|breakdown| breakdown.telemetry_completed_tasks > 0);
393    let telemetry_ready = has_any_telemetry
394        && breakdowns
395            .iter()
396            .all(|breakdown| breakdown.telemetry_completed_tasks >= 5);
397    if telemetry_ready || !has_any_telemetry {
398        breakdowns.sort_by(compare_breakdowns);
399        let chosen_engineer = breakdowns
400            .first()
401            .map(|breakdown| breakdown.engineer.clone());
402        return RoutingDecisionExplanation {
403            chosen_engineer,
404            fallback_to_round_robin: false,
405            fallback_reason: None,
406            breakdowns,
407        };
408    }
409
410    breakdowns.sort_by(|left, right| left.engineer.cmp(&right.engineer));
411    let chosen_engineer = breakdowns
412        .first()
413        .map(|breakdown| breakdown.engineer.clone());
414    RoutingDecisionExplanation {
415        chosen_engineer,
416        fallback_to_round_robin: true,
417        fallback_reason: Some(
418            "telemetry fallback: each eligible engineer needs at least 5 completed tasks"
419                .to_string(),
420        ),
421        breakdowns,
422    }
423}
424
425pub fn print_dispatch_explanation(project_root: &Path, task_id: Option<u32>) -> Result<()> {
426    let board_dir = project_root
427        .join(".batty")
428        .join("team_config")
429        .join("board");
430    let tasks = crate::task::load_tasks_from_dir(&board_dir.join("tasks"))?;
431    let task = select_dispatch_task(&tasks, task_id)
432        .with_context(|| format!("no dispatchable task found for {:?}", task_id))?;
433
434    let team_config = TeamConfig::load(&team_config_dir(project_root).join("team.yaml"))?;
435    let members = resolve_hierarchy(&team_config)?;
436    let mut engineers = load_idle_engineers(project_root, &members)?;
437    if engineers.is_empty() {
438        engineers = members
439            .into_iter()
440            .filter(|member| member.role_type == RoleType::Engineer)
441            .map(|member| member.name)
442            .collect();
443    }
444    let bench_state = crate::team::bench::load_bench_state(project_root)?;
445    engineers.retain(|engineer| !bench_state.benched.contains_key(engineer));
446    engineers.sort();
447    let profiles = load_engineer_profiles(project_root, &engineers, &tasks)?;
448    let explanation = explain_routing_for_task(
449        &engineers,
450        &profiles,
451        task,
452        &team_config.workflow_policy.allocation,
453    );
454
455    println!("Task #{}: {}", task.id, task.title);
456    if let Some(chosen) = &explanation.chosen_engineer {
457        println!("Chosen engineer: {chosen}");
458    } else {
459        println!("Chosen engineer: none");
460    }
461    if let Some(reason) = &explanation.fallback_reason {
462        println!("Routing mode: {reason}");
463    } else {
464        println!("Routing mode: telemetry-scored");
465    }
466    println!();
467    println!(
468        "{:<20} {:>6} {:>5} {:>5} {:>10} {:>10} {:>11} {:>8}",
469        "ENGINEER", "SCORE", "TAGS", "FILES", "COMPLETE%", "AVG SECS", "FIRST PASS%", "SAMPLES"
470    );
471    println!("{}", "-".repeat(88));
472    for breakdown in explanation.breakdowns {
473        println!(
474            "{:<20} {:>6} {:>5} {:>5} {:>10.1} {:>10} {:>11.1} {:>8}",
475            breakdown.engineer,
476            breakdown.total_score,
477            breakdown.tag_matches,
478            breakdown.file_matches,
479            breakdown.completion_rate * 100.0,
480            breakdown
481                .avg_task_duration_secs
482                .map(|secs| format!("{secs:.0}"))
483                .unwrap_or_else(|| "-".to_string()),
484            breakdown.first_pass_test_rate.unwrap_or(0.0) * 100.0,
485            breakdown.telemetry_completed_tasks,
486        );
487    }
488
489    Ok(())
490}
491
492fn engineer_breakdown(
493    engineer: &str,
494    profile: Option<&EngineerProfile>,
495    task: &Task,
496    policy: &AllocationPolicy,
497) -> EngineerRoutingBreakdown {
498    let profile = profile.cloned().unwrap_or_else(|| EngineerProfile {
499        name: engineer.to_string(),
500        ..EngineerProfile::default()
501    });
502    let task_dirs = task_hint_directories(task);
503    let engineer_dirs: HashSet<String> = profile
504        .active_file_paths
505        .iter()
506        .filter_map(|path| parent_dir(path))
507        .collect();
508    let tag_matches = task
509        .tags
510        .iter()
511        .filter(|tag| profile.domain_tags.contains(*tag))
512        .count();
513    let file_matches = engineer_dirs.intersection(&task_dirs).count();
514    EngineerRoutingBreakdown {
515        engineer: engineer.to_string(),
516        total_score: score_engineer_for_task(&profile, task, policy),
517        tag_matches,
518        file_matches,
519        completion_rate: profile.completion_rate,
520        avg_task_duration_secs: profile.avg_task_duration_secs,
521        first_pass_test_rate: profile.first_pass_test_rate,
522        telemetry_completed_tasks: profile.telemetry_completed_tasks,
523    }
524}
525
526fn compare_breakdowns(
527    left: &EngineerRoutingBreakdown,
528    right: &EngineerRoutingBreakdown,
529) -> std::cmp::Ordering {
530    right
531        .total_score
532        .cmp(&left.total_score)
533        .then_with(|| {
534            right
535                .first_pass_test_rate
536                .partial_cmp(&left.first_pass_test_rate)
537                .unwrap_or(std::cmp::Ordering::Equal)
538        })
539        .then_with(|| {
540            left.avg_task_duration_secs
541                .partial_cmp(&right.avg_task_duration_secs)
542                .unwrap_or(std::cmp::Ordering::Equal)
543        })
544        .then_with(|| left.engineer.cmp(&right.engineer))
545}
546
547fn select_dispatch_task(tasks: &[Task], task_id: Option<u32>) -> Option<&Task> {
548    if let Some(task_id) = task_id {
549        return tasks.iter().find(|task| task.id == task_id);
550    }
551
552    let task_status_by_id: HashMap<u32, String> = tasks
553        .iter()
554        .map(|task| (task.id, task.status.clone()))
555        .collect();
556    let mut dispatchable: Vec<&Task> = tasks
557        .iter()
558        .filter(|task| matches!(task.status.as_str(), "backlog" | "todo"))
559        .filter(|task| task.claimed_by.is_none())
560        .filter(|task| task.blocked.is_none())
561        .filter(|task| task.blocked_on.is_none())
562        .filter(|task| !task.is_schedule_blocked())
563        .filter(|task| {
564            task.depends_on.iter().all(|dep_id| {
565                task_status_by_id
566                    .get(dep_id)
567                    .is_none_or(|status| status == "done")
568            })
569        })
570        .collect();
571    dispatchable.sort_by_key(|task| {
572        (
573            match task.priority.as_str() {
574                "critical" => 0,
575                "high" => 1,
576                "medium" => 2,
577                "low" => 3,
578                _ => 4,
579            },
580            task.id,
581        )
582    });
583    dispatchable.into_iter().next()
584}
585
586fn load_idle_engineers(
587    project_root: &Path,
588    members: &[super::hierarchy::MemberInstance],
589) -> Result<Vec<String>> {
590    let path = daemon_state_path(project_root);
591    if !path.exists() {
592        return Ok(Vec::new());
593    }
594    let content = std::fs::read_to_string(&path)
595        .with_context(|| format!("failed to read {}", path.display()))?;
596    let state: PersistedDaemonStateView = serde_json::from_str(&content)
597        .with_context(|| format!("failed to parse {}", path.display()))?;
598    Ok(members
599        .iter()
600        .filter(|member| member.role_type == RoleType::Engineer)
601        .filter(|member| state.states.get(&member.name) == Some(&MemberState::Idle))
602        .map(|member| member.name.clone())
603        .collect())
604}
605
606fn load_engineer_telemetry_stats(
607    project_root: &Path,
608) -> Result<HashMap<String, EngineerTelemetryStats>> {
609    let conn = match super::telemetry_db::open(project_root) {
610        Ok(conn) => conn,
611        Err(_) => return Ok(HashMap::new()),
612    };
613
614    let mut stats = load_quality_metric_stats(&conn)?;
615    for (engineer, completion_rate) in load_completion_rates(&conn)? {
616        stats.entry(engineer).or_default().completion_rate = completion_rate;
617    }
618    Ok(stats)
619}
620
621fn load_completion_rates(conn: &Connection) -> Result<HashMap<String, f64>> {
622    let mut stmt = conn.prepare(
623        "SELECT role,
624                SUM(CASE WHEN event_type = 'task_assigned' THEN 1 ELSE 0 END) AS assigned,
625                SUM(CASE WHEN event_type = 'task_completed' THEN 1 ELSE 0 END) AS completed
626         FROM events
627         WHERE role IS NOT NULL AND event_type IN ('task_assigned', 'task_completed')
628         GROUP BY role",
629    )?;
630    let rows = stmt.query_map([], |row| {
631        let role: String = row.get(0)?;
632        let assigned: i64 = row.get(1)?;
633        let completed: i64 = row.get(2)?;
634        let rate = if assigned > 0 {
635            completed as f64 / assigned as f64
636        } else {
637            0.0
638        };
639        Ok((role, rate))
640    })?;
641
642    let mut rates = HashMap::new();
643    for row in rows {
644        let (role, rate) = row?;
645        rates.insert(role, rate);
646    }
647    Ok(rates)
648}
649
650fn load_quality_metric_stats(conn: &Connection) -> Result<HashMap<String, EngineerTelemetryStats>> {
651    let mut stmt = conn.prepare(
652        "SELECT role,
653                COUNT(*) AS samples,
654                AVG(CAST(json_extract(payload, '$.time_to_completion_secs') AS REAL)) AS avg_secs,
655                AVG(CAST(json_extract(payload, '$.first_pass_test_rate') AS REAL)) AS first_pass
656         FROM events
657         WHERE role IS NOT NULL AND event_type = 'quality_metrics_recorded'
658         GROUP BY role",
659    )?;
660    let rows = stmt.query_map([], |row| {
661        Ok((
662            row.get::<_, String>(0)?,
663            EngineerTelemetryStats {
664                completed_tasks: row.get::<_, i64>(1)? as u32,
665                completion_rate: 0.0,
666                avg_task_duration_secs: row.get(2)?,
667                first_pass_test_rate: row.get(3)?,
668            },
669        ))
670    })?;
671
672    let mut stats = HashMap::new();
673    for row in rows {
674        let (engineer, profile) = row?;
675        stats.insert(engineer, profile);
676    }
677    Ok(stats)
678}
679
680fn task_is_active_for_load(task: &Task) -> bool {
681    matches!(
682        task.status.as_str(),
683        "todo" | "backlog" | "in-progress" | "review" | "blocked"
684    )
685}
686
687fn task_has_conflict_signal(task: &Task) -> bool {
688    let blocked = task.blocked.as_deref().unwrap_or_default();
689    let blocked_on = task.blocked_on.as_deref().unwrap_or_default();
690    let description = task.description.to_ascii_lowercase();
691    blocked.to_ascii_lowercase().contains("conflict")
692        || blocked_on.to_ascii_lowercase().contains("conflict")
693        || description.contains("merge conflict")
694        || description.contains("rebase conflict")
695}
696
697fn task_profile_paths(task: &Task) -> Result<HashSet<String>> {
698    let mut paths: HashSet<String> = crate::task::task_file_hints(task)?.into_iter().collect();
699    for path in load_changed_paths(task.source_path.as_path())? {
700        paths.insert(path);
701    }
702    Ok(paths)
703}
704
705fn task_hint_directories(task: &Task) -> HashSet<String> {
706    crate::task::task_file_hints(task)
707        .unwrap_or_default()
708        .into_iter()
709        .filter_map(|path| parent_dir(&path))
710        .collect()
711}
712
713fn load_changed_paths(path: &Path) -> Result<Vec<String>> {
714    if path.as_os_str().is_empty() || !path.exists() {
715        return Ok(Vec::new());
716    }
717
718    let content = std::fs::read_to_string(path)?;
719    let Some(frontmatter) = extract_frontmatter(&content) else {
720        return Ok(Vec::new());
721    };
722    let parsed: AllocationTaskFrontmatter = serde_yaml::from_str(frontmatter).unwrap_or_default();
723    Ok(parsed.changed_paths)
724}
725
726fn load_persisted_task_profiles(project_root: &Path) -> Result<Vec<PersistedTaskProfile>> {
727    let mut persisted = read_persisted_engineer_profiles(project_root)?;
728    prune_persisted_profiles(&mut persisted);
729    if persisted.updated_at.is_some() {
730        write_persisted_engineer_profiles(project_root, &persisted)?;
731    }
732    Ok(persisted.completions)
733}
734
735fn read_persisted_engineer_profiles(project_root: &Path) -> Result<PersistedEngineerProfiles> {
736    let path = engineer_profiles_path(project_root);
737    if !path.exists() {
738        return Ok(PersistedEngineerProfiles {
739            version: engineer_profiles_format_version(),
740            updated_at: None,
741            completions: Vec::new(),
742        });
743    }
744
745    let content = std::fs::read_to_string(&path)?;
746    let persisted = serde_json::from_str(&content)?;
747    Ok(persisted)
748}
749
750fn write_persisted_engineer_profiles(
751    project_root: &Path,
752    persisted: &PersistedEngineerProfiles,
753) -> Result<()> {
754    let path = engineer_profiles_path(project_root);
755    if let Some(parent) = path.parent() {
756        std::fs::create_dir_all(parent)?;
757    }
758    std::fs::write(path, serde_json::to_vec_pretty(persisted)?)?;
759    Ok(())
760}
761
762fn prune_persisted_profiles(persisted: &mut PersistedEngineerProfiles) {
763    let cutoff = Utc::now() - Duration::days(PROFILE_RETENTION_DAYS);
764    persisted.completions.retain(|entry| {
765        DateTime::parse_from_rfc3339(&entry.completed_at)
766            .map(|completed| completed.with_timezone(&Utc) >= cutoff)
767            .unwrap_or(false)
768    });
769}
770
771fn engineer_profiles_path(project_root: &Path) -> PathBuf {
772    team_config_dir(project_root).join(ENGINEER_PROFILES_FILE)
773}
774
775fn extract_frontmatter(content: &str) -> Option<&str> {
776    let trimmed = content.trim_start();
777    if !trimmed.starts_with("---") {
778        return None;
779    }
780    let after_open = trimmed[3..].strip_prefix('\n').unwrap_or(&trimmed[3..]);
781    let close_pos = after_open.find("\n---")?;
782    Some(&after_open[..close_pos])
783}
784
785fn parent_dir(path: &str) -> Option<String> {
786    PathBuf::from(path)
787        .parent()
788        .filter(|parent| !parent.as_os_str().is_empty())
789        .map(|parent| parent.to_string_lossy().into_owned())
790}
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795    use crate::team::config::AllocationStrategy;
796    use crate::team::events::{QualityMetricsInfo, TeamEvent};
797    use crate::team::telemetry_db;
798    use std::fs;
799
800    fn task(tags: &[&str], description: &str) -> Task {
801        Task {
802            id: 1,
803            title: "task".to_string(),
804            status: "todo".to_string(),
805            priority: "high".to_string(),
806            claimed_by: None,
807            claimed_at: None,
808            claim_ttl_secs: None,
809            claim_expires_at: None,
810            last_progress_at: None,
811            claim_warning_sent_at: None,
812            claim_extensions: None,
813            last_output_bytes: None,
814            blocked: None,
815            tags: tags.iter().map(|tag| (*tag).to_string()).collect(),
816            depends_on: Vec::new(),
817            review_owner: None,
818            blocked_on: None,
819            worktree_path: None,
820            branch: None,
821            commit: None,
822            artifacts: Vec::new(),
823            next_action: None,
824            scheduled_for: None,
825            cron_schedule: None,
826            cron_last_run: None,
827            completed: None,
828            description: description.to_string(),
829            batty_config: None,
830            source_path: PathBuf::new(),
831        }
832    }
833
834    fn policy() -> AllocationPolicy {
835        AllocationPolicy {
836            strategy: AllocationStrategy::Scored,
837            ..AllocationPolicy::default()
838        }
839    }
840
841    #[test]
842    fn score_prefers_tag_overlap() {
843        let profile = EngineerProfile {
844            domain_tags: HashSet::from(["dispatch".to_string()]),
845            ..EngineerProfile::default()
846        };
847        assert!(score_engineer_for_task(&profile, &task(&["dispatch"], ""), &policy()) > 0);
848    }
849
850    #[test]
851    fn score_prefers_matching_directory_hints() {
852        let profile = EngineerProfile {
853            active_file_paths: HashSet::from(["src/team/dispatch/queue.rs".to_string()]),
854            ..EngineerProfile::default()
855        };
856        assert!(
857            score_engineer_for_task(
858                &profile,
859                &task(&[], "Touch src/team/dispatch/mod.rs next."),
860                &policy(),
861            ) > 0
862        );
863    }
864
865    #[test]
866    fn score_penalizes_active_load() {
867        let light = EngineerProfile {
868            active_task_count: 0,
869            ..EngineerProfile::default()
870        };
871        let busy = EngineerProfile {
872            active_task_count: 2,
873            ..EngineerProfile::default()
874        };
875        assert!(
876            score_engineer_for_task(&light, &task(&[], ""), &policy())
877                > score_engineer_for_task(&busy, &task(&[], ""), &policy())
878        );
879    }
880
881    #[test]
882    fn rank_engineers_falls_back_to_name_order_on_tie() {
883        let engineers = vec!["eng-2".to_string(), "eng-1".to_string()];
884        let profiles = HashMap::from([
885            ("eng-1".to_string(), EngineerProfile::default()),
886            ("eng-2".to_string(), EngineerProfile::default()),
887        ]);
888        let ranked = rank_engineers_for_task(&engineers, &profiles, &task(&[], ""), &policy());
889        assert_eq!(ranked, vec!["eng-1".to_string(), "eng-2".to_string()]);
890    }
891
892    #[test]
893    fn score_prefers_more_reliable_performance_profile() {
894        let reliable = EngineerProfile {
895            performance: Some(EngineerPerformanceProfile {
896                avg_task_completion_secs: Some(1800.0),
897                lines_per_hour: Some(250.0),
898                first_pass_test_rate: Some(1.0),
899                context_exhaustion_frequency: Some(0.0),
900            }),
901            ..EngineerProfile::default()
902        };
903        let unstable = EngineerProfile {
904            performance: Some(EngineerPerformanceProfile {
905                avg_task_completion_secs: Some(18_000.0),
906                lines_per_hour: Some(10.0),
907                first_pass_test_rate: Some(0.0),
908                context_exhaustion_frequency: Some(1.0),
909            }),
910            ..EngineerProfile::default()
911        };
912
913        assert!(
914            score_engineer_for_task(&reliable, &task(&[], ""), &policy())
915                > score_engineer_for_task(&unstable, &task(&[], ""), &policy())
916        );
917    }
918
919    #[test]
920    fn build_profiles_reads_changed_paths_and_tags_from_board_tasks() {
921        let tmp = tempfile::tempdir().unwrap();
922        let task_path = tmp.path().join("042-profile.md");
923        fs::write(
924            &task_path,
925            "---\nid: 42\ntitle: profile\nstatus: done\npriority: high\nclaimed_by: eng-2\ntags:\n  - dispatch\nchanged_paths:\n  - src/team/dispatch/queue.rs\nclass: standard\n---\n\nTouch src/team/dispatch/mod.rs too.\n",
926        )
927        .unwrap();
928
929        let task = Task::from_file(&task_path).unwrap();
930        let profiles =
931            build_engineer_profiles(&["eng-1".to_string(), "eng-2".to_string()], &[task]).unwrap();
932        let profile = profiles.get("eng-2").unwrap();
933
934        assert_eq!(profile.total_completions, 1);
935        assert!(profile.domain_tags.contains("dispatch"));
936        assert!(
937            profile
938                .active_file_paths
939                .contains("src/team/dispatch/queue.rs")
940        );
941    }
942
943    #[test]
944    fn build_profiles_counts_active_load_and_conflict_signals() {
945        let task = Task {
946            id: 7,
947            title: "conflicted".to_string(),
948            status: "review".to_string(),
949            priority: "high".to_string(),
950            claimed_by: Some("eng-1".to_string()),
951            claimed_at: None,
952            claim_ttl_secs: None,
953            claim_expires_at: None,
954            last_progress_at: None,
955            claim_warning_sent_at: None,
956            claim_extensions: None,
957            last_output_bytes: None,
958            blocked: Some("merge conflict".to_string()),
959            tags: vec!["daemon".to_string()],
960            depends_on: Vec::new(),
961            review_owner: None,
962            blocked_on: None,
963            worktree_path: None,
964            branch: None,
965            commit: None,
966            artifacts: Vec::new(),
967            next_action: None,
968            scheduled_for: None,
969            cron_schedule: None,
970            cron_last_run: None,
971            completed: None,
972            description: "Resolve rebase conflict in src/team/daemon/mod.rs".to_string(),
973            batty_config: None,
974            source_path: PathBuf::new(),
975        };
976
977        let profiles = build_engineer_profiles(&["eng-1".to_string()], &[task]).unwrap();
978        let profile = profiles.get("eng-1").unwrap();
979        assert_eq!(profile.active_task_count, 1);
980        assert_eq!(profile.recent_merge_conflicts, 1);
981    }
982
983    #[test]
984    fn load_engineer_profiles_merges_recent_persisted_history() {
985        let tmp = tempfile::tempdir().unwrap();
986        let persisted = PersistedEngineerProfiles {
987            version: engineer_profiles_format_version(),
988            updated_at: Some(Utc::now().to_rfc3339()),
989            completions: vec![PersistedTaskProfile {
990                engineer: "eng-2".to_string(),
991                task_id: 9,
992                completed_at: Utc::now().to_rfc3339(),
993                changed_paths: vec!["src/team/dispatch/queue.rs".to_string()],
994                tags: vec!["dispatch".to_string()],
995                recent_merge_conflict: false,
996            }],
997        };
998        write_persisted_engineer_profiles(tmp.path(), &persisted).unwrap();
999
1000        let profiles = load_engineer_profiles(tmp.path(), &["eng-2".to_string()], &[]).unwrap();
1001        let profile = profiles.get("eng-2").unwrap();
1002
1003        assert_eq!(profile.total_completions, 1);
1004        assert!(profile.domain_tags.contains("dispatch"));
1005        assert!(
1006            profile
1007                .active_file_paths
1008                .contains("src/team/dispatch/queue.rs")
1009        );
1010    }
1011
1012    #[test]
1013    fn persist_completed_task_profile_prunes_old_history() {
1014        let tmp = tempfile::tempdir().unwrap();
1015        let stale_completed_at =
1016            (Utc::now() - Duration::days(PROFILE_RETENTION_DAYS + 1)).to_rfc3339();
1017        let persisted = PersistedEngineerProfiles {
1018            version: engineer_profiles_format_version(),
1019            updated_at: Some(Utc::now().to_rfc3339()),
1020            completions: vec![PersistedTaskProfile {
1021                engineer: "eng-1".to_string(),
1022                task_id: 1,
1023                completed_at: stale_completed_at,
1024                changed_paths: vec!["src/old.rs".to_string()],
1025                tags: vec!["old".to_string()],
1026                recent_merge_conflict: false,
1027            }],
1028        };
1029        write_persisted_engineer_profiles(tmp.path(), &persisted).unwrap();
1030
1031        // Use a recent completion date (relative to "now") so the test is not
1032        // date-sensitive. Previously this was hardcoded to 2026-04-06 and
1033        // started failing on any date more than 7 days later.
1034        let recent_completed = (Utc::now() - Duration::days(1)).to_rfc3339();
1035        let task_path = tmp.path().join("042-profile.md");
1036        fs::write(
1037            &task_path,
1038            format!(
1039                "---\nid: 42\ntitle: profile\nstatus: done\npriority: high\nclaimed_by: eng-2\ntags:\n  - dispatch\nchanged_paths:\n  - src/team/dispatch/queue.rs\nclass: standard\ncompleted: {recent_completed}\n---\n\nTouch src/team/dispatch/mod.rs too.\n"
1040            ),
1041        )
1042        .unwrap();
1043
1044        let task = Task::from_file(&task_path).unwrap();
1045        persist_completed_task_profile(tmp.path(), &task).unwrap();
1046
1047        let loaded =
1048            load_engineer_profiles(tmp.path(), &["eng-1".to_string(), "eng-2".to_string()], &[])
1049                .unwrap();
1050        assert_eq!(loaded.get("eng-1").unwrap().total_completions, 0);
1051        assert_eq!(loaded.get("eng-2").unwrap().total_completions, 1);
1052    }
1053
1054    #[test]
1055    fn load_engineer_profiles_reads_telemetry_reliability_metrics() {
1056        let tmp = tempfile::tempdir().unwrap();
1057        fs::create_dir_all(tmp.path().join(".batty")).unwrap();
1058        let conn = telemetry_db::open(tmp.path()).unwrap();
1059        for task_id in 1..=5 {
1060            telemetry_db::insert_event(
1061                &conn,
1062                &TeamEvent::task_assigned("eng-2", &task_id.to_string()),
1063            )
1064            .unwrap();
1065            telemetry_db::insert_event(
1066                &conn,
1067                &TeamEvent::quality_metrics_recorded(&QualityMetricsInfo {
1068                    backend: "codex",
1069                    role: "eng-2",
1070                    task: &task_id.to_string(),
1071                    narration_ratio: 0.1,
1072                    commit_frequency: 1.0,
1073                    first_pass_test_rate: 1.0,
1074                    retry_rate: 0.0,
1075                    time_to_completion_secs: 120,
1076                }),
1077            )
1078            .unwrap();
1079            telemetry_db::insert_event(
1080                &conn,
1081                &TeamEvent::task_completed("eng-2", Some(&task_id.to_string())),
1082            )
1083            .unwrap();
1084        }
1085
1086        let profiles = load_engineer_profiles(tmp.path(), &["eng-2".to_string()], &[]).unwrap();
1087        let profile = profiles.get("eng-2").unwrap();
1088        assert_eq!(profile.telemetry_completed_tasks, 5);
1089        assert!((profile.completion_rate - 1.0).abs() < f64::EPSILON);
1090        assert_eq!(profile.avg_task_duration_secs, Some(120.0));
1091        assert_eq!(profile.first_pass_test_rate, Some(1.0));
1092    }
1093
1094    #[test]
1095    fn rank_engineers_prefers_higher_completion_rate_when_telemetry_is_sufficient() {
1096        let engineers = vec!["eng-1".to_string(), "eng-2".to_string()];
1097        let profiles = HashMap::from([
1098            (
1099                "eng-1".to_string(),
1100                EngineerProfile {
1101                    telemetry_completed_tasks: 5,
1102                    completion_rate: 0.4,
1103                    ..EngineerProfile::default()
1104                },
1105            ),
1106            (
1107                "eng-2".to_string(),
1108                EngineerProfile {
1109                    telemetry_completed_tasks: 5,
1110                    completion_rate: 0.9,
1111                    ..EngineerProfile::default()
1112                },
1113            ),
1114        ]);
1115
1116        let ranked = rank_engineers_for_task(&engineers, &profiles, &task(&[], ""), &policy());
1117        assert_eq!(ranked[0], "eng-2");
1118    }
1119
1120    #[test]
1121    fn explain_routing_falls_back_when_any_engineer_lacks_enough_samples() {
1122        let engineers = vec!["eng-2".to_string(), "eng-1".to_string()];
1123        let profiles = HashMap::from([
1124            (
1125                "eng-1".to_string(),
1126                EngineerProfile {
1127                    telemetry_completed_tasks: 4,
1128                    completion_rate: 1.0,
1129                    ..EngineerProfile::default()
1130                },
1131            ),
1132            (
1133                "eng-2".to_string(),
1134                EngineerProfile {
1135                    telemetry_completed_tasks: 7,
1136                    completion_rate: 0.2,
1137                    ..EngineerProfile::default()
1138                },
1139            ),
1140        ]);
1141
1142        let explanation =
1143            explain_routing_for_task(&engineers, &profiles, &task(&[], ""), &policy());
1144        assert!(explanation.fallback_to_round_robin);
1145        assert_eq!(explanation.chosen_engineer.as_deref(), Some("eng-1"));
1146    }
1147
1148    #[test]
1149    fn explain_routing_keeps_scored_mode_when_no_telemetry_exists() {
1150        let engineers = vec!["eng-1".to_string(), "eng-2".to_string()];
1151        let profiles = HashMap::from([
1152            ("eng-1".to_string(), EngineerProfile::default()),
1153            (
1154                "eng-2".to_string(),
1155                EngineerProfile {
1156                    domain_tags: HashSet::from(["dispatch".to_string()]),
1157                    ..EngineerProfile::default()
1158                },
1159            ),
1160        ]);
1161
1162        let explanation =
1163            explain_routing_for_task(&engineers, &profiles, &task(&["dispatch"], ""), &policy());
1164        assert!(!explanation.fallback_to_round_robin);
1165        assert_eq!(explanation.chosen_engineer.as_deref(), Some("eng-2"));
1166    }
1167}