Skip to main content

orchestral_runtime/
bootstrap.rs

1//! Bootstrap helpers for starting Orchestral from a single YAML config.
2
3mod blob_store;
4mod components;
5mod observability;
6mod runtime_builder;
7
8use std::path::PathBuf;
9use std::sync::{Arc, OnceLock};
10
11use tokio::sync::RwLock;
12
13use async_trait::async_trait;
14use thiserror::Error;
15
16use crate::action::{
17    ActionConfigError, ActionFactory, ActionRegistryManager, DefaultActionFactory,
18};
19use crate::agent::default_action_preflight_hook;
20use crate::context::{BasicContextBuilder, TokenBudget};
21use crate::planner::LlmBuildError;
22use crate::skill::discovery::discover_skills;
23use crate::skill::SkillCatalog;
24use orchestral_core::action::extract_meta;
25use orchestral_core::config::{load_config, ConfigError};
26use orchestral_core::executor::Executor;
27use orchestral_core::io::{BlobIoError, BlobStore};
28use orchestral_core::normalizer::PlanNormalizer;
29use orchestral_core::spi::{
30    ComponentRegistry, HookRegistry, RuntimeBuildRequest, RuntimeComponentFactory, SpiError,
31    SpiMeta, StoreBundle,
32};
33use orchestral_core::store::{InMemoryEventStore, InMemoryTaskStore, StoreError};
34
35use crate::orchestrator::OrchestratorConfig;
36use crate::{Orchestrator, Thread, ThreadRuntime, ThreadRuntimeConfig};
37
38pub use self::blob_store::InMemoryBlobStore;
39use self::observability::init_tracing_if_needed;
40use self::runtime_builder::{
41    build_agent_step_executor, build_planner, build_runtime_component_options,
42    concurrency_policy_from_name,
43};
44
45/// Runtime bootstrap errors.
46#[derive(Debug, Error)]
47pub enum BootstrapError {
48    #[error("config error: {0}")]
49    Config(#[from] ConfigError),
50    #[error("action config error: {0}")]
51    ActionConfig(#[from] ActionConfigError),
52    #[error("planner build error: {0}")]
53    PlannerBuild(#[from] LlmBuildError),
54    #[error("store error: {0}")]
55    Store(#[from] StoreError),
56    #[error("unsupported planner mode: {0}")]
57    UnsupportedPlannerMode(String),
58    #[error("unsupported interpreter mode: {0}")]
59    UnsupportedInterpreterMode(String),
60    #[error("unsupported concurrency policy: {0}")]
61    UnsupportedConcurrencyPolicy(String),
62    #[error("missing provider config for planner mode llm")]
63    MissingProviderConfig,
64    #[error("backend '{0}' not found")]
65    BackendNotFound(String),
66    #[error("model profile '{0}' not found")]
67    ModelProfileNotFound(String),
68    #[error("blob io error: {0}")]
69    Blob(#[from] BlobIoError),
70    #[error("spi error: {0}")]
71    Spi(#[from] SpiError),
72}
73
74/// Running app bundle created from unified config.
75pub struct RuntimeApp {
76    pub orchestrator: Orchestrator,
77    pub blob_store: Arc<dyn BlobStore>,
78    pub hook_registry: Arc<HookRegistry>,
79    pub action_registry_manager: Arc<ActionRegistryManager>,
80}
81
82static TRACING_INIT: OnceLock<()> = OnceLock::new();
83
84/// Default component factory providing in-memory stores and blob storage.
85pub struct DefaultRuntimeComponentFactory;
86
87#[async_trait]
88impl RuntimeComponentFactory for DefaultRuntimeComponentFactory {
89    async fn build(&self, _request: &RuntimeBuildRequest) -> Result<ComponentRegistry, SpiError> {
90        Ok(ComponentRegistry::new()
91            .with_stores(StoreBundle {
92                event_store: Arc::new(InMemoryEventStore::new()),
93                task_store: Arc::new(InMemoryTaskStore::new()),
94            })
95            .with_blob_store(Arc::new(InMemoryBlobStore::default())))
96    }
97}
98
99impl RuntimeApp {
100    /// Create a runnable app from a single `orchestral.yaml`.
101    pub async fn from_config_path(path: impl Into<PathBuf>) -> Result<Self, BootstrapError> {
102        let action_factory: Arc<dyn ActionFactory> = Arc::new(DefaultActionFactory::new());
103        Self::from_config_path_with_spi(
104            path,
105            Arc::new(DefaultRuntimeComponentFactory),
106            Arc::new(HookRegistry::new()),
107            action_factory,
108        )
109        .await
110    }
111
112    /// Create a runnable app and inject custom runtime component factory.
113    pub async fn from_config_path_with_component_factory(
114        path: impl Into<PathBuf>,
115        component_factory: Arc<dyn RuntimeComponentFactory>,
116    ) -> Result<Self, BootstrapError> {
117        let action_factory: Arc<dyn ActionFactory> = Arc::new(DefaultActionFactory::new());
118        Self::from_config_path_with_spi(
119            path,
120            component_factory,
121            Arc::new(HookRegistry::new()),
122            action_factory,
123        )
124        .await
125    }
126
127    /// Create a runnable app and inject component factory + hook registry.
128    pub async fn from_config_path_with_spi(
129        path: impl Into<PathBuf>,
130        component_factory: Arc<dyn RuntimeComponentFactory>,
131        hook_registry: Arc<HookRegistry>,
132        action_factory: Arc<dyn ActionFactory>,
133    ) -> Result<Self, BootstrapError> {
134        let path = path.into();
135        let config = load_config(&path)?;
136        init_tracing_if_needed(&TRACING_INIT, &config.observability);
137        let build_request = RuntimeBuildRequest {
138            meta: SpiMeta::runtime_defaults(env!("CARGO_PKG_VERSION")),
139            config_path: path.to_string_lossy().to_string(),
140            profile: None,
141            options: build_runtime_component_options(&config),
142        };
143        let components = component_factory.build(&build_request).await?;
144        let stores = components.stores.unwrap_or_else(|| {
145            tracing::warn!("component factory missing stores; fallback to in-memory stores");
146            StoreBundle {
147                event_store: Arc::new(InMemoryEventStore::new()),
148                task_store: Arc::new(InMemoryTaskStore::new()),
149            }
150        });
151        let blob_store: Arc<dyn BlobStore> = components.blob_store.unwrap_or_else(|| {
152            tracing::warn!(
153                "component factory missing blob_store; fallback to in-memory blob store"
154            );
155            Arc::new(InMemoryBlobStore::default())
156        });
157
158        let policy = concurrency_policy_from_name(&config.runtime.concurrency_policy)?;
159        let runtime_cfg = ThreadRuntimeConfig {
160            max_interactions_per_thread: config.runtime.max_interactions_per_thread,
161            auto_cleanup: config.runtime.auto_cleanup,
162        };
163
164        let thread_runtime = ThreadRuntime::with_policy_and_config(
165            Thread::new(),
166            stores.event_store.clone(),
167            policy,
168            runtime_cfg,
169        );
170
171        let skill_entries = discover_skills(&config, &path)?;
172        let skill_catalog = Arc::new(RwLock::new(SkillCatalog::new(
173            skill_entries,
174            config.extensions.skill.max_active_skills,
175        )));
176
177        let action_registry_manager = Arc::new(
178            ActionRegistryManager::new(path.clone(), action_factory)
179                .with_skill_catalog(skill_catalog.clone()),
180        );
181        action_registry_manager.load().await?;
182
183        let executor = Executor::with_registry(action_registry_manager.registry())
184            .with_action_preflight_hook(default_action_preflight_hook())
185            .with_export_contract(config.runtime.strict_exports);
186        let executor = if let Some(agent_executor) = build_agent_step_executor(&config)? {
187            executor.with_agent_step_executor(agent_executor)
188        } else {
189            executor
190        };
191        let planner = build_planner(&config)?;
192
193        let mut normalizer = PlanNormalizer::new();
194        {
195            let registry = executor.action_registry.read().await;
196            for name in registry.names() {
197                if let Some(action) = registry.get(&name) {
198                    normalizer.register_action_meta(&extract_meta(action.as_ref()));
199                } else {
200                    normalizer.register_action(name);
201                }
202            }
203        }
204        let context_builder = Arc::new(BasicContextBuilder::new(stores.event_store.clone()));
205
206        let orchestrator_cfg = OrchestratorConfig {
207            history_limit: config.context.history_limit,
208            context_budget: TokenBudget::new(config.context.max_tokens),
209            include_history: config.context.include_history,
210            auto_replan_once: true,
211            auto_repair_plan_once: true,
212            max_planner_iterations: config.runtime.max_planner_iterations,
213        };
214
215        let orchestrator = Orchestrator::with_config(
216            thread_runtime,
217            planner,
218            normalizer,
219            executor,
220            stores.task_store,
221            orchestrator_cfg,
222        )
223        .with_context_builder(context_builder)
224        .with_hook_registry(hook_registry.clone())
225        .with_skill_catalog(skill_catalog)
226        .with_skill_config_path(path.clone());
227
228        Ok(Self {
229            orchestrator,
230            blob_store,
231            hook_registry,
232            action_registry_manager,
233        })
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use bytes::Bytes;
241    use orchestral_core::config::OrchestralConfig;
242    use orchestral_core::io::BlobWriteRequest;
243
244    #[tokio::test]
245    async fn test_default_component_factory_builds_min_runtime_components() {
246        let factory = DefaultRuntimeComponentFactory;
247        let request = RuntimeBuildRequest {
248            meta: SpiMeta::runtime_defaults("0.1.0"),
249            config_path: "/tmp/orchestral.yaml".to_string(),
250            profile: None,
251            options: serde_json::Map::new(),
252        };
253        let components = factory.build(&request).await.expect("components");
254
255        assert!(components.stores.is_some());
256        assert!(components.blob_store.is_some());
257    }
258
259    #[tokio::test]
260    async fn test_default_blob_store_writes_and_reads() {
261        let store: Arc<dyn BlobStore> = Arc::new(InMemoryBlobStore::default());
262        let payload = vec![1_u8, 2, 3];
263        let request = BlobWriteRequest::new(Box::pin(futures_util::stream::once(async move {
264            Ok(Bytes::from(payload))
265        })));
266        let written = store.write(request).await.expect("write");
267        assert_eq!(written.byte_size, 3);
268        let read = store.read(&written.id).await.expect("read");
269        assert_eq!(read.meta.id, written.id);
270    }
271
272    #[test]
273    fn test_runtime_component_options_include_store_and_blob_hints() {
274        let config = OrchestralConfig::default();
275        let options = build_runtime_component_options(&config);
276
277        assert!(options.get("stores").is_some());
278        assert!(options.get("blobs").is_some());
279    }
280
281    #[test]
282    fn test_queue_concurrency_policy_is_rejected_in_bootstrap() {
283        let err = match concurrency_policy_from_name("queue") {
284            Ok(_) => panic!("queue should be unsupported"),
285            Err(err) => err,
286        };
287        match err {
288            BootstrapError::UnsupportedConcurrencyPolicy(message) => {
289                assert!(message.contains("not implemented"));
290            }
291            other => panic!("expected UnsupportedConcurrencyPolicy, got {}", other),
292        }
293    }
294}