1mod blob_store;
4mod components;
5mod observability;
6mod runtime_builder;
7
8use std::path::PathBuf;
9use std::sync::{Arc, OnceLock};
10
11use tokio::sync::RwLock;
12
13use async_trait::async_trait;
14use thiserror::Error;
15
16use crate::action::{
17 ActionConfigError, ActionFactory, ActionRegistryManager, DefaultActionFactory,
18};
19use crate::agent::default_action_preflight_hook;
20use crate::context::{BasicContextBuilder, TokenBudget};
21use crate::planner::LlmBuildError;
22use crate::skill::discovery::discover_skills;
23use crate::skill::SkillCatalog;
24use orchestral_core::action::extract_meta;
25use orchestral_core::config::{load_config, ConfigError};
26use orchestral_core::executor::Executor;
27use orchestral_core::io::{BlobIoError, BlobStore};
28use orchestral_core::normalizer::PlanNormalizer;
29use orchestral_core::spi::{
30 ComponentRegistry, HookRegistry, RuntimeBuildRequest, RuntimeComponentFactory, SpiError,
31 SpiMeta, StoreBundle,
32};
33use orchestral_core::store::{InMemoryEventStore, InMemoryTaskStore, StoreError};
34
35use crate::orchestrator::OrchestratorConfig;
36use crate::{Orchestrator, Thread, ThreadRuntime, ThreadRuntimeConfig};
37
38pub use self::blob_store::InMemoryBlobStore;
39use self::observability::init_tracing_if_needed;
40use self::runtime_builder::{
41 build_agent_step_executor, build_planner, build_runtime_component_options,
42 concurrency_policy_from_name,
43};
44
45#[derive(Debug, Error)]
47pub enum BootstrapError {
48 #[error("config error: {0}")]
49 Config(#[from] ConfigError),
50 #[error("action config error: {0}")]
51 ActionConfig(#[from] ActionConfigError),
52 #[error("planner build error: {0}")]
53 PlannerBuild(#[from] LlmBuildError),
54 #[error("store error: {0}")]
55 Store(#[from] StoreError),
56 #[error("unsupported planner mode: {0}")]
57 UnsupportedPlannerMode(String),
58 #[error("unsupported interpreter mode: {0}")]
59 UnsupportedInterpreterMode(String),
60 #[error("unsupported concurrency policy: {0}")]
61 UnsupportedConcurrencyPolicy(String),
62 #[error("missing provider config for planner mode llm")]
63 MissingProviderConfig,
64 #[error("backend '{0}' not found")]
65 BackendNotFound(String),
66 #[error("model profile '{0}' not found")]
67 ModelProfileNotFound(String),
68 #[error("blob io error: {0}")]
69 Blob(#[from] BlobIoError),
70 #[error("spi error: {0}")]
71 Spi(#[from] SpiError),
72}
73
74pub struct RuntimeApp {
76 pub orchestrator: Orchestrator,
77 pub blob_store: Arc<dyn BlobStore>,
78 pub hook_registry: Arc<HookRegistry>,
79 pub action_registry_manager: Arc<ActionRegistryManager>,
80}
81
82static TRACING_INIT: OnceLock<()> = OnceLock::new();
83
84pub struct DefaultRuntimeComponentFactory;
86
87#[async_trait]
88impl RuntimeComponentFactory for DefaultRuntimeComponentFactory {
89 async fn build(&self, _request: &RuntimeBuildRequest) -> Result<ComponentRegistry, SpiError> {
90 Ok(ComponentRegistry::new()
91 .with_stores(StoreBundle {
92 event_store: Arc::new(InMemoryEventStore::new()),
93 task_store: Arc::new(InMemoryTaskStore::new()),
94 })
95 .with_blob_store(Arc::new(InMemoryBlobStore::default())))
96 }
97}
98
99impl RuntimeApp {
100 pub async fn from_config_path(path: impl Into<PathBuf>) -> Result<Self, BootstrapError> {
102 let action_factory: Arc<dyn ActionFactory> = Arc::new(DefaultActionFactory::new());
103 Self::from_config_path_with_spi(
104 path,
105 Arc::new(DefaultRuntimeComponentFactory),
106 Arc::new(HookRegistry::new()),
107 action_factory,
108 )
109 .await
110 }
111
112 pub async fn from_config_path_with_component_factory(
114 path: impl Into<PathBuf>,
115 component_factory: Arc<dyn RuntimeComponentFactory>,
116 ) -> Result<Self, BootstrapError> {
117 let action_factory: Arc<dyn ActionFactory> = Arc::new(DefaultActionFactory::new());
118 Self::from_config_path_with_spi(
119 path,
120 component_factory,
121 Arc::new(HookRegistry::new()),
122 action_factory,
123 )
124 .await
125 }
126
127 pub async fn from_config_path_with_spi(
129 path: impl Into<PathBuf>,
130 component_factory: Arc<dyn RuntimeComponentFactory>,
131 hook_registry: Arc<HookRegistry>,
132 action_factory: Arc<dyn ActionFactory>,
133 ) -> Result<Self, BootstrapError> {
134 let path = path.into();
135 let config = load_config(&path)?;
136 init_tracing_if_needed(&TRACING_INIT, &config.observability);
137 let build_request = RuntimeBuildRequest {
138 meta: SpiMeta::runtime_defaults(env!("CARGO_PKG_VERSION")),
139 config_path: path.to_string_lossy().to_string(),
140 profile: None,
141 options: build_runtime_component_options(&config),
142 };
143 let components = component_factory.build(&build_request).await?;
144 let stores = components.stores.unwrap_or_else(|| {
145 tracing::warn!("component factory missing stores; fallback to in-memory stores");
146 StoreBundle {
147 event_store: Arc::new(InMemoryEventStore::new()),
148 task_store: Arc::new(InMemoryTaskStore::new()),
149 }
150 });
151 let blob_store: Arc<dyn BlobStore> = components.blob_store.unwrap_or_else(|| {
152 tracing::warn!(
153 "component factory missing blob_store; fallback to in-memory blob store"
154 );
155 Arc::new(InMemoryBlobStore::default())
156 });
157
158 let policy = concurrency_policy_from_name(&config.runtime.concurrency_policy)?;
159 let runtime_cfg = ThreadRuntimeConfig {
160 max_interactions_per_thread: config.runtime.max_interactions_per_thread,
161 auto_cleanup: config.runtime.auto_cleanup,
162 };
163
164 let thread_runtime = ThreadRuntime::with_policy_and_config(
165 Thread::new(),
166 stores.event_store.clone(),
167 policy,
168 runtime_cfg,
169 );
170
171 let skill_entries = discover_skills(&config, &path)?;
172 let skill_catalog = Arc::new(RwLock::new(SkillCatalog::new(
173 skill_entries,
174 config.extensions.skill.max_active_skills,
175 )));
176
177 let action_registry_manager = Arc::new(
178 ActionRegistryManager::new(path.clone(), action_factory)
179 .with_skill_catalog(skill_catalog.clone()),
180 );
181 action_registry_manager.load().await?;
182
183 let executor = Executor::with_registry(action_registry_manager.registry())
184 .with_action_preflight_hook(default_action_preflight_hook())
185 .with_export_contract(config.runtime.strict_exports);
186 let executor = if let Some(agent_executor) = build_agent_step_executor(&config)? {
187 executor.with_agent_step_executor(agent_executor)
188 } else {
189 executor
190 };
191 let planner = build_planner(&config)?;
192
193 let mut normalizer = PlanNormalizer::new();
194 {
195 let registry = executor.action_registry.read().await;
196 for name in registry.names() {
197 if let Some(action) = registry.get(&name) {
198 normalizer.register_action_meta(&extract_meta(action.as_ref()));
199 } else {
200 normalizer.register_action(name);
201 }
202 }
203 }
204 let context_builder = Arc::new(BasicContextBuilder::new(stores.event_store.clone()));
205
206 let orchestrator_cfg = OrchestratorConfig {
207 history_limit: config.context.history_limit,
208 context_budget: TokenBudget::new(config.context.max_tokens),
209 include_history: config.context.include_history,
210 auto_replan_once: true,
211 auto_repair_plan_once: true,
212 max_planner_iterations: config.runtime.max_planner_iterations,
213 };
214
215 let orchestrator = Orchestrator::with_config(
216 thread_runtime,
217 planner,
218 normalizer,
219 executor,
220 stores.task_store,
221 orchestrator_cfg,
222 )
223 .with_context_builder(context_builder)
224 .with_hook_registry(hook_registry.clone())
225 .with_skill_catalog(skill_catalog)
226 .with_skill_config_path(path.clone());
227
228 Ok(Self {
229 orchestrator,
230 blob_store,
231 hook_registry,
232 action_registry_manager,
233 })
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use bytes::Bytes;
241 use orchestral_core::config::OrchestralConfig;
242 use orchestral_core::io::BlobWriteRequest;
243
244 #[tokio::test]
245 async fn test_default_component_factory_builds_min_runtime_components() {
246 let factory = DefaultRuntimeComponentFactory;
247 let request = RuntimeBuildRequest {
248 meta: SpiMeta::runtime_defaults("0.1.0"),
249 config_path: "/tmp/orchestral.yaml".to_string(),
250 profile: None,
251 options: serde_json::Map::new(),
252 };
253 let components = factory.build(&request).await.expect("components");
254
255 assert!(components.stores.is_some());
256 assert!(components.blob_store.is_some());
257 }
258
259 #[tokio::test]
260 async fn test_default_blob_store_writes_and_reads() {
261 let store: Arc<dyn BlobStore> = Arc::new(InMemoryBlobStore::default());
262 let payload = vec![1_u8, 2, 3];
263 let request = BlobWriteRequest::new(Box::pin(futures_util::stream::once(async move {
264 Ok(Bytes::from(payload))
265 })));
266 let written = store.write(request).await.expect("write");
267 assert_eq!(written.byte_size, 3);
268 let read = store.read(&written.id).await.expect("read");
269 assert_eq!(read.meta.id, written.id);
270 }
271
272 #[test]
273 fn test_runtime_component_options_include_store_and_blob_hints() {
274 let config = OrchestralConfig::default();
275 let options = build_runtime_component_options(&config);
276
277 assert!(options.get("stores").is_some());
278 assert!(options.get("blobs").is_some());
279 }
280
281 #[test]
282 fn test_queue_concurrency_policy_is_rejected_in_bootstrap() {
283 let err = match concurrency_policy_from_name("queue") {
284 Ok(_) => panic!("queue should be unsupported"),
285 Err(err) => err,
286 };
287 match err {
288 BootstrapError::UnsupportedConcurrencyPolicy(message) => {
289 assert!(message.contains("not implemented"));
290 }
291 other => panic!("expected UnsupportedConcurrencyPolicy, got {}", other),
292 }
293 }
294}