ricecoder_workflows/
status_reporter.rs1use crate::models::WorkflowState;
4use crate::progress::{ProgressTracker, StatusReport};
5use chrono::Utc;
6
7use std::sync::{Arc, Mutex};
8
9#[derive(Debug, Clone)]
11pub struct StatusReporter {
12 progress_tracker: Arc<Mutex<ProgressTracker>>,
14 last_status: Arc<Mutex<Option<StatusReport>>>,
16}
17
18impl StatusReporter {
19 pub fn new(total_steps: usize) -> Self {
21 StatusReporter {
22 progress_tracker: Arc::new(Mutex::new(ProgressTracker::new(total_steps))),
23 last_status: Arc::new(Mutex::new(None)),
24 }
25 }
26
27 pub fn record_step_duration(&self, duration_ms: u64) {
29 if let Ok(mut tracker) = self.progress_tracker.lock() {
30 tracker.record_step_duration(duration_ms);
31 }
32 }
33
34 pub fn get_status(&self, state: &WorkflowState) -> StatusReport {
36 let now = Utc::now();
37 let tracker = self.progress_tracker.lock().unwrap();
38 let report = tracker.generate_status_report(state, now);
39
40 if let Ok(mut last_status) = self.last_status.lock() {
42 *last_status = Some(report.clone());
43 }
44
45 report
46 }
47
48 pub fn get_last_status(&self) -> Option<StatusReport> {
50 self.last_status
51 .lock()
52 .ok()
53 .and_then(|status| status.clone())
54 }
55
56 pub fn get_average_step_duration(&self) -> Option<u64> {
58 self.progress_tracker
59 .lock()
60 .ok()
61 .and_then(|tracker| tracker.get_average_step_duration())
62 }
63
64 pub fn get_min_step_duration(&self) -> Option<u64> {
66 self.progress_tracker
67 .lock()
68 .ok()
69 .and_then(|tracker| tracker.get_min_step_duration())
70 }
71
72 pub fn get_max_step_duration(&self) -> Option<u64> {
74 self.progress_tracker
75 .lock()
76 .ok()
77 .and_then(|tracker| tracker.get_max_step_duration())
78 }
79
80 pub fn format_status(&self, state: &WorkflowState) -> String {
82 let report = self.get_status(state);
83
84 let mut output = String::new();
85 output.push_str(&format!("Workflow Status: {:?}\n", report.workflow_status));
86 output.push_str(&format!(
87 "Progress: {}/{} steps ({}%)\n",
88 report.completed_steps_count, report.total_steps, report.progress_percentage
89 ));
90
91 if let Some(current_step) = &report.current_step {
92 output.push_str(&format!("Current Step: {}\n", current_step));
93 }
94
95 if let Some(eta) = report.estimated_completion_time {
96 output.push_str(&format!(
97 "Estimated Completion: {}\n",
98 eta.format("%Y-%m-%d %H:%M:%S")
99 ));
100 }
101
102 output
103 }
104}
105
106pub type StatusUpdateCallback = Box<dyn Fn(&StatusReport) + Send + Sync>;
108
109#[derive(Clone)]
111pub struct StatusUpdateListener {
112 callbacks: Arc<Mutex<Vec<StatusUpdateCallback>>>,
113}
114
115impl StatusUpdateListener {
116 pub fn new() -> Self {
118 StatusUpdateListener {
119 callbacks: Arc::new(Mutex::new(Vec::new())),
120 }
121 }
122
123 pub fn on_status_update<F>(&self, callback: F)
125 where
126 F: Fn(&StatusReport) + Send + Sync + 'static,
127 {
128 if let Ok(mut callbacks) = self.callbacks.lock() {
129 callbacks.push(Box::new(callback));
130 }
131 }
132
133 pub fn notify(&self, report: &StatusReport) {
135 if let Ok(callbacks) = self.callbacks.lock() {
136 for callback in callbacks.iter() {
137 callback(report);
138 }
139 }
140 }
141
142 pub fn clear(&self) {
144 if let Ok(mut callbacks) = self.callbacks.lock() {
145 callbacks.clear();
146 }
147 }
148}
149
150impl Default for StatusUpdateListener {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::models::WorkflowStatus;
160
161 fn create_test_workflow_state() -> WorkflowState {
162 WorkflowState {
163 workflow_id: "test-workflow".to_string(),
164 status: WorkflowStatus::Running,
165 current_step: Some("step1".to_string()),
166 completed_steps: vec!["step0".to_string()],
167 step_results: Default::default(),
168 started_at: Utc::now(),
169 updated_at: Utc::now(),
170 }
171 }
172
173 #[test]
174 fn test_create_status_reporter() {
175 let reporter = StatusReporter::new(10);
176 let state = create_test_workflow_state();
177
178 let report = reporter.get_status(&state);
179 assert_eq!(report.total_steps, 10);
180 assert_eq!(report.completed_steps_count, 1);
181 }
182
183 #[test]
184 fn test_record_step_duration() {
185 let reporter = StatusReporter::new(10);
186 reporter.record_step_duration(100);
187 reporter.record_step_duration(200);
188
189 assert_eq!(reporter.get_average_step_duration(), Some(150));
190 }
191
192 #[test]
193 fn test_get_status() {
194 let reporter = StatusReporter::new(10);
195 let state = create_test_workflow_state();
196
197 let report = reporter.get_status(&state);
198 assert_eq!(report.current_step, Some("step1".to_string()));
199 assert_eq!(report.progress_percentage, 10);
200 assert_eq!(report.completed_steps_count, 1);
201 }
202
203 #[test]
204 fn test_get_last_status() {
205 let reporter = StatusReporter::new(10);
206 let state = create_test_workflow_state();
207
208 assert!(reporter.get_last_status().is_none());
209
210 reporter.get_status(&state);
211 assert!(reporter.get_last_status().is_some());
212 }
213
214 #[test]
215 fn test_format_status() {
216 let reporter = StatusReporter::new(10);
217 let state = create_test_workflow_state();
218
219 let formatted = reporter.format_status(&state);
220 assert!(formatted.contains("Workflow Status"));
221 assert!(formatted.contains("Progress"));
222 assert!(formatted.contains("Current Step"));
223 }
224
225 #[test]
226 fn test_status_update_listener() {
227 let listener = StatusUpdateListener::new();
228 let called = std::sync::Arc::new(std::sync::Mutex::new(false));
229 let called_clone = called.clone();
230
231 listener.on_status_update(move |_report| {
232 *called_clone.lock().unwrap() = true;
233 });
234
235 let state = create_test_workflow_state();
236 let reporter = StatusReporter::new(10);
237 let report = reporter.get_status(&state);
238
239 listener.notify(&report);
240 assert!(*called.lock().unwrap());
241 }
242
243 #[test]
244 fn test_status_update_listener_clear() {
245 let listener = StatusUpdateListener::new();
246 listener.on_status_update(|_report| {});
247
248 let state = create_test_workflow_state();
249 let reporter = StatusReporter::new(10);
250 let report = reporter.get_status(&state);
251
252 listener.clear();
253 listener.notify(&report);
254 }
256}