1use std::collections::{BTreeMap, BTreeSet, VecDeque};
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10pub struct GoalPlan {
11 pub id: String,
12 pub objective: String,
13 pub epics: Vec<EpicPlan>,
14}
15
16#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
18pub struct EpicPlan {
19 pub id: String,
20 pub title: String,
21 pub tasks: Vec<TaskPlan>,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26pub struct TaskPlan {
27 pub id: String,
28 pub title: String,
29 pub depends_on: Vec<String>,
30 pub estimate_hours: u32,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(tag = "state", rename_all = "snake_case")]
36pub enum PlanTaskStatus {
37 Pending,
38 InProgress,
39 Done,
40 Blocked { reason: String },
41 Failed { reason: String },
42}
43
44#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
46pub struct PlanTask {
47 pub id: String,
48 pub title: String,
49 pub depends_on: Vec<String>,
50 pub estimate_hours: u32,
51 pub status: PlanTaskStatus,
52 pub confidence: f32,
53 pub updated_at: DateTime<Utc>,
54}
55
56impl PlanTask {
57 pub fn pending(id: &str, depends_on: Vec<String>, updated_at: DateTime<Utc>) -> Self {
58 Self {
59 id: id.to_string(),
60 title: id.to_string(),
61 depends_on,
62 estimate_hours: 1,
63 status: PlanTaskStatus::Pending,
64 confidence: 1.0,
65 updated_at,
66 }
67 }
68}
69
70#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
72pub struct ExecutionDag {
73 pub goal_id: String,
74 pub objective: String,
75 pub tasks: BTreeMap<String, PlanTask>,
76}
77
78impl ExecutionDag {
79 pub fn validate(&self) -> Result<(), PlanningError> {
80 for (task_id, task) in &self.tasks {
81 for dep in &task.depends_on {
82 if !self.tasks.contains_key(dep) {
83 return Err(PlanningError::MissingDependency {
84 task_id: task_id.clone(),
85 missing_dependency: dep.clone(),
86 });
87 }
88 }
89 }
90
91 let mut indegree: BTreeMap<String, usize> =
92 self.tasks.keys().map(|k| (k.clone(), 0usize)).collect();
93 let mut edges: BTreeMap<String, Vec<String>> = BTreeMap::new();
94 for (task_id, task) in &self.tasks {
95 for dep in &task.depends_on {
96 edges.entry(dep.clone()).or_default().push(task_id.clone());
97 *indegree.get_mut(task_id).expect("task in indegree") += 1;
98 }
99 }
100
101 let mut queue: VecDeque<String> = indegree
102 .iter()
103 .filter(|(_, d)| **d == 0)
104 .map(|(k, _)| k.clone())
105 .collect();
106
107 let mut visited = 0usize;
108 while let Some(node) = queue.pop_front() {
109 visited += 1;
110 if let Some(neighbors) = edges.get(&node) {
111 for n in neighbors {
112 let entry = indegree.get_mut(n).expect("neighbor in indegree");
113 *entry -= 1;
114 if *entry == 0 {
115 queue.push_back(n.clone());
116 }
117 }
118 }
119 }
120
121 if visited != self.tasks.len() {
122 return Err(PlanningError::CycleDetected);
123 }
124 Ok(())
125 }
126}
127
128#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
130pub struct SchedulerConstraints {
131 pub max_parallel: usize,
132 pub blocked_tasks: BTreeSet<String>,
133}
134
135#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
137pub struct ProgressReport {
138 pub total_tasks: usize,
139 pub done_tasks: usize,
140 pub in_progress_tasks: usize,
141 pub blocked_tasks: usize,
142 pub failed_tasks: usize,
143 pub pending_tasks: usize,
144 pub completion_ratio: f32,
145 pub confidence: f32,
146 pub blockers: Vec<String>,
147}
148
149#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
151pub struct ReplanPolicy {
152 pub min_confidence: f32,
153 pub max_blocked_ratio: f32,
154 pub trigger_on_failure: bool,
155 pub max_stale_hours: i64,
156}
157
158#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
160#[serde(tag = "reason", rename_all = "snake_case")]
161pub enum ReplanReason {
162 LowConfidence {
163 observed: f32,
164 threshold: f32,
165 },
166 BlockedRatio {
167 observed: f32,
168 threshold: f32,
169 },
170 FailedTasks {
171 count: usize,
172 },
173 StaleProgress {
174 stale_hours: i64,
175 threshold_hours: i64,
176 },
177}
178
179#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
181pub struct ReplanDecision {
182 pub should_replan: bool,
183 pub reasons: Vec<ReplanReason>,
184}
185
186#[derive(Debug, thiserror::Error, PartialEq, Eq)]
187pub enum PlanningError {
188 #[error("task '{task_id}' has missing dependency '{missing_dependency}'")]
189 MissingDependency {
190 task_id: String,
191 missing_dependency: String,
192 },
193 #[error("dependency cycle detected in execution DAG")]
194 CycleDetected,
195}
196
197pub fn decompose_goal_to_dag(goal: &GoalPlan) -> Result<ExecutionDag, PlanningError> {
199 let mut tasks = BTreeMap::new();
200 let now = Utc::now();
201 for epic in &goal.epics {
202 for t in &epic.tasks {
203 tasks.insert(
204 t.id.clone(),
205 PlanTask {
206 id: t.id.clone(),
207 title: t.title.clone(),
208 depends_on: t.depends_on.clone(),
209 estimate_hours: t.estimate_hours,
210 status: PlanTaskStatus::Pending,
211 confidence: 1.0,
212 updated_at: now,
213 },
214 );
215 }
216 }
217
218 let dag = ExecutionDag {
219 goal_id: goal.id.clone(),
220 objective: goal.objective.clone(),
221 tasks,
222 };
223 dag.validate()?;
224 Ok(dag)
225}
226
227pub fn schedule_next_ready_tasks(
229 dag: &ExecutionDag,
230 constraints: &SchedulerConstraints,
231) -> Result<Vec<String>, PlanningError> {
232 dag.validate()?;
233 if constraints.max_parallel == 0 {
234 return Ok(Vec::new());
235 }
236
237 let mut ready: Vec<String> = dag
238 .tasks
239 .iter()
240 .filter_map(|(id, task)| match task.status {
241 PlanTaskStatus::Pending => Some((id, task)),
242 _ => None,
243 })
244 .filter(|(id, _)| !constraints.blocked_tasks.contains(*id))
245 .filter(|(_, task)| {
246 task.depends_on.iter().all(|dep| {
247 matches!(
248 dag.tasks.get(dep).map(|t| &t.status),
249 Some(PlanTaskStatus::Done)
250 )
251 })
252 })
253 .map(|(id, _)| id.clone())
254 .collect();
255
256 ready.sort();
257 ready.truncate(constraints.max_parallel);
258 Ok(ready)
259}
260
261pub fn compute_progress(dag: &ExecutionDag) -> ProgressReport {
263 let total = dag.tasks.len();
264 let mut done = 0usize;
265 let mut in_progress = 0usize;
266 let mut blocked = 0usize;
267 let mut failed = 0usize;
268 let mut pending = 0usize;
269 let mut blockers = Vec::new();
270 let mut confidence_sum = 0.0f32;
271
272 for task in dag.tasks.values() {
273 confidence_sum += task.confidence;
274 match &task.status {
275 PlanTaskStatus::Done => done += 1,
276 PlanTaskStatus::InProgress => in_progress += 1,
277 PlanTaskStatus::Blocked { reason } => {
278 blocked += 1;
279 blockers.push(reason.clone());
280 }
281 PlanTaskStatus::Failed { .. } => failed += 1,
282 PlanTaskStatus::Pending => pending += 1,
283 }
284 }
285
286 let completion_ratio = if total == 0 {
287 0.0
288 } else {
289 done as f32 / total as f32
290 };
291 let confidence = if total == 0 {
292 0.0
293 } else {
294 confidence_sum / total as f32
295 };
296
297 ProgressReport {
298 total_tasks: total,
299 done_tasks: done,
300 in_progress_tasks: in_progress,
301 blocked_tasks: blocked,
302 failed_tasks: failed,
303 pending_tasks: pending,
304 completion_ratio,
305 confidence,
306 blockers,
307 }
308}
309
310pub fn evaluate_replan(
312 dag: &ExecutionDag,
313 policy: &ReplanPolicy,
314 now: DateTime<Utc>,
315) -> ReplanDecision {
316 let report = compute_progress(dag);
317 let mut reasons = Vec::new();
318
319 if report.confidence < policy.min_confidence {
320 reasons.push(ReplanReason::LowConfidence {
321 observed: report.confidence,
322 threshold: policy.min_confidence,
323 });
324 }
325
326 let blocked_ratio = if report.total_tasks == 0 {
327 0.0
328 } else {
329 report.blocked_tasks as f32 / report.total_tasks as f32
330 };
331 if blocked_ratio > policy.max_blocked_ratio {
332 reasons.push(ReplanReason::BlockedRatio {
333 observed: blocked_ratio,
334 threshold: policy.max_blocked_ratio,
335 });
336 }
337
338 if policy.trigger_on_failure && report.failed_tasks > 0 {
339 reasons.push(ReplanReason::FailedTasks {
340 count: report.failed_tasks,
341 });
342 }
343
344 if let Some(oldest) = dag.tasks.values().map(|t| t.updated_at).min() {
345 let stale_hours = (now - oldest).num_hours();
346 if stale_hours > policy.max_stale_hours {
347 reasons.push(ReplanReason::StaleProgress {
348 stale_hours,
349 threshold_hours: policy.max_stale_hours,
350 });
351 }
352 }
353
354 ReplanDecision {
355 should_replan: !reasons.is_empty(),
356 reasons,
357 }
358}