composable_runtime/runtime/
mod.rs1use anyhow::Result;
2use serde::de::DeserializeOwned;
3use std::collections::HashMap;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use crate::composition::graph::ComponentGraph;
8use crate::composition::registry::{HostCapability, HostCapabilityFactory, build_registries};
9use crate::config::types::{ConfigHandler, DefinitionLoader};
10use crate::service::Service;
11#[cfg(feature = "messaging")]
12use crate::types::MessagePublisher;
13use crate::types::{Component, ComponentInvoker};
14
15mod grpc;
16pub(crate) mod host;
17
18use host::ComponentHost;
19
20pub struct Runtime {
22 host: ComponentHost,
23 services: Vec<Box<dyn Service>>,
24 #[cfg(feature = "messaging")]
25 publisher: Arc<dyn MessagePublisher>,
26}
27
28impl Runtime {
29 pub fn builder() -> RuntimeBuilder {
31 RuntimeBuilder::new()
32 }
33
34 pub fn list_components(&self) -> Vec<Component> {
36 self.host
37 .component_registry
38 .get_components()
39 .map(|spec| Component {
40 name: spec.name.clone(),
41 functions: spec.functions.clone(),
42 })
43 .collect()
44 }
45
46 pub fn get_component(&self, name: &str) -> Option<Component> {
48 self.host.get_component(name)
49 }
50
51 pub async fn invoke(
53 &self,
54 component_name: &str,
55 function_name: &str,
56 args: Vec<serde_json::Value>,
57 ) -> Result<serde_json::Value> {
58 ComponentInvoker::invoke(&self.host, component_name, function_name, args).await
59 }
60
61 pub async fn invoke_with_env(
63 &self,
64 component_name: &str,
65 function_name: &str,
66 args: Vec<serde_json::Value>,
67 env_vars: &[(&str, &str)],
68 ) -> Result<serde_json::Value> {
69 self.host
70 .invoke(component_name, function_name, args, env_vars)
71 .await
72 }
73
74 pub async fn instantiate(
76 &self,
77 component_name: &str,
78 ) -> Result<(
79 wasmtime::Store<crate::types::ComponentState>,
80 wasmtime::component::Instance,
81 )> {
82 self.instantiate_with_env(component_name, &[]).await
83 }
84
85 pub async fn instantiate_with_env(
87 &self,
88 component_name: &str,
89 env_vars: &[(&str, &str)],
90 ) -> Result<(
91 wasmtime::Store<crate::types::ComponentState>,
92 wasmtime::component::Instance,
93 )> {
94 self.host.instantiate(component_name, env_vars).await
95 }
96
97 pub fn invoker(&self) -> Arc<dyn ComponentInvoker> {
99 Arc::new(self.host.clone())
100 }
101
102 #[cfg(feature = "messaging")]
104 pub fn publisher(&self) -> Arc<dyn MessagePublisher> {
105 Arc::clone(&self.publisher)
106 }
107
108 pub fn start(&self) -> Result<()> {
113 let invoker: Arc<dyn ComponentInvoker> = Arc::new(self.host.clone());
114 for service in &self.services {
115 service.set_invoker(invoker.clone());
116 #[cfg(feature = "messaging")]
117 service.set_publisher(Arc::clone(&self.publisher));
118 service.start()?;
119 }
120 Ok(())
121 }
122
123 pub async fn shutdown(&self) {
125 for service in self.services.iter().rev() {
126 service.shutdown().await;
127 }
128 }
129
130 pub async fn run(&self) -> Result<()> {
135 self.start()?;
136 wait_for_shutdown().await?;
137 self.shutdown().await;
138 Ok(())
139 }
140}
141
142pub struct RuntimeBuilder {
144 paths: Vec<PathBuf>,
145 loaders: Vec<Box<dyn DefinitionLoader>>,
146 handlers: Vec<Box<dyn ConfigHandler>>,
147 services: Vec<Box<dyn Service>>,
148 factories: HashMap<&'static str, HostCapabilityFactory>,
149 use_default_loaders: bool,
150}
151
152impl RuntimeBuilder {
153 fn new() -> Self {
154 Self {
155 paths: Vec::new(),
156 loaders: Vec::new(),
157 handlers: Vec::new(),
158 services: Vec::new(),
159 factories: HashMap::new(),
160 use_default_loaders: true,
161 }
162 }
163
164 pub fn from_path(mut self, path: impl Into<PathBuf>) -> Self {
166 self.paths.push(path.into());
167 self
168 }
169
170 pub fn from_paths(mut self, paths: &[PathBuf]) -> Self {
172 self.paths.extend_from_slice(paths);
173 self
174 }
175
176 pub fn with_definition_loader(mut self, loader: Box<dyn DefinitionLoader>) -> Self {
178 self.loaders.push(loader);
179 self
180 }
181
182 pub fn with_config_handler(mut self, handler: Box<dyn ConfigHandler>) -> Self {
184 self.handlers.push(handler);
185 self
186 }
187
188 pub fn no_default_loaders(mut self) -> Self {
190 self.use_default_loaders = false;
191 self
192 }
193
194 pub fn with_service<T: Service + Default + 'static>(mut self) -> Self {
200 self.services.push(Box::new(T::default()));
201 self
202 }
203
204 pub fn with_capability<T>(mut self, name: &'static str) -> Self
211 where
212 T: HostCapability + DeserializeOwned + Default + 'static,
213 {
214 self.factories.insert(
215 name,
216 Box::new(
217 |config: serde_json::Value| -> Result<Box<dyn HostCapability>> {
218 match serde_json::from_value::<T>(config.clone()) {
219 Ok(instance) => Ok(Box::new(instance)),
220 Err(e) => {
221 if config == serde_json::json!({}) {
222 Ok(Box::new(T::default()))
223 } else {
224 Err(e.into())
225 }
226 }
227 }
228 },
229 ),
230 );
231 self
232 }
233
234 #[allow(unused_mut)]
236 pub async fn build(mut self) -> Result<Runtime> {
237 #[cfg(feature = "messaging")]
239 let messaging_publisher: Arc<dyn MessagePublisher> = {
240 let svc = crate::messaging::MessagingService::new();
241 let publisher = svc.publisher();
242 self.services.push(Box::new(svc));
243 publisher
244 };
245
246 let mut graph_builder = ComponentGraph::builder().from_paths(&self.paths);
247 if !self.use_default_loaders {
248 graph_builder = graph_builder.no_default_loaders();
249 }
250 for loader in self.loaders {
251 graph_builder = graph_builder.add_loader(loader);
252 }
253 for handler in self.handlers {
254 graph_builder = graph_builder.add_handler(handler);
255 }
256 for service in &self.services {
258 if let Some(handler) = service.config_handler() {
259 graph_builder = graph_builder.add_handler(handler);
260 }
261 }
262 let graph = graph_builder.build()?;
263
264 let mut factories = self.factories;
266 for service in &self.services {
267 for (name, factory) in service.capabilities() {
268 factories.insert(name, factory);
269 }
270 }
271
272 let (component_registry, capability_registry) = build_registries(&graph, factories).await?;
274
275 let host = ComponentHost::new(component_registry, capability_registry)?;
277
278 Ok(Runtime {
279 host,
280 services: self.services,
281 #[cfg(feature = "messaging")]
282 publisher: messaging_publisher,
283 })
284 }
285}
286
287async fn wait_for_shutdown() -> Result<()> {
288 let ctrl_c = tokio::signal::ctrl_c();
289
290 #[cfg(unix)]
291 {
292 let mut sigterm =
293 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
294 tokio::select! {
295 result = ctrl_c => result?,
296 _ = sigterm.recv() => {}
297 }
298 }
299
300 #[cfg(not(unix))]
301 ctrl_c.await?;
302
303 Ok(())
304}