phlow_engine/
phlow.rs

1use crate::{
2    context::Context,
3    pipeline::{Pipeline, PipelineError},
4    step_worker::NextStep,
5    transform::{value_to_pipelines, TransformError},
6};
7use phlow_sdk::prelude::*;
8use phs::{build_engine, Repositories};
9use std::{collections::HashMap, sync::Arc};
10
11#[derive(Debug)]
12pub enum PhlowError {
13    TransformError(TransformError),
14    PipelineError(PipelineError),
15    PipelineNotFound,
16    ParentError,
17}
18
19pub type PipelineMap = HashMap<usize, Pipeline>;
20
21#[derive(Debug, Default)]
22pub struct Phlow {
23    pipelines: PipelineMap,
24}
25
26impl Phlow {
27    pub fn try_from_value(
28        value: &Value,
29        modules: Option<Arc<Modules>>,
30    ) -> Result<Self, PhlowError> {
31        let engine = match &modules {
32            Some(modules) => {
33                let repositories: Repositories = modules.extract_repositories();
34                build_engine(Some(repositories))
35            }
36            None => build_engine(None),
37        };
38
39        let modules = if let Some(modules) = modules {
40            modules
41        } else {
42            Arc::new(Modules::default())
43        };
44
45        let pipelines =
46            value_to_pipelines(engine, modules, value).map_err(PhlowError::TransformError)?;
47
48        Ok(Self { pipelines })
49    }
50
51    pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
52        if self.pipelines.is_empty() {
53            return Ok(None);
54        }
55
56        let mut current_pipeline = self.pipelines.len() - 1;
57        let mut current_step = 0;
58
59        loop {
60            let pipeline = self
61                .pipelines
62                .get(&current_pipeline)
63                .ok_or(PhlowError::PipelineNotFound)?;
64
65            match pipeline.execute(context, current_step).await {
66                Ok(step_output) => match step_output {
67                    Some(step_output) => match step_output.next_step {
68                        NextStep::Next | NextStep::Stop => {
69                            return Ok(step_output.output);
70                        }
71                        NextStep::Pipeline(id) => {
72                            current_pipeline = id;
73                            current_step = 0;
74                        }
75                        NextStep::GoToStep(to) => {
76                            current_pipeline = to.pipeline;
77                            current_step = to.step;
78                        }
79                    },
80                    None => {
81                        return Ok(None);
82                    }
83                },
84                Err(err) => {
85                    return Err(PhlowError::PipelineError(err));
86                }
87            }
88        }
89    }
90}