Skip to main content

batty_cli/team/
allocation.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3
4use crate::task::Task;
5use anyhow::{Context, Result};
6use chrono::{DateTime, Duration, Utc};
7use rusqlite::Connection;
8use serde::{Deserialize, Serialize};
9
10use super::config::{AllocationPolicy, RoleType, TeamConfig};
11use super::hierarchy::resolve_hierarchy;
12use super::standup::MemberState;
13use super::{daemon_state_path, team_config_dir};
14
15#[derive(Debug, Clone, Default, PartialEq)]
16pub struct EngineerProfile {
17    pub name: String,
18    pub completed_task_ids: Vec<u32>,
19    pub active_file_paths: HashSet<String>,
20    pub domain_tags: HashSet<String>,
21    pub active_task_count: u32,
22    pub total_completions: u32,
23    pub recent_merge_conflicts: u32,
24    pub performance: Option<EngineerPerformanceProfile>,
25    pub telemetry_completed_tasks: u32,
26    pub completion_rate: f64,
27    pub avg_task_duration_secs: Option<f64>,
28    pub first_pass_test_rate: Option<f64>,
29}
30
31#[derive(Debug, Clone, Default, PartialEq)]
32pub struct EngineerPerformanceProfile {
33    pub avg_task_completion_secs: Option<f64>,
34    pub lines_per_hour: Option<f64>,
35    pub first_pass_test_rate: Option<f64>,
36    pub context_exhaustion_frequency: Option<f64>,
37}
38
39#[derive(Debug, Clone, Default, PartialEq)]
40pub struct EngineerRoutingBreakdown {
41    pub engineer: String,
42    pub total_score: i32,
43    pub tag_matches: usize,
44    pub file_matches: usize,
45    pub completion_rate: f64,
46    pub avg_task_duration_secs: Option<f64>,
47    pub first_pass_test_rate: Option<f64>,
48    pub telemetry_completed_tasks: u32,
49}
50
51#[derive(Debug, Clone, Default, PartialEq)]
52pub struct RoutingDecisionExplanation {
53    pub chosen_engineer: Option<String>,
54    pub fallback_to_round_robin: bool,
55    pub fallback_reason: Option<String>,
56    pub breakdowns: Vec<EngineerRoutingBreakdown>,
57}
58
59#[derive(Debug, Default, Deserialize)]
60struct AllocationTaskFrontmatter {
61    #[serde(default)]
62    changed_paths: Vec<String>,
63}
64
65const PROFILE_RETENTION_DAYS: i64 = 7;
66const ENGINEER_PROFILES_FILE: &str = "engineer_profiles.json";
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
69struct PersistedEngineerProfiles {
70    #[serde(default = "engineer_profiles_format_version")]
71    version: u32,
72    #[serde(default)]
73    updated_at: Option<String>,
74    #[serde(default)]
75    completions: Vec<PersistedTaskProfile>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
79struct PersistedTaskProfile {
80    engineer: String,
81    task_id: u32,
82    completed_at: String,
83    #[serde(default)]
84    changed_paths: Vec<String>,
85    #[serde(default)]
86    tags: Vec<String>,
87    #[serde(default)]
88    recent_merge_conflict: bool,
89}
90
91#[derive(Debug, Clone, Default, PartialEq)]
92struct EngineerTelemetryStats {
93    completed_tasks: u32,
94    completion_rate: f64,
95    avg_task_duration_secs: Option<f64>,
96    first_pass_test_rate: Option<f64>,
97}
98
99#[derive(Debug, Deserialize)]
100struct PersistedDaemonStateView {
101    #[serde(default)]
102    states: HashMap<String, MemberState>,
103}
104
105fn engineer_profiles_format_version() -> u32 {
106    1
107}
108
109pub fn build_engineer_profiles(
110    engineers: &[String],
111    tasks: &[Task],
112) -> Result<HashMap<String, EngineerProfile>> {
113    build_engineer_profiles_with_history(engineers, tasks, &[], &HashMap::new())
114}
115
116pub fn load_engineer_profiles(
117    project_root: &Path,
118    engineers: &[String],
119    tasks: &[Task],
120) -> Result<HashMap<String, EngineerProfile>> {
121    let persisted = load_persisted_task_profiles(project_root)?;
122    let telemetry = load_engineer_telemetry_stats(project_root)?;
123    let mut profiles =
124        build_engineer_profiles_with_history(engineers, tasks, &persisted, &telemetry)?;
125    if let Ok(conn) = crate::team::telemetry_db::open(project_root)
126        && let Ok(rows) = crate::team::telemetry_db::query_engineer_performance_profiles(&conn)
127    {
128        for row in rows {
129            if let Some(profile) = profiles.get_mut(&row.role) {
130                profile.performance = Some(EngineerPerformanceProfile {
131                    avg_task_completion_secs: row.avg_task_completion_secs,
132                    lines_per_hour: row.lines_per_hour,
133                    first_pass_test_rate: row.first_pass_test_rate,
134                    context_exhaustion_frequency: row.context_exhaustion_frequency,
135                });
136            }
137        }
138    }
139    Ok(profiles)
140}
141
142pub(crate) fn predict_task_file_paths(project_root: &Path, task: &Task) -> Result<HashSet<String>> {
143    let mut paths = task_profile_paths(task)?;
144    for record in load_persisted_task_profiles(project_root)? {
145        if persisted_profile_matches_task(task, &record) {
146            paths.extend(record.changed_paths);
147        }
148    }
149    Ok(paths)
150}
151
152pub fn persist_completed_task_profile(project_root: &Path, task: &Task) -> Result<()> {
153    let Some(engineer) = task.claimed_by.as_deref() else {
154        return Ok(());
155    };
156
157    let mut persisted = read_persisted_engineer_profiles(project_root)?;
158    prune_persisted_profiles(&mut persisted);
159    persisted
160        .completions
161        .retain(|entry| !(entry.engineer == engineer && entry.task_id == task.id));
162    persisted.completions.push(PersistedTaskProfile {
163        engineer: engineer.to_string(),
164        task_id: task.id,
165        completed_at: task
166            .completed
167            .clone()
168            .unwrap_or_else(|| Utc::now().to_rfc3339()),
169        changed_paths: load_changed_paths(task.source_path.as_path())?,
170        tags: task.tags.clone(),
171        recent_merge_conflict: task_has_conflict_signal(task),
172    });
173    persisted.updated_at = Some(Utc::now().to_rfc3339());
174    write_persisted_engineer_profiles(project_root, &persisted)
175}
176
177fn build_engineer_profiles_with_history(
178    engineers: &[String],
179    tasks: &[Task],
180    persisted: &[PersistedTaskProfile],
181    telemetry: &HashMap<String, EngineerTelemetryStats>,
182) -> Result<HashMap<String, EngineerProfile>> {
183    let mut profiles: HashMap<String, EngineerProfile> = engineers
184        .iter()
185        .cloned()
186        .map(|name| {
187            let profile = EngineerProfile {
188                name: name.clone(),
189                ..EngineerProfile::default()
190            };
191            (name, profile)
192        })
193        .collect();
194
195    let mut completed_task_keys: HashSet<(String, u32)> = HashSet::new();
196
197    for record in persisted {
198        let Some(profile) = profiles.get_mut(&record.engineer) else {
199            continue;
200        };
201        apply_completed_profile(
202            profile,
203            record.task_id,
204            record.tags.iter().cloned(),
205            record.changed_paths.iter().cloned(),
206            record.recent_merge_conflict,
207        );
208        completed_task_keys.insert((record.engineer.clone(), record.task_id));
209    }
210
211    for task in tasks {
212        let Some(owner) = task.claimed_by.as_deref() else {
213            continue;
214        };
215        let Some(profile) = profiles.get_mut(owner) else {
216            continue;
217        };
218
219        if task.status == "done" {
220            if completed_task_keys.insert((owner.to_string(), task.id)) {
221                apply_completed_profile(
222                    profile,
223                    task.id,
224                    task.tags.iter().cloned(),
225                    task_profile_paths(task)?.into_iter(),
226                    task_has_conflict_signal(task),
227                );
228            }
229        } else if task_is_active_for_load(task) {
230            profile.active_task_count += 1;
231            profile.active_file_paths.extend(task_profile_paths(task)?);
232            if task_has_conflict_signal(task) {
233                profile.recent_merge_conflicts += 1;
234            }
235        }
236    }
237
238    for (engineer, stats) in telemetry {
239        let Some(profile) = profiles.get_mut(engineer) else {
240            continue;
241        };
242        profile.telemetry_completed_tasks = stats.completed_tasks;
243        profile.completion_rate = stats.completion_rate;
244        profile.avg_task_duration_secs = stats.avg_task_duration_secs;
245        profile.first_pass_test_rate = stats.first_pass_test_rate;
246    }
247
248    Ok(profiles)
249}
250
251fn apply_completed_profile<I, J>(
252    profile: &mut EngineerProfile,
253    task_id: u32,
254    tags: I,
255    changed_paths: J,
256    recent_merge_conflict: bool,
257) where
258    I: IntoIterator<Item = String>,
259    J: IntoIterator<Item = String>,
260{
261    profile.completed_task_ids.push(task_id);
262    profile.total_completions += 1;
263    profile.domain_tags.extend(tags);
264    profile.active_file_paths.extend(changed_paths);
265    if recent_merge_conflict {
266        profile.recent_merge_conflicts += 1;
267    }
268}
269
270fn persisted_profile_matches_task(task: &Task, record: &PersistedTaskProfile) -> bool {
271    if task
272        .tags
273        .iter()
274        .any(|tag| record.tags.iter().any(|candidate| candidate == tag))
275    {
276        return true;
277    }
278
279    let hinted_dirs: HashSet<String> = task_hint_paths(task)
280        .into_iter()
281        .filter_map(|path| parent_dir(&path))
282        .collect();
283    let record_dirs: HashSet<String> = record
284        .changed_paths
285        .iter()
286        .filter_map(|path| parent_dir(path))
287        .collect();
288    !hinted_dirs.is_empty() && !hinted_dirs.is_disjoint(&record_dirs)
289}
290
291pub fn score_engineer_for_task(
292    engineer: &EngineerProfile,
293    task: &Task,
294    policy: &AllocationPolicy,
295) -> i32 {
296    let mut score = 0;
297
298    let tag_overlap = task
299        .tags
300        .iter()
301        .filter(|tag| engineer.domain_tags.contains(*tag))
302        .count() as i32;
303    score += tag_overlap * policy.tag_weight;
304
305    let task_dirs = task_hint_directories(task);
306    let engineer_dirs: HashSet<String> = engineer
307        .active_file_paths
308        .iter()
309        .filter_map(|path| parent_dir(path))
310        .collect();
311    let dir_overlap = engineer_dirs.intersection(&task_dirs).count() as i32;
312    score += dir_overlap * policy.file_overlap_weight;
313    score += (engineer.completion_rate * 100.0).round() as i32;
314
315    score -= (engineer.active_task_count as i32) * policy.load_penalty;
316    score -= (engineer.recent_merge_conflicts as i32) * policy.conflict_penalty;
317    if engineer.total_completions > 3 {
318        score += policy.experience_bonus;
319    }
320    score += performance_score(engineer.performance.as_ref());
321
322    score
323}
324
325fn performance_score(performance: Option<&EngineerPerformanceProfile>) -> i32 {
326    let Some(performance) = performance else {
327        return 0;
328    };
329
330    let mut score = 0;
331
332    if let Some(first_pass_rate) = performance.first_pass_test_rate {
333        if first_pass_rate >= 0.75 {
334            score += 1;
335        } else if first_pass_rate < 0.5 {
336            score -= 1;
337        }
338    }
339
340    if let Some(context_freq) = performance.context_exhaustion_frequency {
341        if context_freq >= 0.5 {
342            score -= 2;
343        } else if context_freq > 0.0 {
344            score -= 1;
345        }
346    }
347
348    if let Some(avg_task_completion_secs) = performance.avg_task_completion_secs {
349        if avg_task_completion_secs > 0.0 && avg_task_completion_secs <= 3_600.0 {
350            score += 1;
351        } else if avg_task_completion_secs >= 14_400.0 {
352            score -= 1;
353        }
354    }
355
356    if let Some(lines_per_hour) = performance.lines_per_hour
357        && lines_per_hour >= 200.0
358    {
359        score += 1;
360    }
361
362    score
363}
364
365pub fn rank_engineers_for_task(
366    engineers: &[String],
367    profiles: &HashMap<String, EngineerProfile>,
368    task: &Task,
369    policy: &AllocationPolicy,
370) -> Vec<String> {
371    explain_routing_for_task(engineers, profiles, task, policy)
372        .breakdowns
373        .into_iter()
374        .map(|breakdown| breakdown.engineer)
375        .collect()
376}
377
378pub fn explain_routing_for_task(
379    engineers: &[String],
380    profiles: &HashMap<String, EngineerProfile>,
381    task: &Task,
382    policy: &AllocationPolicy,
383) -> RoutingDecisionExplanation {
384    let mut breakdowns: Vec<EngineerRoutingBreakdown> = engineers
385        .iter()
386        .map(|engineer| engineer_breakdown(engineer, profiles.get(engineer), task, policy))
387        .collect();
388
389    let has_any_telemetry = breakdowns
390        .iter()
391        .any(|breakdown| breakdown.telemetry_completed_tasks > 0);
392    let telemetry_ready = has_any_telemetry
393        && breakdowns
394            .iter()
395            .all(|breakdown| breakdown.telemetry_completed_tasks >= 5);
396    if telemetry_ready || !has_any_telemetry {
397        breakdowns.sort_by(compare_breakdowns);
398        let chosen_engineer = breakdowns
399            .first()
400            .map(|breakdown| breakdown.engineer.clone());
401        return RoutingDecisionExplanation {
402            chosen_engineer,
403            fallback_to_round_robin: false,
404            fallback_reason: None,
405            breakdowns,
406        };
407    }
408
409    breakdowns.sort_by(|left, right| left.engineer.cmp(&right.engineer));
410    let chosen_engineer = breakdowns
411        .first()
412        .map(|breakdown| breakdown.engineer.clone());
413    RoutingDecisionExplanation {
414        chosen_engineer,
415        fallback_to_round_robin: true,
416        fallback_reason: Some(
417            "telemetry fallback: each eligible engineer needs at least 5 completed tasks"
418                .to_string(),
419        ),
420        breakdowns,
421    }
422}
423
424pub fn print_dispatch_explanation(project_root: &Path, task_id: Option<u32>) -> Result<()> {
425    let board_dir = project_root
426        .join(".batty")
427        .join("team_config")
428        .join("board");
429    let tasks = crate::task::load_tasks_from_dir(&board_dir.join("tasks"))?;
430    let task = select_dispatch_task(&tasks, task_id)
431        .with_context(|| format!("no dispatchable task found for {:?}", task_id))?;
432
433    let team_config = TeamConfig::load(&team_config_dir(project_root).join("team.yaml"))?;
434    let members = resolve_hierarchy(&team_config)?;
435    let mut engineers = load_idle_engineers(project_root, &members)?;
436    if engineers.is_empty() {
437        engineers = members
438            .into_iter()
439            .filter(|member| member.role_type == RoleType::Engineer)
440            .map(|member| member.name)
441            .collect();
442    }
443    let bench_state = crate::team::bench::load_bench_state(project_root)?;
444    engineers.retain(|engineer| !bench_state.benched.contains_key(engineer));
445    engineers.sort();
446    let profiles = load_engineer_profiles(project_root, &engineers, &tasks)?;
447    let explanation = explain_routing_for_task(
448        &engineers,
449        &profiles,
450        task,
451        &team_config.workflow_policy.allocation,
452    );
453
454    println!("Task #{}: {}", task.id, task.title);
455    if let Some(chosen) = &explanation.chosen_engineer {
456        println!("Chosen engineer: {chosen}");
457    } else {
458        println!("Chosen engineer: none");
459    }
460    if let Some(reason) = &explanation.fallback_reason {
461        println!("Routing mode: {reason}");
462    } else {
463        println!("Routing mode: telemetry-scored");
464    }
465    println!();
466    println!(
467        "{:<20} {:>6} {:>5} {:>5} {:>10} {:>10} {:>11} {:>8}",
468        "ENGINEER", "SCORE", "TAGS", "FILES", "COMPLETE%", "AVG SECS", "FIRST PASS%", "SAMPLES"
469    );
470    println!("{}", "-".repeat(88));
471    for breakdown in explanation.breakdowns {
472        println!(
473            "{:<20} {:>6} {:>5} {:>5} {:>10.1} {:>10} {:>11.1} {:>8}",
474            breakdown.engineer,
475            breakdown.total_score,
476            breakdown.tag_matches,
477            breakdown.file_matches,
478            breakdown.completion_rate * 100.0,
479            breakdown
480                .avg_task_duration_secs
481                .map(|secs| format!("{secs:.0}"))
482                .unwrap_or_else(|| "-".to_string()),
483            breakdown.first_pass_test_rate.unwrap_or(0.0) * 100.0,
484            breakdown.telemetry_completed_tasks,
485        );
486    }
487
488    Ok(())
489}
490
491fn engineer_breakdown(
492    engineer: &str,
493    profile: Option<&EngineerProfile>,
494    task: &Task,
495    policy: &AllocationPolicy,
496) -> EngineerRoutingBreakdown {
497    let profile = profile.cloned().unwrap_or_else(|| EngineerProfile {
498        name: engineer.to_string(),
499        ..EngineerProfile::default()
500    });
501    let task_dirs = task_hint_directories(task);
502    let engineer_dirs: HashSet<String> = profile
503        .active_file_paths
504        .iter()
505        .filter_map(|path| parent_dir(path))
506        .collect();
507    let tag_matches = task
508        .tags
509        .iter()
510        .filter(|tag| profile.domain_tags.contains(*tag))
511        .count();
512    let file_matches = engineer_dirs.intersection(&task_dirs).count();
513    EngineerRoutingBreakdown {
514        engineer: engineer.to_string(),
515        total_score: score_engineer_for_task(&profile, task, policy),
516        tag_matches,
517        file_matches,
518        completion_rate: profile.completion_rate,
519        avg_task_duration_secs: profile.avg_task_duration_secs,
520        first_pass_test_rate: profile.first_pass_test_rate,
521        telemetry_completed_tasks: profile.telemetry_completed_tasks,
522    }
523}
524
525fn compare_breakdowns(
526    left: &EngineerRoutingBreakdown,
527    right: &EngineerRoutingBreakdown,
528) -> std::cmp::Ordering {
529    right
530        .total_score
531        .cmp(&left.total_score)
532        .then_with(|| {
533            right
534                .first_pass_test_rate
535                .partial_cmp(&left.first_pass_test_rate)
536                .unwrap_or(std::cmp::Ordering::Equal)
537        })
538        .then_with(|| {
539            left.avg_task_duration_secs
540                .partial_cmp(&right.avg_task_duration_secs)
541                .unwrap_or(std::cmp::Ordering::Equal)
542        })
543        .then_with(|| left.engineer.cmp(&right.engineer))
544}
545
546fn select_dispatch_task(tasks: &[Task], task_id: Option<u32>) -> Option<&Task> {
547    if let Some(task_id) = task_id {
548        return tasks.iter().find(|task| task.id == task_id);
549    }
550
551    let task_status_by_id: HashMap<u32, String> = tasks
552        .iter()
553        .map(|task| (task.id, task.status.clone()))
554        .collect();
555    let mut dispatchable: Vec<&Task> = tasks
556        .iter()
557        .filter(|task| matches!(task.status.as_str(), "backlog" | "todo"))
558        .filter(|task| task.claimed_by.is_none())
559        .filter(|task| task.blocked.is_none())
560        .filter(|task| task.blocked_on.is_none())
561        .filter(|task| !task.is_schedule_blocked())
562        .filter(|task| {
563            task.depends_on.iter().all(|dep_id| {
564                task_status_by_id
565                    .get(dep_id)
566                    .is_none_or(|status| status == "done")
567            })
568        })
569        .collect();
570    dispatchable.sort_by_key(|task| {
571        (
572            match task.priority.as_str() {
573                "critical" => 0,
574                "high" => 1,
575                "medium" => 2,
576                "low" => 3,
577                _ => 4,
578            },
579            task.id,
580        )
581    });
582    dispatchable.into_iter().next()
583}
584
585fn load_idle_engineers(
586    project_root: &Path,
587    members: &[super::hierarchy::MemberInstance],
588) -> Result<Vec<String>> {
589    let path = daemon_state_path(project_root);
590    if !path.exists() {
591        return Ok(Vec::new());
592    }
593    let content = std::fs::read_to_string(&path)
594        .with_context(|| format!("failed to read {}", path.display()))?;
595    let state: PersistedDaemonStateView = serde_json::from_str(&content)
596        .with_context(|| format!("failed to parse {}", path.display()))?;
597    Ok(members
598        .iter()
599        .filter(|member| member.role_type == RoleType::Engineer)
600        .filter(|member| state.states.get(&member.name) == Some(&MemberState::Idle))
601        .map(|member| member.name.clone())
602        .collect())
603}
604
605fn load_engineer_telemetry_stats(
606    project_root: &Path,
607) -> Result<HashMap<String, EngineerTelemetryStats>> {
608    let conn = match super::telemetry_db::open(project_root) {
609        Ok(conn) => conn,
610        Err(_) => return Ok(HashMap::new()),
611    };
612
613    let mut stats = load_quality_metric_stats(&conn)?;
614    for (engineer, completion_rate) in load_completion_rates(&conn)? {
615        stats.entry(engineer).or_default().completion_rate = completion_rate;
616    }
617    Ok(stats)
618}
619
620fn load_completion_rates(conn: &Connection) -> Result<HashMap<String, f64>> {
621    let mut stmt = conn.prepare(
622        "SELECT role,
623                SUM(CASE WHEN event_type = 'task_assigned' THEN 1 ELSE 0 END) AS assigned,
624                SUM(CASE WHEN event_type = 'task_completed' THEN 1 ELSE 0 END) AS completed
625         FROM events
626         WHERE role IS NOT NULL AND event_type IN ('task_assigned', 'task_completed')
627         GROUP BY role",
628    )?;
629    let rows = stmt.query_map([], |row| {
630        let role: String = row.get(0)?;
631        let assigned: i64 = row.get(1)?;
632        let completed: i64 = row.get(2)?;
633        let rate = if assigned > 0 {
634            completed as f64 / assigned as f64
635        } else {
636            0.0
637        };
638        Ok((role, rate))
639    })?;
640
641    let mut rates = HashMap::new();
642    for row in rows {
643        let (role, rate) = row?;
644        rates.insert(role, rate);
645    }
646    Ok(rates)
647}
648
649fn load_quality_metric_stats(conn: &Connection) -> Result<HashMap<String, EngineerTelemetryStats>> {
650    let mut stmt = conn.prepare(
651        "SELECT role,
652                COUNT(*) AS samples,
653                AVG(CAST(json_extract(payload, '$.time_to_completion_secs') AS REAL)) AS avg_secs,
654                AVG(CAST(json_extract(payload, '$.first_pass_test_rate') AS REAL)) AS first_pass
655         FROM events
656         WHERE role IS NOT NULL AND event_type = 'quality_metrics_recorded'
657         GROUP BY role",
658    )?;
659    let rows = stmt.query_map([], |row| {
660        Ok((
661            row.get::<_, String>(0)?,
662            EngineerTelemetryStats {
663                completed_tasks: row.get::<_, i64>(1)? as u32,
664                completion_rate: 0.0,
665                avg_task_duration_secs: row.get(2)?,
666                first_pass_test_rate: row.get(3)?,
667            },
668        ))
669    })?;
670
671    let mut stats = HashMap::new();
672    for row in rows {
673        let (engineer, profile) = row?;
674        stats.insert(engineer, profile);
675    }
676    Ok(stats)
677}
678
679fn task_is_active_for_load(task: &Task) -> bool {
680    matches!(
681        task.status.as_str(),
682        "todo" | "backlog" | "in-progress" | "review" | "blocked"
683    )
684}
685
686fn task_has_conflict_signal(task: &Task) -> bool {
687    let blocked = task.blocked.as_deref().unwrap_or_default();
688    let blocked_on = task.blocked_on.as_deref().unwrap_or_default();
689    let description = task.description.to_ascii_lowercase();
690    blocked.to_ascii_lowercase().contains("conflict")
691        || blocked_on.to_ascii_lowercase().contains("conflict")
692        || description.contains("merge conflict")
693        || description.contains("rebase conflict")
694}
695
696fn task_profile_paths(task: &Task) -> Result<HashSet<String>> {
697    let mut paths = task_hint_paths(task);
698    for path in load_changed_paths(task.source_path.as_path())? {
699        paths.insert(path);
700    }
701    Ok(paths)
702}
703
704fn task_hint_directories(task: &Task) -> HashSet<String> {
705    task_hint_paths(task)
706        .into_iter()
707        .filter_map(|path| parent_dir(&path))
708        .collect()
709}
710
711fn task_hint_paths(task: &Task) -> HashSet<String> {
712    task.description
713        .split_whitespace()
714        .filter_map(clean_task_path_token)
715        .collect()
716}
717
718fn clean_task_path_token(token: &str) -> Option<String> {
719    let cleaned = token.trim_matches(|ch: char| {
720        matches!(
721            ch,
722            '"' | '\'' | ',' | ':' | ';' | '(' | ')' | '[' | ']' | '`'
723        )
724    });
725    parent_dir(cleaned).map(|_| cleaned.to_string())
726}
727
728fn load_changed_paths(path: &Path) -> Result<Vec<String>> {
729    if path.as_os_str().is_empty() || !path.exists() {
730        return Ok(Vec::new());
731    }
732
733    let content = std::fs::read_to_string(path)?;
734    let Some(frontmatter) = extract_frontmatter(&content) else {
735        return Ok(Vec::new());
736    };
737    let parsed: AllocationTaskFrontmatter = serde_yaml::from_str(frontmatter).unwrap_or_default();
738    Ok(parsed.changed_paths)
739}
740
741fn load_persisted_task_profiles(project_root: &Path) -> Result<Vec<PersistedTaskProfile>> {
742    let mut persisted = read_persisted_engineer_profiles(project_root)?;
743    prune_persisted_profiles(&mut persisted);
744    if persisted.updated_at.is_some() {
745        write_persisted_engineer_profiles(project_root, &persisted)?;
746    }
747    Ok(persisted.completions)
748}
749
750fn read_persisted_engineer_profiles(project_root: &Path) -> Result<PersistedEngineerProfiles> {
751    let path = engineer_profiles_path(project_root);
752    if !path.exists() {
753        return Ok(PersistedEngineerProfiles {
754            version: engineer_profiles_format_version(),
755            updated_at: None,
756            completions: Vec::new(),
757        });
758    }
759
760    let content = std::fs::read_to_string(&path)?;
761    let persisted = serde_json::from_str(&content)?;
762    Ok(persisted)
763}
764
765fn write_persisted_engineer_profiles(
766    project_root: &Path,
767    persisted: &PersistedEngineerProfiles,
768) -> Result<()> {
769    let path = engineer_profiles_path(project_root);
770    if let Some(parent) = path.parent() {
771        std::fs::create_dir_all(parent)?;
772    }
773    std::fs::write(path, serde_json::to_vec_pretty(persisted)?)?;
774    Ok(())
775}
776
777fn prune_persisted_profiles(persisted: &mut PersistedEngineerProfiles) {
778    let cutoff = Utc::now() - Duration::days(PROFILE_RETENTION_DAYS);
779    persisted.completions.retain(|entry| {
780        DateTime::parse_from_rfc3339(&entry.completed_at)
781            .map(|completed| completed.with_timezone(&Utc) >= cutoff)
782            .unwrap_or(false)
783    });
784}
785
786fn engineer_profiles_path(project_root: &Path) -> PathBuf {
787    team_config_dir(project_root).join(ENGINEER_PROFILES_FILE)
788}
789
790fn extract_frontmatter(content: &str) -> Option<&str> {
791    let trimmed = content.trim_start();
792    if !trimmed.starts_with("---") {
793        return None;
794    }
795    let after_open = trimmed[3..].strip_prefix('\n').unwrap_or(&trimmed[3..]);
796    let close_pos = after_open.find("\n---")?;
797    Some(&after_open[..close_pos])
798}
799
800fn parent_dir(path: &str) -> Option<String> {
801    PathBuf::from(path)
802        .parent()
803        .filter(|parent| !parent.as_os_str().is_empty())
804        .map(|parent| parent.to_string_lossy().into_owned())
805}
806
807#[cfg(test)]
808mod tests {
809    use super::*;
810    use crate::team::config::AllocationStrategy;
811    use crate::team::events::{QualityMetricsInfo, TeamEvent};
812    use crate::team::telemetry_db;
813    use std::fs;
814
815    fn task(tags: &[&str], description: &str) -> Task {
816        Task {
817            id: 1,
818            title: "task".to_string(),
819            status: "todo".to_string(),
820            priority: "high".to_string(),
821            claimed_by: None,
822            claimed_at: None,
823            claim_ttl_secs: None,
824            claim_expires_at: None,
825            last_progress_at: None,
826            claim_warning_sent_at: None,
827            claim_extensions: None,
828            last_output_bytes: None,
829            blocked: None,
830            tags: tags.iter().map(|tag| (*tag).to_string()).collect(),
831            depends_on: Vec::new(),
832            review_owner: None,
833            blocked_on: None,
834            worktree_path: None,
835            branch: None,
836            commit: None,
837            artifacts: Vec::new(),
838            next_action: None,
839            scheduled_for: None,
840            cron_schedule: None,
841            cron_last_run: None,
842            completed: None,
843            description: description.to_string(),
844            batty_config: None,
845            source_path: PathBuf::new(),
846        }
847    }
848
849    fn policy() -> AllocationPolicy {
850        AllocationPolicy {
851            strategy: AllocationStrategy::Scored,
852            ..AllocationPolicy::default()
853        }
854    }
855
856    #[test]
857    fn score_prefers_tag_overlap() {
858        let profile = EngineerProfile {
859            domain_tags: HashSet::from(["dispatch".to_string()]),
860            ..EngineerProfile::default()
861        };
862        assert!(score_engineer_for_task(&profile, &task(&["dispatch"], ""), &policy()) > 0);
863    }
864
865    #[test]
866    fn score_prefers_matching_directory_hints() {
867        let profile = EngineerProfile {
868            active_file_paths: HashSet::from(["src/team/dispatch/queue.rs".to_string()]),
869            ..EngineerProfile::default()
870        };
871        assert!(
872            score_engineer_for_task(
873                &profile,
874                &task(&[], "Touch src/team/dispatch/mod.rs next."),
875                &policy(),
876            ) > 0
877        );
878    }
879
880    #[test]
881    fn score_penalizes_active_load() {
882        let light = EngineerProfile {
883            active_task_count: 0,
884            ..EngineerProfile::default()
885        };
886        let busy = EngineerProfile {
887            active_task_count: 2,
888            ..EngineerProfile::default()
889        };
890        assert!(
891            score_engineer_for_task(&light, &task(&[], ""), &policy())
892                > score_engineer_for_task(&busy, &task(&[], ""), &policy())
893        );
894    }
895
896    #[test]
897    fn rank_engineers_falls_back_to_name_order_on_tie() {
898        let engineers = vec!["eng-2".to_string(), "eng-1".to_string()];
899        let profiles = HashMap::from([
900            ("eng-1".to_string(), EngineerProfile::default()),
901            ("eng-2".to_string(), EngineerProfile::default()),
902        ]);
903        let ranked = rank_engineers_for_task(&engineers, &profiles, &task(&[], ""), &policy());
904        assert_eq!(ranked, vec!["eng-1".to_string(), "eng-2".to_string()]);
905    }
906
907    #[test]
908    fn score_prefers_more_reliable_performance_profile() {
909        let reliable = EngineerProfile {
910            performance: Some(EngineerPerformanceProfile {
911                avg_task_completion_secs: Some(1800.0),
912                lines_per_hour: Some(250.0),
913                first_pass_test_rate: Some(1.0),
914                context_exhaustion_frequency: Some(0.0),
915            }),
916            ..EngineerProfile::default()
917        };
918        let unstable = EngineerProfile {
919            performance: Some(EngineerPerformanceProfile {
920                avg_task_completion_secs: Some(18_000.0),
921                lines_per_hour: Some(10.0),
922                first_pass_test_rate: Some(0.0),
923                context_exhaustion_frequency: Some(1.0),
924            }),
925            ..EngineerProfile::default()
926        };
927
928        assert!(
929            score_engineer_for_task(&reliable, &task(&[], ""), &policy())
930                > score_engineer_for_task(&unstable, &task(&[], ""), &policy())
931        );
932    }
933
934    #[test]
935    fn build_profiles_reads_changed_paths_and_tags_from_board_tasks() {
936        let tmp = tempfile::tempdir().unwrap();
937        let task_path = tmp.path().join("042-profile.md");
938        fs::write(
939            &task_path,
940            "---\nid: 42\ntitle: profile\nstatus: done\npriority: high\nclaimed_by: eng-2\ntags:\n  - dispatch\nchanged_paths:\n  - src/team/dispatch/queue.rs\nclass: standard\n---\n\nTouch src/team/dispatch/mod.rs too.\n",
941        )
942        .unwrap();
943
944        let task = Task::from_file(&task_path).unwrap();
945        let profiles =
946            build_engineer_profiles(&["eng-1".to_string(), "eng-2".to_string()], &[task]).unwrap();
947        let profile = profiles.get("eng-2").unwrap();
948
949        assert_eq!(profile.total_completions, 1);
950        assert!(profile.domain_tags.contains("dispatch"));
951        assert!(
952            profile
953                .active_file_paths
954                .contains("src/team/dispatch/queue.rs")
955        );
956    }
957
958    #[test]
959    fn build_profiles_counts_active_load_and_conflict_signals() {
960        let task = Task {
961            id: 7,
962            title: "conflicted".to_string(),
963            status: "review".to_string(),
964            priority: "high".to_string(),
965            claimed_by: Some("eng-1".to_string()),
966            claimed_at: None,
967            claim_ttl_secs: None,
968            claim_expires_at: None,
969            last_progress_at: None,
970            claim_warning_sent_at: None,
971            claim_extensions: None,
972            last_output_bytes: None,
973            blocked: Some("merge conflict".to_string()),
974            tags: vec!["daemon".to_string()],
975            depends_on: Vec::new(),
976            review_owner: None,
977            blocked_on: None,
978            worktree_path: None,
979            branch: None,
980            commit: None,
981            artifacts: Vec::new(),
982            next_action: None,
983            scheduled_for: None,
984            cron_schedule: None,
985            cron_last_run: None,
986            completed: None,
987            description: "Resolve rebase conflict in src/team/daemon/mod.rs".to_string(),
988            batty_config: None,
989            source_path: PathBuf::new(),
990        };
991
992        let profiles = build_engineer_profiles(&["eng-1".to_string()], &[task]).unwrap();
993        let profile = profiles.get("eng-1").unwrap();
994        assert_eq!(profile.active_task_count, 1);
995        assert_eq!(profile.recent_merge_conflicts, 1);
996    }
997
998    #[test]
999    fn load_engineer_profiles_merges_recent_persisted_history() {
1000        let tmp = tempfile::tempdir().unwrap();
1001        let persisted = PersistedEngineerProfiles {
1002            version: engineer_profiles_format_version(),
1003            updated_at: Some(Utc::now().to_rfc3339()),
1004            completions: vec![PersistedTaskProfile {
1005                engineer: "eng-2".to_string(),
1006                task_id: 9,
1007                completed_at: Utc::now().to_rfc3339(),
1008                changed_paths: vec!["src/team/dispatch/queue.rs".to_string()],
1009                tags: vec!["dispatch".to_string()],
1010                recent_merge_conflict: false,
1011            }],
1012        };
1013        write_persisted_engineer_profiles(tmp.path(), &persisted).unwrap();
1014
1015        let profiles = load_engineer_profiles(tmp.path(), &["eng-2".to_string()], &[]).unwrap();
1016        let profile = profiles.get("eng-2").unwrap();
1017
1018        assert_eq!(profile.total_completions, 1);
1019        assert!(profile.domain_tags.contains("dispatch"));
1020        assert!(
1021            profile
1022                .active_file_paths
1023                .contains("src/team/dispatch/queue.rs")
1024        );
1025    }
1026
1027    #[test]
1028    fn persist_completed_task_profile_prunes_old_history() {
1029        let tmp = tempfile::tempdir().unwrap();
1030        let stale_completed_at =
1031            (Utc::now() - Duration::days(PROFILE_RETENTION_DAYS + 1)).to_rfc3339();
1032        let persisted = PersistedEngineerProfiles {
1033            version: engineer_profiles_format_version(),
1034            updated_at: Some(Utc::now().to_rfc3339()),
1035            completions: vec![PersistedTaskProfile {
1036                engineer: "eng-1".to_string(),
1037                task_id: 1,
1038                completed_at: stale_completed_at,
1039                changed_paths: vec!["src/old.rs".to_string()],
1040                tags: vec!["old".to_string()],
1041                recent_merge_conflict: false,
1042            }],
1043        };
1044        write_persisted_engineer_profiles(tmp.path(), &persisted).unwrap();
1045
1046        let task_path = tmp.path().join("042-profile.md");
1047        fs::write(
1048            &task_path,
1049            "---\nid: 42\ntitle: profile\nstatus: done\npriority: high\nclaimed_by: eng-2\ntags:\n  - dispatch\nchanged_paths:\n  - src/team/dispatch/queue.rs\nclass: standard\ncompleted: 2026-04-06T03:00:00-04:00\n---\n\nTouch src/team/dispatch/mod.rs too.\n",
1050        )
1051        .unwrap();
1052
1053        let task = Task::from_file(&task_path).unwrap();
1054        persist_completed_task_profile(tmp.path(), &task).unwrap();
1055
1056        let loaded =
1057            load_engineer_profiles(tmp.path(), &["eng-1".to_string(), "eng-2".to_string()], &[])
1058                .unwrap();
1059        assert_eq!(loaded.get("eng-1").unwrap().total_completions, 0);
1060        assert_eq!(loaded.get("eng-2").unwrap().total_completions, 1);
1061    }
1062
1063    #[test]
1064    fn load_engineer_profiles_reads_telemetry_reliability_metrics() {
1065        let tmp = tempfile::tempdir().unwrap();
1066        fs::create_dir_all(tmp.path().join(".batty")).unwrap();
1067        let conn = telemetry_db::open(tmp.path()).unwrap();
1068        for task_id in 1..=5 {
1069            telemetry_db::insert_event(
1070                &conn,
1071                &TeamEvent::task_assigned("eng-2", &task_id.to_string()),
1072            )
1073            .unwrap();
1074            telemetry_db::insert_event(
1075                &conn,
1076                &TeamEvent::quality_metrics_recorded(&QualityMetricsInfo {
1077                    backend: "codex",
1078                    role: "eng-2",
1079                    task: &task_id.to_string(),
1080                    narration_ratio: 0.1,
1081                    commit_frequency: 1.0,
1082                    first_pass_test_rate: 1.0,
1083                    retry_rate: 0.0,
1084                    time_to_completion_secs: 120,
1085                }),
1086            )
1087            .unwrap();
1088            telemetry_db::insert_event(
1089                &conn,
1090                &TeamEvent::task_completed("eng-2", Some(&task_id.to_string())),
1091            )
1092            .unwrap();
1093        }
1094
1095        let profiles = load_engineer_profiles(tmp.path(), &["eng-2".to_string()], &[]).unwrap();
1096        let profile = profiles.get("eng-2").unwrap();
1097        assert_eq!(profile.telemetry_completed_tasks, 5);
1098        assert!((profile.completion_rate - 1.0).abs() < f64::EPSILON);
1099        assert_eq!(profile.avg_task_duration_secs, Some(120.0));
1100        assert_eq!(profile.first_pass_test_rate, Some(1.0));
1101    }
1102
1103    #[test]
1104    fn rank_engineers_prefers_higher_completion_rate_when_telemetry_is_sufficient() {
1105        let engineers = vec!["eng-1".to_string(), "eng-2".to_string()];
1106        let profiles = HashMap::from([
1107            (
1108                "eng-1".to_string(),
1109                EngineerProfile {
1110                    telemetry_completed_tasks: 5,
1111                    completion_rate: 0.4,
1112                    ..EngineerProfile::default()
1113                },
1114            ),
1115            (
1116                "eng-2".to_string(),
1117                EngineerProfile {
1118                    telemetry_completed_tasks: 5,
1119                    completion_rate: 0.9,
1120                    ..EngineerProfile::default()
1121                },
1122            ),
1123        ]);
1124
1125        let ranked = rank_engineers_for_task(&engineers, &profiles, &task(&[], ""), &policy());
1126        assert_eq!(ranked[0], "eng-2");
1127    }
1128
1129    #[test]
1130    fn explain_routing_falls_back_when_any_engineer_lacks_enough_samples() {
1131        let engineers = vec!["eng-2".to_string(), "eng-1".to_string()];
1132        let profiles = HashMap::from([
1133            (
1134                "eng-1".to_string(),
1135                EngineerProfile {
1136                    telemetry_completed_tasks: 4,
1137                    completion_rate: 1.0,
1138                    ..EngineerProfile::default()
1139                },
1140            ),
1141            (
1142                "eng-2".to_string(),
1143                EngineerProfile {
1144                    telemetry_completed_tasks: 7,
1145                    completion_rate: 0.2,
1146                    ..EngineerProfile::default()
1147                },
1148            ),
1149        ]);
1150
1151        let explanation =
1152            explain_routing_for_task(&engineers, &profiles, &task(&[], ""), &policy());
1153        assert!(explanation.fallback_to_round_robin);
1154        assert_eq!(explanation.chosen_engineer.as_deref(), Some("eng-1"));
1155    }
1156
1157    #[test]
1158    fn explain_routing_keeps_scored_mode_when_no_telemetry_exists() {
1159        let engineers = vec!["eng-1".to_string(), "eng-2".to_string()];
1160        let profiles = HashMap::from([
1161            ("eng-1".to_string(), EngineerProfile::default()),
1162            (
1163                "eng-2".to_string(),
1164                EngineerProfile {
1165                    domain_tags: HashSet::from(["dispatch".to_string()]),
1166                    ..EngineerProfile::default()
1167                },
1168            ),
1169        ]);
1170
1171        let explanation =
1172            explain_routing_for_task(&engineers, &profiles, &task(&["dispatch"], ""), &policy());
1173        assert!(!explanation.fallback_to_round_robin);
1174        assert_eq!(explanation.chosen_engineer.as_deref(), Some("eng-2"));
1175    }
1176}