Skip to main content

orchestral_runtime/
orchestrator.rs

1//! Orchestrator - minimal intent → plan → normalize → execute pipeline
2//!
3//! This bridges the ThreadRuntime (events + concurrency) with core planning/execution.
4
5mod agent_loop;
6mod entry;
7mod execution;
8mod output;
9mod planning;
10mod progress;
11#[cfg(test)]
12mod recovery;
13mod state;
14mod support;
15#[cfg(test)]
16mod tests;
17
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::Instant;
21
22use serde_json::Value;
23use tokio::sync::RwLock;
24
25use crate::context::{ContextBuilder, ContextError, ContextRequest, ContextWindow, TokenBudget};
26use crate::skill::SkillCatalog;
27use orchestral_core::action::{extract_meta, ActionMeta};
28use orchestral_core::executor::{ExecutionResult, Executor, ExecutorContext};
29use orchestral_core::interpreter::InterpretRequest;
30use orchestral_core::normalizer::{NormalizeError, PlanNormalizer};
31use orchestral_core::planner::{
32    HistoryItem, PlanError, Planner, PlannerContext, PlannerLoopContext, PlannerOutput,
33    PlannerRuntimeInfo,
34};
35use orchestral_core::spi::lifecycle::LifecycleHookRegistry;
36use orchestral_core::spi::HookRegistry;
37use orchestral_core::store::{Event, InteractionId, StoreError, TaskStore, WorkingSet};
38use orchestral_core::types::{
39    Intent, IntentContext, StepId, StepKind, Task, TaskId, TaskState, WaitUserReason,
40};
41use output::{execution_result_metadata, summarize_plan_steps};
42use progress::RuntimeProgressReporter;
43use state::{
44    apply_resume_event_to_working_set, complete_wait_step_for_resume, interaction_state_from_task,
45    restore_checkpoint, task_state_from_execution,
46};
47use support::{
48    context_window_to_history, drop_current_turn_user_input, event_to_history_item,
49    event_type_label, intent_from_event, summarize_working_set,
50};
51
52use crate::{HandleEventResult, InteractionState, RuntimeError, ThreadRuntime};
53
54const MAX_LOG_CHARS: usize = 8_000;
55
56fn truncate_for_log(input: &str, max_chars: usize) -> String {
57    let char_count = input.chars().count();
58    if char_count <= max_chars {
59        return input.to_string();
60    }
61    let mut preview: String = input.chars().take(max_chars).collect();
62    preview.push_str(&format!("... [truncated, total_chars={}]", char_count));
63    preview
64}
65
66fn truncate_debug_for_log(value: &impl std::fmt::Debug, max_chars: usize) -> String {
67    truncate_for_log(&format!("{:?}", value), max_chars)
68}
69
70/// Orchestrator result for a handled event
71#[derive(Debug)]
72pub enum OrchestratorResult {
73    /// A new interaction was started and executed
74    Started {
75        interaction_id: InteractionId,
76        task_id: TaskId,
77        result: ExecutionResult,
78    },
79    /// The event was merged into an existing interaction and executed
80    Merged {
81        interaction_id: InteractionId,
82        task_id: TaskId,
83        result: ExecutionResult,
84    },
85    /// The event was rejected
86    Rejected { reason: String },
87    /// The event was queued
88    Queued,
89}
90
91/// Orchestrator errors
92#[derive(Debug, thiserror::Error)]
93pub enum OrchestratorError {
94    #[error("runtime error: {0}")]
95    Runtime(#[from] RuntimeError),
96    #[error("planner error: {0}")]
97    Planner(#[from] PlanError),
98    #[error("normalize error: {0}")]
99    Normalize(#[from] NormalizeError),
100    #[error("store error: {0}")]
101    Store(#[from] StoreError),
102    #[error("context error: {0}")]
103    Context(#[from] ContextError),
104    #[error("task not found: {0}")]
105    TaskNotFound(String),
106    #[error("task has no plan: {0}")]
107    MissingPlan(String),
108    #[error("resume error: {0}")]
109    ResumeError(String),
110    #[error("unsupported event: {0}")]
111    UnsupportedEvent(String),
112}
113
114/// Orchestrator - wires runtime + planner + executor for a minimal pipeline
115pub struct Orchestrator {
116    pub thread_runtime: ThreadRuntime,
117    pub planner: Arc<dyn Planner>,
118    pub normalizer: PlanNormalizer,
119    pub executor: Executor,
120    pub task_store: Arc<dyn TaskStore>,
121    pub context_builder: Option<Arc<dyn ContextBuilder>>,
122    pub config: OrchestratorConfig,
123    pub hook_registry: Arc<HookRegistry>,
124    pub lifecycle_hooks: Arc<LifecycleHookRegistry>,
125    pub skill_catalog: Arc<RwLock<SkillCatalog>>,
126    /// Config path used for skill re-discovery (hot reload).
127    pub skill_config_path: Option<std::path::PathBuf>,
128}
129
130/// Orchestrator configuration
131#[derive(Debug, Clone)]
132pub struct OrchestratorConfig {
133    /// Max events to include in planner history (0 = all)
134    pub history_limit: usize,
135    /// Token budget for context assembly
136    pub context_budget: TokenBudget,
137    /// Whether to include history when building context
138    pub include_history: bool,
139    /// Retry once by replanning only the failed subgraph.
140    pub auto_replan_once: bool,
141    /// Retry once by asking the planner to repair a plan rejected by normalization.
142    pub auto_repair_plan_once: bool,
143    /// Max planner iterations inside one outer agent loop turn.
144    pub max_planner_iterations: usize,
145}
146
147impl Default for OrchestratorConfig {
148    fn default() -> Self {
149        Self {
150            history_limit: 50,
151            context_budget: TokenBudget::default(),
152            include_history: true,
153            auto_replan_once: true,
154            auto_repair_plan_once: true,
155            max_planner_iterations: 6,
156        }
157    }
158}
159
160impl Orchestrator {
161    /// Create a new orchestrator
162    pub fn new(
163        thread_runtime: ThreadRuntime,
164        planner: Arc<dyn Planner>,
165        normalizer: PlanNormalizer,
166        executor: Executor,
167        task_store: Arc<dyn TaskStore>,
168    ) -> Self {
169        Self::with_config(
170            thread_runtime,
171            planner,
172            normalizer,
173            executor,
174            task_store,
175            OrchestratorConfig::default(),
176        )
177    }
178
179    /// Create a new orchestrator with config
180    pub fn with_config(
181        thread_runtime: ThreadRuntime,
182        planner: Arc<dyn Planner>,
183        normalizer: PlanNormalizer,
184        executor: Executor,
185        task_store: Arc<dyn TaskStore>,
186        config: OrchestratorConfig,
187    ) -> Self {
188        Self {
189            thread_runtime,
190            planner,
191            normalizer,
192            executor,
193            task_store,
194            context_builder: None,
195            config,
196            hook_registry: Arc::new(HookRegistry::new()),
197            lifecycle_hooks: Arc::new(LifecycleHookRegistry::new()),
198            skill_catalog: Arc::new(RwLock::new(SkillCatalog::new(Vec::new(), 0))),
199            skill_config_path: None,
200        }
201    }
202
203    /// Attach a context builder (optional)
204    pub fn with_context_builder(mut self, builder: Arc<dyn ContextBuilder>) -> Self {
205        self.context_builder = Some(builder);
206        self
207    }
208
209    /// Attach runtime hook registry.
210    pub fn with_hook_registry(mut self, hook_registry: Arc<HookRegistry>) -> Self {
211        self.hook_registry = hook_registry;
212        self
213    }
214
215    /// Attach lifecycle hook registry for SDK pipeline hooks.
216    pub fn with_lifecycle_hooks(mut self, hooks: Arc<LifecycleHookRegistry>) -> Self {
217        self.lifecycle_hooks = hooks;
218        self
219    }
220
221    /// Attach skill catalog for planner-time instruction injection.
222    pub fn with_skill_catalog(mut self, skill_catalog: Arc<RwLock<SkillCatalog>>) -> Self {
223        self.skill_catalog = skill_catalog;
224        self
225    }
226
227    /// Set config path for skill hot-reload discovery.
228    pub fn with_skill_config_path(mut self, path: std::path::PathBuf) -> Self {
229        self.skill_config_path = Some(path);
230        self
231    }
232}