Skip to main content

composable_runtime/runtime/
mod.rs

1use anyhow::Result;
2use serde::de::DeserializeOwned;
3use std::collections::HashMap;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use crate::composition::graph::ComponentGraph;
8use crate::composition::registry::{HostCapability, HostCapabilityFactory, build_registries};
9use crate::config::types::{ConfigHandler, DefinitionLoader};
10use crate::service::Service;
11#[cfg(feature = "messaging")]
12use crate::types::MessagePublisher;
13use crate::types::{Component, ComponentInvoker};
14
15mod grpc;
16pub(crate) mod host;
17
18use host::ComponentHost;
19
20/// Composable Runtime for invoking Wasm Components
21pub struct Runtime {
22    host: ComponentHost,
23    services: Vec<Box<dyn Service>>,
24    #[cfg(feature = "messaging")]
25    publisher: Arc<dyn MessagePublisher>,
26}
27
28impl Runtime {
29    /// Create a RuntimeBuilder
30    pub fn builder() -> RuntimeBuilder {
31        RuntimeBuilder::new()
32    }
33
34    /// List all components
35    pub fn list_components(&self) -> Vec<Component> {
36        self.host
37            .component_registry
38            .get_components()
39            .map(|spec| Component {
40                name: spec.name.clone(),
41                functions: spec.functions.clone(),
42            })
43            .collect()
44    }
45
46    /// Get a specific component by name
47    pub fn get_component(&self, name: &str) -> Option<Component> {
48        self.host.get_component(name)
49    }
50
51    /// Invoke a component function
52    pub async fn invoke(
53        &self,
54        component_name: &str,
55        function_name: &str,
56        args: Vec<serde_json::Value>,
57    ) -> Result<serde_json::Value> {
58        ComponentInvoker::invoke(&self.host, component_name, function_name, args).await
59    }
60
61    /// Invoke a component function with environment variables
62    pub async fn invoke_with_env(
63        &self,
64        component_name: &str,
65        function_name: &str,
66        args: Vec<serde_json::Value>,
67        env_vars: &[(&str, &str)],
68    ) -> Result<serde_json::Value> {
69        self.host
70            .invoke(component_name, function_name, args, env_vars)
71            .await
72    }
73
74    /// Instantiate a component
75    pub async fn instantiate(
76        &self,
77        component_name: &str,
78    ) -> Result<(
79        wasmtime::Store<crate::types::ComponentState>,
80        wasmtime::component::Instance,
81    )> {
82        self.instantiate_with_env(component_name, &[]).await
83    }
84
85    /// Instantiate a component with environment variables
86    pub async fn instantiate_with_env(
87        &self,
88        component_name: &str,
89        env_vars: &[(&str, &str)],
90    ) -> Result<(
91        wasmtime::Store<crate::types::ComponentState>,
92        wasmtime::component::Instance,
93    )> {
94        self.host.instantiate(component_name, env_vars).await
95    }
96
97    /// Get a component invoker for this runtime.
98    pub fn invoker(&self) -> Arc<dyn ComponentInvoker> {
99        Arc::new(self.host.clone())
100    }
101
102    /// Get a message publisher for this runtime (messaging feature only).
103    #[cfg(feature = "messaging")]
104    pub fn publisher(&self) -> Arc<dyn MessagePublisher> {
105        Arc::clone(&self.publisher)
106    }
107
108    /// Start the runtime (services, in registration order).
109    ///
110    /// Injects dependencies (`set_invoker`, `set_publisher`) into each
111    /// service before calling `start()`.
112    pub fn start(&self) -> Result<()> {
113        let invoker: Arc<dyn ComponentInvoker> = Arc::new(self.host.clone());
114        for service in &self.services {
115            service.set_invoker(invoker.clone());
116            #[cfg(feature = "messaging")]
117            service.set_publisher(Arc::clone(&self.publisher));
118            service.start()?;
119        }
120        Ok(())
121    }
122
123    /// Shutdown all services in reverse registration order.
124    pub async fn shutdown(&self) {
125        for service in self.services.iter().rev() {
126            service.shutdown().await;
127        }
128    }
129
130    /// Start the runtime and block until a shutdown signal (SIGINT/SIGTERM).
131    ///
132    /// Intended for long-lived processes (`composable run`).
133    /// For one-off invocations, use `start()` / `shutdown().await` directly.
134    pub async fn run(&self) -> Result<()> {
135        self.start()?;
136        wait_for_shutdown().await?;
137        self.shutdown().await;
138        Ok(())
139    }
140}
141
142/// Builder for configuring and creating a Runtime
143pub struct RuntimeBuilder {
144    paths: Vec<PathBuf>,
145    loaders: Vec<Box<dyn DefinitionLoader>>,
146    handlers: Vec<Box<dyn ConfigHandler>>,
147    services: Vec<Box<dyn Service>>,
148    factories: HashMap<&'static str, HostCapabilityFactory>,
149    use_default_loaders: bool,
150}
151
152impl RuntimeBuilder {
153    fn new() -> Self {
154        Self {
155            paths: Vec::new(),
156            loaders: Vec::new(),
157            handlers: Vec::new(),
158            services: Vec::new(),
159            factories: HashMap::new(),
160            use_default_loaders: true,
161        }
162    }
163
164    /// Add a definition source path (.toml, .wasm, oci://, etc.)
165    pub fn from_path(mut self, path: impl Into<PathBuf>) -> Self {
166        self.paths.push(path.into());
167        self
168    }
169
170    /// Add multiple definition source paths
171    pub fn from_paths(mut self, paths: &[PathBuf]) -> Self {
172        self.paths.extend_from_slice(paths);
173        self
174    }
175
176    /// Register a custom definition loader
177    pub fn with_definition_loader(mut self, loader: Box<dyn DefinitionLoader>) -> Self {
178        self.loaders.push(loader);
179        self
180    }
181
182    /// Register a standalone config handler
183    pub fn with_config_handler(mut self, handler: Box<dyn ConfigHandler>) -> Self {
184        self.handlers.push(handler);
185        self
186    }
187
188    /// Opt out of the default TomlLoader + WasmLoader
189    pub fn no_default_loaders(mut self) -> Self {
190        self.use_default_loaders = false;
191        self
192    }
193
194    /// Register a lifecycle-managed service.
195    ///
196    /// The service's config handler (if any) participates in config parsing.
197    /// Its capabilities are registered after config parsing. Its `start()`
198    /// and `shutdown()` are called during the runtime lifecycle.
199    pub fn with_service<T: Service + Default + 'static>(mut self) -> Self {
200        self.services.push(Box::new(T::default()));
201        self
202    }
203
204    /// Register a host capability type for the given name.
205    ///
206    /// The name corresponds to the `type` value in `[capability.*]` TOML blocks.
207    ///
208    /// If the config is empty and deserialization fails,
209    /// falls back to `Default::default()`.
210    pub fn with_capability<T>(mut self, name: &'static str) -> Self
211    where
212        T: HostCapability + DeserializeOwned + Default + 'static,
213    {
214        self.factories.insert(
215            name,
216            Box::new(
217                |config: serde_json::Value| -> Result<Box<dyn HostCapability>> {
218                    match serde_json::from_value::<T>(config.clone()) {
219                        Ok(instance) => Ok(Box::new(instance)),
220                        Err(e) => {
221                            if config == serde_json::json!({}) {
222                                Ok(Box::new(T::default()))
223                            } else {
224                                Err(e.into())
225                            }
226                        }
227                    }
228                },
229            ),
230        );
231        self
232    }
233
234    /// Build the Runtime: load config, build graph, build registries, create component host
235    #[allow(unused_mut)]
236    pub async fn build(mut self) -> Result<Runtime> {
237        // Auto-register MessagingService when feature is enabled
238        #[cfg(feature = "messaging")]
239        let messaging_publisher: Arc<dyn MessagePublisher> = {
240            let svc = crate::messaging::MessagingService::new();
241            let publisher = svc.publisher();
242            self.services.push(Box::new(svc));
243            publisher
244        };
245
246        let mut graph_builder = ComponentGraph::builder().from_paths(&self.paths);
247        if !self.use_default_loaders {
248            graph_builder = graph_builder.no_default_loaders();
249        }
250        for loader in self.loaders {
251            graph_builder = graph_builder.add_loader(loader);
252        }
253        for handler in self.handlers {
254            graph_builder = graph_builder.add_handler(handler);
255        }
256        // Add config handlers from registered services
257        for service in &self.services {
258            if let Some(handler) = service.config_handler() {
259                graph_builder = graph_builder.add_handler(handler);
260            }
261        }
262        let graph = graph_builder.build()?;
263
264        // Collect capability factories from both with_capability and service registrations
265        let mut factories = self.factories;
266        for service in &self.services {
267            for (name, factory) in service.capabilities() {
268                factories.insert(name, factory);
269            }
270        }
271
272        // Build registries from graph
273        let (component_registry, capability_registry) = build_registries(&graph, factories).await?;
274
275        // Create component host
276        let host = ComponentHost::new(component_registry, capability_registry)?;
277
278        Ok(Runtime {
279            host,
280            services: self.services,
281            #[cfg(feature = "messaging")]
282            publisher: messaging_publisher,
283        })
284    }
285}
286
287async fn wait_for_shutdown() -> Result<()> {
288    let ctrl_c = tokio::signal::ctrl_c();
289
290    #[cfg(unix)]
291    {
292        let mut sigterm =
293            tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
294        tokio::select! {
295            result = ctrl_c => result?,
296            _ = sigterm.recv() => {}
297        }
298    }
299
300    #[cfg(not(unix))]
301    ctrl_c.await?;
302
303    Ok(())
304}