1use crate::event::{Event, EventHub, LongOperationEvent, Origin};
114use anyhow::Result;
115use std::collections::HashMap;
116use std::sync::{
117 Arc, Mutex,
118 atomic::{AtomicBool, Ordering},
119};
120use std::thread;
121
122#[derive(Debug, Clone, PartialEq)]
124pub enum OperationStatus {
125 Running,
126 Completed,
127 Cancelled,
128 Failed(String),
129}
130
131#[derive(Debug, Clone)]
133pub struct OperationProgress {
134 pub percentage: f32, pub message: Option<String>,
136}
137
138impl OperationProgress {
139 pub fn new(percentage: f32, message: Option<String>) -> Self {
140 Self {
141 percentage: percentage.clamp(0.0, 100.0),
142 message,
143 }
144 }
145}
146
147pub trait LongOperation: Send + 'static {
149 type Output: Send + Sync + 'static + serde::Serialize;
150
151 fn execute(
152 &self,
153 progress_callback: Box<dyn Fn(OperationProgress) + Send>,
154 cancel_flag: Arc<AtomicBool>,
155 ) -> Result<Self::Output>;
156}
157
158trait OperationHandleTrait: Send {
160 fn get_status(&self) -> OperationStatus;
161 fn get_progress(&self) -> OperationProgress;
162 fn cancel(&self);
163 fn is_finished(&self) -> bool;
164}
165
166struct OperationHandle {
168 status: Arc<Mutex<OperationStatus>>,
169 progress: Arc<Mutex<OperationProgress>>,
170 cancel_flag: Arc<AtomicBool>,
171 _join_handle: thread::JoinHandle<()>,
172}
173
174fn lock_or_recover<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
180 mutex
181 .lock()
182 .unwrap_or_else(|poisoned| poisoned.into_inner())
183}
184
185impl OperationHandleTrait for OperationHandle {
186 fn get_status(&self) -> OperationStatus {
187 lock_or_recover(&self.status).clone()
188 }
189
190 fn get_progress(&self) -> OperationProgress {
191 lock_or_recover(&self.progress).clone()
192 }
193
194 fn cancel(&self) {
195 self.cancel_flag.store(true, Ordering::Relaxed);
196 let mut status = lock_or_recover(&self.status);
197 if matches!(*status, OperationStatus::Running) {
198 *status = OperationStatus::Cancelled;
199 }
200 }
201
202 fn is_finished(&self) -> bool {
203 matches!(
204 self.get_status(),
205 OperationStatus::Completed | OperationStatus::Cancelled | OperationStatus::Failed(_)
206 )
207 }
208}
209
210pub struct LongOperationManager {
212 operations: Arc<Mutex<HashMap<String, Box<dyn OperationHandleTrait>>>>,
213 next_id: Arc<Mutex<u64>>,
214 results: Arc<Mutex<HashMap<String, String>>>, event_hub: Option<Arc<EventHub>>,
216}
217
218impl LongOperationManager {
219 pub fn new() -> Self {
220 Self {
221 operations: Arc::new(Mutex::new(HashMap::new())),
222 next_id: Arc::new(Mutex::new(0)),
223 results: Arc::new(Mutex::new(HashMap::new())),
224 event_hub: None,
225 }
226 }
227
228 pub fn set_event_hub(&mut self, event_hub: &Arc<EventHub>) {
230 self.event_hub = Some(Arc::clone(event_hub));
231 }
232
233 pub fn start_operation<Op: LongOperation>(&self, operation: Op) -> String {
235 let id = {
236 let mut next_id = lock_or_recover(&self.next_id);
237 *next_id += 1;
238 format!("op_{}", *next_id)
239 };
240
241 if let Some(event_hub) = &self.event_hub {
243 event_hub.send_event(Event {
244 origin: Origin::LongOperation(LongOperationEvent::Started),
245 ids: vec![],
246 data: Some(id.clone()),
247 });
248 }
249
250 let status = Arc::new(Mutex::new(OperationStatus::Running));
251 let progress = Arc::new(Mutex::new(OperationProgress::new(0.0, None)));
252 let cancel_flag = Arc::new(AtomicBool::new(false));
253
254 let status_clone = status.clone();
255 let progress_clone = progress.clone();
256 let cancel_flag_clone = cancel_flag.clone();
257 let results_clone = self.results.clone();
258 let id_clone = id.clone();
259 let event_hub_opt = self.event_hub.clone();
260
261 let join_handle = thread::spawn(move || {
262 let progress_callback = {
263 let progress = progress_clone.clone();
264 let event_hub_opt = event_hub_opt.clone();
265 let id_for_cb = id_clone.clone();
266 Box::new(move |prog: OperationProgress| {
267 *lock_or_recover(&progress) = prog.clone();
268 if let Some(event_hub) = &event_hub_opt {
269 let payload = serde_json::json!({
270 "id": id_for_cb,
271 "percentage": prog.percentage,
272 "message": prog.message,
273 })
274 .to_string();
275 event_hub.send_event(Event {
276 origin: Origin::LongOperation(LongOperationEvent::Progress),
277 ids: vec![],
278 data: Some(payload),
279 });
280 }
281 }) as Box<dyn Fn(OperationProgress) + Send>
282 };
283
284 let operation_result = operation.execute(progress_callback, cancel_flag_clone.clone());
285
286 let final_status = if cancel_flag_clone.load(Ordering::Relaxed) {
287 OperationStatus::Cancelled
288 } else {
289 match &operation_result {
290 Ok(result) => {
291 if let Ok(serialized) = serde_json::to_string(result) {
293 let mut results = lock_or_recover(&results_clone);
294 results.insert(id_clone.clone(), serialized);
295 }
296 OperationStatus::Completed
297 }
298 Err(e) => OperationStatus::Failed(e.to_string()),
299 }
300 };
301
302 if let Some(event_hub) = &event_hub_opt {
304 let (event, data) = match &final_status {
305 OperationStatus::Completed => (
306 LongOperationEvent::Completed,
307 serde_json::json!({"id": id_clone}).to_string(),
308 ),
309 OperationStatus::Cancelled => (
310 LongOperationEvent::Cancelled,
311 serde_json::json!({"id": id_clone}).to_string(),
312 ),
313 OperationStatus::Failed(err) => (
314 LongOperationEvent::Failed,
315 serde_json::json!({"id": id_clone, "error": err}).to_string(),
316 ),
317 OperationStatus::Running => (
318 LongOperationEvent::Progress,
319 serde_json::json!({"id": id_clone}).to_string(),
320 ),
321 };
322 event_hub.send_event(Event {
323 origin: Origin::LongOperation(event),
324 ids: vec![],
325 data: Some(data),
326 });
327 }
328
329 *lock_or_recover(&status_clone) = final_status;
330 });
331
332 let handle = OperationHandle {
333 status,
334 progress,
335 cancel_flag,
336 _join_handle: join_handle,
337 };
338
339 lock_or_recover(&self.operations).insert(id.clone(), Box::new(handle));
340
341 id
342 }
343
344 pub fn get_operation_status(&self, id: &str) -> Option<OperationStatus> {
346 let operations = lock_or_recover(&self.operations);
347 operations.get(id).map(|handle| handle.get_status())
348 }
349
350 pub fn get_operation_progress(&self, id: &str) -> Option<OperationProgress> {
352 let operations = lock_or_recover(&self.operations);
353 operations.get(id).map(|handle| handle.get_progress())
354 }
355
356 pub fn cancel_operation(&self, id: &str) -> bool {
358 let operations = lock_or_recover(&self.operations);
359 if let Some(handle) = operations.get(id) {
360 handle.cancel();
361 if let Some(event_hub) = &self.event_hub {
363 let payload = serde_json::json!({"id": id}).to_string();
364 event_hub.send_event(Event {
365 origin: Origin::LongOperation(LongOperationEvent::Cancelled),
366 ids: vec![],
367 data: Some(payload),
368 });
369 }
370 true
371 } else {
372 false
373 }
374 }
375
376 pub fn is_operation_finished(&self, id: &str) -> Option<bool> {
378 let operations = lock_or_recover(&self.operations);
379 operations.get(id).map(|handle| handle.is_finished())
380 }
381
382 pub fn cleanup_finished_operations(&self) {
384 let mut operations = lock_or_recover(&self.operations);
385 operations.retain(|_, handle| !handle.is_finished());
386 }
387
388 pub fn list_operations(&self) -> Vec<String> {
390 let operations = lock_or_recover(&self.operations);
391 operations.keys().cloned().collect()
392 }
393
394 pub fn get_operations_summary(&self) -> Vec<(String, OperationStatus, OperationProgress)> {
396 let operations = lock_or_recover(&self.operations);
397 operations
398 .iter()
399 .map(|(id, handle)| (id.clone(), handle.get_status(), handle.get_progress()))
400 .collect()
401 }
402
403 pub fn store_operation_result<T: serde::Serialize>(&self, id: &str, result: T) -> Result<()> {
405 let serialized = serde_json::to_string(&result)?;
406 let mut results = lock_or_recover(&self.results);
407 results.insert(id.to_string(), serialized);
408 Ok(())
409 }
410
411 pub fn get_operation_result(&self, id: &str) -> Option<String> {
413 let results = lock_or_recover(&self.results);
414 results.get(id).cloned()
415 }
416}
417
418impl Default for LongOperationManager {
419 fn default() -> Self {
420 Self::new()
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use anyhow::anyhow;
428 use std::time::Duration;
429
430 pub struct FileProcessingOperation {
432 pub file_path: String,
433 pub total_files: usize,
434 }
435
436 impl LongOperation for FileProcessingOperation {
437 type Output = ();
438
439 fn execute(
440 &self,
441 progress_callback: Box<dyn Fn(OperationProgress) + Send>,
442 cancel_flag: Arc<AtomicBool>,
443 ) -> Result<Self::Output> {
444 for i in 0..self.total_files {
445 if cancel_flag.load(Ordering::Relaxed) {
447 return Err(anyhow!("Operation was cancelled".to_string()));
448 }
449
450 thread::sleep(Duration::from_millis(500));
452
453 let percentage = (i as f32 / self.total_files as f32) * 100.0;
455 progress_callback(OperationProgress::new(
456 percentage,
457 Some(format!("Processing file {} of {}", i + 1, self.total_files)),
458 ));
459 }
460
461 progress_callback(OperationProgress::new(100.0, Some("Completed".to_string())));
463 Ok(())
464 }
465 }
466
467 #[test]
468 fn test_operation_manager() {
469 let manager = LongOperationManager::new();
470
471 let operation = FileProcessingOperation {
472 file_path: "/tmp/test".to_string(),
473 total_files: 5,
474 };
475
476 let op_id = manager.start_operation(operation);
477
478 assert_eq!(
480 manager.get_operation_status(&op_id),
481 Some(OperationStatus::Running)
482 );
483
484 thread::sleep(Duration::from_millis(100));
486 let progress = manager.get_operation_progress(&op_id);
487 assert!(progress.is_some());
488
489 assert!(manager.cancel_operation(&op_id));
491 thread::sleep(Duration::from_millis(100));
492 assert_eq!(
493 manager.get_operation_status(&op_id),
494 Some(OperationStatus::Cancelled)
495 );
496 }
497}