1use crate::event::{Event, EventHub, LongOperationEvent, Origin};
114use anyhow::Result;
115use std::collections::HashMap;
116use std::sync::{
117 Arc, Mutex,
118 atomic::{AtomicBool, Ordering},
119};
120use std::thread;
121
122#[derive(Debug, Clone, PartialEq)]
124pub enum OperationStatus {
125 Running,
126 Completed,
127 Cancelled,
128 Failed(String),
129}
130
131#[derive(Debug, Clone)]
133pub struct OperationProgress {
134 pub percentage: f32, pub message: Option<String>,
136}
137
138impl OperationProgress {
139 pub fn new(percentage: f32, message: Option<String>) -> Self {
140 Self {
141 percentage: percentage.clamp(0.0, 100.0),
142 message,
143 }
144 }
145}
146
147pub trait LongOperation: Send + 'static {
149 type Output: Send + Sync + 'static + serde::Serialize;
150
151 fn execute(
152 &self,
153 progress_callback: Box<dyn Fn(OperationProgress) + Send>,
154 cancel_flag: Arc<AtomicBool>,
155 ) -> Result<Self::Output>;
156}
157
158trait OperationHandleTrait: Send {
160 fn get_status(&self) -> OperationStatus;
161 fn get_progress(&self) -> OperationProgress;
162 fn cancel(&self);
163 fn is_finished(&self) -> bool;
164}
165
166struct OperationHandle {
168 status: Arc<Mutex<OperationStatus>>,
169 progress: Arc<Mutex<OperationProgress>>,
170 cancel_flag: Arc<AtomicBool>,
171 _join_handle: thread::JoinHandle<()>,
172}
173
174impl OperationHandleTrait for OperationHandle {
175 fn get_status(&self) -> OperationStatus {
176 self.status.lock().unwrap().clone()
177 }
178
179 fn get_progress(&self) -> OperationProgress {
180 self.progress.lock().unwrap().clone()
181 }
182
183 fn cancel(&self) {
184 self.cancel_flag.store(true, Ordering::Relaxed);
185 let mut status = self.status.lock().unwrap();
186 if matches!(*status, OperationStatus::Running) {
187 *status = OperationStatus::Cancelled;
188 }
189 }
190
191 fn is_finished(&self) -> bool {
192 matches!(
193 self.get_status(),
194 OperationStatus::Completed | OperationStatus::Cancelled | OperationStatus::Failed(_)
195 )
196 }
197}
198
199pub struct LongOperationManager {
201 operations: Arc<Mutex<HashMap<String, Box<dyn OperationHandleTrait>>>>,
202 next_id: Arc<Mutex<u64>>,
203 results: Arc<Mutex<HashMap<String, String>>>, event_hub: Option<Arc<EventHub>>,
205}
206
207impl LongOperationManager {
208 pub fn new() -> Self {
209 Self {
210 operations: Arc::new(Mutex::new(HashMap::new())),
211 next_id: Arc::new(Mutex::new(0)),
212 results: Arc::new(Mutex::new(HashMap::new())),
213 event_hub: None,
214 }
215 }
216
217 pub fn set_event_hub(&mut self, event_hub: &Arc<EventHub>) {
219 self.event_hub = Some(Arc::clone(event_hub));
220 }
221
222 pub fn start_operation<Op: LongOperation>(&self, operation: Op) -> String {
224 let id = {
225 let mut next_id = self.next_id.lock().unwrap();
226 *next_id += 1;
227 format!("op_{}", *next_id)
228 };
229
230 if let Some(event_hub) = &self.event_hub {
232 event_hub.send_event(Event {
233 origin: Origin::LongOperation(LongOperationEvent::Started),
234 ids: vec![],
235 data: Some(id.clone()),
236 });
237 }
238
239 let status = Arc::new(Mutex::new(OperationStatus::Running));
240 let progress = Arc::new(Mutex::new(OperationProgress::new(0.0, None)));
241 let cancel_flag = Arc::new(AtomicBool::new(false));
242
243 let status_clone = status.clone();
244 let progress_clone = progress.clone();
245 let cancel_flag_clone = cancel_flag.clone();
246 let results_clone = self.results.clone();
247 let id_clone = id.clone();
248 let event_hub_opt = self.event_hub.clone();
249
250 let join_handle = thread::spawn(move || {
251 let progress_callback = {
252 let progress = progress_clone.clone();
253 let event_hub_opt = event_hub_opt.clone();
254 let id_for_cb = id_clone.clone();
255 Box::new(move |prog: OperationProgress| {
256 *progress.lock().unwrap() = prog.clone();
257 if let Some(event_hub) = &event_hub_opt {
258 let payload = serde_json::json!({
259 "id": id_for_cb,
260 "percentage": prog.percentage,
261 "message": prog.message,
262 })
263 .to_string();
264 event_hub.send_event(Event {
265 origin: Origin::LongOperation(LongOperationEvent::Progress),
266 ids: vec![],
267 data: Some(payload),
268 });
269 }
270 }) as Box<dyn Fn(OperationProgress) + Send>
271 };
272
273 let operation_result = operation.execute(progress_callback, cancel_flag_clone.clone());
274
275 let final_status = if cancel_flag_clone.load(Ordering::Relaxed) {
276 OperationStatus::Cancelled
277 } else {
278 match &operation_result {
279 Ok(result) => {
280 if let Ok(serialized) = serde_json::to_string(result) {
282 let mut results = results_clone.lock().unwrap();
283 results.insert(id_clone.clone(), serialized);
284 }
285 OperationStatus::Completed
286 }
287 Err(e) => OperationStatus::Failed(e.to_string()),
288 }
289 };
290
291 if let Some(event_hub) = &event_hub_opt {
293 let (event, data) = match &final_status {
294 OperationStatus::Completed => (
295 LongOperationEvent::Completed,
296 serde_json::json!({"id": id_clone}).to_string(),
297 ),
298 OperationStatus::Cancelled => (
299 LongOperationEvent::Cancelled,
300 serde_json::json!({"id": id_clone}).to_string(),
301 ),
302 OperationStatus::Failed(err) => (
303 LongOperationEvent::Failed,
304 serde_json::json!({"id": id_clone, "error": err}).to_string(),
305 ),
306 OperationStatus::Running => (
307 LongOperationEvent::Progress,
308 serde_json::json!({"id": id_clone}).to_string(),
309 ),
310 };
311 event_hub.send_event(Event {
312 origin: Origin::LongOperation(event),
313 ids: vec![],
314 data: Some(data),
315 });
316 }
317
318 *status_clone.lock().unwrap() = final_status;
319 });
320
321 let handle = OperationHandle {
322 status,
323 progress,
324 cancel_flag,
325 _join_handle: join_handle,
326 };
327
328 self.operations
329 .lock()
330 .unwrap()
331 .insert(id.clone(), Box::new(handle));
332
333 id
334 }
335
336 pub fn get_operation_status(&self, id: &str) -> Option<OperationStatus> {
338 let operations = self.operations.lock().unwrap();
339 operations.get(id).map(|handle| handle.get_status())
340 }
341
342 pub fn get_operation_progress(&self, id: &str) -> Option<OperationProgress> {
344 let operations = self.operations.lock().unwrap();
345 operations.get(id).map(|handle| handle.get_progress())
346 }
347
348 pub fn cancel_operation(&self, id: &str) -> bool {
350 let operations = self.operations.lock().unwrap();
351 if let Some(handle) = operations.get(id) {
352 handle.cancel();
353 if let Some(event_hub) = &self.event_hub {
355 let payload = serde_json::json!({"id": id}).to_string();
356 event_hub.send_event(Event {
357 origin: Origin::LongOperation(LongOperationEvent::Cancelled),
358 ids: vec![],
359 data: Some(payload),
360 });
361 }
362 true
363 } else {
364 false
365 }
366 }
367
368 pub fn is_operation_finished(&self, id: &str) -> Option<bool> {
370 let operations = self.operations.lock().unwrap();
371 operations.get(id).map(|handle| handle.is_finished())
372 }
373
374 pub fn cleanup_finished_operations(&self) {
376 let mut operations = self.operations.lock().unwrap();
377 operations.retain(|_, handle| !handle.is_finished());
378 }
379
380 pub fn list_operations(&self) -> Vec<String> {
382 let operations = self.operations.lock().unwrap();
383 operations.keys().cloned().collect()
384 }
385
386 pub fn get_operations_summary(&self) -> Vec<(String, OperationStatus, OperationProgress)> {
388 let operations = self.operations.lock().unwrap();
389 operations
390 .iter()
391 .map(|(id, handle)| (id.clone(), handle.get_status(), handle.get_progress()))
392 .collect()
393 }
394
395 pub fn store_operation_result<T: serde::Serialize>(&self, id: &str, result: T) -> Result<()> {
397 let serialized = serde_json::to_string(&result)?;
398 let mut results = self.results.lock().unwrap();
399 results.insert(id.to_string(), serialized);
400 Ok(())
401 }
402
403 pub fn get_operation_result(&self, id: &str) -> Option<String> {
405 let results = self.results.lock().unwrap();
406 results.get(id).cloned()
407 }
408}
409
410impl Default for LongOperationManager {
411 fn default() -> Self {
412 Self::new()
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use anyhow::anyhow;
420 use std::time::Duration;
421
422 pub struct FileProcessingOperation {
424 pub _file_path: String,
425 pub total_files: usize,
426 }
427
428 impl LongOperation for FileProcessingOperation {
429 type Output = ();
430
431 fn execute(
432 &self,
433 progress_callback: Box<dyn Fn(OperationProgress) + Send>,
434 cancel_flag: Arc<AtomicBool>,
435 ) -> Result<Self::Output> {
436 for i in 0..self.total_files {
437 if cancel_flag.load(Ordering::Relaxed) {
439 return Err(anyhow!("Operation was cancelled".to_string()));
440 }
441
442 thread::sleep(Duration::from_millis(500));
444
445 let percentage = (i as f32 / self.total_files as f32) * 100.0;
447 progress_callback(OperationProgress::new(
448 percentage,
449 Some(format!("Processing file {} of {}", i + 1, self.total_files)),
450 ));
451 }
452
453 progress_callback(OperationProgress::new(100.0, Some("Completed".to_string())));
455 Ok(())
456 }
457 }
458
459 #[test]
460 fn test_operation_manager() {
461 let manager = LongOperationManager::new();
462
463 let operation = FileProcessingOperation {
464 _file_path: "/tmp/test".to_string(),
465 total_files: 5,
466 };
467
468 let op_id = manager.start_operation(operation);
469
470 assert_eq!(
472 manager.get_operation_status(&op_id),
473 Some(OperationStatus::Running)
474 );
475
476 thread::sleep(Duration::from_millis(100));
478 let progress = manager.get_operation_progress(&op_id);
479 assert!(progress.is_some());
480
481 assert!(manager.cancel_operation(&op_id));
483 thread::sleep(Duration::from_millis(100));
484 assert_eq!(
485 manager.get_operation_status(&op_id),
486 Some(OperationStatus::Cancelled)
487 );
488 }
489}