Skip to main content

phlow_runtime/
runtime_api.rs

1//! Phlow runtime API for in-memory pipelines.
2//!
3//! # Example
4//!
5//! ```no_run
6//! use phlow_engine::Context;
7//! use phlow_runtime::PhlowBuilder;
8//! use phlow_sdk::prelude::json;
9//!
10//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
11//! let pipeline = json!({
12//!     "steps": [
13//!         { "payload": "{{ main.name }}" }
14//!     ]
15//! });
16//! let context = Context::from_main(json!({ "name": "Phlow" }));
17//!
18//! let mut builder = PhlowBuilder::new();
19//! builder.settings_mut().download = false;
20//! let mut runtime = builder
21//!     .set_pipeline(pipeline)
22//!     .set_context(context)
23//!     .build()
24//!     .await
25//!     .unwrap();
26//!
27//! let result = runtime.run().await.unwrap();
28//! let _ = result;
29//! runtime.shutdown().await.unwrap();
30//! # });
31//! ```
32//!
33//! # Inline modules
34//!
35//! You can register inline modules with async handlers that run in-process.
36//! The module name must be declared in the pipeline `modules` list.
37//!
38//! ```no_run
39//! use phlow_engine::Context;
40//! use phlow_runtime::{PhlowBuilder, PhlowModule, PhlowModuleSchema};
41//! use phlow_sdk::prelude::json;
42//! use phlow_sdk::structs::ModuleResponse;
43//!
44//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
45//! let pipeline = json!({
46//!     "modules": [
47//!         { "module": "inline_echo", "name": "inline_echo" }
48//!     ],
49//!     "steps": [
50//!         { "use": "inline_echo", "input": { "name": "{{ main.name }}" } },
51//!         { "payload": "{{ payload.message }}" }
52//!     ]
53//! });
54//! let context = Context::from_main(json!({ "name": "Phlow" }));
55//!
56//! let mut module = PhlowModule::new();
57//! module.set_schema(
58//!     PhlowModuleSchema::new()
59//!         .with_input(json!({ "name": "string" }))
60//!         .with_output(json!({ "message": "string" }))
61//!         .with_input_order(vec!["name"]),
62//! );
63//! module.set_handler(|request| async move {
64//!     let name = request
65//!         .input
66//!         .and_then(|value| value.get("name").cloned())
67//!         .unwrap_or_else(|| json!("unknown"));
68//!     let message = format!("Hello, {}", name);
69//!     ModuleResponse::from_success(json!({ "message": message }))
70//! });
71//!
72//! let mut builder = PhlowBuilder::new();
73//! builder.settings_mut().download = false;
74//! let mut runtime = builder
75//!     .set_pipeline(pipeline)
76//!     .set_context(context)
77//!     .set_module("inline_echo", module)
78//!     .build()
79//!     .await
80//!     .unwrap();
81//!
82//! let result = runtime.run().await.unwrap();
83//! let _ = result;
84//! runtime.shutdown().await.unwrap();
85//! # });
86//! ```
87use crate::debug_server;
88use crate::inline_module::{InlineModules, PhlowModule};
89use crate::loader::Loader;
90use crate::runtime::Runtime;
91use crate::runtime::RuntimeError;
92use crate::settings::Settings;
93use crossbeam::channel;
94use phlow_engine::Context;
95use phlow_sdk::otel::{OtelGuard, init_tracing_subscriber};
96use phlow_sdk::prelude::Value;
97use phlow_sdk::structs::Package;
98use phlow_sdk::{tracing, use_log};
99use std::fmt::{Display, Formatter};
100use std::path::PathBuf;
101use std::sync::Arc;
102
103/// Errors returned by the runtime API.
104#[derive(Debug)]
105pub enum PhlowRuntimeError {
106    /// Pipeline was not provided.
107    MissingPipeline,
108    /// Failed to load the pipeline into a loader.
109    LoaderError(crate::loader::error::Error),
110    /// Failed to send a package to the runtime loop.
111    PackageSendError,
112    /// Response channel closed before a result arrived.
113    ResponseChannelClosed,
114    /// Error reported by runtime execution.
115    RuntimeError(RuntimeError),
116    /// Join error from the runtime task.
117    RuntimeJoinError(tokio::task::JoinError),
118}
119
120impl Display for PhlowRuntimeError {
121    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
122        match self {
123            PhlowRuntimeError::MissingPipeline => write!(f, "Pipeline not set"),
124            PhlowRuntimeError::LoaderError(err) => write!(f, "Loader error: {}", err),
125            PhlowRuntimeError::PackageSendError => write!(f, "Failed to send package"),
126            PhlowRuntimeError::ResponseChannelClosed => write!(f, "Response channel closed"),
127            PhlowRuntimeError::RuntimeError(err) => write!(f, "Runtime error: {}", err),
128            PhlowRuntimeError::RuntimeJoinError(err) => write!(f, "Runtime task error: {}", err),
129        }
130    }
131}
132
133impl std::error::Error for PhlowRuntimeError {}
134
135impl From<crate::loader::error::Error> for PhlowRuntimeError {
136    fn from(err: crate::loader::error::Error) -> Self {
137        PhlowRuntimeError::LoaderError(err)
138    }
139}
140
141impl From<RuntimeError> for PhlowRuntimeError {
142    fn from(err: RuntimeError) -> Self {
143        PhlowRuntimeError::RuntimeError(err)
144    }
145}
146
147/// Prepared runtime that can execute an in-memory pipeline.
148pub struct PhlowRuntime {
149    pipeline: Option<Value>,
150    context: Option<Context>,
151    settings: Settings,
152    base_path: Option<PathBuf>,
153    dispatch: Option<tracing::Dispatch>,
154    inline_modules: InlineModules,
155    prepared: Option<PreparedRuntime>,
156}
157
158/// Builder for creating a prepared [`PhlowRuntime`].
159///
160/// Use this when you want a fluent API that returns a ready runtime.
161pub struct PhlowBuilder {
162    pipeline: Option<Value>,
163    context: Option<Context>,
164    settings: Settings,
165    base_path: Option<PathBuf>,
166    dispatch: Option<tracing::Dispatch>,
167    inline_modules: InlineModules,
168}
169
170impl Default for PhlowRuntime {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176impl PhlowRuntime {
177    /// Create a new runtime with default settings.
178    ///
179    /// This sets `var_main` to a default value so non-main pipelines auto-start.
180    pub fn new() -> Self {
181        let mut settings = Settings::for_runtime();
182        if settings.var_main.is_none() {
183            settings.var_main = Some("__phlow_runtime__".to_string());
184        }
185
186        Self {
187            pipeline: None,
188            context: None,
189            settings,
190            base_path: None,
191            dispatch: None,
192            inline_modules: InlineModules::default(),
193            prepared: None,
194        }
195    }
196
197    /// Create a new runtime using explicit settings.
198    pub fn with_settings(settings: Settings) -> Self {
199        Self {
200            pipeline: None,
201            context: None,
202            settings,
203            base_path: None,
204            dispatch: None,
205            inline_modules: InlineModules::default(),
206            prepared: None,
207        }
208    }
209
210    /// Set the pipeline to be executed.
211    ///
212    /// This clears any prepared runtime state.
213    pub fn set_pipeline(&mut self, pipeline: Value) -> &mut Self {
214        self.pipeline = Some(pipeline);
215        self.prepared = None;
216        self
217    }
218
219    /// Set the execution context.
220    ///
221    /// This clears any prepared runtime state.
222    pub fn set_context(&mut self, context: Context) -> &mut Self {
223        self.context = Some(context);
224        self.prepared = None;
225        self
226    }
227
228    /// Replace the runtime settings.
229    ///
230    /// This clears any prepared runtime state.
231    pub fn set_settings(&mut self, settings: Settings) -> &mut Self {
232        self.settings = settings;
233        self.prepared = None;
234        self
235    }
236
237    /// Set the base path used for resolving local module paths.
238    ///
239    /// This clears any prepared runtime state.
240    pub fn set_base_path<P: Into<PathBuf>>(&mut self, base_path: P) -> &mut Self {
241        self.base_path = Some(base_path.into());
242        self.prepared = None;
243        self
244    }
245
246    /// Provide a custom tracing dispatch instead of initializing OpenTelemetry.
247    ///
248    /// This clears any prepared runtime state.
249    pub fn set_dispatch(&mut self, dispatch: tracing::Dispatch) -> &mut Self {
250        self.dispatch = Some(dispatch);
251        self.prepared = None;
252        self
253    }
254
255    /// Register an inline module by name.
256    ///
257    /// The module must be declared in the pipeline `modules` list.
258    /// The handler runs asynchronously inside the runtime.
259    ///
260    /// This clears any prepared runtime state.
261    pub fn set_module<S: Into<String>>(&mut self, name: S, module: PhlowModule) -> &mut Self {
262        self.inline_modules.insert(name.into(), module);
263        self.prepared = None;
264        self
265    }
266
267    /// Read-only access to the current settings.
268    pub fn settings(&self) -> &Settings {
269        &self.settings
270    }
271
272    /// Mutable access to settings.
273    ///
274    /// This clears any prepared runtime state.
275    pub fn settings_mut(&mut self) -> &mut Settings {
276        self.prepared = None;
277        &mut self.settings
278    }
279
280    /// Build and prepare the runtime (load modules, tracing, and start loop).
281    ///
282    /// Calling this multiple times is safe; it is a no-op if already prepared.
283    pub async fn build(&mut self) -> Result<(), PhlowRuntimeError> {
284        if self.prepared.is_some() {
285            return Ok(());
286        }
287
288        use_log!();
289
290        let pipeline = self
291            .pipeline
292            .as_ref()
293            .ok_or(PhlowRuntimeError::MissingPipeline)?;
294
295        let base_path = self.base_path.clone().unwrap_or_else(|| {
296            std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
297        });
298
299        let mut loader = Loader::from_value(pipeline, Some(base_path.as_path()))?;
300
301        if self.settings.download {
302            loader
303                .download(&self.settings.default_package_repository_url)
304                .await?;
305        }
306
307        loader.update_info();
308
309        let mut guard: Option<OtelGuard> = None;
310        let dispatch = if let Some(dispatch) = self.dispatch.clone() {
311            dispatch
312        } else {
313            let next_guard = init_tracing_subscriber(loader.app_data.clone());
314            let dispatch = next_guard.dispatch.clone();
315            guard = Some(next_guard);
316            dispatch
317        };
318
319        let debug_enabled = std::env::var("PHLOW_DEBUG")
320            .map(|value| value.eq_ignore_ascii_case("true"))
321            .unwrap_or(false);
322        if debug_enabled {
323            let controller = Arc::new(phlow_engine::debug::DebugController::new());
324            match debug_server::spawn(controller.clone()).await {
325                Ok(()) => {
326                    if phlow_engine::debug::set_debug_controller(controller).is_err() {
327                        log::warn!("Debug controller already set");
328                    }
329                    log::info!("Phlow debug enabled");
330                }
331                Err(err) => {
332                    log::error!("Failed to start debug server: {}", err);
333                }
334            }
335        }
336
337        let context = self.context.clone().unwrap_or_else(Context::new);
338        let request_data = context.get_main();
339        let context_for_runtime = context.clone();
340        let auto_start = self.settings.var_main.is_some()
341            || loader.main == -1
342            || context.get_main().is_some();
343
344        let app_name = loader
345            .app_data
346            .name
347            .clone()
348            .unwrap_or_else(|| "phlow runtime".to_string());
349
350        let settings = self.settings.clone();
351        let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
352        let tx_for_runtime = tx_main_package.clone();
353        let dispatch_for_runtime = dispatch.clone();
354        let inline_modules = self.inline_modules.clone();
355
356        let runtime_handle = tokio::spawn(async move {
357            tracing::dispatcher::with_default(&dispatch_for_runtime, || {
358                Runtime::run_script_with_modules(
359                    tx_for_runtime,
360                    rx_main_package,
361                    loader,
362                    dispatch_for_runtime.clone(),
363                    settings,
364                    context_for_runtime,
365                    inline_modules,
366                )
367            })
368            .await
369        });
370
371        self.prepared = Some(PreparedRuntime {
372            tx_main_package,
373            dispatch,
374            runtime_handle,
375            guard,
376            app_name,
377            request_data,
378            auto_start,
379        });
380
381        Ok(())
382    }
383
384    /// Execute the pipeline and return its result.
385    ///
386    /// This can be called multiple times after [`build`](Self::build). When the
387    /// pipeline cannot auto-start (for example, a main module is present and
388    /// `var_main` is not set), this returns `Value::Undefined` and shuts down
389    /// the prepared runtime. For normal execution, call [`shutdown`](Self::shutdown)
390    /// when you are done to release resources.
391    pub async fn run(&mut self) -> Result<Value, PhlowRuntimeError> {
392        self.build().await?;
393
394        let auto_start = match self.prepared.as_ref() {
395            Some(prepared) => prepared.auto_start,
396            None => return Err(PhlowRuntimeError::MissingPipeline),
397        };
398
399        if !auto_start {
400            self.shutdown().await?;
401            return Ok(Value::Undefined);
402        }
403
404        let (tx_main_package, dispatch, app_name, request_data) = match self.prepared.as_ref() {
405            Some(prepared) => (
406                prepared.tx_main_package.clone(),
407                prepared.dispatch.clone(),
408                prepared.app_name.clone(),
409                prepared.request_data.clone(),
410            ),
411            None => return Err(PhlowRuntimeError::MissingPipeline),
412        };
413
414        let (response_tx, response_rx) = tokio::sync::oneshot::channel::<Value>();
415        let package = tracing::dispatcher::with_default(&dispatch, || {
416            let span = tracing::span!(
417                tracing::Level::INFO,
418                "phlow_run",
419                otel.name = app_name.as_str()
420            );
421
422            Package {
423                response: Some(response_tx),
424                request_data,
425                origin: 0,
426                span: Some(span),
427                dispatch: Some(dispatch.clone()),
428            }
429        });
430
431        if tx_main_package.send(package).is_err() {
432            return Err(PhlowRuntimeError::PackageSendError);
433        }
434
435        let result = response_rx
436            .await
437            .map_err(|_| PhlowRuntimeError::ResponseChannelClosed)?;
438
439        Ok(result)
440    }
441
442    /// Shut down the prepared runtime and release resources.
443    ///
444    /// Call this when you are done reusing the runtime to close channels,
445    /// wait for the runtime task, and flush tracing providers.
446    pub async fn shutdown(&mut self) -> Result<(), PhlowRuntimeError> {
447        let prepared = match self.prepared.take() {
448            Some(prepared) => prepared,
449            None => return Ok(()),
450        };
451
452        drop(prepared.tx_main_package);
453
454        let runtime_result = prepared
455            .runtime_handle
456            .await
457            .map_err(PhlowRuntimeError::RuntimeJoinError)?;
458        runtime_result?;
459
460        drop(prepared.guard);
461
462        Ok(())
463    }
464}
465
466impl PhlowBuilder {
467    /// Create a new builder with default settings.
468    ///
469    /// This sets `var_main` to a default value so non-main pipelines auto-start.
470    pub fn new() -> Self {
471        let mut settings = Settings::for_runtime();
472        if settings.var_main.is_none() {
473            settings.var_main = Some("__phlow_runtime__".to_string());
474        }
475
476        Self {
477            pipeline: None,
478            context: None,
479            settings,
480            base_path: None,
481            dispatch: None,
482            inline_modules: InlineModules::default(),
483        }
484    }
485
486    /// Create a new builder using explicit settings.
487    pub fn with_settings(settings: Settings) -> Self {
488        Self {
489            pipeline: None,
490            context: None,
491            settings,
492            base_path: None,
493            dispatch: None,
494            inline_modules: InlineModules::default(),
495        }
496    }
497
498    /// Set the pipeline to be executed.
499    ///
500    /// Returns the builder for chaining.
501    pub fn set_pipeline(mut self, pipeline: Value) -> Self {
502        self.pipeline = Some(pipeline);
503        self
504    }
505
506    /// Set the execution context.
507    ///
508    /// Returns the builder for chaining.
509    pub fn set_context(mut self, context: Context) -> Self {
510        self.context = Some(context);
511        self
512    }
513
514    /// Replace the runtime settings.
515    ///
516    /// Returns the builder for chaining.
517    pub fn set_settings(mut self, settings: Settings) -> Self {
518        self.settings = settings;
519        self
520    }
521
522    /// Set the base path used for resolving local module paths.
523    ///
524    /// Returns the builder for chaining.
525    pub fn set_base_path<P: Into<PathBuf>>(mut self, base_path: P) -> Self {
526        self.base_path = Some(base_path.into());
527        self
528    }
529
530    /// Provide a custom tracing dispatch instead of initializing OpenTelemetry.
531    ///
532    /// Returns the builder for chaining.
533    pub fn set_dispatch(mut self, dispatch: tracing::Dispatch) -> Self {
534        self.dispatch = Some(dispatch);
535        self
536    }
537
538    /// Register an inline module by name.
539    ///
540    /// The module must be declared in the pipeline `modules` list.
541    /// The handler runs asynchronously inside the runtime.
542    ///
543    /// Returns the builder for chaining.
544    pub fn set_module<S: Into<String>>(mut self, name: S, module: PhlowModule) -> Self {
545        self.inline_modules.insert(name.into(), module);
546        self
547    }
548
549    /// Read-only access to the current settings.
550    pub fn settings(&self) -> &Settings {
551        &self.settings
552    }
553
554    /// Mutable access to settings.
555    pub fn settings_mut(&mut self) -> &mut Settings {
556        &mut self.settings
557    }
558
559    /// Build and return a prepared [`PhlowRuntime`].
560    ///
561    /// This consumes the builder and prepares the runtime for execution.
562    pub async fn build(mut self) -> Result<PhlowRuntime, PhlowRuntimeError> {
563        let mut runtime = PhlowRuntime::with_settings(self.settings);
564        runtime.inline_modules = self.inline_modules;
565
566        if let Some(pipeline) = self.pipeline.take() {
567            runtime.set_pipeline(pipeline);
568        }
569
570        if let Some(context) = self.context.take() {
571            runtime.set_context(context);
572        }
573
574        if let Some(base_path) = self.base_path.take() {
575            runtime.set_base_path(base_path);
576        }
577
578        if let Some(dispatch) = self.dispatch.take() {
579            runtime.set_dispatch(dispatch);
580        }
581
582        runtime.build().await?;
583        Ok(runtime)
584    }
585}
586
587impl Default for PhlowBuilder {
588    fn default() -> Self {
589        Self::new()
590    }
591}
592
593struct PreparedRuntime {
594    tx_main_package: channel::Sender<Package>,
595    dispatch: tracing::Dispatch,
596    runtime_handle: tokio::task::JoinHandle<Result<(), RuntimeError>>,
597    guard: Option<OtelGuard>,
598    app_name: String,
599    request_data: Option<Value>,
600    auto_start: bool,
601}