Skip to main content

phlow_runtime/
runtime_api.rs

1//! Phlow runtime API for in-memory pipelines.
2//!
3//! # Example
4//!
5//! ```no_run
6//! use phlow_engine::Context;
7//! use phlow_runtime::PhlowBuilder;
8//! use phlow_sdk::prelude::json;
9//!
10//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
11//! let pipeline = json!({
12//!     "steps": [
13//!         { "payload": "{{ main.name }}" }
14//!     ]
15//! });
16//! let context = Context::from_main(json!({ "name": "Phlow" }));
17//!
18//! let mut builder = PhlowBuilder::new();
19//! builder.settings_mut().download = false;
20//! let mut runtime = builder
21//!     .set_pipeline(pipeline)
22//!     .set_context(context)
23//!     .build()
24//!     .await
25//!     .unwrap();
26//!
27//! let result = runtime.run().await.unwrap();
28//! let _ = result;
29//! runtime.shutdown().await.unwrap();
30//! # });
31//! ```
32use crate::debug_server;
33use crate::loader::Loader;
34use crate::runtime::Runtime;
35use crate::runtime::RuntimeError;
36use crate::settings::Settings;
37use crossbeam::channel;
38use phlow_engine::Context;
39use phlow_sdk::otel::{OtelGuard, init_tracing_subscriber};
40use phlow_sdk::prelude::Value;
41use phlow_sdk::structs::Package;
42use phlow_sdk::{tracing, use_log};
43use std::fmt::{Display, Formatter};
44use std::path::PathBuf;
45use std::sync::Arc;
46
47/// Errors returned by the runtime API.
48#[derive(Debug)]
49pub enum PhlowRuntimeError {
50    /// Pipeline was not provided.
51    MissingPipeline,
52    /// Failed to load the pipeline into a loader.
53    LoaderError(crate::loader::error::Error),
54    /// Failed to send a package to the runtime loop.
55    PackageSendError,
56    /// Response channel closed before a result arrived.
57    ResponseChannelClosed,
58    /// Error reported by runtime execution.
59    RuntimeError(RuntimeError),
60    /// Join error from the runtime task.
61    RuntimeJoinError(tokio::task::JoinError),
62}
63
64impl Display for PhlowRuntimeError {
65    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66        match self {
67            PhlowRuntimeError::MissingPipeline => write!(f, "Pipeline not set"),
68            PhlowRuntimeError::LoaderError(err) => write!(f, "Loader error: {}", err),
69            PhlowRuntimeError::PackageSendError => write!(f, "Failed to send package"),
70            PhlowRuntimeError::ResponseChannelClosed => write!(f, "Response channel closed"),
71            PhlowRuntimeError::RuntimeError(err) => write!(f, "Runtime error: {}", err),
72            PhlowRuntimeError::RuntimeJoinError(err) => write!(f, "Runtime task error: {}", err),
73        }
74    }
75}
76
77impl std::error::Error for PhlowRuntimeError {}
78
79impl From<crate::loader::error::Error> for PhlowRuntimeError {
80    fn from(err: crate::loader::error::Error) -> Self {
81        PhlowRuntimeError::LoaderError(err)
82    }
83}
84
85impl From<RuntimeError> for PhlowRuntimeError {
86    fn from(err: RuntimeError) -> Self {
87        PhlowRuntimeError::RuntimeError(err)
88    }
89}
90
91/// Prepared runtime that can execute an in-memory pipeline.
92pub struct PhlowRuntime {
93    pipeline: Option<Value>,
94    context: Option<Context>,
95    settings: Settings,
96    base_path: Option<PathBuf>,
97    dispatch: Option<tracing::Dispatch>,
98    prepared: Option<PreparedRuntime>,
99}
100
101/// Builder for creating a prepared [`PhlowRuntime`].
102///
103/// Use this when you want a fluent API that returns a ready runtime.
104pub struct PhlowBuilder {
105    pipeline: Option<Value>,
106    context: Option<Context>,
107    settings: Settings,
108    base_path: Option<PathBuf>,
109    dispatch: Option<tracing::Dispatch>,
110}
111
112impl Default for PhlowRuntime {
113    fn default() -> Self {
114        Self::new()
115    }
116}
117
118impl PhlowRuntime {
119    /// Create a new runtime with default settings.
120    ///
121    /// This sets `var_main` to a default value so non-main pipelines auto-start.
122    pub fn new() -> Self {
123        let mut settings = Settings::for_runtime();
124        if settings.var_main.is_none() {
125            settings.var_main = Some("__phlow_runtime__".to_string());
126        }
127
128        Self {
129            pipeline: None,
130            context: None,
131            settings,
132            base_path: None,
133            dispatch: None,
134            prepared: None,
135        }
136    }
137
138    /// Create a new runtime using explicit settings.
139    pub fn with_settings(settings: Settings) -> Self {
140        Self {
141            pipeline: None,
142            context: None,
143            settings,
144            base_path: None,
145            dispatch: None,
146            prepared: None,
147        }
148    }
149
150    /// Set the pipeline to be executed.
151    ///
152    /// This clears any prepared runtime state.
153    pub fn set_pipeline(&mut self, pipeline: Value) -> &mut Self {
154        self.pipeline = Some(pipeline);
155        self.prepared = None;
156        self
157    }
158
159    /// Set the execution context.
160    ///
161    /// This clears any prepared runtime state.
162    pub fn set_context(&mut self, context: Context) -> &mut Self {
163        self.context = Some(context);
164        self.prepared = None;
165        self
166    }
167
168    /// Replace the runtime settings.
169    ///
170    /// This clears any prepared runtime state.
171    pub fn set_settings(&mut self, settings: Settings) -> &mut Self {
172        self.settings = settings;
173        self.prepared = None;
174        self
175    }
176
177    /// Set the base path used for resolving local module paths.
178    ///
179    /// This clears any prepared runtime state.
180    pub fn set_base_path<P: Into<PathBuf>>(&mut self, base_path: P) -> &mut Self {
181        self.base_path = Some(base_path.into());
182        self.prepared = None;
183        self
184    }
185
186    /// Provide a custom tracing dispatch instead of initializing OpenTelemetry.
187    ///
188    /// This clears any prepared runtime state.
189    pub fn set_dispatch(&mut self, dispatch: tracing::Dispatch) -> &mut Self {
190        self.dispatch = Some(dispatch);
191        self.prepared = None;
192        self
193    }
194
195    /// Read-only access to the current settings.
196    pub fn settings(&self) -> &Settings {
197        &self.settings
198    }
199
200    /// Mutable access to settings.
201    ///
202    /// This clears any prepared runtime state.
203    pub fn settings_mut(&mut self) -> &mut Settings {
204        self.prepared = None;
205        &mut self.settings
206    }
207
208    /// Build and prepare the runtime (load modules, tracing, and start loop).
209    ///
210    /// Calling this multiple times is safe; it is a no-op if already prepared.
211    pub async fn build(&mut self) -> Result<(), PhlowRuntimeError> {
212        if self.prepared.is_some() {
213            return Ok(());
214        }
215
216        use_log!();
217
218        let pipeline = self
219            .pipeline
220            .as_ref()
221            .ok_or(PhlowRuntimeError::MissingPipeline)?;
222
223        let base_path = self.base_path.clone().unwrap_or_else(|| {
224            std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
225        });
226
227        let mut loader = Loader::from_value(pipeline, Some(base_path.as_path()))?;
228
229        if self.settings.download {
230            loader
231                .download(&self.settings.default_package_repository_url)
232                .await?;
233        }
234
235        loader.update_info();
236
237        let mut guard: Option<OtelGuard> = None;
238        let dispatch = if let Some(dispatch) = self.dispatch.clone() {
239            dispatch
240        } else {
241            let next_guard = init_tracing_subscriber(loader.app_data.clone());
242            let dispatch = next_guard.dispatch.clone();
243            guard = Some(next_guard);
244            dispatch
245        };
246
247        let debug_enabled = std::env::var("PHLOW_DEBUG")
248            .map(|value| value.eq_ignore_ascii_case("true"))
249            .unwrap_or(false);
250        if debug_enabled {
251            let controller = Arc::new(phlow_engine::debug::DebugController::new());
252            match debug_server::spawn(controller.clone()).await {
253                Ok(()) => {
254                    if phlow_engine::debug::set_debug_controller(controller).is_err() {
255                        log::warn!("Debug controller already set");
256                    }
257                    log::info!("Phlow debug enabled");
258                }
259                Err(err) => {
260                    log::error!("Failed to start debug server: {}", err);
261                }
262            }
263        }
264
265        let context = self.context.clone().unwrap_or_else(Context::new);
266        let request_data = context.get_main();
267        let context_for_runtime = context.clone();
268        let auto_start = self.settings.var_main.is_some()
269            || loader.main == -1
270            || context.get_main().is_some();
271
272        let app_name = loader
273            .app_data
274            .name
275            .clone()
276            .unwrap_or_else(|| "phlow runtime".to_string());
277
278        let settings = self.settings.clone();
279        let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
280        let tx_for_runtime = tx_main_package.clone();
281        let dispatch_for_runtime = dispatch.clone();
282
283        let runtime_handle = tokio::spawn(async move {
284            tracing::dispatcher::with_default(&dispatch_for_runtime, || {
285                Runtime::run_script(
286                    tx_for_runtime,
287                    rx_main_package,
288                    loader,
289                    dispatch_for_runtime.clone(),
290                    settings,
291                    context_for_runtime,
292                )
293            })
294            .await
295        });
296
297        self.prepared = Some(PreparedRuntime {
298            tx_main_package,
299            dispatch,
300            runtime_handle,
301            guard,
302            app_name,
303            request_data,
304            auto_start,
305        });
306
307        Ok(())
308    }
309
310    /// Execute the pipeline and return its result.
311    ///
312    /// This can be called multiple times after [`build`](Self::build). When the
313    /// pipeline cannot auto-start (for example, a main module is present and
314    /// `var_main` is not set), this returns `Value::Undefined` and shuts down
315    /// the prepared runtime. For normal execution, call [`shutdown`](Self::shutdown)
316    /// when you are done to release resources.
317    pub async fn run(&mut self) -> Result<Value, PhlowRuntimeError> {
318        self.build().await?;
319
320        let auto_start = match self.prepared.as_ref() {
321            Some(prepared) => prepared.auto_start,
322            None => return Err(PhlowRuntimeError::MissingPipeline),
323        };
324
325        if !auto_start {
326            self.shutdown().await?;
327            return Ok(Value::Undefined);
328        }
329
330        let (tx_main_package, dispatch, app_name, request_data) = match self.prepared.as_ref() {
331            Some(prepared) => (
332                prepared.tx_main_package.clone(),
333                prepared.dispatch.clone(),
334                prepared.app_name.clone(),
335                prepared.request_data.clone(),
336            ),
337            None => return Err(PhlowRuntimeError::MissingPipeline),
338        };
339
340        let (response_tx, response_rx) = tokio::sync::oneshot::channel::<Value>();
341        let package = tracing::dispatcher::with_default(&dispatch, || {
342            let span = tracing::span!(
343                tracing::Level::INFO,
344                "phlow_run",
345                otel.name = app_name.as_str()
346            );
347
348            Package {
349                response: Some(response_tx),
350                request_data,
351                origin: 0,
352                span: Some(span),
353                dispatch: Some(dispatch.clone()),
354            }
355        });
356
357        if tx_main_package.send(package).is_err() {
358            return Err(PhlowRuntimeError::PackageSendError);
359        }
360
361        let result = response_rx
362            .await
363            .map_err(|_| PhlowRuntimeError::ResponseChannelClosed)?;
364
365        Ok(result)
366    }
367
368    /// Shut down the prepared runtime and release resources.
369    ///
370    /// Call this when you are done reusing the runtime to close channels,
371    /// wait for the runtime task, and flush tracing providers.
372    pub async fn shutdown(&mut self) -> Result<(), PhlowRuntimeError> {
373        let prepared = match self.prepared.take() {
374            Some(prepared) => prepared,
375            None => return Ok(()),
376        };
377
378        drop(prepared.tx_main_package);
379
380        let runtime_result = prepared
381            .runtime_handle
382            .await
383            .map_err(PhlowRuntimeError::RuntimeJoinError)?;
384        runtime_result?;
385
386        drop(prepared.guard);
387
388        Ok(())
389    }
390}
391
392impl PhlowBuilder {
393    /// Create a new builder with default settings.
394    ///
395    /// This sets `var_main` to a default value so non-main pipelines auto-start.
396    pub fn new() -> Self {
397        let mut settings = Settings::for_runtime();
398        if settings.var_main.is_none() {
399            settings.var_main = Some("__phlow_runtime__".to_string());
400        }
401
402        Self {
403            pipeline: None,
404            context: None,
405            settings,
406            base_path: None,
407            dispatch: None,
408        }
409    }
410
411    /// Create a new builder using explicit settings.
412    pub fn with_settings(settings: Settings) -> Self {
413        Self {
414            pipeline: None,
415            context: None,
416            settings,
417            base_path: None,
418            dispatch: None,
419        }
420    }
421
422    /// Set the pipeline to be executed.
423    ///
424    /// Returns the builder for chaining.
425    pub fn set_pipeline(mut self, pipeline: Value) -> Self {
426        self.pipeline = Some(pipeline);
427        self
428    }
429
430    /// Set the execution context.
431    ///
432    /// Returns the builder for chaining.
433    pub fn set_context(mut self, context: Context) -> Self {
434        self.context = Some(context);
435        self
436    }
437
438    /// Replace the runtime settings.
439    ///
440    /// Returns the builder for chaining.
441    pub fn set_settings(mut self, settings: Settings) -> Self {
442        self.settings = settings;
443        self
444    }
445
446    /// Set the base path used for resolving local module paths.
447    ///
448    /// Returns the builder for chaining.
449    pub fn set_base_path<P: Into<PathBuf>>(mut self, base_path: P) -> Self {
450        self.base_path = Some(base_path.into());
451        self
452    }
453
454    /// Provide a custom tracing dispatch instead of initializing OpenTelemetry.
455    ///
456    /// Returns the builder for chaining.
457    pub fn set_dispatch(mut self, dispatch: tracing::Dispatch) -> Self {
458        self.dispatch = Some(dispatch);
459        self
460    }
461
462    /// Read-only access to the current settings.
463    pub fn settings(&self) -> &Settings {
464        &self.settings
465    }
466
467    /// Mutable access to settings.
468    pub fn settings_mut(&mut self) -> &mut Settings {
469        &mut self.settings
470    }
471
472    /// Build and return a prepared [`PhlowRuntime`].
473    ///
474    /// This consumes the builder and prepares the runtime for execution.
475    pub async fn build(mut self) -> Result<PhlowRuntime, PhlowRuntimeError> {
476        let mut runtime = PhlowRuntime::with_settings(self.settings);
477
478        if let Some(pipeline) = self.pipeline.take() {
479            runtime.set_pipeline(pipeline);
480        }
481
482        if let Some(context) = self.context.take() {
483            runtime.set_context(context);
484        }
485
486        if let Some(base_path) = self.base_path.take() {
487            runtime.set_base_path(base_path);
488        }
489
490        if let Some(dispatch) = self.dispatch.take() {
491            runtime.set_dispatch(dispatch);
492        }
493
494        runtime.build().await?;
495        Ok(runtime)
496    }
497}
498
499impl Default for PhlowBuilder {
500    fn default() -> Self {
501        Self::new()
502    }
503}
504
505struct PreparedRuntime {
506    tx_main_package: channel::Sender<Package>,
507    dispatch: tracing::Dispatch,
508    runtime_handle: tokio::task::JoinHandle<Result<(), RuntimeError>>,
509    guard: Option<OtelGuard>,
510    app_name: String,
511    request_data: Option<Value>,
512    auto_start: bool,
513}