flowbuilder_core/
flow_builder.rs

1use crate::Flow;
2use anyhow::Result;
3use flowbuilder_context::{FlowContext, SharedContext};
4use std::{future::Future, pin::Pin, time::Duration};
5
6/// Type alias for step functions
7pub type StepFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
8pub type Step = Box<dyn FnOnce(SharedContext) -> StepFuture + Send>;
9
10/// Builder for creating flows with a fluent API
11pub struct FlowBuilder {
12    steps: Vec<Step>,
13}
14
15impl Default for FlowBuilder {
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl FlowBuilder {
22    /// Creates a new FlowBuilder
23    pub fn new() -> Self {
24        Self { steps: Vec::new() }
25    }
26
27    /// Adds a simple step to the flow
28    pub fn step<Fut, F>(mut self, mut f: F) -> Self
29    where
30        F: FnMut(SharedContext) -> Fut + Send + 'static,
31        Fut: Future<Output = Result<()>> + Send + 'static,
32    {
33        self.steps.push(Box::new(move |ctx| Box::pin(f(ctx))));
34        self
35    }
36
37    /// Adds a named step to the flow with automatic logging
38    pub fn named_step<Fut, F>(mut self, name: &'static str, mut f: F) -> Self
39    where
40        F: FnMut(SharedContext) -> Fut + Send + 'static,
41        Fut: Future<Output = Result<()>> + Send + 'static,
42    {
43        self.steps.push(Box::new(move |ctx| {
44            let ctx2 = ctx.clone();
45            Box::pin(async move {
46                // Start step logging
47                {
48                    let mut guard = ctx2.lock().await;
49                    guard.start_step(name.to_string());
50                }
51
52                let result = f(ctx2.clone()).await;
53
54                // End step logging
55                {
56                    let mut guard = ctx2.lock().await;
57                    match &result {
58                        Ok(()) => guard.end_step_success(name),
59                        Err(e) => guard.end_step_failed(name, &e.to_string()),
60                    }
61                }
62
63                result
64            })
65        }));
66        self
67    }
68
69    /// Adds a conditional step that only executes if the condition is met
70    pub fn step_if<Fut, F, Cond>(mut self, cond: Cond, mut f: F) -> Self
71    where
72        Cond: Fn(&FlowContext) -> bool + Send + Sync + 'static,
73        F: FnMut(SharedContext) -> Fut + Send + 'static,
74        Fut: Future<Output = Result<()>> + Send + 'static,
75    {
76        self.steps.push(Box::new(move |ctx| {
77            let ctx2 = ctx.clone();
78            Box::pin(async move {
79                let guard = ctx2.lock().await;
80                if cond(&guard) {
81                    drop(guard);
82                    f(ctx2).await
83                } else {
84                    let trace_id = guard.trace_id.clone();
85                    drop(guard);
86                    println!(
87                        "[trace_id:{trace_id}] [step_if] condition not met, skipping step"
88                    );
89                    Ok(())
90                }
91            })
92        }));
93        self
94    }
95
96    /// Adds a wait step that waits until a condition is met
97    pub fn wait_until<Cond>(
98        mut self,
99        cond: Cond,
100        interval: Duration,
101        max_retry: usize,
102    ) -> Self
103    where
104        Cond: Fn(&FlowContext) -> bool + Send + Sync + 'static,
105    {
106        self.steps.push(Box::new(move |ctx| {
107            Box::pin(async move {
108                for attempt in 0..max_retry {
109                    {
110                        let guard = ctx.lock().await;
111                        if cond(&guard) {
112                            println!(
113                                "[wait_until] condition met on attempt {}",
114                                attempt + 1
115                            );
116                            return Ok(());
117                        }
118                    }
119
120                    if attempt < max_retry - 1 {
121                        tokio::time::sleep(interval).await;
122                    }
123                }
124
125                anyhow::bail!(
126                    "[wait_until] condition not met after {} attempts",
127                    max_retry
128                )
129            })
130        }));
131        self
132    }
133
134    /// Builds the flow
135    pub fn build(self) -> Flow {
136        Flow::new(self.steps)
137    }
138
139    /// Access steps for runtime extensions
140    pub fn into_steps(self) -> Vec<Step> {
141        self.steps
142    }
143
144    /// Builds and executes the flow immediately
145    pub async fn execute(self) -> Result<FlowContext> {
146        self.build().execute().await
147    }
148
149    /// Builds and executes the flow with a custom context
150    pub async fn execute_with_context(
151        self,
152        context: FlowContext,
153    ) -> Result<FlowContext> {
154        self.build().execute_with_context(context).await
155    }
156}