Skip to main content

agent_orchestrator/dynamic_orchestration/
adaptive.rs

1use crate::config::StepPrehookContext;
2use anyhow::{Context, Result, anyhow};
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7use super::dag::{DynamicExecutionPlan, PrehookConfig, WorkflowEdge, WorkflowNode};
8
9pub use orchestrator_config::adaptive::{AdaptiveFallbackMode, AdaptivePlannerConfig};
10
11/// Historical execution record for planning context.
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
13pub struct ExecutionHistoryRecord {
14    /// Task that produced this historical record.
15    pub task_id: String,
16    /// Task item associated with the recorded execution.
17    pub item_id: String,
18    /// Workflow cycle number for the record.
19    pub cycle: u32,
20    /// Step-level execution summaries captured for the cycle.
21    pub steps: Vec<StepExecutionRecord>,
22    /// Final task-item status after the cycle.
23    pub final_status: String,
24    /// Timestamp when the record was captured.
25    pub timestamp: DateTime<Utc>,
26}
27
28/// Step-level execution data included in adaptive planning history.
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
30pub struct StepExecutionRecord {
31    /// Workflow step identifier.
32    pub step_id: String,
33    /// Semantic step type used by the planner.
34    pub step_type: String,
35    /// Exit code returned by the step command.
36    pub exit_code: i64,
37    /// Measured wall-clock duration in milliseconds.
38    pub duration_ms: u64,
39    /// Confidence score reported by the agent, when available.
40    pub confidence: Option<f32>,
41    /// Quality score reported by the agent, when available.
42    pub quality_score: Option<f32>,
43    /// Number of tickets created by the step.
44    pub tickets_created: i64,
45    /// Number of tickets resolved by the step.
46    pub tickets_resolved: i64,
47}
48
49/// Source used to materialize an adaptive execution plan.
50#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
51#[serde(rename_all = "snake_case")]
52pub enum AdaptivePlanSource {
53    /// The adaptive planner returned a valid plan.
54    Planner,
55    /// The planner failed and deterministic fallback logic was used.
56    DeterministicFallback,
57}
58
59/// Failure classes used to explain adaptive planner degradation.
60#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
61#[serde(rename_all = "snake_case")]
62pub enum AdaptiveFailureClass {
63    /// Adaptive planning is disabled for the workflow.
64    Disabled,
65    /// Required planner configuration is missing or invalid.
66    Misconfigured,
67    /// The injected executor failed before returning a plan.
68    ExecutorFailure,
69    /// The executor returned output that was not valid JSON.
70    InvalidJson,
71    /// The returned graph failed structural validation.
72    InvalidPlan,
73}
74
75/// Metadata describing how an adaptive plan outcome was produced.
76#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
77pub struct AdaptivePlanMetadata {
78    /// Source that produced the final plan.
79    pub source: AdaptivePlanSource,
80    /// Whether fallback logic was used.
81    pub used_fallback: bool,
82    /// Failure class recorded when fallback logic was used.
83    #[serde(default, skip_serializing_if = "Option::is_none")]
84    pub error_class: Option<AdaptiveFailureClass>,
85    /// Human-readable error message captured during fallback.
86    #[serde(default, skip_serializing_if = "Option::is_none")]
87    pub error_message: Option<String>,
88}
89
90/// Final adaptive planning result returned to the scheduler.
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct AdaptivePlanOutcome {
93    /// Executable graph selected for the workflow item.
94    pub plan: DynamicExecutionPlan,
95    /// Metadata describing planner source and degradation state.
96    pub metadata: AdaptivePlanMetadata,
97    /// Raw planner output before JSON deserialization, when retained.
98    #[serde(default, skip_serializing_if = "Option::is_none")]
99    pub raw_output: Option<String>,
100}
101
102#[async_trait]
103/// Injected executor used by `AdaptivePlanner` to obtain raw planner output.
104pub trait AdaptivePlanExecutor: Send + Sync {
105    /// Executes the planner prompt and returns raw JSON output.
106    async fn execute(&self, prompt: &str, config: &AdaptivePlannerConfig) -> Result<String>;
107}
108
109/// Adaptive planner that delegates plan generation to an injected executor.
110#[derive(Debug, Clone)]
111pub struct AdaptivePlanner {
112    config: AdaptivePlannerConfig,
113    history: Vec<ExecutionHistoryRecord>,
114}
115
116impl AdaptivePlanner {
117    /// Creates an adaptive planner with empty execution history.
118    pub fn new(config: AdaptivePlannerConfig) -> Self {
119        Self {
120            config,
121            history: Vec::new(),
122        }
123    }
124
125    /// Adds one historical execution record, trimming to the configured history size.
126    pub fn add_history(&mut self, record: ExecutionHistoryRecord) {
127        if self.history.len() >= self.config.max_history {
128            self.history.remove(0);
129        }
130        self.history.push(record);
131    }
132
133    /// Returns the in-memory execution history currently used for prompt generation.
134    pub fn history(&self) -> &[ExecutionHistoryRecord] {
135        &self.history
136    }
137
138    /// Generates a dynamic execution plan using the configured adaptive planner executor.
139    pub async fn generate_plan<E>(
140        &self,
141        executor: &E,
142        context: &StepPrehookContext,
143    ) -> Result<AdaptivePlanOutcome>
144    where
145        E: AdaptivePlanExecutor,
146    {
147        if !self.config.enabled {
148            return Err(anyhow!("adaptive planning is not enabled"));
149        }
150
151        if self
152            .config
153            .planner_agent
154            .as_deref()
155            .map(str::trim)
156            .map(str::is_empty)
157            .unwrap_or(true)
158        {
159            return self.handle_failure(
160                AdaptiveFailureClass::Misconfigured,
161                anyhow!("adaptive planner is enabled but planner_agent is not configured"),
162                context,
163                None,
164            );
165        }
166
167        let prompt = self.build_prompt(context)?;
168        let response = match executor.execute(&prompt, &self.config).await {
169            Ok(response) => response,
170            Err(err) => {
171                return self.handle_failure(
172                    AdaptiveFailureClass::ExecutorFailure,
173                    err,
174                    context,
175                    None,
176                );
177            }
178        };
179
180        let plan = match serde_json::from_str::<DynamicExecutionPlan>(&response) {
181            Ok(plan) => plan,
182            Err(err) => {
183                return self.handle_failure(
184                    AdaptiveFailureClass::InvalidJson,
185                    anyhow!("adaptive planner returned invalid JSON: {}", err),
186                    context,
187                    Some(response),
188                );
189            }
190        };
191
192        if let Err(err) = validate_generated_plan(&plan) {
193            return self.handle_failure(
194                AdaptiveFailureClass::InvalidPlan,
195                err,
196                context,
197                Some(response),
198            );
199        }
200
201        Ok(AdaptivePlanOutcome {
202            plan,
203            metadata: AdaptivePlanMetadata {
204                source: AdaptivePlanSource::Planner,
205                used_fallback: false,
206                error_class: None,
207                error_message: None,
208            },
209            raw_output: Some(response),
210        })
211    }
212
213    fn handle_failure(
214        &self,
215        class: AdaptiveFailureClass,
216        err: anyhow::Error,
217        context: &StepPrehookContext,
218        raw_output: Option<String>,
219    ) -> Result<AdaptivePlanOutcome> {
220        match self.config.fallback_mode {
221            AdaptiveFallbackMode::SoftFallback => {
222                tracing::warn!(
223                    error_class = ?class,
224                    error = %err,
225                    task_id = %context.task_id,
226                    item_id = %context.task_item_id,
227                    "adaptive planner failed; using deterministic fallback"
228                );
229                Ok(AdaptivePlanOutcome {
230                    plan: deterministic_fallback_plan(context),
231                    metadata: AdaptivePlanMetadata {
232                        source: AdaptivePlanSource::DeterministicFallback,
233                        used_fallback: true,
234                        error_class: Some(class),
235                        error_message: Some(err.to_string()),
236                    },
237                    raw_output,
238                })
239            }
240            AdaptiveFallbackMode::FailClosed => Err(err.context(format!(
241                "adaptive planning failed ({})",
242                adaptive_failure_class_name(class)
243            ))),
244        }
245    }
246
247    fn build_prompt(&self, context: &StepPrehookContext) -> Result<String> {
248        let history_json =
249            serde_json::to_string(&self.history).context("serialize adaptive planner history")?;
250
251        Ok(format!(
252            r#"You are an adaptive workflow planner for an agent orchestrator.
253Return ONLY valid JSON that deserializes into:
254{{
255  "entry": "optional-node-id",
256  "nodes": {{
257    "node_id": {{
258      "id": "node_id",
259      "step_type": "qa|fix|retest|custom",
260      "agent_id": "optional-agent-id",
261      "template": "optional-command-template",
262      "prehook": {{
263        "engine": "cel",
264        "when": "expression",
265        "reason": "optional",
266        "extended": false
267      }},
268      "is_guard": false,
269      "repeatable": true
270    }}
271  }},
272  "edges": [
273    {{
274      "from": "node_id",
275      "to": "node_id",
276      "condition": "optional expression"
277    }}
278  ]
279}}
280
281Rules:
282- Output JSON only, no markdown or prose.
283- All node ids must be unique.
284- The graph must be acyclic.
285- Keep plans minimal and executable.
286- If fix is unnecessary, omit it instead of adding unreachable nodes.
287- Use the configured agent_id only when you need to pin a specific agent.
288- Temperature hint: {}
289
290Context:
291- Task: {}
292- Item: {}
293- Cycle: {}
294- Active step: {}
295- QA file path: {}
296- Item status: {}
297- Task status: {}
298- QA exit code: {:?}
299- Fix exit code: {:?}
300- Retest exit code: {:?}
301- Active tickets: {}
302- New tickets: {}
303- QA failed: {}
304- Fix required: {}
305- QA confidence: {:?}
306- QA quality score: {:?}
307- Build error count: {}
308- Test failure count: {}
309- Build exit code: {:?}
310- Test exit code: {:?}
311- Self test exit code: {:?}
312- Self test passed: {}
313- Max cycles: {}
314- Is last cycle: {}
315- Self referential safe: {}
316
317Recent execution history:
318{}
319"#,
320            self.config.temperature,
321            context.task_id,
322            context.task_item_id,
323            context.cycle,
324            context.step,
325            context.qa_file_path,
326            context.item_status,
327            context.task_status,
328            context.qa_exit_code,
329            context.fix_exit_code,
330            context.retest_exit_code,
331            context.active_ticket_count,
332            context.new_ticket_count,
333            context.qa_failed,
334            context.fix_required,
335            context.qa_confidence,
336            context.qa_quality_score,
337            context.build_error_count,
338            context.test_failure_count,
339            context.build_exit_code,
340            context.test_exit_code,
341            context.self_test_exit_code,
342            context.self_test_passed,
343            context.max_cycles,
344            context.is_last_cycle,
345            context.self_referential_safe,
346            history_json
347        ))
348    }
349}
350
351/// Returns the stable label for an adaptive failure class.
352pub fn adaptive_failure_class_name(class: AdaptiveFailureClass) -> &'static str {
353    match class {
354        AdaptiveFailureClass::Disabled => "disabled",
355        AdaptiveFailureClass::Misconfigured => "misconfigured",
356        AdaptiveFailureClass::ExecutorFailure => "executor_failure",
357        AdaptiveFailureClass::InvalidJson => "invalid_json",
358        AdaptiveFailureClass::InvalidPlan => "invalid_plan",
359    }
360}
361
362/// Returns the deterministic fallback plan used when adaptive planning degrades.
363pub fn deterministic_fallback_plan(_context: &StepPrehookContext) -> DynamicExecutionPlan {
364    let mut plan = DynamicExecutionPlan::new();
365
366    let _ = plan.add_node(WorkflowNode {
367        id: "qa".to_string(),
368        step_type: "qa".to_string(),
369        agent_id: None,
370        template: None,
371        prehook: None,
372        is_guard: false,
373        repeatable: false,
374    });
375
376    let _ = plan.add_node(WorkflowNode {
377        id: "fix".to_string(),
378        step_type: "fix".to_string(),
379        agent_id: None,
380        template: None,
381        prehook: Some(PrehookConfig {
382            engine: "cel".to_string(),
383            when: "active_ticket_count > 0".to_string(),
384            reason: Some("Only run fix when there are active tickets".to_string()),
385            extended: false,
386        }),
387        is_guard: false,
388        repeatable: true,
389    });
390
391    let _ = plan.add_edge(WorkflowEdge {
392        from: "qa".to_string(),
393        to: "fix".to_string(),
394        condition: Some("qa_exit_code != 0 || active_ticket_count > 0".to_string()),
395    });
396
397    plan.entry = Some("qa".to_string());
398    plan
399}
400
401/// Validates the structure of a generated adaptive execution plan.
402pub fn validate_generated_plan(plan: &DynamicExecutionPlan) -> Result<()> {
403    if plan.nodes.is_empty() {
404        anyhow::bail!("adaptive plan must define at least one node");
405    }
406
407    if let Some(entry) = plan.entry.as_deref() {
408        if !plan.nodes.contains_key(entry) {
409            anyhow::bail!("adaptive plan entry node '{}' does not exist", entry);
410        }
411    }
412
413    for (node_id, node) in &plan.nodes {
414        if node.id.trim().is_empty() {
415            anyhow::bail!("adaptive plan contains node with empty id");
416        }
417        if node.id != *node_id {
418            anyhow::bail!(
419                "adaptive plan node key '{}' does not match node.id '{}'",
420                node_id,
421                node.id
422            );
423        }
424        if node.step_type.trim().is_empty() {
425            anyhow::bail!("adaptive plan node '{}' has empty step_type", node.id);
426        }
427    }
428
429    for edge in &plan.edges {
430        if !plan.nodes.contains_key(&edge.from) {
431            anyhow::bail!("adaptive plan edge source '{}' does not exist", edge.from);
432        }
433        if !plan.nodes.contains_key(&edge.to) {
434            anyhow::bail!("adaptive plan edge target '{}' does not exist", edge.to);
435        }
436    }
437
438    if plan.has_cycles() {
439        anyhow::bail!("adaptive plan must be acyclic");
440    }
441
442    Ok(())
443}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448
449    struct MockExecutor {
450        response: Option<String>,
451        error: Option<String>,
452    }
453
454    #[async_trait]
455    impl AdaptivePlanExecutor for MockExecutor {
456        async fn execute(&self, _prompt: &str, _config: &AdaptivePlannerConfig) -> Result<String> {
457            match (&self.response, &self.error) {
458                (Some(response), None) => Ok(response.clone()),
459                (None, Some(error)) => Err(anyhow!(error.clone())),
460                _ => Err(anyhow!("mock executor misconfigured")),
461            }
462        }
463    }
464
465    fn enabled_config() -> AdaptivePlannerConfig {
466        AdaptivePlannerConfig {
467            enabled: true,
468            planner_agent: Some("adaptive-agent".to_string()),
469            ..Default::default()
470        }
471    }
472
473    #[tokio::test]
474    async fn test_adaptive_planner_disabled() {
475        let planner = AdaptivePlanner::new(AdaptivePlannerConfig::default());
476        let executor = MockExecutor {
477            response: Some("{}".to_string()),
478            error: None,
479        };
480
481        let result = planner
482            .generate_plan(&executor, &StepPrehookContext::default())
483            .await;
484        assert!(result.is_err());
485    }
486
487    #[test]
488    fn test_adaptive_planner_config_default() {
489        let cfg = AdaptivePlannerConfig::default();
490        assert!(!cfg.enabled);
491        assert!(cfg.planner_agent.is_none());
492        assert_eq!(cfg.max_history, 10);
493        assert!((cfg.temperature - 0.7).abs() < f32::EPSILON);
494        assert_eq!(cfg.fallback_mode, AdaptiveFallbackMode::SoftFallback);
495    }
496
497    #[test]
498    fn test_adaptive_planner_add_history_respects_max() {
499        let mut planner = AdaptivePlanner::new(AdaptivePlannerConfig {
500            max_history: 2,
501            ..enabled_config()
502        });
503
504        for i in 0..5 {
505            planner.add_history(ExecutionHistoryRecord {
506                task_id: format!("task_{}", i),
507                item_id: "item".to_string(),
508                cycle: i,
509                steps: vec![],
510                final_status: "done".to_string(),
511                timestamp: Utc::now(),
512            });
513        }
514        assert_eq!(planner.history().len(), 2);
515        assert_eq!(planner.history()[0].task_id, "task_3");
516        assert_eq!(planner.history()[1].task_id, "task_4");
517    }
518
519    #[tokio::test]
520    async fn test_adaptive_planner_generate_plan_enabled() {
521        let planner = AdaptivePlanner::new(enabled_config());
522        let executor = MockExecutor {
523            response: Some(
524                r#"{"entry":"qa","nodes":{"qa":{"id":"qa","step_type":"qa","repeatable":false},"fix":{"id":"fix","step_type":"fix","repeatable":true}},"edges":[{"from":"qa","to":"fix","condition":"active_ticket_count > 0"}]}"#
525                    .to_string(),
526            ),
527            error: None,
528        };
529
530        let outcome = planner
531            .generate_plan(&executor, &StepPrehookContext::default())
532            .await
533            .expect("adaptive planner should generate a plan when enabled");
534        assert_eq!(outcome.metadata.source, AdaptivePlanSource::Planner);
535        assert!(outcome.plan.nodes.contains_key("qa"));
536        assert!(outcome.plan.nodes.contains_key("fix"));
537        assert_eq!(outcome.plan.edges.len(), 1);
538    }
539
540    #[tokio::test]
541    async fn test_adaptive_planner_soft_fallback_on_invalid_json() {
542        let planner = AdaptivePlanner::new(enabled_config());
543        let executor = MockExecutor {
544            response: Some("not-json".to_string()),
545            error: None,
546        };
547
548        let outcome = planner
549            .generate_plan(&executor, &StepPrehookContext::default())
550            .await
551            .expect("soft fallback should succeed");
552        assert!(outcome.metadata.used_fallback);
553        assert_eq!(
554            outcome.metadata.error_class,
555            Some(AdaptiveFailureClass::InvalidJson)
556        );
557        assert_eq!(
558            outcome.metadata.source,
559            AdaptivePlanSource::DeterministicFallback
560        );
561        assert_eq!(outcome.plan.entry.as_deref(), Some("qa"));
562    }
563
564    #[tokio::test]
565    async fn test_adaptive_planner_fail_closed_on_invalid_json() {
566        let planner = AdaptivePlanner::new(AdaptivePlannerConfig {
567            fallback_mode: AdaptiveFallbackMode::FailClosed,
568            ..enabled_config()
569        });
570        let executor = MockExecutor {
571            response: Some("not-json".to_string()),
572            error: None,
573        };
574
575        let err = planner
576            .generate_plan(&executor, &StepPrehookContext::default())
577            .await
578            .expect_err("fail closed should error");
579        assert!(err.to_string().contains("invalid_json"));
580    }
581
582    #[tokio::test]
583    async fn test_adaptive_planner_rejects_missing_planner_agent() {
584        let planner = AdaptivePlanner::new(AdaptivePlannerConfig {
585            enabled: true,
586            planner_agent: None,
587            ..Default::default()
588        });
589        let executor = MockExecutor {
590            response: Some("{}".to_string()),
591            error: None,
592        };
593
594        let outcome = planner
595            .generate_plan(&executor, &StepPrehookContext::default())
596            .await
597            .expect("soft fallback should handle misconfiguration");
598        assert_eq!(
599            outcome.metadata.error_class,
600            Some(AdaptiveFailureClass::Misconfigured)
601        );
602    }
603
604    #[test]
605    fn test_validate_generated_plan_rejects_unknown_entry() {
606        let plan = DynamicExecutionPlan {
607            nodes: std::collections::HashMap::new(),
608            edges: vec![],
609            entry: Some("missing".to_string()),
610        };
611        let err = validate_generated_plan(&plan).expect_err("plan should fail");
612        assert!(err.to_string().contains("at least one node"));
613    }
614}