1use std::collections::HashMap;
4use std::sync::Arc;
5
6use actionqueue_core::ids::TaskId;
7use actionqueue_runtime::engine::ActionQueueEngine;
8use serde_json::Value;
9use tokio::sync::watch;
10use tokio::task::JoinHandle;
11use worldinterface_connector::ConnectorRegistry;
12use worldinterface_contextstore::ContextStore;
13use worldinterface_contextstore::SqliteContextStore;
14use worldinterface_coordinator::FlowHandler;
15use worldinterface_core::descriptor::Descriptor;
16use worldinterface_core::flowspec::FlowSpec;
17use worldinterface_core::id::{trigger_input_node_id, FlowRunId};
18
19use crate::config::HostConfig;
20use crate::error::HostError;
21use crate::helpers::{
22 build_single_node_flowspec, persist_coordinator_map, restore_coordinator_map,
23};
24use crate::status::{
25 derive_flow_run_status, extract_flow_outputs, FlowPhase, FlowRunStatus, FlowRunSummary,
26};
27use crate::tick::{tick_loop, Engine};
28
29pub(crate) type EngineSlot = Arc<tokio::sync::Mutex<Option<Engine>>>;
32
33pub(crate) struct HostInner {
35 pub engine: EngineSlot,
36 pub store: Arc<SqliteContextStore>,
37 pub registry: Arc<ConnectorRegistry>,
38 pub coordinator_map: Arc<std::sync::Mutex<HashMap<FlowRunId, TaskId>>>,
39 pub compiler_config: worldinterface_flowspec::CompilerConfig,
40 pub shutdown_tx: watch::Sender<bool>,
41}
42
43pub struct EmbeddedHost {
49 pub(crate) inner: Arc<HostInner>,
50 tick_handle: JoinHandle<()>,
51}
52
53impl EmbeddedHost {
54 pub async fn start(config: HostConfig, registry: ConnectorRegistry) -> Result<Self, HostError> {
57 config.validate()?;
58
59 std::fs::create_dir_all(&config.aq_data_dir)?;
61 if let Some(parent) = config.context_store_path.parent() {
62 if !parent.as_os_str().is_empty() {
63 std::fs::create_dir_all(parent)?;
64 }
65 }
66
67 let store = Arc::new(SqliteContextStore::open(&config.context_store_path)?);
69
70 let coordinator_map = restore_coordinator_map(&store)?;
72 let coordinator_map = Arc::new(std::sync::Mutex::new(coordinator_map));
73
74 let registry = Arc::new(registry);
76 let handler = FlowHandler::new(
77 Arc::clone(®istry),
78 Arc::clone(&store),
79 Arc::clone(&config.metrics),
80 );
81 let runtime_config = config.to_runtime_config();
82 let aq_engine = ActionQueueEngine::new(runtime_config, handler);
83 let bootstrapped = aq_engine.bootstrap()?;
84 let engine: EngineSlot = Arc::new(tokio::sync::Mutex::new(Some(bootstrapped)));
85
86 let (shutdown_tx, shutdown_rx) = watch::channel(false);
88
89 let tick_engine = Arc::clone(&engine);
91 let tick_map = Arc::clone(&coordinator_map);
92 let tick_store = Arc::clone(&store);
93 let tick_interval = config.tick_interval;
94 let tick_metrics = Arc::clone(&config.metrics);
95 let tick_handle = tokio::spawn(async move {
96 tick_loop(tick_engine, tick_map, tick_store, tick_interval, shutdown_rx, tick_metrics)
97 .await;
98 });
99
100 let inner = Arc::new(HostInner {
101 engine,
102 store,
103 registry,
104 coordinator_map,
105 compiler_config: config.compiler_config,
106 shutdown_tx,
107 });
108
109 Ok(Self { inner, tick_handle })
110 }
111
112 pub async fn submit_flow(&self, spec: FlowSpec) -> Result<FlowRunId, HostError> {
117 let flow_run_id = FlowRunId::new();
118 tracing::info!(%flow_run_id, "submitting flow");
119 let result = worldinterface_flowspec::compile_with_config(
120 &spec,
121 &self.inner.compiler_config,
122 flow_run_id,
123 )?;
124
125 {
127 let mut map = self.inner.coordinator_map.lock().unwrap();
128 map.insert(flow_run_id, result.coordinator.id());
129 }
130
131 persist_coordinator_map(&self.inner.store, &self.inner.coordinator_map)?;
133
134 {
136 let mut guard = self.inner.engine.lock().await;
137 let engine = guard
138 .as_mut()
139 .ok_or_else(|| HostError::InternalError("engine already shut down".into()))?;
140 engine.submit_task(result.coordinator)?;
141 }
142
143 Ok(flow_run_id)
144 }
145
146 pub async fn submit_flow_with_trigger_input(
153 &self,
154 spec: FlowSpec,
155 trigger_input: Value,
156 ) -> Result<FlowRunId, HostError> {
157 let flow_run_id = FlowRunId::new();
158 tracing::info!(%flow_run_id, trigger = true, "submitting flow with trigger input");
159 let result = worldinterface_flowspec::compile_with_config(
160 &spec,
161 &self.inner.compiler_config,
162 flow_run_id,
163 )?;
164
165 let trigger_node_id = trigger_input_node_id();
167 self.inner.store.put(flow_run_id, trigger_node_id, &trigger_input)?;
168
169 {
171 let mut map = self.inner.coordinator_map.lock().unwrap();
172 map.insert(flow_run_id, result.coordinator.id());
173 }
174
175 persist_coordinator_map(&self.inner.store, &self.inner.coordinator_map)?;
177
178 {
180 let mut guard = self.inner.engine.lock().await;
181 let engine = guard
182 .as_mut()
183 .ok_or_else(|| HostError::InternalError("engine already shut down".into()))?;
184 engine.submit_task(result.coordinator)?;
185 }
186
187 Ok(flow_run_id)
188 }
189
190 pub fn context_store(&self) -> &dyn ContextStore {
195 self.inner.store.as_ref()
196 }
197
198 pub fn store_trigger_receipt(
202 &self,
203 flow_run_id: FlowRunId,
204 receipt: &Value,
205 ) -> Result<(), HostError> {
206 let key = format!("receipt:trigger:{}", flow_run_id);
207 self.inner.store.upsert_global(&key, receipt)?;
208 Ok(())
209 }
210
211 pub async fn run_status(&self, flow_run_id: FlowRunId) -> Result<FlowRunStatus, HostError> {
213 let coordinator_task_id = {
214 let map = self.inner.coordinator_map.lock().unwrap();
215 match map.get(&flow_run_id) {
216 Some(&tid) => tid,
217 None => return Err(HostError::FlowRunNotFound(flow_run_id)),
218 }
219 };
220
221 let guard = self.inner.engine.lock().await;
222 let engine = guard
223 .as_ref()
224 .ok_or_else(|| HostError::InternalError("engine already shut down".into()))?;
225 let status = derive_flow_run_status(
226 engine.projection(),
227 self.inner.store.as_ref(),
228 flow_run_id,
229 coordinator_task_id,
230 );
231 Ok(status)
232 }
233
234 pub fn list_capabilities(&self) -> Vec<Descriptor> {
236 self.inner.registry.list_capabilities()
237 }
238
239 pub fn describe(&self, connector_name: &str) -> Option<Descriptor> {
241 self.inner.registry.describe(connector_name)
242 }
243
244 pub async fn list_runs(&self) -> Result<Vec<FlowRunSummary>, HostError> {
249 let flow_run_ids: Vec<FlowRunId> = {
250 let map = self.inner.coordinator_map.lock().unwrap();
251 map.keys().copied().collect()
252 };
253
254 let mut summaries = Vec::with_capacity(flow_run_ids.len());
255 for flow_run_id in flow_run_ids {
256 match self.run_status(flow_run_id).await {
257 Ok(status) => summaries.push(FlowRunSummary {
258 flow_run_id: status.flow_run_id,
259 phase: status.phase,
260 submitted_at: status.submitted_at,
261 last_updated_at: status.last_updated_at,
262 }),
263 Err(HostError::FlowRunNotFound(_)) => continue,
264 Err(e) => return Err(e),
265 }
266 }
267
268 summaries.sort_by(|a, b| b.submitted_at.cmp(&a.submitted_at));
269 Ok(summaries)
270 }
271
272 pub async fn invoke_single(
277 &self,
278 connector_name: &str,
279 params: Value,
280 ) -> Result<Value, HostError> {
281 if self.inner.registry.get(connector_name).is_none() {
283 return Err(HostError::ConnectorNotFound(connector_name.to_string()));
284 }
285
286 let spec = build_single_node_flowspec(connector_name, params);
287 let node_id = spec.nodes[0].id;
288 let flow_run_id = self.submit_flow(spec).await?;
289
290 let status = self.poll_until_terminal(flow_run_id).await?;
292
293 match status.phase {
294 FlowPhase::Completed => {
295 let outputs = extract_flow_outputs(self.inner.store.as_ref(), flow_run_id)?;
297 outputs.get(&node_id).cloned().ok_or_else(|| {
298 HostError::InternalError(format!(
299 "flow completed but no output for node {node_id}"
300 ))
301 })
302 }
303 FlowPhase::Failed => Err(HostError::FlowFailed {
304 flow_run_id,
305 error: status.error.unwrap_or_else(|| "unknown error".into()),
306 }),
307 FlowPhase::Canceled => Err(HostError::FlowCanceled(flow_run_id)),
308 _ => Err(HostError::InternalError(format!(
309 "unexpected terminal phase: {:?}",
310 status.phase
311 ))),
312 }
313 }
314
315 async fn poll_until_terminal(
317 &self,
318 flow_run_id: FlowRunId,
319 ) -> Result<FlowRunStatus, HostError> {
320 loop {
321 let status = self.run_status(flow_run_id).await?;
322 match status.phase {
323 FlowPhase::Completed | FlowPhase::Failed | FlowPhase::Canceled => {
324 return Ok(status);
325 }
326 _ => {
327 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
328 }
329 }
330 }
331 }
332
333 async fn stop_and_take_engine(self) -> Result<Option<Engine>, HostError> {
338 let _ = self.inner.shutdown_tx.send(true);
340
341 if let Err(e) = self.tick_handle.await {
343 tracing::warn!(error = %e, "tick loop task panicked during shutdown");
344 }
345
346 let mut guard = self.inner.engine.lock().await;
348 Ok(guard.take())
349 }
350
351 pub async fn shutdown(self) -> Result<(), HostError> {
356 tracing::info!("shutting down host");
357
358 let engine = self.stop_and_take_engine().await?;
359
360 if let Some(eng) = engine {
363 tokio::task::spawn_blocking(move || {
364 drop(eng);
365 })
366 .await
367 .map_err(|e| HostError::InternalError(format!("shutdown join error: {e}")))?;
368 }
369
370 tracing::info!("host shut down");
371 Ok(())
372 }
373
374 pub fn active_flow_count(&self) -> usize {
379 let map = self.inner.coordinator_map.lock().unwrap();
380 map.len()
381 }
382
383 #[doc(hidden)]
388 pub async fn crash_drop(self) {
389 let engine = self.stop_and_take_engine().await.ok().flatten();
390 if let Some(eng) = engine {
391 tokio::task::spawn_blocking(move || {
392 drop(eng);
393 })
394 .await
395 .ok();
396 }
397 }
398}