1use crate::{
2 context::Context,
3 pipeline::{Pipeline, PipelineError},
4 step_worker::NextStep,
5 transform::{value_to_pipelines, TransformError},
6};
7use phlow_sdk::prelude::*;
8use phs::{build_engine, Repositories};
9use std::{collections::HashMap, sync::Arc};
10
11#[derive(Debug)]
12pub enum PhlowError {
13 TransformError(TransformError),
14 PipelineError(PipelineError),
15 PipelineNotFound,
16 ParentError,
17}
18
19pub type PipelineMap = HashMap<usize, Pipeline>;
20
21#[derive(Debug, Default)]
22pub struct Phlow {
23 pipelines: PipelineMap,
24}
25
26impl Phlow {
27 pub fn try_from_value(
28 value: &Value,
29 modules: Option<Arc<Modules>>,
30 ) -> Result<Self, PhlowError> {
31 let engine = match &modules {
32 Some(modules) => {
33 let repositories: Repositories = modules.extract_repositories();
34 build_engine(Some(repositories))
35 }
36 None => build_engine(None),
37 };
38
39 let modules = if let Some(modules) = modules {
40 modules
41 } else {
42 Arc::new(Modules::default())
43 };
44
45 let pipelines =
46 value_to_pipelines(engine, modules, value).map_err(PhlowError::TransformError)?;
47
48 Ok(Self { pipelines })
49 }
50
51 pub async fn execute(&self, context: &mut Context) -> Result<Option<Value>, PhlowError> {
52 if self.pipelines.is_empty() {
53 return Ok(None);
54 }
55
56 let mut current_pipeline = self.pipelines.len() - 1;
57 let mut current_step = 0;
58
59 loop {
60 let pipeline = self
61 .pipelines
62 .get(¤t_pipeline)
63 .ok_or(PhlowError::PipelineNotFound)?;
64
65 match pipeline.execute(context, current_step).await {
66 Ok(step_output) => match step_output {
67 Some(step_output) => match step_output.next_step {
68 NextStep::Next | NextStep::Stop => {
69 return Ok(step_output.output);
70 }
71 NextStep::Pipeline(id) => {
72 current_pipeline = id;
73 current_step = 0;
74 }
75 NextStep::GoToStep(to) => {
76 current_pipeline = to.pipeline;
77 current_step = to.step;
78 }
79 },
80 None => {
81 return Ok(None);
82 }
83 },
84 Err(err) => {
85 return Err(PhlowError::PipelineError(err));
86 }
87 }
88 }
89 }
90}