flowbuilder_core/
flow_builder.rs1use crate::Flow;
2use anyhow::Result;
3use flowbuilder_context::{FlowContext, SharedContext};
4use std::{future::Future, pin::Pin, time::Duration};
5
6pub type StepFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
8pub type Step = Box<dyn FnOnce(SharedContext) -> StepFuture + Send>;
9
10pub struct FlowBuilder {
12 steps: Vec<Step>,
13}
14
15impl Default for FlowBuilder {
16 fn default() -> Self {
17 Self::new()
18 }
19}
20
21impl FlowBuilder {
22 pub fn new() -> Self {
24 Self { steps: Vec::new() }
25 }
26
27 pub fn step<Fut, F>(mut self, mut f: F) -> Self
29 where
30 F: FnMut(SharedContext) -> Fut + Send + 'static,
31 Fut: Future<Output = Result<()>> + Send + 'static,
32 {
33 self.steps.push(Box::new(move |ctx| Box::pin(f(ctx))));
34 self
35 }
36
37 pub fn named_step<Fut, F>(mut self, name: &'static str, mut f: F) -> Self
39 where
40 F: FnMut(SharedContext) -> Fut + Send + 'static,
41 Fut: Future<Output = Result<()>> + Send + 'static,
42 {
43 self.steps.push(Box::new(move |ctx| {
44 let ctx2 = ctx.clone();
45 Box::pin(async move {
46 {
48 let mut guard = ctx2.lock().await;
49 guard.start_step(name.to_string());
50 }
51
52 let result = f(ctx2.clone()).await;
53
54 {
56 let mut guard = ctx2.lock().await;
57 match &result {
58 Ok(()) => guard.end_step_success(name),
59 Err(e) => guard.end_step_failed(name, &e.to_string()),
60 }
61 }
62
63 result
64 })
65 }));
66 self
67 }
68
69 pub fn step_if<Fut, F, Cond>(mut self, cond: Cond, mut f: F) -> Self
71 where
72 Cond: Fn(&FlowContext) -> bool + Send + Sync + 'static,
73 F: FnMut(SharedContext) -> Fut + Send + 'static,
74 Fut: Future<Output = Result<()>> + Send + 'static,
75 {
76 self.steps.push(Box::new(move |ctx| {
77 let ctx2 = ctx.clone();
78 Box::pin(async move {
79 let guard = ctx2.lock().await;
80 if cond(&guard) {
81 drop(guard);
82 f(ctx2).await
83 } else {
84 let trace_id = guard.trace_id.clone();
85 drop(guard);
86 println!(
87 "[trace_id:{trace_id}] [step_if] condition not met, skipping step"
88 );
89 Ok(())
90 }
91 })
92 }));
93 self
94 }
95
96 pub fn wait_until<Cond>(
98 mut self,
99 cond: Cond,
100 interval: Duration,
101 max_retry: usize,
102 ) -> Self
103 where
104 Cond: Fn(&FlowContext) -> bool + Send + Sync + 'static,
105 {
106 self.steps.push(Box::new(move |ctx| {
107 Box::pin(async move {
108 for attempt in 0..max_retry {
109 {
110 let guard = ctx.lock().await;
111 if cond(&guard) {
112 println!(
113 "[wait_until] condition met on attempt {}",
114 attempt + 1
115 );
116 return Ok(());
117 }
118 }
119
120 if attempt < max_retry - 1 {
121 tokio::time::sleep(interval).await;
122 }
123 }
124
125 anyhow::bail!(
126 "[wait_until] condition not met after {} attempts",
127 max_retry
128 )
129 })
130 }));
131 self
132 }
133
134 pub fn build(self) -> Flow {
136 Flow::new(self.steps)
137 }
138
139 pub fn into_steps(self) -> Vec<Step> {
141 self.steps
142 }
143
144 pub async fn execute(self) -> Result<FlowContext> {
146 self.build().execute().await
147 }
148
149 pub async fn execute_with_context(
151 self,
152 context: FlowContext,
153 ) -> Result<FlowContext> {
154 self.build().execute_with_context(context).await
155 }
156}