Skip to main content

worldinterface_host/
host.rs

1//! EmbeddedHost — the primary programmatic API surface for WorldInterface.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use actionqueue_core::ids::TaskId;
7use actionqueue_runtime::engine::ActionQueueEngine;
8use serde_json::Value;
9use tokio::sync::watch;
10use tokio::task::JoinHandle;
11use worldinterface_connector::ConnectorRegistry;
12use worldinterface_contextstore::ContextStore;
13use worldinterface_contextstore::SqliteContextStore;
14use worldinterface_coordinator::FlowHandler;
15use worldinterface_core::descriptor::Descriptor;
16use worldinterface_core::flowspec::FlowSpec;
17use worldinterface_core::id::{trigger_input_node_id, FlowRunId};
18
19use crate::config::HostConfig;
20use crate::error::HostError;
21use crate::helpers::{
22    build_single_node_flowspec, persist_coordinator_map, restore_coordinator_map,
23};
24use crate::status::{
25    derive_flow_run_status, extract_flow_outputs, FlowPhase, FlowRunStatus, FlowRunSummary,
26};
27use crate::tick::{tick_loop, Engine};
28
29/// The engine is wrapped in `Option` so it can be taken out and dropped on a
30/// blocking thread (AQ's internal tokio runtime panics if dropped from async context).
31pub(crate) type EngineSlot = Arc<tokio::sync::Mutex<Option<Engine>>>;
32
33/// Shared inner state accessible from both the public API and the tick loop.
34pub(crate) struct HostInner {
35    pub engine: EngineSlot,
36    pub store: Arc<SqliteContextStore>,
37    pub registry: Arc<ConnectorRegistry>,
38    pub coordinator_map: Arc<std::sync::Mutex<HashMap<FlowRunId, TaskId>>>,
39    pub compiler_config: worldinterface_flowspec::CompilerConfig,
40    pub shutdown_tx: watch::Sender<bool>,
41}
42
43/// The embedded WorldInterface host.
44///
45/// Owns the AQ engine, ContextStore, and ConnectorRegistry. Provides the full
46/// lifecycle API: flow submission, status queries, capability discovery, and
47/// single-op invocation.
48pub struct EmbeddedHost {
49    pub(crate) inner: Arc<HostInner>,
50    tick_handle: JoinHandle<()>,
51}
52
53impl EmbeddedHost {
54    /// Start the host: validate config, bootstrap AQ engine, restore coordinator map,
55    /// and launch the background tick loop.
56    pub async fn start(config: HostConfig, registry: ConnectorRegistry) -> Result<Self, HostError> {
57        config.validate()?;
58
59        // Ensure data directories exist
60        std::fs::create_dir_all(&config.aq_data_dir)?;
61        if let Some(parent) = config.context_store_path.parent() {
62            if !parent.as_os_str().is_empty() {
63                std::fs::create_dir_all(parent)?;
64            }
65        }
66
67        // Open ContextStore
68        let store = Arc::new(SqliteContextStore::open(&config.context_store_path)?);
69
70        // Restore coordinator map from ContextStore globals
71        let coordinator_map = restore_coordinator_map(&store)?;
72        let coordinator_map = Arc::new(std::sync::Mutex::new(coordinator_map));
73
74        // Build the FlowHandler and bootstrap AQ engine
75        let registry = Arc::new(registry);
76        let handler = FlowHandler::new(
77            Arc::clone(&registry),
78            Arc::clone(&store),
79            Arc::clone(&config.metrics),
80        );
81        let runtime_config = config.to_runtime_config();
82        let aq_engine = ActionQueueEngine::new(runtime_config, handler);
83        let bootstrapped = aq_engine.bootstrap()?;
84        let engine: EngineSlot = Arc::new(tokio::sync::Mutex::new(Some(bootstrapped)));
85
86        // Set up shutdown signaling
87        let (shutdown_tx, shutdown_rx) = watch::channel(false);
88
89        // Launch background tick loop
90        let tick_engine = Arc::clone(&engine);
91        let tick_map = Arc::clone(&coordinator_map);
92        let tick_store = Arc::clone(&store);
93        let tick_interval = config.tick_interval;
94        let tick_metrics = Arc::clone(&config.metrics);
95        let tick_handle = tokio::spawn(async move {
96            tick_loop(tick_engine, tick_map, tick_store, tick_interval, shutdown_rx, tick_metrics)
97                .await;
98        });
99
100        let inner = Arc::new(HostInner {
101            engine,
102            store,
103            registry,
104            coordinator_map,
105            compiler_config: config.compiler_config,
106            shutdown_tx,
107        });
108
109        Ok(Self { inner, tick_handle })
110    }
111
112    /// Submit a FlowSpec for execution.
113    ///
114    /// Validates, compiles, and submits the Coordinator task to AQ. Returns
115    /// the FlowRunId for subsequent status queries.
116    pub async fn submit_flow(&self, spec: FlowSpec) -> Result<FlowRunId, HostError> {
117        let flow_run_id = FlowRunId::new();
118        tracing::info!(%flow_run_id, "submitting flow");
119        let result = worldinterface_flowspec::compile_with_config(
120            &spec,
121            &self.inner.compiler_config,
122            flow_run_id,
123        )?;
124
125        // Step 1: Insert into coordinator_map (brief std::sync::Mutex lock, no await)
126        {
127            let mut map = self.inner.coordinator_map.lock().unwrap();
128            map.insert(flow_run_id, result.coordinator.id());
129        }
130
131        // Step 2: Persist to ContextStore (no mutex held)
132        persist_coordinator_map(&self.inner.store, &self.inner.coordinator_map)?;
133
134        // Step 3: Submit to AQ engine (tokio::sync::Mutex)
135        {
136            let mut guard = self.inner.engine.lock().await;
137            let engine = guard
138                .as_mut()
139                .ok_or_else(|| HostError::InternalError("engine already shut down".into()))?;
140            engine.submit_task(result.coordinator)?;
141        }
142
143        Ok(flow_run_id)
144    }
145
146    /// Submit a FlowSpec with trigger input data.
147    ///
148    /// Like `submit_flow`, but additionally writes `trigger_input` to ContextStore
149    /// at `(flow_run_id, TRIGGER_INPUT_NODE_ID)` before submitting the Coordinator
150    /// task to AQ. This ensures trigger data is durably available before any step
151    /// can attempt to read it.
152    pub async fn submit_flow_with_trigger_input(
153        &self,
154        spec: FlowSpec,
155        trigger_input: Value,
156    ) -> Result<FlowRunId, HostError> {
157        let flow_run_id = FlowRunId::new();
158        tracing::info!(%flow_run_id, trigger = true, "submitting flow with trigger input");
159        let result = worldinterface_flowspec::compile_with_config(
160            &spec,
161            &self.inner.compiler_config,
162            flow_run_id,
163        )?;
164
165        // Write trigger input to ContextStore BEFORE AQ submission.
166        let trigger_node_id = trigger_input_node_id();
167        self.inner.store.put(flow_run_id, trigger_node_id, &trigger_input)?;
168
169        // Insert into coordinator_map (brief std::sync::Mutex lock, no await)
170        {
171            let mut map = self.inner.coordinator_map.lock().unwrap();
172            map.insert(flow_run_id, result.coordinator.id());
173        }
174
175        // Persist to ContextStore (no mutex held)
176        persist_coordinator_map(&self.inner.store, &self.inner.coordinator_map)?;
177
178        // Submit to AQ engine (tokio::sync::Mutex)
179        {
180            let mut guard = self.inner.engine.lock().await;
181            let engine = guard
182                .as_mut()
183                .ok_or_else(|| HostError::InternalError("engine already shut down".into()))?;
184            engine.submit_task(result.coordinator)?;
185        }
186
187        Ok(flow_run_id)
188    }
189
190    /// Access the shared ContextStore.
191    ///
192    /// Used by the daemon for webhook registry persistence and trigger
193    /// receipt storage. ContextStore operations are thread-safe.
194    pub fn context_store(&self) -> &dyn ContextStore {
195        self.inner.store.as_ref()
196    }
197
198    /// Store a trigger receipt in ContextStore globals.
199    ///
200    /// Receipts are stored under `receipt:trigger:<flow_run_id>` for later retrieval.
201    pub fn store_trigger_receipt(
202        &self,
203        flow_run_id: FlowRunId,
204        receipt: &Value,
205    ) -> Result<(), HostError> {
206        let key = format!("receipt:trigger:{}", flow_run_id);
207        self.inner.store.upsert_global(&key, receipt)?;
208        Ok(())
209    }
210
211    /// Query the status of a flow run.
212    pub async fn run_status(&self, flow_run_id: FlowRunId) -> Result<FlowRunStatus, HostError> {
213        let coordinator_task_id = {
214            let map = self.inner.coordinator_map.lock().unwrap();
215            match map.get(&flow_run_id) {
216                Some(&tid) => tid,
217                None => return Err(HostError::FlowRunNotFound(flow_run_id)),
218            }
219        };
220
221        let guard = self.inner.engine.lock().await;
222        let engine = guard
223            .as_ref()
224            .ok_or_else(|| HostError::InternalError("engine already shut down".into()))?;
225        let status = derive_flow_run_status(
226            engine.projection(),
227            self.inner.store.as_ref(),
228            flow_run_id,
229            coordinator_task_id,
230        );
231        Ok(status)
232    }
233
234    /// List all connector capabilities available in the registry.
235    pub fn list_capabilities(&self) -> Vec<Descriptor> {
236        self.inner.registry.list_capabilities()
237    }
238
239    /// Describe a specific connector by name.
240    pub fn describe(&self, connector_name: &str) -> Option<Descriptor> {
241        self.inner.registry.describe(connector_name)
242    }
243
244    /// List all known flow runs with summary status.
245    ///
246    /// Returns a summary for each flow in the coordinator map. Terminal flows
247    /// that have been pruned are not included.
248    pub async fn list_runs(&self) -> Result<Vec<FlowRunSummary>, HostError> {
249        let flow_run_ids: Vec<FlowRunId> = {
250            let map = self.inner.coordinator_map.lock().unwrap();
251            map.keys().copied().collect()
252        };
253
254        let mut summaries = Vec::with_capacity(flow_run_ids.len());
255        for flow_run_id in flow_run_ids {
256            match self.run_status(flow_run_id).await {
257                Ok(status) => summaries.push(FlowRunSummary {
258                    flow_run_id: status.flow_run_id,
259                    phase: status.phase,
260                    submitted_at: status.submitted_at,
261                    last_updated_at: status.last_updated_at,
262                }),
263                Err(HostError::FlowRunNotFound(_)) => continue,
264                Err(e) => return Err(e),
265            }
266        }
267
268        summaries.sort_by(|a, b| b.submitted_at.cmp(&a.submitted_at));
269        Ok(summaries)
270    }
271
272    /// Invoke a single connector operation.
273    ///
274    /// Creates an ephemeral 1-node FlowSpec and executes it through the full
275    /// AQ path (never bypasses AQ — Invariant 1).
276    pub async fn invoke_single(
277        &self,
278        connector_name: &str,
279        params: Value,
280    ) -> Result<Value, HostError> {
281        // Verify connector exists before building the flow
282        if self.inner.registry.get(connector_name).is_none() {
283            return Err(HostError::ConnectorNotFound(connector_name.to_string()));
284        }
285
286        let spec = build_single_node_flowspec(connector_name, params);
287        let node_id = spec.nodes[0].id;
288        let flow_run_id = self.submit_flow(spec).await?;
289
290        // Poll until terminal
291        let status = self.poll_until_terminal(flow_run_id).await?;
292
293        match status.phase {
294            FlowPhase::Completed => {
295                // Get the single node's output
296                let outputs = extract_flow_outputs(self.inner.store.as_ref(), flow_run_id)?;
297                outputs.get(&node_id).cloned().ok_or_else(|| {
298                    HostError::InternalError(format!(
299                        "flow completed but no output for node {node_id}"
300                    ))
301                })
302            }
303            FlowPhase::Failed => Err(HostError::FlowFailed {
304                flow_run_id,
305                error: status.error.unwrap_or_else(|| "unknown error".into()),
306            }),
307            FlowPhase::Canceled => Err(HostError::FlowCanceled(flow_run_id)),
308            _ => Err(HostError::InternalError(format!(
309                "unexpected terminal phase: {:?}",
310                status.phase
311            ))),
312        }
313    }
314
315    /// Poll a flow run until it reaches a terminal state.
316    async fn poll_until_terminal(
317        &self,
318        flow_run_id: FlowRunId,
319    ) -> Result<FlowRunStatus, HostError> {
320        loop {
321            let status = self.run_status(flow_run_id).await?;
322            match status.phase {
323                FlowPhase::Completed | FlowPhase::Failed | FlowPhase::Canceled => {
324                    return Ok(status);
325                }
326                _ => {
327                    tokio::time::sleep(std::time::Duration::from_millis(20)).await;
328                }
329            }
330        }
331    }
332
333    /// Stop the tick loop and take the engine out for safe disposal.
334    ///
335    /// Takes the engine out of the `Option`, so when `HostInner` is later
336    /// dropped in async context, the `Option` is `None` and no runtime panic occurs.
337    async fn stop_and_take_engine(self) -> Result<Option<Engine>, HostError> {
338        // Signal tick loop to stop
339        let _ = self.inner.shutdown_tx.send(true);
340
341        // Wait for tick loop task to exit
342        if let Err(e) = self.tick_handle.await {
343            tracing::warn!(error = %e, "tick loop task panicked during shutdown");
344        }
345
346        // Take the engine out of the slot
347        let mut guard = self.inner.engine.lock().await;
348        Ok(guard.take())
349    }
350
351    /// Shut down the host gracefully.
352    ///
353    /// Stops the tick loop. In-flight handlers complete in background.
354    /// On next restart, incomplete tasks are recovered from the WAL.
355    pub async fn shutdown(self) -> Result<(), HostError> {
356        tracing::info!("shutting down host");
357
358        let engine = self.stop_and_take_engine().await?;
359
360        // Drop engine on a blocking thread — AQ's internal runtime cannot
361        // be dropped from within an async context.
362        if let Some(eng) = engine {
363            tokio::task::spawn_blocking(move || {
364                drop(eng);
365            })
366            .await
367            .map_err(|e| HostError::InternalError(format!("shutdown join error: {e}")))?;
368        }
369
370        tracing::info!("host shut down");
371        Ok(())
372    }
373
374    /// Count currently active (non-terminal) flow runs.
375    ///
376    /// The coordinator map only contains non-terminal flows (pruned on completion),
377    /// so its length is a good approximation of active flows.
378    pub fn active_flow_count(&self) -> usize {
379        let map = self.inner.coordinator_map.lock().unwrap();
380        map.len()
381    }
382
383    /// Drop the host without graceful shutdown — simulates a crash.
384    ///
385    /// Stops the tick loop, then drops the engine on a blocking thread.
386    /// Data files are preserved for crash-resume testing.
387    #[doc(hidden)]
388    pub async fn crash_drop(self) {
389        let engine = self.stop_and_take_engine().await.ok().flatten();
390        if let Some(eng) = engine {
391            tokio::task::spawn_blocking(move || {
392                drop(eng);
393            })
394            .await
395            .ok();
396        }
397    }
398}