1use crate::debug_server;
129use crate::inline_module::{InlineModules, PhlowModule};
130use crate::loader::Loader;
131use crate::loader::error::Error as LoaderError;
132use crate::preprocessor::preprocessor;
133use crate::runtime::Runtime;
134use crate::runtime::RuntimeError;
135use crate::settings::Settings;
136use crossbeam::channel;
137use phlow_engine::Context;
138use phlow_sdk::otel::{OtelGuard, init_tracing_subscriber};
139use phlow_sdk::prelude::{Array, Value};
140use phlow_sdk::structs::Package;
141use phlow_sdk::{tracing, use_log};
142use std::fmt::{Display, Formatter};
143use std::path::Path;
144use std::path::PathBuf;
145use std::sync::Arc;
146
147#[derive(Debug)]
149pub enum PhlowRuntimeError {
150 MissingPipeline,
152 LoaderError(crate::loader::error::Error),
154 PackageSendError,
156 ResponseChannelClosed,
158 PreprocessError(Vec<String>),
160 ScriptParseError(serde_yaml::Error),
162 RuntimeError(RuntimeError),
164 RuntimeJoinError(tokio::task::JoinError),
166}
167
168impl Display for PhlowRuntimeError {
169 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
170 match self {
171 PhlowRuntimeError::MissingPipeline => write!(f, "Pipeline not set"),
172 PhlowRuntimeError::LoaderError(err) => write!(f, "Loader error: {}", err),
173 PhlowRuntimeError::PackageSendError => write!(f, "Failed to send package"),
174 PhlowRuntimeError::ResponseChannelClosed => write!(f, "Response channel closed"),
175 PhlowRuntimeError::PreprocessError(errs) => {
176 write!(f, "Preprocess error: {}", errs.join(", "))
177 }
178 PhlowRuntimeError::ScriptParseError(err) => write!(f, "Script parse error: {}", err),
179 PhlowRuntimeError::RuntimeError(err) => write!(f, "Runtime error: {}", err),
180 PhlowRuntimeError::RuntimeJoinError(err) => write!(f, "Runtime task error: {}", err),
181 }
182 }
183}
184
185impl std::error::Error for PhlowRuntimeError {}
186
187impl From<crate::loader::error::Error> for PhlowRuntimeError {
188 fn from(err: crate::loader::error::Error) -> Self {
189 PhlowRuntimeError::LoaderError(err)
190 }
191}
192
193impl From<RuntimeError> for PhlowRuntimeError {
194 fn from(err: RuntimeError) -> Self {
195 PhlowRuntimeError::RuntimeError(err)
196 }
197}
198
199fn preprocess_string_inner(
200 script: &str,
201 base_path: &Path,
202 print_yaml: bool,
203 print_output: crate::settings::PrintOutput,
204) -> Result<Value, PhlowRuntimeError> {
205 let processed = preprocessor(script, base_path, print_yaml, print_output)
206 .map_err(PhlowRuntimeError::PreprocessError)?;
207 let mut value: Value =
208 serde_yaml::from_str(&processed).map_err(PhlowRuntimeError::ScriptParseError)?;
209
210 if value.get("steps").is_none() {
211 return Err(PhlowRuntimeError::LoaderError(LoaderError::StepsNotDefined));
212 }
213
214 if let Some(modules) = value.get("modules") {
215 if !modules.is_array() {
216 return Err(PhlowRuntimeError::LoaderError(
217 LoaderError::ModuleLoaderError("Modules not an array".to_string()),
218 ));
219 }
220
221 value.insert("modules", modules.clone());
222 } else {
223 value.insert("modules", Value::Array(Array::new()));
224 }
225
226 Ok(value)
227}
228
229pub struct PhlowRuntime {
231 pipeline: Option<Value>,
232 context: Option<Context>,
233 settings: Settings,
234 base_path: Option<PathBuf>,
235 dispatch: Option<tracing::Dispatch>,
236 inline_modules: InlineModules,
237 prepared: Option<PreparedRuntime>,
238}
239
240pub struct PhlowBuilder {
244 pipeline: Option<Value>,
245 context: Option<Context>,
246 settings: Settings,
247 base_path: Option<PathBuf>,
248 dispatch: Option<tracing::Dispatch>,
249 inline_modules: InlineModules,
250}
251
252impl Default for PhlowRuntime {
253 fn default() -> Self {
254 Self::new()
255 }
256}
257
258impl PhlowRuntime {
259 pub fn new() -> Self {
263 let mut settings = Settings::for_runtime();
264 if settings.var_main.is_none() {
265 settings.var_main = Some("__phlow_runtime__".to_string());
266 }
267
268 Self {
269 pipeline: None,
270 context: None,
271 settings,
272 base_path: None,
273 dispatch: None,
274 inline_modules: InlineModules::default(),
275 prepared: None,
276 }
277 }
278
279 pub fn with_settings(settings: Settings) -> Self {
281 Self {
282 pipeline: None,
283 context: None,
284 settings,
285 base_path: None,
286 dispatch: None,
287 inline_modules: InlineModules::default(),
288 prepared: None,
289 }
290 }
291
292 pub fn preprocess_string(&self, script: &str) -> Result<Value, PhlowRuntimeError> {
297 let base_path = self.base_path.clone().unwrap_or_else(|| {
298 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
299 });
300
301 preprocess_string_inner(
302 script,
303 base_path.as_path(),
304 self.settings.print_yaml,
305 self.settings.print_output,
306 )
307 }
308
309 pub fn set_pipeline(&mut self, pipeline: Value) -> &mut Self {
313 self.pipeline = Some(pipeline);
314 self.prepared = None;
315 self
316 }
317
318 pub fn set_context(&mut self, context: Context) -> &mut Self {
322 self.context = Some(context);
323 self.prepared = None;
324 self
325 }
326
327 pub fn set_preprocessed_pipeline(&mut self, pipeline: Value) -> &mut Self {
334 self.set_pipeline(pipeline)
335 }
336
337 pub fn set_settings(&mut self, settings: Settings) -> &mut Self {
341 self.settings = settings;
342 self.prepared = None;
343 self
344 }
345
346 pub fn set_base_path<P: Into<PathBuf>>(&mut self, base_path: P) -> &mut Self {
350 self.base_path = Some(base_path.into());
351 self.prepared = None;
352 self
353 }
354
355 pub fn set_dispatch(&mut self, dispatch: tracing::Dispatch) -> &mut Self {
359 self.dispatch = Some(dispatch);
360 self.prepared = None;
361 self
362 }
363
364 pub fn set_module<S: Into<String>>(&mut self, name: S, module: PhlowModule) -> &mut Self {
371 self.inline_modules.insert(name.into(), module);
372 self.prepared = None;
373 self
374 }
375
376 pub fn settings(&self) -> &Settings {
378 &self.settings
379 }
380
381 pub fn settings_mut(&mut self) -> &mut Settings {
385 self.prepared = None;
386 &mut self.settings
387 }
388
389 pub async fn build(&mut self) -> Result<(), PhlowRuntimeError> {
393 if self.prepared.is_some() {
394 return Ok(());
395 }
396
397 use_log!();
398
399 let pipeline = self
400 .pipeline
401 .as_ref()
402 .ok_or(PhlowRuntimeError::MissingPipeline)?;
403
404 let base_path = self.base_path.clone().unwrap_or_else(|| {
405 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
406 });
407
408 let mut loader = Loader::from_value(pipeline, Some(base_path.as_path()))?;
409
410 if self.settings.download {
411 loader
412 .download(&self.settings.default_package_repository_url)
413 .await?;
414 }
415
416 loader.update_info();
417
418 let mut guard: Option<OtelGuard> = None;
419 let dispatch = if let Some(dispatch) = self.dispatch.clone() {
420 dispatch
421 } else {
422 let next_guard = init_tracing_subscriber(loader.app_data.clone());
423 let dispatch = next_guard.dispatch.clone();
424 guard = Some(next_guard);
425 dispatch
426 };
427
428 let debug_enabled = std::env::var("PHLOW_DEBUG")
429 .map(|value| value.eq_ignore_ascii_case("true"))
430 .unwrap_or(false);
431 if debug_enabled {
432 let controller = Arc::new(phlow_engine::debug::DebugController::new());
433 match debug_server::spawn(controller.clone()).await {
434 Ok(()) => {
435 if phlow_engine::debug::set_debug_controller(controller).is_err() {
436 log::warn!("Debug controller already set");
437 }
438 log::info!("Phlow debug enabled");
439 }
440 Err(err) => {
441 log::error!("Failed to start debug server: {}", err);
442 }
443 }
444 }
445
446 let context = self.context.clone().unwrap_or_else(Context::new);
447 let request_data = context.get_main();
448 let context_for_runtime = context.clone();
449 let auto_start = self.settings.var_main.is_some()
450 || loader.main == -1
451 || context.get_main().is_some();
452
453 let app_name = loader
454 .app_data
455 .name
456 .clone()
457 .unwrap_or_else(|| "phlow runtime".to_string());
458
459 let settings = self.settings.clone();
460 let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
461 let tx_for_runtime = tx_main_package.clone();
462 let dispatch_for_runtime = dispatch.clone();
463 let inline_modules = self.inline_modules.clone();
464
465 let runtime_handle = tokio::spawn(async move {
466 tracing::dispatcher::with_default(&dispatch_for_runtime, || {
467 Runtime::run_script_with_modules(
468 tx_for_runtime,
469 rx_main_package,
470 loader,
471 dispatch_for_runtime.clone(),
472 settings,
473 context_for_runtime,
474 inline_modules,
475 )
476 })
477 .await
478 });
479
480 self.prepared = Some(PreparedRuntime {
481 tx_main_package,
482 dispatch,
483 runtime_handle,
484 guard,
485 app_name,
486 request_data,
487 auto_start,
488 });
489
490 Ok(())
491 }
492
493 pub async fn run(&mut self) -> Result<Value, PhlowRuntimeError> {
501 self.build().await?;
502
503 let auto_start = match self.prepared.as_ref() {
504 Some(prepared) => prepared.auto_start,
505 None => return Err(PhlowRuntimeError::MissingPipeline),
506 };
507
508 if !auto_start {
509 self.shutdown().await?;
510 return Ok(Value::Undefined);
511 }
512
513 let (tx_main_package, dispatch, app_name, request_data) = match self.prepared.as_ref() {
514 Some(prepared) => (
515 prepared.tx_main_package.clone(),
516 prepared.dispatch.clone(),
517 prepared.app_name.clone(),
518 prepared.request_data.clone(),
519 ),
520 None => return Err(PhlowRuntimeError::MissingPipeline),
521 };
522
523 let (response_tx, response_rx) = tokio::sync::oneshot::channel::<Value>();
524 let package = tracing::dispatcher::with_default(&dispatch, || {
525 let span = tracing::span!(
526 tracing::Level::INFO,
527 "phlow_run",
528 otel.name = app_name.as_str()
529 );
530
531 Package {
532 response: Some(response_tx),
533 request_data,
534 origin: 0,
535 span: Some(span),
536 dispatch: Some(dispatch.clone()),
537 }
538 });
539
540 if tx_main_package.send(package).is_err() {
541 return Err(PhlowRuntimeError::PackageSendError);
542 }
543
544 let result = response_rx
545 .await
546 .map_err(|_| PhlowRuntimeError::ResponseChannelClosed)?;
547
548 Ok(result)
549 }
550
551 pub async fn run_preprocessed(
556 pipeline: Value,
557 context: Context,
558 ) -> Result<Value, PhlowRuntimeError> {
559 let mut runtime = PhlowRuntime::new();
560 runtime.set_preprocessed_pipeline(pipeline);
561 runtime.set_context(context);
562 let result = runtime.run().await?;
563 runtime.shutdown().await?;
564 Ok(result)
565 }
566
567 pub async fn shutdown(&mut self) -> Result<(), PhlowRuntimeError> {
572 let prepared = match self.prepared.take() {
573 Some(prepared) => prepared,
574 None => return Ok(()),
575 };
576
577 drop(prepared.tx_main_package);
578
579 let runtime_result = prepared
580 .runtime_handle
581 .await
582 .map_err(PhlowRuntimeError::RuntimeJoinError)?;
583 runtime_result?;
584
585 drop(prepared.guard);
586
587 Ok(())
588 }
589}
590
591impl PhlowBuilder {
592 pub fn new() -> Self {
596 let mut settings = Settings::for_runtime();
597 if settings.var_main.is_none() {
598 settings.var_main = Some("__phlow_runtime__".to_string());
599 }
600
601 Self {
602 pipeline: None,
603 context: None,
604 settings,
605 base_path: None,
606 dispatch: None,
607 inline_modules: InlineModules::default(),
608 }
609 }
610
611 pub fn with_settings(settings: Settings) -> Self {
613 Self {
614 pipeline: None,
615 context: None,
616 settings,
617 base_path: None,
618 dispatch: None,
619 inline_modules: InlineModules::default(),
620 }
621 }
622
623 pub fn preprocess_string(&self, script: &str) -> Result<Value, PhlowRuntimeError> {
628 let base_path = self.base_path.clone().unwrap_or_else(|| {
629 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
630 });
631
632 preprocess_string_inner(
633 script,
634 base_path.as_path(),
635 self.settings.print_yaml,
636 self.settings.print_output,
637 )
638 }
639
640 pub fn set_pipeline(mut self, pipeline: Value) -> Self {
644 self.pipeline = Some(pipeline);
645 self
646 }
647
648 pub fn set_preprocessed_pipeline(mut self, pipeline: Value) -> Self {
652 self.pipeline = Some(pipeline);
653 self
654 }
655
656 pub fn set_context(mut self, context: Context) -> Self {
660 self.context = Some(context);
661 self
662 }
663
664 pub fn set_settings(mut self, settings: Settings) -> Self {
668 self.settings = settings;
669 self
670 }
671
672 pub fn set_base_path<P: Into<PathBuf>>(mut self, base_path: P) -> Self {
676 self.base_path = Some(base_path.into());
677 self
678 }
679
680 pub fn set_dispatch(mut self, dispatch: tracing::Dispatch) -> Self {
684 self.dispatch = Some(dispatch);
685 self
686 }
687
688 pub fn set_module<S: Into<String>>(mut self, name: S, module: PhlowModule) -> Self {
695 self.inline_modules.insert(name.into(), module);
696 self
697 }
698
699 pub fn settings(&self) -> &Settings {
701 &self.settings
702 }
703
704 pub fn settings_mut(&mut self) -> &mut Settings {
706 &mut self.settings
707 }
708
709 pub async fn build(mut self) -> Result<PhlowRuntime, PhlowRuntimeError> {
713 let mut runtime = PhlowRuntime::with_settings(self.settings);
714 runtime.inline_modules = self.inline_modules;
715
716 if let Some(pipeline) = self.pipeline.take() {
717 runtime.set_pipeline(pipeline);
718 }
719
720 if let Some(context) = self.context.take() {
721 runtime.set_context(context);
722 }
723
724 if let Some(base_path) = self.base_path.take() {
725 runtime.set_base_path(base_path);
726 }
727
728 if let Some(dispatch) = self.dispatch.take() {
729 runtime.set_dispatch(dispatch);
730 }
731
732 runtime.build().await?;
733 Ok(runtime)
734 }
735}
736
737impl Default for PhlowBuilder {
738 fn default() -> Self {
739 Self::new()
740 }
741}
742
743struct PreparedRuntime {
744 tx_main_package: channel::Sender<Package>,
745 dispatch: tracing::Dispatch,
746 runtime_handle: tokio::task::JoinHandle<Result<(), RuntimeError>>,
747 guard: Option<OtelGuard>,
748 app_name: String,
749 request_data: Option<Value>,
750 auto_start: bool,
751}