phlow_runtime/
runtime_api.rs

1//! Phlow runtime API for in-memory pipelines.
2//!
3//! # Example
4//!
5//! ```no_run
6//! use phlow_engine::Context;
7//! use phlow_runtime::PhlowBuilder;
8//! use phlow_sdk::prelude::json;
9//!
10//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
11//! let pipeline = json!({
12//!     "steps": [
13//!         { "payload": "{{ main.name }}" }
14//!     ]
15//! });
16//! let context = Context::from_main(json!({ "name": "Phlow" }));
17//!
18//! let mut builder = PhlowBuilder::new();
19//! builder.settings_mut().download = false;
20//! let mut runtime = builder
21//!     .set_pipeline(pipeline)
22//!     .set_context(context)
23//!     .build()
24//!     .await
25//!     .unwrap();
26//!
27//! let result = runtime.run().await.unwrap();
28//! let _ = result;
29//! runtime.shutdown().await.unwrap();
30//! # });
31//! ```
32//!
33//! # Inline modules
34//!
35//! You can register inline modules with async handlers that run in-process.
36//! The module name must be declared in the pipeline `modules` list.
37//!
38//! ```no_run
39//! use phlow_engine::Context;
40//! use phlow_runtime::{PhlowBuilder, PhlowModule, PhlowModuleSchema};
41//! use phlow_sdk::prelude::json;
42//! use phlow_sdk::structs::ModuleResponse;
43//!
44//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
45//! let pipeline = json!({
46//!     "modules": [
47//!         { "module": "inline_echo", "name": "inline_echo" }
48//!     ],
49//!     "steps": [
50//!         { "use": "inline_echo", "input": { "name": "{{ main.name }}" } },
51//!         { "payload": "{{ payload.message }}" }
52//!     ]
53//! });
54//! let context = Context::from_main(json!({ "name": "Phlow" }));
55//!
56//! let mut module = PhlowModule::new();
57//! module.set_schema(
58//!     PhlowModuleSchema::new()
59//!         .with_input(json!({ "name": "string" }))
60//!         .with_output(json!({ "message": "string" }))
61//!         .with_input_order(vec!["name"]),
62//! );
63//! module.set_handler(|request| async move {
64//!     let name = request
65//!         .input
66//!         .and_then(|value| value.get("name").cloned())
67//!         .unwrap_or_else(|| json!("unknown"));
68//!     let message = format!("Hello, {}", name);
69//!     ModuleResponse::from_success(json!({ "message": message }))
70//! });
71//!
72//! let mut builder = PhlowBuilder::new();
73//! builder.settings_mut().download = false;
74//! let mut runtime = builder
75//!     .set_pipeline(pipeline)
76//!     .set_context(context)
77//!     .set_module("inline_echo", module)
78//!     .build()
79//!     .await
80//!     .unwrap();
81//!
82//! let result = runtime.run().await.unwrap();
83//! let _ = result;
84//! runtime.shutdown().await.unwrap();
85//! # });
86//! ```
87//!
88//! # Preprocess and run strings
89//!
90//! If you have a script string, preprocess it once and reuse the resulting value.
91//!
92//! ```no_run
93//! use phlow_engine::Context;
94//! use phlow_runtime::PhlowRuntime;
95//!
96//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
97//! let script = r#"
98//! steps:
99//!   - return: "ok"
100//! "#;
101//! let runtime = PhlowRuntime::new();
102//! let pipeline = runtime.preprocess_string(script).unwrap();
103//! let result = PhlowRuntime::run_preprocessed(pipeline, Context::new()).await.unwrap();
104//! let _ = result;
105//! # });
106//! ```
107//!
108//! You can also set the preprocessed value on a runtime to avoid preprocessing twice:
109//!
110//! ```no_run
111//! use phlow_engine::Context;
112//! use phlow_runtime::PhlowRuntime;
113//!
114//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
115//! let script = r#"
116//! steps:
117//!   - return: "ok"
118//! "#;
119//! let mut runtime = PhlowRuntime::new();
120//! let pipeline = runtime.preprocess_string(script).unwrap();
121//! runtime.set_preprocessed_pipeline(pipeline);
122//! runtime.set_context(Context::new());
123//! let result = runtime.run().await.unwrap();
124//! let _ = result;
125//! runtime.shutdown().await.unwrap();
126//! # });
127//! ```
128use crate::debug_server;
129use crate::inline_module::{InlineModules, PhlowModule};
130use crate::loader::Loader;
131use crate::loader::error::Error as LoaderError;
132use crate::preprocessor::preprocessor;
133use crate::runtime::Runtime;
134use crate::runtime::RuntimeError;
135use crate::settings::Settings;
136use crossbeam::channel;
137use phlow_engine::Context;
138use phlow_sdk::otel::{OtelGuard, init_tracing_subscriber};
139use phlow_sdk::prelude::{Array, Value};
140use phlow_sdk::structs::Package;
141use phlow_sdk::{tracing, use_log};
142use std::fmt::{Display, Formatter};
143use std::path::Path;
144use std::path::PathBuf;
145use std::sync::Arc;
146
147/// Errors returned by the runtime API.
148#[derive(Debug)]
149pub enum PhlowRuntimeError {
150    /// Pipeline was not provided.
151    MissingPipeline,
152    /// Failed to load the pipeline into a loader.
153    LoaderError(crate::loader::error::Error),
154    /// Failed to send a package to the runtime loop.
155    PackageSendError,
156    /// Response channel closed before a result arrived.
157    ResponseChannelClosed,
158    /// Preprocessor errors while expanding a script string.
159    PreprocessError(Vec<String>),
160    /// Failed to parse the preprocessed script into a value.
161    ScriptParseError(serde_yaml::Error),
162    /// Error reported by runtime execution.
163    RuntimeError(RuntimeError),
164    /// Join error from the runtime task.
165    RuntimeJoinError(tokio::task::JoinError),
166}
167
168impl Display for PhlowRuntimeError {
169    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
170        match self {
171            PhlowRuntimeError::MissingPipeline => write!(f, "Pipeline not set"),
172            PhlowRuntimeError::LoaderError(err) => write!(f, "Loader error: {}", err),
173            PhlowRuntimeError::PackageSendError => write!(f, "Failed to send package"),
174            PhlowRuntimeError::ResponseChannelClosed => write!(f, "Response channel closed"),
175            PhlowRuntimeError::PreprocessError(errs) => {
176                write!(f, "Preprocess error: {}", errs.join(", "))
177            }
178            PhlowRuntimeError::ScriptParseError(err) => write!(f, "Script parse error: {}", err),
179            PhlowRuntimeError::RuntimeError(err) => write!(f, "Runtime error: {}", err),
180            PhlowRuntimeError::RuntimeJoinError(err) => write!(f, "Runtime task error: {}", err),
181        }
182    }
183}
184
185impl std::error::Error for PhlowRuntimeError {}
186
187impl From<crate::loader::error::Error> for PhlowRuntimeError {
188    fn from(err: crate::loader::error::Error) -> Self {
189        PhlowRuntimeError::LoaderError(err)
190    }
191}
192
193impl From<RuntimeError> for PhlowRuntimeError {
194    fn from(err: RuntimeError) -> Self {
195        PhlowRuntimeError::RuntimeError(err)
196    }
197}
198
199fn preprocess_string_inner(
200    script: &str,
201    base_path: &Path,
202    print_yaml: bool,
203    print_output: crate::settings::PrintOutput,
204) -> Result<Value, PhlowRuntimeError> {
205    let processed = preprocessor(script, base_path, print_yaml, print_output)
206        .map_err(PhlowRuntimeError::PreprocessError)?;
207    let mut value: Value =
208        serde_yaml::from_str(&processed).map_err(PhlowRuntimeError::ScriptParseError)?;
209
210    if value.get("steps").is_none() {
211        return Err(PhlowRuntimeError::LoaderError(LoaderError::StepsNotDefined));
212    }
213
214    if let Some(modules) = value.get("modules") {
215        if !modules.is_array() {
216            return Err(PhlowRuntimeError::LoaderError(
217                LoaderError::ModuleLoaderError("Modules not an array".to_string()),
218            ));
219        }
220
221        value.insert("modules", modules.clone());
222    } else {
223        value.insert("modules", Value::Array(Array::new()));
224    }
225
226    Ok(value)
227}
228
229/// Prepared runtime that can execute an in-memory pipeline.
230pub struct PhlowRuntime {
231    pipeline: Option<Value>,
232    context: Option<Context>,
233    settings: Settings,
234    base_path: Option<PathBuf>,
235    dispatch: Option<tracing::Dispatch>,
236    inline_modules: InlineModules,
237    prepared: Option<PreparedRuntime>,
238}
239
240/// Builder for creating a prepared [`PhlowRuntime`].
241///
242/// Use this when you want a fluent API that returns a ready runtime.
243pub struct PhlowBuilder {
244    pipeline: Option<Value>,
245    context: Option<Context>,
246    settings: Settings,
247    base_path: Option<PathBuf>,
248    dispatch: Option<tracing::Dispatch>,
249    inline_modules: InlineModules,
250}
251
252impl Default for PhlowRuntime {
253    fn default() -> Self {
254        Self::new()
255    }
256}
257
258impl PhlowRuntime {
259    /// Create a new runtime with default settings.
260    ///
261    /// This sets `var_main` to a default value so non-main pipelines auto-start.
262    pub fn new() -> Self {
263        let mut settings = Settings::for_runtime();
264        if settings.var_main.is_none() {
265            settings.var_main = Some("__phlow_runtime__".to_string());
266        }
267
268        Self {
269            pipeline: None,
270            context: None,
271            settings,
272            base_path: None,
273            dispatch: None,
274            inline_modules: InlineModules::default(),
275            prepared: None,
276        }
277    }
278
279    /// Create a new runtime using explicit settings.
280    pub fn with_settings(settings: Settings) -> Self {
281        Self {
282            pipeline: None,
283            context: None,
284            settings,
285            base_path: None,
286            dispatch: None,
287            inline_modules: InlineModules::default(),
288            prepared: None,
289        }
290    }
291
292    /// Preprocess a phlow script string and parse it into a [`Value`].
293    ///
294    /// This uses the runtime settings for preprocessor options and the base path
295    /// (or the current directory) to resolve `!include`.
296    pub fn preprocess_string(&self, script: &str) -> Result<Value, PhlowRuntimeError> {
297        let base_path = self.base_path.clone().unwrap_or_else(|| {
298            std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
299        });
300
301        preprocess_string_inner(
302            script,
303            base_path.as_path(),
304            self.settings.print_yaml,
305            self.settings.print_output,
306        )
307    }
308
309    /// Set the pipeline to be executed.
310    ///
311    /// This clears any prepared runtime state.
312    pub fn set_pipeline(&mut self, pipeline: Value) -> &mut Self {
313        self.pipeline = Some(pipeline);
314        self.prepared = None;
315        self
316    }
317
318    /// Set the execution context.
319    ///
320    /// This clears any prepared runtime state.
321    pub fn set_context(&mut self, context: Context) -> &mut Self {
322        self.context = Some(context);
323        self.prepared = None;
324        self
325    }
326
327    /// Set a preprocessed pipeline to be executed.
328    ///
329    /// This does not run the preprocessor again.
330    ///
331    /// Use [`preprocess_string`](Self::preprocess_string) to turn a script string
332    /// into a preprocessed value before calling this.
333    pub fn set_preprocessed_pipeline(&mut self, pipeline: Value) -> &mut Self {
334        self.set_pipeline(pipeline)
335    }
336
337    /// Replace the runtime settings.
338    ///
339    /// This clears any prepared runtime state.
340    pub fn set_settings(&mut self, settings: Settings) -> &mut Self {
341        self.settings = settings;
342        self.prepared = None;
343        self
344    }
345
346    /// Set the base path used for resolving local module paths.
347    ///
348    /// This clears any prepared runtime state.
349    pub fn set_base_path<P: Into<PathBuf>>(&mut self, base_path: P) -> &mut Self {
350        self.base_path = Some(base_path.into());
351        self.prepared = None;
352        self
353    }
354
355    /// Provide a custom tracing dispatch instead of initializing OpenTelemetry.
356    ///
357    /// This clears any prepared runtime state.
358    pub fn set_dispatch(&mut self, dispatch: tracing::Dispatch) -> &mut Self {
359        self.dispatch = Some(dispatch);
360        self.prepared = None;
361        self
362    }
363
364    /// Register an inline module by name.
365    ///
366    /// The module must be declared in the pipeline `modules` list.
367    /// The handler runs asynchronously inside the runtime.
368    ///
369    /// This clears any prepared runtime state.
370    pub fn set_module<S: Into<String>>(&mut self, name: S, module: PhlowModule) -> &mut Self {
371        self.inline_modules.insert(name.into(), module);
372        self.prepared = None;
373        self
374    }
375
376    /// Read-only access to the current settings.
377    pub fn settings(&self) -> &Settings {
378        &self.settings
379    }
380
381    /// Mutable access to settings.
382    ///
383    /// This clears any prepared runtime state.
384    pub fn settings_mut(&mut self) -> &mut Settings {
385        self.prepared = None;
386        &mut self.settings
387    }
388
389    /// Build and prepare the runtime (load modules, tracing, and start loop).
390    ///
391    /// Calling this multiple times is safe; it is a no-op if already prepared.
392    pub async fn build(&mut self) -> Result<(), PhlowRuntimeError> {
393        if self.prepared.is_some() {
394            return Ok(());
395        }
396
397        use_log!();
398
399        let pipeline = self
400            .pipeline
401            .as_ref()
402            .ok_or(PhlowRuntimeError::MissingPipeline)?;
403
404        let base_path = self.base_path.clone().unwrap_or_else(|| {
405            std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
406        });
407
408        let mut loader = Loader::from_value(pipeline, Some(base_path.as_path()))?;
409
410        if self.settings.download {
411            loader
412                .download(&self.settings.default_package_repository_url)
413                .await?;
414        }
415
416        loader.update_info();
417
418        let mut guard: Option<OtelGuard> = None;
419        let dispatch = if let Some(dispatch) = self.dispatch.clone() {
420            dispatch
421        } else {
422            let next_guard = init_tracing_subscriber(loader.app_data.clone());
423            let dispatch = next_guard.dispatch.clone();
424            guard = Some(next_guard);
425            dispatch
426        };
427
428        let debug_enabled = std::env::var("PHLOW_DEBUG")
429            .map(|value| value.eq_ignore_ascii_case("true"))
430            .unwrap_or(false);
431        if debug_enabled {
432            let controller = Arc::new(phlow_engine::debug::DebugController::new());
433            match debug_server::spawn(controller.clone()).await {
434                Ok(()) => {
435                    if phlow_engine::debug::set_debug_controller(controller).is_err() {
436                        log::warn!("Debug controller already set");
437                    }
438                    log::info!("Phlow debug enabled");
439                }
440                Err(err) => {
441                    log::error!("Failed to start debug server: {}", err);
442                }
443            }
444        }
445
446        let context = self.context.clone().unwrap_or_else(Context::new);
447        let request_data = context.get_main();
448        let context_for_runtime = context.clone();
449        let auto_start = self.settings.var_main.is_some()
450            || loader.main == -1
451            || context.get_main().is_some();
452
453        let app_name = loader
454            .app_data
455            .name
456            .clone()
457            .unwrap_or_else(|| "phlow runtime".to_string());
458
459        let settings = self.settings.clone();
460        let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
461        let tx_for_runtime = tx_main_package.clone();
462        let dispatch_for_runtime = dispatch.clone();
463        let inline_modules = self.inline_modules.clone();
464
465        let runtime_handle = tokio::spawn(async move {
466            tracing::dispatcher::with_default(&dispatch_for_runtime, || {
467                Runtime::run_script_with_modules(
468                    tx_for_runtime,
469                    rx_main_package,
470                    loader,
471                    dispatch_for_runtime.clone(),
472                    settings,
473                    context_for_runtime,
474                    inline_modules,
475                )
476            })
477            .await
478        });
479
480        self.prepared = Some(PreparedRuntime {
481            tx_main_package,
482            dispatch,
483            runtime_handle,
484            guard,
485            app_name,
486            request_data,
487            auto_start,
488        });
489
490        Ok(())
491    }
492
493    /// Execute the pipeline and return its result.
494    ///
495    /// This can be called multiple times after [`build`](Self::build). When the
496    /// pipeline cannot auto-start (for example, a main module is present and
497    /// `var_main` is not set), this returns `Value::Undefined` and shuts down
498    /// the prepared runtime. For normal execution, call [`shutdown`](Self::shutdown)
499    /// when you are done to release resources.
500    pub async fn run(&mut self) -> Result<Value, PhlowRuntimeError> {
501        self.build().await?;
502
503        let auto_start = match self.prepared.as_ref() {
504            Some(prepared) => prepared.auto_start,
505            None => return Err(PhlowRuntimeError::MissingPipeline),
506        };
507
508        if !auto_start {
509            self.shutdown().await?;
510            return Ok(Value::Undefined);
511        }
512
513        let (tx_main_package, dispatch, app_name, request_data) = match self.prepared.as_ref() {
514            Some(prepared) => (
515                prepared.tx_main_package.clone(),
516                prepared.dispatch.clone(),
517                prepared.app_name.clone(),
518                prepared.request_data.clone(),
519            ),
520            None => return Err(PhlowRuntimeError::MissingPipeline),
521        };
522
523        let (response_tx, response_rx) = tokio::sync::oneshot::channel::<Value>();
524        let package = tracing::dispatcher::with_default(&dispatch, || {
525            let span = tracing::span!(
526                tracing::Level::INFO,
527                "phlow_run",
528                otel.name = app_name.as_str()
529            );
530
531            Package {
532                response: Some(response_tx),
533                request_data,
534                origin: 0,
535                span: Some(span),
536                dispatch: Some(dispatch.clone()),
537            }
538        });
539
540        if tx_main_package.send(package).is_err() {
541            return Err(PhlowRuntimeError::PackageSendError);
542        }
543
544        let result = response_rx
545            .await
546            .map_err(|_| PhlowRuntimeError::ResponseChannelClosed)?;
547
548        Ok(result)
549    }
550
551    /// Execute a preprocessed pipeline in one call and return its result.
552    ///
553    /// This creates a new runtime with default settings, runs the pipeline,
554    /// and shuts down the runtime before returning.
555    pub async fn run_preprocessed(
556        pipeline: Value,
557        context: Context,
558    ) -> Result<Value, PhlowRuntimeError> {
559        let mut runtime = PhlowRuntime::new();
560        runtime.set_preprocessed_pipeline(pipeline);
561        runtime.set_context(context);
562        let result = runtime.run().await?;
563        runtime.shutdown().await?;
564        Ok(result)
565    }
566
567    /// Shut down the prepared runtime and release resources.
568    ///
569    /// Call this when you are done reusing the runtime to close channels,
570    /// wait for the runtime task, and flush tracing providers.
571    pub async fn shutdown(&mut self) -> Result<(), PhlowRuntimeError> {
572        let prepared = match self.prepared.take() {
573            Some(prepared) => prepared,
574            None => return Ok(()),
575        };
576
577        drop(prepared.tx_main_package);
578
579        let runtime_result = prepared
580            .runtime_handle
581            .await
582            .map_err(PhlowRuntimeError::RuntimeJoinError)?;
583        runtime_result?;
584
585        drop(prepared.guard);
586
587        Ok(())
588    }
589}
590
591impl PhlowBuilder {
592    /// Create a new builder with default settings.
593    ///
594    /// This sets `var_main` to a default value so non-main pipelines auto-start.
595    pub fn new() -> Self {
596        let mut settings = Settings::for_runtime();
597        if settings.var_main.is_none() {
598            settings.var_main = Some("__phlow_runtime__".to_string());
599        }
600
601        Self {
602            pipeline: None,
603            context: None,
604            settings,
605            base_path: None,
606            dispatch: None,
607            inline_modules: InlineModules::default(),
608        }
609    }
610
611    /// Create a new builder using explicit settings.
612    pub fn with_settings(settings: Settings) -> Self {
613        Self {
614            pipeline: None,
615            context: None,
616            settings,
617            base_path: None,
618            dispatch: None,
619            inline_modules: InlineModules::default(),
620        }
621    }
622
623    /// Preprocess a phlow script string and parse it into a [`Value`].
624    ///
625    /// This uses the builder settings for preprocessor options and the base path
626    /// (or the current directory) to resolve `!include`.
627    pub fn preprocess_string(&self, script: &str) -> Result<Value, PhlowRuntimeError> {
628        let base_path = self.base_path.clone().unwrap_or_else(|| {
629            std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
630        });
631
632        preprocess_string_inner(
633            script,
634            base_path.as_path(),
635            self.settings.print_yaml,
636            self.settings.print_output,
637        )
638    }
639
640    /// Set the pipeline to be executed.
641    ///
642    /// Returns the builder for chaining.
643    pub fn set_pipeline(mut self, pipeline: Value) -> Self {
644        self.pipeline = Some(pipeline);
645        self
646    }
647
648    /// Set a preprocessed pipeline to be executed.
649    ///
650    /// Returns the builder for chaining.
651    pub fn set_preprocessed_pipeline(mut self, pipeline: Value) -> Self {
652        self.pipeline = Some(pipeline);
653        self
654    }
655
656    /// Set the execution context.
657    ///
658    /// Returns the builder for chaining.
659    pub fn set_context(mut self, context: Context) -> Self {
660        self.context = Some(context);
661        self
662    }
663
664    /// Replace the runtime settings.
665    ///
666    /// Returns the builder for chaining.
667    pub fn set_settings(mut self, settings: Settings) -> Self {
668        self.settings = settings;
669        self
670    }
671
672    /// Set the base path used for resolving local module paths.
673    ///
674    /// Returns the builder for chaining.
675    pub fn set_base_path<P: Into<PathBuf>>(mut self, base_path: P) -> Self {
676        self.base_path = Some(base_path.into());
677        self
678    }
679
680    /// Provide a custom tracing dispatch instead of initializing OpenTelemetry.
681    ///
682    /// Returns the builder for chaining.
683    pub fn set_dispatch(mut self, dispatch: tracing::Dispatch) -> Self {
684        self.dispatch = Some(dispatch);
685        self
686    }
687
688    /// Register an inline module by name.
689    ///
690    /// The module must be declared in the pipeline `modules` list.
691    /// The handler runs asynchronously inside the runtime.
692    ///
693    /// Returns the builder for chaining.
694    pub fn set_module<S: Into<String>>(mut self, name: S, module: PhlowModule) -> Self {
695        self.inline_modules.insert(name.into(), module);
696        self
697    }
698
699    /// Read-only access to the current settings.
700    pub fn settings(&self) -> &Settings {
701        &self.settings
702    }
703
704    /// Mutable access to settings.
705    pub fn settings_mut(&mut self) -> &mut Settings {
706        &mut self.settings
707    }
708
709    /// Build and return a prepared [`PhlowRuntime`].
710    ///
711    /// This consumes the builder and prepares the runtime for execution.
712    pub async fn build(mut self) -> Result<PhlowRuntime, PhlowRuntimeError> {
713        let mut runtime = PhlowRuntime::with_settings(self.settings);
714        runtime.inline_modules = self.inline_modules;
715
716        if let Some(pipeline) = self.pipeline.take() {
717            runtime.set_pipeline(pipeline);
718        }
719
720        if let Some(context) = self.context.take() {
721            runtime.set_context(context);
722        }
723
724        if let Some(base_path) = self.base_path.take() {
725            runtime.set_base_path(base_path);
726        }
727
728        if let Some(dispatch) = self.dispatch.take() {
729            runtime.set_dispatch(dispatch);
730        }
731
732        runtime.build().await?;
733        Ok(runtime)
734    }
735}
736
737impl Default for PhlowBuilder {
738    fn default() -> Self {
739        Self::new()
740    }
741}
742
743struct PreparedRuntime {
744    tx_main_package: channel::Sender<Package>,
745    dispatch: tracing::Dispatch,
746    runtime_handle: tokio::task::JoinHandle<Result<(), RuntimeError>>,
747    guard: Option<OtelGuard>,
748    app_name: String,
749    request_data: Option<Value>,
750    auto_start: bool,
751}