1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Workflow Learning v1 run finalization (GCL-04 / TAN-44). Split out of part02.rs
// to keep that file under the line-count gate.
impl AppState {
async fn finalize_terminal_automation_v2_run_learning(
&self,
run: &AutomationV2RunRecord,
) -> anyhow::Result<()> {
let learning_policy =
crate::workflow_learning_policy::WorkflowLearningPromotionPolicy::from_env();
let automation = if let Some(snapshot) = run.automation_snapshot.clone() {
snapshot
} else if let Some(current) = self.get_automation_v2(&run.automation_id).await {
current
} else {
return Ok(());
};
let recent_runs = self
.list_automation_v2_runs(Some(&run.automation_id), 50)
.await;
let metrics =
crate::app::state::automation::workflow_learning_metrics_snapshot(&recent_runs);
let existing_candidates = self
.list_workflow_learning_candidates(Some(&run.automation_id), None, None)
.await;
let generated =
crate::app::state::automation::workflow_learning_candidates_for_terminal_run(
&automation,
run,
&recent_runs,
&existing_candidates,
);
let mut generated_candidate_ids = Vec::new();
for candidate in generated {
let stored = self.upsert_workflow_learning_candidate(candidate).await?;
// Consult the promotion policy. Fail closed: with the default
// (auto-apply disabled) this is always RequireHumanReview/Block, so
// candidates stay `Proposed` for the human review endpoint exactly as
// before. Only an explicit opt-in can auto-apply a low-risk candidate.
if stored.status == WorkflowLearningCandidateStatus::Proposed
&& learning_policy
.evaluate_promotion(&stored, &metrics)
.is_auto_apply()
{
let baseline = metrics.clone();
let _ = self
.update_workflow_learning_candidate(&stored.candidate_id, |candidate| {
candidate.status = WorkflowLearningCandidateStatus::Applied;
// Capture the baseline the same way the review endpoint
// does, so the before/after regression gate can run.
candidate.baseline_before = Some(baseline.clone());
})
.await;
self.event_bus.publish(EngineEvent::new(
"workflow_learning.candidate.auto_applied",
serde_json::json!({
"candidate_id": stored.candidate_id,
"workflow_id": stored.workflow_id,
"kind": format!("{:?}", stored.kind),
"confidence": stored.confidence,
}),
));
}
generated_candidate_ids.push(stored.candidate_id);
}
let candidate_ids = self
.list_workflow_learning_candidates(Some(&run.automation_id), None, None)
.await
.into_iter()
.filter(|candidate| {
matches!(
candidate.status,
WorkflowLearningCandidateStatus::Approved
| WorkflowLearningCandidateStatus::Applied
) && candidate.baseline_before.is_some()
})
.map(|candidate| candidate.candidate_id)
.collect::<Vec<_>>();
for candidate_id in candidate_ids {
let _ = self
.update_workflow_learning_candidate(&candidate_id, |candidate| {
candidate.latest_observed_metrics = Some(metrics.clone());
if candidate.status == WorkflowLearningCandidateStatus::Applied {
if let Some(baseline) = candidate.baseline_before.as_ref() {
// Count terminal runs that finished *after* the baseline
// was captured. This is uncapped by the rolling window:
// subtracting capped snapshot sample sizes would pin the
// post-apply count at 0 on mature workflows, so a
// regression could never be detected.
let post_apply_sample_size = recent_runs
.iter()
.filter(|candidate_run| {
candidate_run
.finished_at_ms
.is_some_and(|finished| finished > baseline.computed_at_ms)
})
.count();
// Route the before/after gate through the policy so the
// thresholds are centralized and testable. Default
// thresholds reproduce the prior inline behavior.
if learning_policy
.evaluate_regression(baseline, &metrics, post_apply_sample_size)
.is_regressed()
{
candidate.status = WorkflowLearningCandidateStatus::Regressed;
}
}
}
})
.await;
}
let updated_run = {
let mut guard = self.automation_v2_runs.write().await;
let Some(stored_run) = guard.get_mut(&run.run_id) else {
return Ok(());
};
let summary = stored_run
.learning_summary
.get_or_insert_with(WorkflowLearningRunSummary::default);
for candidate_id in generated_candidate_ids {
if !summary
.generated_candidate_ids
.iter()
.any(|value| value == &candidate_id)
{
summary.generated_candidate_ids.push(candidate_id);
}
}
summary.post_run_metrics = Some(metrics);
stored_run.clone()
};
self.persist_automation_v2_runs().await?;
self.persist_automation_v2_run_status_json(&updated_run)
.await?;
Ok(())
}
}