use crate::debug_server;
use crate::inline_module::{InlineModules, PhlowModule};
use crate::loader::Loader;
use crate::loader::error::Error as LoaderError;
use crate::preprocessor::preprocessor;
use crate::runtime::Runtime;
use crate::runtime::RuntimeError;
use crate::settings::Settings;
use crossbeam::channel;
use phlow_engine::Context;
use phlow_sdk::otel::{OtelGuard, init_tracing_subscriber};
use phlow_sdk::prelude::{Array, Value};
use phlow_sdk::structs::Package;
use phlow_sdk::{tracing, use_log};
use std::fmt::{Display, Formatter};
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
#[derive(Debug)]
pub enum PhlowRuntimeError {
MissingPipeline,
LoaderError(crate::loader::error::Error),
PackageSendError,
ResponseChannelClosed,
PreprocessError(Vec<String>),
ScriptParseError(serde_yaml::Error),
RuntimeError(RuntimeError),
RuntimeJoinError(tokio::task::JoinError),
}
impl Display for PhlowRuntimeError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
PhlowRuntimeError::MissingPipeline => write!(f, "Pipeline not set"),
PhlowRuntimeError::LoaderError(err) => write!(f, "Loader error: {}", err),
PhlowRuntimeError::PackageSendError => write!(f, "Failed to send package"),
PhlowRuntimeError::ResponseChannelClosed => write!(f, "Response channel closed"),
PhlowRuntimeError::PreprocessError(errs) => {
write!(f, "Preprocess error: {}", errs.join(", "))
}
PhlowRuntimeError::ScriptParseError(err) => write!(f, "Script parse error: {}", err),
PhlowRuntimeError::RuntimeError(err) => write!(f, "Runtime error: {}", err),
PhlowRuntimeError::RuntimeJoinError(err) => write!(f, "Runtime task error: {}", err),
}
}
}
impl std::error::Error for PhlowRuntimeError {}
impl From<crate::loader::error::Error> for PhlowRuntimeError {
fn from(err: crate::loader::error::Error) -> Self {
PhlowRuntimeError::LoaderError(err)
}
}
impl From<RuntimeError> for PhlowRuntimeError {
fn from(err: RuntimeError) -> Self {
PhlowRuntimeError::RuntimeError(err)
}
}
fn preprocess_string_inner(
script: &str,
base_path: &Path,
print_yaml: bool,
print_output: crate::settings::PrintOutput,
) -> Result<Value, PhlowRuntimeError> {
let processed = preprocessor(script, base_path, print_yaml, print_output)
.map_err(PhlowRuntimeError::PreprocessError)?;
let mut value: Value =
serde_yaml::from_str(&processed).map_err(PhlowRuntimeError::ScriptParseError)?;
if value.get("steps").is_none() {
return Err(PhlowRuntimeError::LoaderError(LoaderError::StepsNotDefined));
}
if let Some(modules) = value.get("modules") {
if !modules.is_array() {
return Err(PhlowRuntimeError::LoaderError(
LoaderError::ModuleLoaderError("Modules not an array".to_string()),
));
}
value.insert("modules", modules.clone());
} else {
value.insert("modules", Value::Array(Array::new()));
}
Ok(value)
}
pub struct PhlowRuntime {
pipeline: Option<Value>,
context: Option<Context>,
settings: Settings,
base_path: Option<PathBuf>,
dispatch: Option<tracing::Dispatch>,
inline_modules: InlineModules,
prepared: Option<PreparedRuntime>,
}
pub struct PhlowBuilder {
pipeline: Option<Value>,
context: Option<Context>,
settings: Settings,
base_path: Option<PathBuf>,
dispatch: Option<tracing::Dispatch>,
inline_modules: InlineModules,
}
impl Default for PhlowRuntime {
fn default() -> Self {
Self::new()
}
}
impl PhlowRuntime {
pub fn new() -> Self {
let mut settings = Settings::for_runtime();
if settings.var_main.is_none() {
settings.var_main = Some("__phlow_runtime__".to_string());
}
Self {
pipeline: None,
context: None,
settings,
base_path: None,
dispatch: None,
inline_modules: InlineModules::default(),
prepared: None,
}
}
pub fn with_settings(settings: Settings) -> Self {
Self {
pipeline: None,
context: None,
settings,
base_path: None,
dispatch: None,
inline_modules: InlineModules::default(),
prepared: None,
}
}
pub fn preprocess_string(&self, script: &str) -> Result<Value, PhlowRuntimeError> {
let base_path = self.base_path.clone().unwrap_or_else(|| {
std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
});
preprocess_string_inner(
script,
base_path.as_path(),
self.settings.print_yaml,
self.settings.print_output,
)
}
pub fn set_pipeline(&mut self, pipeline: Value) -> &mut Self {
self.pipeline = Some(pipeline);
self.prepared = None;
self
}
pub fn set_context(&mut self, context: Context) -> &mut Self {
self.context = Some(context);
self.prepared = None;
self
}
pub fn set_preprocessed_pipeline(&mut self, pipeline: Value) -> &mut Self {
self.set_pipeline(pipeline)
}
pub fn set_settings(&mut self, settings: Settings) -> &mut Self {
self.settings = settings;
self.prepared = None;
self
}
pub fn set_base_path<P: Into<PathBuf>>(&mut self, base_path: P) -> &mut Self {
self.base_path = Some(base_path.into());
self.prepared = None;
self
}
pub fn set_dispatch(&mut self, dispatch: tracing::Dispatch) -> &mut Self {
self.dispatch = Some(dispatch);
self.prepared = None;
self
}
pub fn set_module<S: Into<String>>(&mut self, name: S, module: PhlowModule) -> &mut Self {
self.inline_modules.insert(name.into(), module);
self.prepared = None;
self
}
pub fn settings(&self) -> &Settings {
&self.settings
}
pub fn settings_mut(&mut self) -> &mut Settings {
self.prepared = None;
&mut self.settings
}
pub async fn build(&mut self) -> Result<(), PhlowRuntimeError> {
if self.prepared.is_some() {
return Ok(());
}
use_log!();
let pipeline = self
.pipeline
.as_ref()
.ok_or(PhlowRuntimeError::MissingPipeline)?;
let base_path = self.base_path.clone().unwrap_or_else(|| {
std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
});
let mut loader = Loader::from_value(pipeline, Some(base_path.as_path()))?;
if self.settings.download {
loader
.download(&self.settings.default_package_repository_url)
.await?;
}
loader.update_info();
let mut guard: Option<OtelGuard> = None;
let dispatch = if let Some(dispatch) = self.dispatch.clone() {
dispatch
} else {
let next_guard = init_tracing_subscriber(loader.app_data.clone());
let dispatch = next_guard.dispatch.clone();
guard = Some(next_guard);
dispatch
};
let debug_enabled = std::env::var("PHLOW_DEBUG")
.map(|value| value.eq_ignore_ascii_case("true"))
.unwrap_or(false);
if debug_enabled {
let controller = Arc::new(phlow_engine::debug::DebugController::new());
match debug_server::spawn(controller.clone()).await {
Ok(()) => {
if phlow_engine::debug::set_debug_controller(controller).is_err() {
log::warn!("Debug controller already set");
}
log::info!("Phlow debug enabled");
}
Err(err) => {
log::error!("Failed to start debug server: {}", err);
}
}
}
let context = self.context.clone().unwrap_or_else(Context::new);
let request_data = context.get_main();
let context_for_runtime = context.clone();
let auto_start = self.settings.var_main.is_some()
|| loader.main == -1
|| context.get_main().is_some();
let app_name = loader
.app_data
.name
.clone()
.unwrap_or_else(|| "phlow runtime".to_string());
let settings = self.settings.clone();
let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
let tx_for_runtime = tx_main_package.clone();
let dispatch_for_runtime = dispatch.clone();
let inline_modules = self.inline_modules.clone();
let runtime_handle = tokio::spawn(async move {
tracing::dispatcher::with_default(&dispatch_for_runtime, || {
Runtime::run_script_with_modules(
tx_for_runtime,
rx_main_package,
loader,
dispatch_for_runtime.clone(),
settings,
context_for_runtime,
inline_modules,
)
})
.await
});
self.prepared = Some(PreparedRuntime {
tx_main_package,
dispatch,
runtime_handle,
guard,
app_name,
request_data,
auto_start,
});
Ok(())
}
pub async fn run(&mut self) -> Result<Value, PhlowRuntimeError> {
self.build().await?;
let auto_start = match self.prepared.as_ref() {
Some(prepared) => prepared.auto_start,
None => return Err(PhlowRuntimeError::MissingPipeline),
};
if !auto_start {
self.shutdown().await?;
return Ok(Value::Undefined);
}
let (tx_main_package, dispatch, app_name, request_data) = match self.prepared.as_ref() {
Some(prepared) => (
prepared.tx_main_package.clone(),
prepared.dispatch.clone(),
prepared.app_name.clone(),
prepared.request_data.clone(),
),
None => return Err(PhlowRuntimeError::MissingPipeline),
};
let (response_tx, response_rx) = tokio::sync::oneshot::channel::<Value>();
let package = tracing::dispatcher::with_default(&dispatch, || {
let span = tracing::span!(
tracing::Level::INFO,
"phlow_run",
otel.name = app_name.as_str()
);
Package {
response: Some(response_tx),
request_data,
origin: 0,
span: Some(span),
dispatch: Some(dispatch.clone()),
}
});
if tx_main_package.send(package).is_err() {
return Err(PhlowRuntimeError::PackageSendError);
}
let result = response_rx
.await
.map_err(|_| PhlowRuntimeError::ResponseChannelClosed)?;
Ok(result)
}
pub async fn run_preprocessed(
pipeline: Value,
context: Context,
) -> Result<Value, PhlowRuntimeError> {
let mut runtime = PhlowRuntime::new();
runtime.set_preprocessed_pipeline(pipeline);
runtime.set_context(context);
let result = runtime.run().await?;
runtime.shutdown().await?;
Ok(result)
}
pub async fn shutdown(&mut self) -> Result<(), PhlowRuntimeError> {
let prepared = match self.prepared.take() {
Some(prepared) => prepared,
None => return Ok(()),
};
drop(prepared.tx_main_package);
let runtime_result = prepared
.runtime_handle
.await
.map_err(PhlowRuntimeError::RuntimeJoinError)?;
runtime_result?;
drop(prepared.guard);
Ok(())
}
}
impl PhlowBuilder {
pub fn new() -> Self {
let mut settings = Settings::for_runtime();
if settings.var_main.is_none() {
settings.var_main = Some("__phlow_runtime__".to_string());
}
Self {
pipeline: None,
context: None,
settings,
base_path: None,
dispatch: None,
inline_modules: InlineModules::default(),
}
}
pub fn with_settings(settings: Settings) -> Self {
Self {
pipeline: None,
context: None,
settings,
base_path: None,
dispatch: None,
inline_modules: InlineModules::default(),
}
}
pub fn preprocess_string(&self, script: &str) -> Result<Value, PhlowRuntimeError> {
let base_path = self.base_path.clone().unwrap_or_else(|| {
std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
});
preprocess_string_inner(
script,
base_path.as_path(),
self.settings.print_yaml,
self.settings.print_output,
)
}
pub fn set_pipeline(mut self, pipeline: Value) -> Self {
self.pipeline = Some(pipeline);
self
}
pub fn set_preprocessed_pipeline(mut self, pipeline: Value) -> Self {
self.pipeline = Some(pipeline);
self
}
pub fn set_context(mut self, context: Context) -> Self {
self.context = Some(context);
self
}
pub fn set_settings(mut self, settings: Settings) -> Self {
self.settings = settings;
self
}
pub fn set_base_path<P: Into<PathBuf>>(mut self, base_path: P) -> Self {
self.base_path = Some(base_path.into());
self
}
pub fn set_dispatch(mut self, dispatch: tracing::Dispatch) -> Self {
self.dispatch = Some(dispatch);
self
}
pub fn set_module<S: Into<String>>(mut self, name: S, module: PhlowModule) -> Self {
self.inline_modules.insert(name.into(), module);
self
}
pub fn settings(&self) -> &Settings {
&self.settings
}
pub fn settings_mut(&mut self) -> &mut Settings {
&mut self.settings
}
pub async fn build(mut self) -> Result<PhlowRuntime, PhlowRuntimeError> {
let mut runtime = PhlowRuntime::with_settings(self.settings);
runtime.inline_modules = self.inline_modules;
if let Some(pipeline) = self.pipeline.take() {
runtime.set_pipeline(pipeline);
}
if let Some(context) = self.context.take() {
runtime.set_context(context);
}
if let Some(base_path) = self.base_path.take() {
runtime.set_base_path(base_path);
}
if let Some(dispatch) = self.dispatch.take() {
runtime.set_dispatch(dispatch);
}
runtime.build().await?;
Ok(runtime)
}
}
impl Default for PhlowBuilder {
fn default() -> Self {
Self::new()
}
}
struct PreparedRuntime {
tx_main_package: channel::Sender<Package>,
dispatch: tracing::Dispatch,
runtime_handle: tokio::task::JoinHandle<Result<(), RuntimeError>>,
guard: Option<OtelGuard>,
app_name: String,
request_data: Option<Value>,
auto_start: bool,
}