phlow_engine/
phlow.rs

1use crate::{
2    context::Context,
3    pipeline::{Pipeline, PipelineError},
4    step_worker::NextStep,
5    transform::{value_to_pipelines, TransformError},
6};
7use phlow_sdk::prelude::{log::error, *};
8use phs::build_engine;
9use std::{collections::HashMap, fmt::Display, sync::Arc};
10
11#[derive(Debug)]
12pub enum PhlowError {
13    TransformError(TransformError),
14    PipelineError(PipelineError),
15    PipelineNotFound,
16    ParentError,
17}
18
19impl Display for PhlowError {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        match self {
22            PhlowError::TransformError(err) => write!(f, "Transform error: {}", err),
23            PhlowError::PipelineError(err) => write!(f, "Pipeline error: {}", err),
24            PhlowError::PipelineNotFound => write!(f, "Pipeline not found"),
25            PhlowError::ParentError => write!(f, "Parent error"),
26        }
27    }
28}
29
30impl std::error::Error for PhlowError {
31    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
32        match self {
33            PhlowError::TransformError(err) => Some(err),
34            PhlowError::PipelineError(err) => Some(err),
35            PhlowError::PipelineNotFound => None,
36            PhlowError::ParentError => None,
37        }
38    }
39}
40
41pub type PipelineMap = HashMap<usize, Pipeline>;
42
43#[derive(Debug, Default)]
44pub struct Phlow {
45    pipelines: PipelineMap,
46}
47
48impl Phlow {
49    pub fn try_from_value(
50        value: &Value,
51        modules: Option<Arc<Modules>>,
52    ) -> Result<Self, PhlowError> {
53        let engine = match &modules {
54            Some(modules) => {
55                let repositories = modules.extract_repositories();
56                build_engine(Some(repositories))
57            }
58            None => build_engine(None),
59        };
60
61        let modules = if let Some(modules) = modules {
62            modules
63        } else {
64            Arc::new(Modules::default())
65        };
66
67        let pipelines =
68            value_to_pipelines(engine, modules, value).map_err(PhlowError::TransformError)?;
69
70        Ok(Self { pipelines })
71    }
72
73    pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
74        if self.pipelines.is_empty() {
75            return Ok(None);
76        }
77
78        let mut current_pipeline = self.pipelines.len() - 1;
79        let mut current_step = 0;
80
81        loop {
82            log::debug!(
83                "Executing pipeline {} step {}",
84                current_pipeline,
85                current_step
86            );
87            let pipeline = self
88                .pipelines
89                .get(&current_pipeline)
90                .ok_or(PhlowError::PipelineNotFound)?;
91
92            match pipeline.execute(context, current_step).await {
93                Ok(step_output) => match step_output {
94                    Some(step_output) => {
95                        log::debug!(
96                            "Next step decision: {:?}, payload: {:?}",
97                            step_output.next_step,
98                            step_output.output
99                        );
100                        match step_output.next_step {
101                            NextStep::Stop => {
102                                log::debug!("NextStep::Stop - terminating execution");
103                                return Ok(step_output.output);
104                            }
105                            NextStep::Next => {
106                                log::debug!("NextStep::Next - checking if sub-pipeline needs to return to parent");
107                                // Check if this is the main pipeline (highest index)
108                                let main_pipeline = self.pipelines.len() - 1;
109                                if current_pipeline == main_pipeline {
110                                    log::debug!("NextStep::Next - terminating execution (main pipeline completed)");
111                                    return Ok(step_output.output);
112                                } else {
113                                    log::debug!("NextStep::Next - sub-pipeline completed, checking for parent return");
114                                    // This is a sub-pipeline that completed - we should return to parent
115                                    // For now, terminate execution but this needs proper parent tracking
116                                    return Ok(step_output.output);
117                                }
118                            }
119                            NextStep::Pipeline(id) => {
120                                log::debug!("NextStep::Pipeline({}) - jumping to pipeline", id);
121                                current_pipeline = id;
122                                current_step = 0;
123                            }
124                            NextStep::GoToStep(to) => {
125                                log::debug!("NextStep::GoToStep({:?}) - jumping to step", to);
126                                current_pipeline = to.pipeline;
127                                current_step = to.step;
128                            }
129                        }
130                    }
131                    None => {
132                        return Ok(None);
133                    }
134                },
135                Err(err) => {
136                    error!("Error executing step: {:?}", err);
137                    return Err(PhlowError::PipelineError(err));
138                }
139            }
140        }
141    }
142}