1use crate::{
2    context::Context,
3    pipeline::{Pipeline, PipelineError},
4    step_worker::NextStep,
5    transform::{value_to_pipelines, TransformError},
6};
7use phlow_sdk::prelude::*;
8use phs::build_engine;
9use std::{collections::HashMap, fmt::Display, sync::Arc};
10
11#[derive(Debug)]
12pub enum PhlowError {
13    TransformError(TransformError),
14    PipelineError(PipelineError),
15    PipelineNotFound,
16    ParentError,
17}
18
19impl Display for PhlowError {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        match self {
22            PhlowError::TransformError(err) => write!(f, "Transform error: {}", err),
23            PhlowError::PipelineError(err) => write!(f, "Pipeline error: {}", err),
24            PhlowError::PipelineNotFound => write!(f, "Pipeline not found"),
25            PhlowError::ParentError => write!(f, "Parent error"),
26        }
27    }
28}
29
30pub type PipelineMap = HashMap<usize, Pipeline>;
31
32#[derive(Debug, Default)]
33pub struct Phlow {
34    pipelines: PipelineMap,
35}
36
37impl Phlow {
38    pub fn try_from_value(
39        value: &Value,
40        modules: Option<Arc<Modules>>,
41    ) -> Result<Self, PhlowError> {
42        let engine = match &modules {
43            Some(modules) => {
44                let repositories = modules.extract_repositories();
45                build_engine(Some(repositories))
46            }
47            None => build_engine(None),
48        };
49
50        let modules = if let Some(modules) = modules {
51            modules
52        } else {
53            Arc::new(Modules::default())
54        };
55
56        let pipelines =
57            value_to_pipelines(engine, modules, value).map_err(PhlowError::TransformError)?;
58
59        Ok(Self { pipelines })
60    }
61
62    pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
63        if self.pipelines.is_empty() {
64            return Ok(None);
65        }
66
67        let mut current_pipeline = self.pipelines.len() - 1;
68        let mut current_step = 0;
69
70        loop {
71            let pipeline = self
72                .pipelines
73                .get(¤t_pipeline)
74                .ok_or(PhlowError::PipelineNotFound)?;
75
76            match pipeline.execute(context, current_step).await {
77                Ok(step_output) => match step_output {
78                    Some(step_output) => match step_output.next_step {
79                        NextStep::Next | NextStep::Stop => {
80                            return Ok(step_output.output);
81                        }
82                        NextStep::Pipeline(id) => {
83                            current_pipeline = id;
84                            current_step = 0;
85                        }
86                        NextStep::GoToStep(to) => {
87                            current_pipeline = to.pipeline;
88                            current_step = to.step;
89                        }
90                    },
91                    None => {
92                        return Ok(None);
93                    }
94                },
95                Err(err) => {
96                    return Err(PhlowError::PipelineError(err));
97                }
98            }
99        }
100    }
101}