1use crate::config::StepPrehookContext;
2use anyhow::{Context, Result, anyhow};
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7use super::dag::{DynamicExecutionPlan, PrehookConfig, WorkflowEdge, WorkflowNode};
8
9pub use orchestrator_config::adaptive::{AdaptiveFallbackMode, AdaptivePlannerConfig};
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
13pub struct ExecutionHistoryRecord {
14 pub task_id: String,
16 pub item_id: String,
18 pub cycle: u32,
20 pub steps: Vec<StepExecutionRecord>,
22 pub final_status: String,
24 pub timestamp: DateTime<Utc>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
30pub struct StepExecutionRecord {
31 pub step_id: String,
33 pub step_type: String,
35 pub exit_code: i64,
37 pub duration_ms: u64,
39 pub confidence: Option<f32>,
41 pub quality_score: Option<f32>,
43 pub tickets_created: i64,
45 pub tickets_resolved: i64,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
51#[serde(rename_all = "snake_case")]
52pub enum AdaptivePlanSource {
53 Planner,
55 DeterministicFallback,
57}
58
59#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
61#[serde(rename_all = "snake_case")]
62pub enum AdaptiveFailureClass {
63 Disabled,
65 Misconfigured,
67 ExecutorFailure,
69 InvalidJson,
71 InvalidPlan,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
77pub struct AdaptivePlanMetadata {
78 pub source: AdaptivePlanSource,
80 pub used_fallback: bool,
82 #[serde(default, skip_serializing_if = "Option::is_none")]
84 pub error_class: Option<AdaptiveFailureClass>,
85 #[serde(default, skip_serializing_if = "Option::is_none")]
87 pub error_message: Option<String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct AdaptivePlanOutcome {
93 pub plan: DynamicExecutionPlan,
95 pub metadata: AdaptivePlanMetadata,
97 #[serde(default, skip_serializing_if = "Option::is_none")]
99 pub raw_output: Option<String>,
100}
101
102#[async_trait]
103pub trait AdaptivePlanExecutor: Send + Sync {
105 async fn execute(&self, prompt: &str, config: &AdaptivePlannerConfig) -> Result<String>;
107}
108
109#[derive(Debug, Clone)]
111pub struct AdaptivePlanner {
112 config: AdaptivePlannerConfig,
113 history: Vec<ExecutionHistoryRecord>,
114}
115
116impl AdaptivePlanner {
117 pub fn new(config: AdaptivePlannerConfig) -> Self {
119 Self {
120 config,
121 history: Vec::new(),
122 }
123 }
124
125 pub fn add_history(&mut self, record: ExecutionHistoryRecord) {
127 if self.history.len() >= self.config.max_history {
128 self.history.remove(0);
129 }
130 self.history.push(record);
131 }
132
133 pub fn history(&self) -> &[ExecutionHistoryRecord] {
135 &self.history
136 }
137
138 pub async fn generate_plan<E>(
140 &self,
141 executor: &E,
142 context: &StepPrehookContext,
143 ) -> Result<AdaptivePlanOutcome>
144 where
145 E: AdaptivePlanExecutor,
146 {
147 if !self.config.enabled {
148 return Err(anyhow!("adaptive planning is not enabled"));
149 }
150
151 if self
152 .config
153 .planner_agent
154 .as_deref()
155 .map(str::trim)
156 .map(str::is_empty)
157 .unwrap_or(true)
158 {
159 return self.handle_failure(
160 AdaptiveFailureClass::Misconfigured,
161 anyhow!("adaptive planner is enabled but planner_agent is not configured"),
162 context,
163 None,
164 );
165 }
166
167 let prompt = self.build_prompt(context)?;
168 let response = match executor.execute(&prompt, &self.config).await {
169 Ok(response) => response,
170 Err(err) => {
171 return self.handle_failure(
172 AdaptiveFailureClass::ExecutorFailure,
173 err,
174 context,
175 None,
176 );
177 }
178 };
179
180 let plan = match serde_json::from_str::<DynamicExecutionPlan>(&response) {
181 Ok(plan) => plan,
182 Err(err) => {
183 return self.handle_failure(
184 AdaptiveFailureClass::InvalidJson,
185 anyhow!("adaptive planner returned invalid JSON: {}", err),
186 context,
187 Some(response),
188 );
189 }
190 };
191
192 if let Err(err) = validate_generated_plan(&plan) {
193 return self.handle_failure(
194 AdaptiveFailureClass::InvalidPlan,
195 err,
196 context,
197 Some(response),
198 );
199 }
200
201 Ok(AdaptivePlanOutcome {
202 plan,
203 metadata: AdaptivePlanMetadata {
204 source: AdaptivePlanSource::Planner,
205 used_fallback: false,
206 error_class: None,
207 error_message: None,
208 },
209 raw_output: Some(response),
210 })
211 }
212
213 fn handle_failure(
214 &self,
215 class: AdaptiveFailureClass,
216 err: anyhow::Error,
217 context: &StepPrehookContext,
218 raw_output: Option<String>,
219 ) -> Result<AdaptivePlanOutcome> {
220 match self.config.fallback_mode {
221 AdaptiveFallbackMode::SoftFallback => {
222 tracing::warn!(
223 error_class = ?class,
224 error = %err,
225 task_id = %context.task_id,
226 item_id = %context.task_item_id,
227 "adaptive planner failed; using deterministic fallback"
228 );
229 Ok(AdaptivePlanOutcome {
230 plan: deterministic_fallback_plan(context),
231 metadata: AdaptivePlanMetadata {
232 source: AdaptivePlanSource::DeterministicFallback,
233 used_fallback: true,
234 error_class: Some(class),
235 error_message: Some(err.to_string()),
236 },
237 raw_output,
238 })
239 }
240 AdaptiveFallbackMode::FailClosed => Err(err.context(format!(
241 "adaptive planning failed ({})",
242 adaptive_failure_class_name(class)
243 ))),
244 }
245 }
246
247 fn build_prompt(&self, context: &StepPrehookContext) -> Result<String> {
248 let history_json =
249 serde_json::to_string(&self.history).context("serialize adaptive planner history")?;
250
251 Ok(format!(
252 r#"You are an adaptive workflow planner for an agent orchestrator.
253Return ONLY valid JSON that deserializes into:
254{{
255 "entry": "optional-node-id",
256 "nodes": {{
257 "node_id": {{
258 "id": "node_id",
259 "step_type": "qa|fix|retest|custom",
260 "agent_id": "optional-agent-id",
261 "template": "optional-command-template",
262 "prehook": {{
263 "engine": "cel",
264 "when": "expression",
265 "reason": "optional",
266 "extended": false
267 }},
268 "is_guard": false,
269 "repeatable": true
270 }}
271 }},
272 "edges": [
273 {{
274 "from": "node_id",
275 "to": "node_id",
276 "condition": "optional expression"
277 }}
278 ]
279}}
280
281Rules:
282- Output JSON only, no markdown or prose.
283- All node ids must be unique.
284- The graph must be acyclic.
285- Keep plans minimal and executable.
286- If fix is unnecessary, omit it instead of adding unreachable nodes.
287- Use the configured agent_id only when you need to pin a specific agent.
288- Temperature hint: {}
289
290Context:
291- Task: {}
292- Item: {}
293- Cycle: {}
294- Active step: {}
295- QA file path: {}
296- Item status: {}
297- Task status: {}
298- QA exit code: {:?}
299- Fix exit code: {:?}
300- Retest exit code: {:?}
301- Active tickets: {}
302- New tickets: {}
303- QA failed: {}
304- Fix required: {}
305- QA confidence: {:?}
306- QA quality score: {:?}
307- Build error count: {}
308- Test failure count: {}
309- Build exit code: {:?}
310- Test exit code: {:?}
311- Self test exit code: {:?}
312- Self test passed: {}
313- Max cycles: {}
314- Is last cycle: {}
315- Self referential safe: {}
316
317Recent execution history:
318{}
319"#,
320 self.config.temperature,
321 context.task_id,
322 context.task_item_id,
323 context.cycle,
324 context.step,
325 context.qa_file_path,
326 context.item_status,
327 context.task_status,
328 context.qa_exit_code,
329 context.fix_exit_code,
330 context.retest_exit_code,
331 context.active_ticket_count,
332 context.new_ticket_count,
333 context.qa_failed,
334 context.fix_required,
335 context.qa_confidence,
336 context.qa_quality_score,
337 context.build_error_count,
338 context.test_failure_count,
339 context.build_exit_code,
340 context.test_exit_code,
341 context.self_test_exit_code,
342 context.self_test_passed,
343 context.max_cycles,
344 context.is_last_cycle,
345 context.self_referential_safe,
346 history_json
347 ))
348 }
349}
350
351pub fn adaptive_failure_class_name(class: AdaptiveFailureClass) -> &'static str {
353 match class {
354 AdaptiveFailureClass::Disabled => "disabled",
355 AdaptiveFailureClass::Misconfigured => "misconfigured",
356 AdaptiveFailureClass::ExecutorFailure => "executor_failure",
357 AdaptiveFailureClass::InvalidJson => "invalid_json",
358 AdaptiveFailureClass::InvalidPlan => "invalid_plan",
359 }
360}
361
362pub fn deterministic_fallback_plan(_context: &StepPrehookContext) -> DynamicExecutionPlan {
364 let mut plan = DynamicExecutionPlan::new();
365
366 let _ = plan.add_node(WorkflowNode {
367 id: "qa".to_string(),
368 step_type: "qa".to_string(),
369 agent_id: None,
370 template: None,
371 prehook: None,
372 is_guard: false,
373 repeatable: false,
374 });
375
376 let _ = plan.add_node(WorkflowNode {
377 id: "fix".to_string(),
378 step_type: "fix".to_string(),
379 agent_id: None,
380 template: None,
381 prehook: Some(PrehookConfig {
382 engine: "cel".to_string(),
383 when: "active_ticket_count > 0".to_string(),
384 reason: Some("Only run fix when there are active tickets".to_string()),
385 extended: false,
386 }),
387 is_guard: false,
388 repeatable: true,
389 });
390
391 let _ = plan.add_edge(WorkflowEdge {
392 from: "qa".to_string(),
393 to: "fix".to_string(),
394 condition: Some("qa_exit_code != 0 || active_ticket_count > 0".to_string()),
395 });
396
397 plan.entry = Some("qa".to_string());
398 plan
399}
400
401pub fn validate_generated_plan(plan: &DynamicExecutionPlan) -> Result<()> {
403 if plan.nodes.is_empty() {
404 anyhow::bail!("adaptive plan must define at least one node");
405 }
406
407 if let Some(entry) = plan.entry.as_deref() {
408 if !plan.nodes.contains_key(entry) {
409 anyhow::bail!("adaptive plan entry node '{}' does not exist", entry);
410 }
411 }
412
413 for (node_id, node) in &plan.nodes {
414 if node.id.trim().is_empty() {
415 anyhow::bail!("adaptive plan contains node with empty id");
416 }
417 if node.id != *node_id {
418 anyhow::bail!(
419 "adaptive plan node key '{}' does not match node.id '{}'",
420 node_id,
421 node.id
422 );
423 }
424 if node.step_type.trim().is_empty() {
425 anyhow::bail!("adaptive plan node '{}' has empty step_type", node.id);
426 }
427 }
428
429 for edge in &plan.edges {
430 if !plan.nodes.contains_key(&edge.from) {
431 anyhow::bail!("adaptive plan edge source '{}' does not exist", edge.from);
432 }
433 if !plan.nodes.contains_key(&edge.to) {
434 anyhow::bail!("adaptive plan edge target '{}' does not exist", edge.to);
435 }
436 }
437
438 if plan.has_cycles() {
439 anyhow::bail!("adaptive plan must be acyclic");
440 }
441
442 Ok(())
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448
449 struct MockExecutor {
450 response: Option<String>,
451 error: Option<String>,
452 }
453
454 #[async_trait]
455 impl AdaptivePlanExecutor for MockExecutor {
456 async fn execute(&self, _prompt: &str, _config: &AdaptivePlannerConfig) -> Result<String> {
457 match (&self.response, &self.error) {
458 (Some(response), None) => Ok(response.clone()),
459 (None, Some(error)) => Err(anyhow!(error.clone())),
460 _ => Err(anyhow!("mock executor misconfigured")),
461 }
462 }
463 }
464
465 fn enabled_config() -> AdaptivePlannerConfig {
466 AdaptivePlannerConfig {
467 enabled: true,
468 planner_agent: Some("adaptive-agent".to_string()),
469 ..Default::default()
470 }
471 }
472
473 #[tokio::test]
474 async fn test_adaptive_planner_disabled() {
475 let planner = AdaptivePlanner::new(AdaptivePlannerConfig::default());
476 let executor = MockExecutor {
477 response: Some("{}".to_string()),
478 error: None,
479 };
480
481 let result = planner
482 .generate_plan(&executor, &StepPrehookContext::default())
483 .await;
484 assert!(result.is_err());
485 }
486
487 #[test]
488 fn test_adaptive_planner_config_default() {
489 let cfg = AdaptivePlannerConfig::default();
490 assert!(!cfg.enabled);
491 assert!(cfg.planner_agent.is_none());
492 assert_eq!(cfg.max_history, 10);
493 assert!((cfg.temperature - 0.7).abs() < f32::EPSILON);
494 assert_eq!(cfg.fallback_mode, AdaptiveFallbackMode::SoftFallback);
495 }
496
497 #[test]
498 fn test_adaptive_planner_add_history_respects_max() {
499 let mut planner = AdaptivePlanner::new(AdaptivePlannerConfig {
500 max_history: 2,
501 ..enabled_config()
502 });
503
504 for i in 0..5 {
505 planner.add_history(ExecutionHistoryRecord {
506 task_id: format!("task_{}", i),
507 item_id: "item".to_string(),
508 cycle: i,
509 steps: vec![],
510 final_status: "done".to_string(),
511 timestamp: Utc::now(),
512 });
513 }
514 assert_eq!(planner.history().len(), 2);
515 assert_eq!(planner.history()[0].task_id, "task_3");
516 assert_eq!(planner.history()[1].task_id, "task_4");
517 }
518
519 #[tokio::test]
520 async fn test_adaptive_planner_generate_plan_enabled() {
521 let planner = AdaptivePlanner::new(enabled_config());
522 let executor = MockExecutor {
523 response: Some(
524 r#"{"entry":"qa","nodes":{"qa":{"id":"qa","step_type":"qa","repeatable":false},"fix":{"id":"fix","step_type":"fix","repeatable":true}},"edges":[{"from":"qa","to":"fix","condition":"active_ticket_count > 0"}]}"#
525 .to_string(),
526 ),
527 error: None,
528 };
529
530 let outcome = planner
531 .generate_plan(&executor, &StepPrehookContext::default())
532 .await
533 .expect("adaptive planner should generate a plan when enabled");
534 assert_eq!(outcome.metadata.source, AdaptivePlanSource::Planner);
535 assert!(outcome.plan.nodes.contains_key("qa"));
536 assert!(outcome.plan.nodes.contains_key("fix"));
537 assert_eq!(outcome.plan.edges.len(), 1);
538 }
539
540 #[tokio::test]
541 async fn test_adaptive_planner_soft_fallback_on_invalid_json() {
542 let planner = AdaptivePlanner::new(enabled_config());
543 let executor = MockExecutor {
544 response: Some("not-json".to_string()),
545 error: None,
546 };
547
548 let outcome = planner
549 .generate_plan(&executor, &StepPrehookContext::default())
550 .await
551 .expect("soft fallback should succeed");
552 assert!(outcome.metadata.used_fallback);
553 assert_eq!(
554 outcome.metadata.error_class,
555 Some(AdaptiveFailureClass::InvalidJson)
556 );
557 assert_eq!(
558 outcome.metadata.source,
559 AdaptivePlanSource::DeterministicFallback
560 );
561 assert_eq!(outcome.plan.entry.as_deref(), Some("qa"));
562 }
563
564 #[tokio::test]
565 async fn test_adaptive_planner_fail_closed_on_invalid_json() {
566 let planner = AdaptivePlanner::new(AdaptivePlannerConfig {
567 fallback_mode: AdaptiveFallbackMode::FailClosed,
568 ..enabled_config()
569 });
570 let executor = MockExecutor {
571 response: Some("not-json".to_string()),
572 error: None,
573 };
574
575 let err = planner
576 .generate_plan(&executor, &StepPrehookContext::default())
577 .await
578 .expect_err("fail closed should error");
579 assert!(err.to_string().contains("invalid_json"));
580 }
581
582 #[tokio::test]
583 async fn test_adaptive_planner_rejects_missing_planner_agent() {
584 let planner = AdaptivePlanner::new(AdaptivePlannerConfig {
585 enabled: true,
586 planner_agent: None,
587 ..Default::default()
588 });
589 let executor = MockExecutor {
590 response: Some("{}".to_string()),
591 error: None,
592 };
593
594 let outcome = planner
595 .generate_plan(&executor, &StepPrehookContext::default())
596 .await
597 .expect("soft fallback should handle misconfiguration");
598 assert_eq!(
599 outcome.metadata.error_class,
600 Some(AdaptiveFailureClass::Misconfigured)
601 );
602 }
603
604 #[test]
605 fn test_validate_generated_plan_rejects_unknown_entry() {
606 let plan = DynamicExecutionPlan {
607 nodes: std::collections::HashMap::new(),
608 edges: vec![],
609 entry: Some("missing".to_string()),
610 };
611 let err = validate_generated_plan(&plan).expect_err("plan should fail");
612 assert!(err.to_string().contains("at least one node"));
613 }
614}