1use crate::{
2 context::Context,
3 pipeline::{Pipeline, PipelineError},
4 step_worker::NextStep,
5 transform::{value_to_pipelines, TransformError},
6};
7use phlow_sdk::prelude::*;
8use phs::build_engine;
9use std::{collections::HashMap, fmt::Display, sync::Arc};
10
11#[derive(Debug)]
12pub enum PhlowError {
13 TransformError(TransformError),
14 PipelineError(PipelineError),
15 PipelineNotFound,
16 ParentError,
17}
18
19impl Display for PhlowError {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 match self {
22 PhlowError::TransformError(err) => write!(f, "Transform error: {}", err),
23 PhlowError::PipelineError(err) => write!(f, "Pipeline error: {}", err),
24 PhlowError::PipelineNotFound => write!(f, "Pipeline not found"),
25 PhlowError::ParentError => write!(f, "Parent error"),
26 }
27 }
28}
29
30pub type PipelineMap = HashMap<usize, Pipeline>;
31
32#[derive(Debug, Default)]
33pub struct Phlow {
34 pipelines: PipelineMap,
35}
36
37impl Phlow {
38 pub fn try_from_value(
39 value: &Value,
40 modules: Option<Arc<Modules>>,
41 ) -> Result<Self, PhlowError> {
42 let engine = match &modules {
43 Some(modules) => {
44 let repositories = modules.extract_repositories();
45 build_engine(Some(repositories))
46 }
47 None => build_engine(None),
48 };
49
50 let modules = if let Some(modules) = modules {
51 modules
52 } else {
53 Arc::new(Modules::default())
54 };
55
56 let pipelines =
57 value_to_pipelines(engine, modules, value).map_err(PhlowError::TransformError)?;
58
59 Ok(Self { pipelines })
60 }
61
62 pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
63 if self.pipelines.is_empty() {
64 return Ok(None);
65 }
66
67 let mut current_pipeline = self.pipelines.len() - 1;
68 let mut current_step = 0;
69
70 loop {
71 let pipeline = self
72 .pipelines
73 .get(¤t_pipeline)
74 .ok_or(PhlowError::PipelineNotFound)?;
75
76 match pipeline.execute(context, current_step).await {
77 Ok(step_output) => match step_output {
78 Some(step_output) => match step_output.next_step {
79 NextStep::Next | NextStep::Stop => {
80 return Ok(step_output.output);
81 }
82 NextStep::Pipeline(id) => {
83 current_pipeline = id;
84 current_step = 0;
85 }
86 NextStep::GoToStep(to) => {
87 current_pipeline = to.pipeline;
88 current_step = to.step;
89 }
90 },
91 None => {
92 return Ok(None);
93 }
94 },
95 Err(err) => {
96 return Err(PhlowError::PipelineError(err));
97 }
98 }
99 }
100 }
101}