ricecoder_workflows/
status_reporter.rs

1//! Status reporting for workflow execution
2
3use crate::models::WorkflowState;
4use crate::progress::{ProgressTracker, StatusReport};
5use chrono::Utc;
6
7use std::sync::{Arc, Mutex};
8
9/// Status reporter for providing real-time workflow status updates
10#[derive(Debug, Clone)]
11pub struct StatusReporter {
12    /// Progress tracker
13    progress_tracker: Arc<Mutex<ProgressTracker>>,
14    /// Last reported status
15    last_status: Arc<Mutex<Option<StatusReport>>>,
16}
17
18impl StatusReporter {
19    /// Create a new status reporter
20    pub fn new(total_steps: usize) -> Self {
21        StatusReporter {
22            progress_tracker: Arc::new(Mutex::new(ProgressTracker::new(total_steps))),
23            last_status: Arc::new(Mutex::new(None)),
24        }
25    }
26
27    /// Record a step duration
28    pub fn record_step_duration(&self, duration_ms: u64) {
29        if let Ok(mut tracker) = self.progress_tracker.lock() {
30            tracker.record_step_duration(duration_ms);
31        }
32    }
33
34    /// Get current status report
35    pub fn get_status(&self, state: &WorkflowState) -> StatusReport {
36        let now = Utc::now();
37        let tracker = self.progress_tracker.lock().unwrap();
38        let report = tracker.generate_status_report(state, now);
39
40        // Update last status
41        if let Ok(mut last_status) = self.last_status.lock() {
42            *last_status = Some(report.clone());
43        }
44
45        report
46    }
47
48    /// Get last reported status
49    pub fn get_last_status(&self) -> Option<StatusReport> {
50        self.last_status
51            .lock()
52            .ok()
53            .and_then(|status| status.clone())
54    }
55
56    /// Get average step duration
57    pub fn get_average_step_duration(&self) -> Option<u64> {
58        self.progress_tracker
59            .lock()
60            .ok()
61            .and_then(|tracker| tracker.get_average_step_duration())
62    }
63
64    /// Get minimum step duration
65    pub fn get_min_step_duration(&self) -> Option<u64> {
66        self.progress_tracker
67            .lock()
68            .ok()
69            .and_then(|tracker| tracker.get_min_step_duration())
70    }
71
72    /// Get maximum step duration
73    pub fn get_max_step_duration(&self) -> Option<u64> {
74        self.progress_tracker
75            .lock()
76            .ok()
77            .and_then(|tracker| tracker.get_max_step_duration())
78    }
79
80    /// Format status report as a human-readable string
81    pub fn format_status(&self, state: &WorkflowState) -> String {
82        let report = self.get_status(state);
83
84        let mut output = String::new();
85        output.push_str(&format!("Workflow Status: {:?}\n", report.workflow_status));
86        output.push_str(&format!(
87            "Progress: {}/{} steps ({}%)\n",
88            report.completed_steps_count, report.total_steps, report.progress_percentage
89        ));
90
91        if let Some(current_step) = &report.current_step {
92            output.push_str(&format!("Current Step: {}\n", current_step));
93        }
94
95        if let Some(eta) = report.estimated_completion_time {
96            output.push_str(&format!(
97                "Estimated Completion: {}\n",
98                eta.format("%Y-%m-%d %H:%M:%S")
99            ));
100        }
101
102        output
103    }
104}
105
106/// Real-time status update callback
107pub type StatusUpdateCallback = Box<dyn Fn(&StatusReport) + Send + Sync>;
108
109/// Status update listener for receiving real-time updates
110#[derive(Clone)]
111pub struct StatusUpdateListener {
112    callbacks: Arc<Mutex<Vec<StatusUpdateCallback>>>,
113}
114
115impl StatusUpdateListener {
116    /// Create a new status update listener
117    pub fn new() -> Self {
118        StatusUpdateListener {
119            callbacks: Arc::new(Mutex::new(Vec::new())),
120        }
121    }
122
123    /// Register a callback for status updates
124    pub fn on_status_update<F>(&self, callback: F)
125    where
126        F: Fn(&StatusReport) + Send + Sync + 'static,
127    {
128        if let Ok(mut callbacks) = self.callbacks.lock() {
129            callbacks.push(Box::new(callback));
130        }
131    }
132
133    /// Notify all listeners of a status update
134    pub fn notify(&self, report: &StatusReport) {
135        if let Ok(callbacks) = self.callbacks.lock() {
136            for callback in callbacks.iter() {
137                callback(report);
138            }
139        }
140    }
141
142    /// Clear all callbacks
143    pub fn clear(&self) {
144        if let Ok(mut callbacks) = self.callbacks.lock() {
145            callbacks.clear();
146        }
147    }
148}
149
150impl Default for StatusUpdateListener {
151    fn default() -> Self {
152        Self::new()
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::models::WorkflowStatus;
160
161    fn create_test_workflow_state() -> WorkflowState {
162        WorkflowState {
163            workflow_id: "test-workflow".to_string(),
164            status: WorkflowStatus::Running,
165            current_step: Some("step1".to_string()),
166            completed_steps: vec!["step0".to_string()],
167            step_results: Default::default(),
168            started_at: Utc::now(),
169            updated_at: Utc::now(),
170        }
171    }
172
173    #[test]
174    fn test_create_status_reporter() {
175        let reporter = StatusReporter::new(10);
176        let state = create_test_workflow_state();
177
178        let report = reporter.get_status(&state);
179        assert_eq!(report.total_steps, 10);
180        assert_eq!(report.completed_steps_count, 1);
181    }
182
183    #[test]
184    fn test_record_step_duration() {
185        let reporter = StatusReporter::new(10);
186        reporter.record_step_duration(100);
187        reporter.record_step_duration(200);
188
189        assert_eq!(reporter.get_average_step_duration(), Some(150));
190    }
191
192    #[test]
193    fn test_get_status() {
194        let reporter = StatusReporter::new(10);
195        let state = create_test_workflow_state();
196
197        let report = reporter.get_status(&state);
198        assert_eq!(report.current_step, Some("step1".to_string()));
199        assert_eq!(report.progress_percentage, 10);
200        assert_eq!(report.completed_steps_count, 1);
201    }
202
203    #[test]
204    fn test_get_last_status() {
205        let reporter = StatusReporter::new(10);
206        let state = create_test_workflow_state();
207
208        assert!(reporter.get_last_status().is_none());
209
210        reporter.get_status(&state);
211        assert!(reporter.get_last_status().is_some());
212    }
213
214    #[test]
215    fn test_format_status() {
216        let reporter = StatusReporter::new(10);
217        let state = create_test_workflow_state();
218
219        let formatted = reporter.format_status(&state);
220        assert!(formatted.contains("Workflow Status"));
221        assert!(formatted.contains("Progress"));
222        assert!(formatted.contains("Current Step"));
223    }
224
225    #[test]
226    fn test_status_update_listener() {
227        let listener = StatusUpdateListener::new();
228        let called = std::sync::Arc::new(std::sync::Mutex::new(false));
229        let called_clone = called.clone();
230
231        listener.on_status_update(move |_report| {
232            *called_clone.lock().unwrap() = true;
233        });
234
235        let state = create_test_workflow_state();
236        let reporter = StatusReporter::new(10);
237        let report = reporter.get_status(&state);
238
239        listener.notify(&report);
240        assert!(*called.lock().unwrap());
241    }
242
243    #[test]
244    fn test_status_update_listener_clear() {
245        let listener = StatusUpdateListener::new();
246        listener.on_status_update(|_report| {});
247
248        let state = create_test_workflow_state();
249        let reporter = StatusReporter::new(10);
250        let report = reporter.get_status(&state);
251
252        listener.clear();
253        listener.notify(&report);
254        // If we get here without panicking, clear worked
255    }
256}