flowbuilder_context/
lib.rs

1//! # FlowBuilder Context
2//!
3//! Context management and shared state for FlowBuilder
4
5use std::sync::Arc;
6use tokio::sync::Mutex;
7use uuid::Uuid;
8
9#[derive(Debug, Clone)]
10pub struct FlowContext {
11    pub trace_id: String,
12    pub ok: bool,
13    pub errors: Vec<String>,
14    pub step_logs: Vec<StepLog>,
15    pub variables: std::collections::HashMap<String, String>,
16    pub snapshots: std::collections::HashMap<String, ContextSnapshot>,
17}
18
19#[derive(Debug, Clone)]
20pub struct ContextSnapshot {
21    pub snapshot_id: String,
22    pub timestamp: std::time::Instant,
23    pub variables: std::collections::HashMap<String, String>,
24    pub ok: bool,
25    pub errors: Vec<String>,
26    pub description: String,
27}
28
29#[derive(Debug, Clone)]
30pub struct StepLog {
31    pub step_name: String,
32    pub start_time: std::time::Instant,
33    pub end_time: Option<std::time::Instant>,
34    pub status: StepStatus,
35    pub error_message: Option<String>,
36    pub trace_id: String,
37}
38
39#[derive(Debug, Clone)]
40pub enum StepStatus {
41    Running,
42    Success,
43    Failed,
44    Skipped,
45    Timeout,
46}
47
48impl Default for FlowContext {
49    fn default() -> Self {
50        Self {
51            trace_id: Uuid::new_v4().to_string(),
52            ok: true,
53            errors: Vec::new(),
54            step_logs: Vec::new(),
55            variables: std::collections::HashMap::new(),
56            snapshots: std::collections::HashMap::new(),
57        }
58    }
59}
60
61impl FlowContext {
62    pub fn new_with_trace_id(trace_id: String) -> Self {
63        Self {
64            trace_id,
65            ok: true,
66            errors: Vec::new(),
67            step_logs: Vec::new(),
68            variables: std::collections::HashMap::new(),
69            snapshots: std::collections::HashMap::new(),
70        }
71    }
72
73    /// 创建快照
74    pub fn create_snapshot(
75        &mut self,
76        snapshot_id: String,
77        description: String,
78    ) -> Result<(), String> {
79        if self.snapshots.contains_key(&snapshot_id) {
80            return Err(format!(
81                "Snapshot with id '{snapshot_id}' already exists"
82            ));
83        }
84
85        let snapshot = ContextSnapshot {
86            snapshot_id: snapshot_id.clone(),
87            timestamp: std::time::Instant::now(),
88            variables: self.variables.clone(),
89            ok: self.ok,
90            errors: self.errors.clone(),
91            description,
92        };
93
94        self.snapshots.insert(snapshot_id.clone(), snapshot);
95
96        #[cfg(feature = "logger")]
97        tracing::info!(
98            "[trace_id:{}] Created snapshot: {}",
99            self.trace_id,
100            snapshot_id
101        );
102        #[cfg(not(feature = "logger"))]
103        println!(
104            "[trace_id:{}] Created snapshot: {}",
105            self.trace_id, snapshot_id
106        );
107
108        Ok(())
109    }
110
111    /// 回滚到快照
112    pub fn rollback_to_snapshot(
113        &mut self,
114        snapshot_id: &str,
115    ) -> Result<(), String> {
116        let snapshot = self
117            .snapshots
118            .get(snapshot_id)
119            .ok_or_else(|| format!("Snapshot '{snapshot_id}' not found"))?
120            .clone();
121
122        // 保留 trace_id 和快照信息,回滚其他状态
123        let old_variables_count = self.variables.len();
124        let old_errors_count = self.errors.len();
125
126        self.variables = snapshot.variables;
127        self.ok = snapshot.ok;
128        self.errors = snapshot.errors;
129
130        #[cfg(feature = "logger")]
131        tracing::info!(
132            "[trace_id:{}] Rolled back to snapshot '{}' ({}). Variables: {} -> {}, Errors: {} -> {}",
133            self.trace_id,
134            snapshot_id,
135            snapshot.description,
136            old_variables_count,
137            self.variables.len(),
138            old_errors_count,
139            self.errors.len()
140        );
141        #[cfg(not(feature = "logger"))]
142        println!(
143            "[trace_id:{}] Rolled back to snapshot '{}' ({}). Variables: {} -> {}, Errors: {} -> {}",
144            self.trace_id,
145            snapshot_id,
146            snapshot.description,
147            old_variables_count,
148            self.variables.len(),
149            old_errors_count,
150            self.errors.len()
151        );
152
153        Ok(())
154    }
155
156    /// 删除快照
157    pub fn remove_snapshot(&mut self, snapshot_id: &str) -> Result<(), String> {
158        self.snapshots
159            .remove(snapshot_id)
160            .ok_or_else(|| format!("Snapshot '{snapshot_id}' not found"))?;
161
162        #[cfg(feature = "logger")]
163        tracing::info!(
164            "[trace_id:{}] Removed snapshot: {}",
165            self.trace_id,
166            snapshot_id
167        );
168        #[cfg(not(feature = "logger"))]
169        println!(
170            "[trace_id:{}] Removed snapshot: {}",
171            self.trace_id, snapshot_id
172        );
173
174        Ok(())
175    }
176
177    /// 列出所有快照
178    pub fn list_snapshots(&self) -> Vec<&ContextSnapshot> {
179        self.snapshots.values().collect()
180    }
181
182    pub fn start_step(&mut self, step_name: String) {
183        let step_log = StepLog {
184            step_name: step_name.clone(),
185            start_time: std::time::Instant::now(),
186            end_time: None,
187            status: StepStatus::Running,
188            error_message: None,
189            trace_id: self.trace_id.clone(),
190        };
191        self.step_logs.push(step_log);
192
193        #[cfg(feature = "logger")]
194        tracing::info!(
195            "[trace_id:{}] [step:{}] starting...",
196            self.trace_id,
197            step_name
198        );
199        #[cfg(not(feature = "logger"))]
200        println!(
201            "[trace_id:{}] [step:{}] starting...",
202            self.trace_id, step_name
203        );
204    }
205
206    pub fn end_step_success(&mut self, step_name: &str) {
207        if let Some(log) = self
208            .step_logs
209            .iter_mut()
210            .rev()
211            .find(|log| log.step_name == step_name)
212        {
213            log.end_time = Some(std::time::Instant::now());
214            log.status = StepStatus::Success;
215            let duration = log.end_time.unwrap().duration_since(log.start_time);
216
217            #[cfg(feature = "logger")]
218            tracing::info!(
219                "[trace_id:{}] [step:{}] completed successfully in {:?}",
220                self.trace_id,
221                step_name,
222                duration
223            );
224            #[cfg(not(feature = "logger"))]
225            println!(
226                "[trace_id:{}] [step:{}] completed successfully in {:?}",
227                self.trace_id, step_name, duration
228            );
229        }
230    }
231
232    pub fn end_step_failed(&mut self, step_name: &str, error: &str) {
233        if let Some(log) = self
234            .step_logs
235            .iter_mut()
236            .rev()
237            .find(|log| log.step_name == step_name)
238        {
239            log.end_time = Some(std::time::Instant::now());
240            log.status = StepStatus::Failed;
241            log.error_message = Some(error.to_string());
242            let duration = log.end_time.unwrap().duration_since(log.start_time);
243
244            #[cfg(feature = "logger")]
245            tracing::error!(
246                "[trace_id:{}] [step:{}] failed after {:?}: {}",
247                self.trace_id,
248                step_name,
249                duration,
250                error
251            );
252            #[cfg(not(feature = "logger"))]
253            println!(
254                "[trace_id:{}] [step:{}] failed after {:?}: {}",
255                self.trace_id, step_name, duration, error
256            );
257        }
258        self.errors
259            .push(format!("[{}] {}: {}", self.trace_id, step_name, error));
260    }
261
262    pub fn end_step_skipped(&mut self, step_name: &str, reason: &str) {
263        if let Some(log) = self
264            .step_logs
265            .iter_mut()
266            .rev()
267            .find(|log| log.step_name == step_name)
268        {
269            log.end_time = Some(std::time::Instant::now());
270            log.status = StepStatus::Skipped;
271            let duration = log.end_time.unwrap().duration_since(log.start_time);
272
273            #[cfg(feature = "logger")]
274            tracing::warn!(
275                "[trace_id:{}] [step:{}] skipped after {:?}: {}",
276                self.trace_id,
277                step_name,
278                duration,
279                reason
280            );
281            #[cfg(not(feature = "logger"))]
282            println!(
283                "[trace_id:{}] [step:{}] skipped after {:?}: {}",
284                self.trace_id, step_name, duration, reason
285            );
286        }
287    }
288
289    pub fn end_step_timeout(&mut self, step_name: &str) {
290        if let Some(log) = self
291            .step_logs
292            .iter_mut()
293            .rev()
294            .find(|log| log.step_name == step_name)
295        {
296            log.end_time = Some(std::time::Instant::now());
297            log.status = StepStatus::Timeout;
298            let duration = log.end_time.unwrap().duration_since(log.start_time);
299
300            #[cfg(feature = "logger")]
301            tracing::error!(
302                "[trace_id:{}] [step:{}] timed out after {:?}",
303                self.trace_id,
304                step_name,
305                duration
306            );
307            #[cfg(not(feature = "logger"))]
308            println!(
309                "[trace_id:{}] [step:{}] timed out after {:?}",
310                self.trace_id, step_name, duration
311            );
312        }
313        self.errors
314            .push(format!("[{}] {}: timeout", self.trace_id, step_name));
315    }
316
317    pub fn set_variable(&mut self, key: String, value: String) {
318        #[cfg(feature = "logger")]
319        tracing::debug!(
320            "[trace_id:{}] setting variable {} = {}",
321            self.trace_id,
322            key,
323            value
324        );
325        #[cfg(not(feature = "logger"))]
326        println!(
327            "[trace_id:{}] setting variable {} = {}",
328            self.trace_id, key, value
329        );
330
331        self.variables.insert(key, value);
332    }
333
334    pub fn get_variable(&self, key: &str) -> Option<&String> {
335        self.variables.get(key)
336    }
337
338    pub fn print_summary(&self) {
339        let summary =
340            format!("\n=== Flow Summary [trace_id: {}] ===", self.trace_id);
341
342        #[cfg(feature = "logger")]
343        tracing::info!("{}", summary);
344        #[cfg(not(feature = "logger"))]
345        println!("{}", summary);
346
347        println!("Total steps: {}", self.step_logs.len());
348
349        let success_count = self
350            .step_logs
351            .iter()
352            .filter(|log| matches!(log.status, StepStatus::Success))
353            .count();
354        let failed_count = self
355            .step_logs
356            .iter()
357            .filter(|log| matches!(log.status, StepStatus::Failed))
358            .count();
359        let skipped_count = self
360            .step_logs
361            .iter()
362            .filter(|log| matches!(log.status, StepStatus::Skipped))
363            .count();
364        let timeout_count = self
365            .step_logs
366            .iter()
367            .filter(|log| matches!(log.status, StepStatus::Timeout))
368            .count();
369
370        println!(
371            "Success: {success_count}, Failed: {failed_count}, Skipped: {skipped_count}, Timeout: {timeout_count}"
372        );
373
374        if !self.errors.is_empty() {
375            println!("Errors: {}", self.errors.len());
376            for error in &self.errors {
377                println!("  - {error}");
378            }
379        }
380
381        if !self.variables.is_empty() {
382            println!("Variables:");
383            for (key, value) in &self.variables {
384                println!("  {key} = {value}");
385            }
386        }
387        println!("==============================\n");
388    }
389}
390
391pub type SharedContext = Arc<Mutex<FlowContext>>;