text-document-common 1.4.0

Shared entities, database, events, and undo/redo infrastructure for text-document
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
// Generated by Qleany v1.5.1 from long_operation.tera
//! This module provides a framework for managing long-running operations with the ability to track
//! status, progress, and enable cancellation. It includes the infrastructure for defining, executing,
//! and monitoring such operations. For undoable operations, it is recommended to use the Undo/Redo framework.
//!
//! # Components:
//!
//! - **OperationStatus**: Enum representing the state of an operation.
//! - **OperationProgress**: Struct holding details about the progress of an operation.
//! - **LongOperation**: Trait that must be implemented by any long-running operation.
//! - **LongOperationManager**: Manager that orchestrates the execution, tracking, and cleanup of multiple operations.
//!
//! # Usage:
//!
//! 1. Implement the `LongOperation` trait for your task.
//! 2. Use `LongOperationManager` to start, track, and manage your operations.
//! 3. Access methods like:
//!     - `start_operation` to start new operations.
//!     - `get_operation_status`, `get_operation_progress` to query operation details.
//!     - `cancel_operation` to cancel operations.
//!     - `cleanup_finished_operations` to remove completed or cancelled operations.
//!
//! # Example:
//!
//! ```rust,ignore
//! // Define your long-running operation
//! use std::sync::Arc;
//! use std::sync::atomic::{AtomicBool, Ordering};
//! use std::thread;
//! use std::time::Duration;
//! use common::long_operation::{LongOperation, LongOperationManager, OperationProgress};
//!
//! pub struct MyOperation {
//!     pub total_steps: usize,
//! }
//!
//! impl LongOperation for MyOperation {
//!     fn execute(
//!         &self,
//!         progress_callback: Box<dyn Fn(OperationProgress) + Send>,
//!         cancel_flag: Arc<AtomicBool>,
//!     ) -> Result<(), String> {
//!         for i in 0..self.total_steps {
//!             if cancel_flag.load(Ordering::Relaxed) {
//!                 return Err("Operation cancelled".to_string());
//!             }
//!             thread::sleep(Duration::from_millis(500));
//!             progress_callback(OperationProgress::new(
//!                 (i as f32 / self.total_steps as f32) * 100.0,
//!                 Some(format!("Step {}/{}", i + 1, self.total_steps)),
//!             ));
//!         }
//!         Ok(())
//!     }
//! }
//!
//! let manager = LongOperationManager::new();
//! let my_operation = MyOperation { total_steps: 5 };
//! let operation_id = manager.start_operation(my_operation);
//!
//! while let Some(status) = manager.get_operation_status(&operation_id) {
//!     println!("{:?}", status);
//!     thread::sleep(Duration::from_millis(100));
//! }
//! ```
//!
//! # Notes:
//!
//! - Thread-safety is ensured through the use of `Arc<Mutex<T>>` and `AtomicBool`.
//! - Operations run in their own threads, ensuring non-blocking execution.
//! - Proper cleanup of finished operations is encouraged using `cleanup_finished_operations`.
//!
//! # Definitions:
//!
//! ## `OperationStatus`
//! Represents the state of an operation. Possible states are:
//! - `Running`: Operation is ongoing.
//! - `Completed`: Operation finished successfully.
//! - `Cancelled`: Operation was cancelled by the user.
//! - `Failed(String)`: Operation failed with an error message.
//!
//! ## `OperationProgress`
//! Describes the progress of an operation, including:
//! - `percentage` (0.0 to 100.0): Indicates completion progress.
//! - `message`: Optional user-defined progress description.
//!
//! ## `LongOperation` Trait
//! Any custom long-running operation must implement this trait:
//! - `execute`: Defines the operation logic, accepting a progress callback and cancellation flag.
//!
//! ## `LongOperationManager`
//! Provides APIs to manage operations, including:
//! - `start_operation`: Starts a new operation and returns its unique ID.
//! - `get_operation_status`: Queries the current status of an operation.
//! - `get_operation_progress`: Retrieves the progress of an operation.
//! - `cancel_operation`: Cancels an operation.
//! - `cleanup_finished_operations`: Removes completed or cancelled operations to free resources.
//!
//! ## Example Operation: FileProcessingOperation
//! Represents a long-running operation to process files. Demonstrates typical usage of the framework.
//!
//! - **Fields**:
//!     - `file_path`: Path of the file to process.
//!     - `total_files`: Number of files to process.
//! - **Behavior**:
//!   Simulates file processing with periodic progress updates. Supports cancellation.
//!
//!

// Generated by Qleany. Edit at your own risk! Be careful when regenerating this file
// as changes will be lost.

use crate::event::{Event, EventHub, LongOperationEvent, Origin};
use anyhow::Result;
use std::collections::HashMap;
use std::sync::{
    Arc, Mutex,
    atomic::{AtomicBool, Ordering},
};
use std::thread;

// Status of a long operation
#[derive(Debug, Clone, PartialEq)]
pub enum OperationStatus {
    Running,
    Completed,
    Cancelled,
    Failed(String),
}

// Progress information
#[derive(Debug, Clone)]
pub struct OperationProgress {
    pub percentage: f32, // 0.0 to 100.0
    pub message: Option<String>,
}

impl OperationProgress {
    pub fn new(percentage: f32, message: Option<String>) -> Self {
        Self {
            percentage: percentage.clamp(0.0, 100.0),
            message,
        }
    }
}

// Trait that long operations must implement
pub trait LongOperation: Send + 'static {
    type Output: Send + Sync + 'static + serde::Serialize;

    fn execute(
        &self,
        progress_callback: Box<dyn Fn(OperationProgress) + Send>,
        cancel_flag: Arc<AtomicBool>,
    ) -> Result<Self::Output>;
}

// Trait for operation handles (type-erased)
trait OperationHandleTrait: Send {
    fn get_status(&self) -> OperationStatus;
    fn get_progress(&self) -> OperationProgress;
    fn cancel(&self);
    fn is_finished(&self) -> bool;
}

// Concrete handle implementation
struct OperationHandle {
    status: Arc<Mutex<OperationStatus>>,
    progress: Arc<Mutex<OperationProgress>>,
    cancel_flag: Arc<AtomicBool>,
    _join_handle: thread::JoinHandle<()>,
}

impl OperationHandleTrait for OperationHandle {
    fn get_status(&self) -> OperationStatus {
        self.status.lock().unwrap().clone()
    }

    fn get_progress(&self) -> OperationProgress {
        self.progress.lock().unwrap().clone()
    }

    fn cancel(&self) {
        self.cancel_flag.store(true, Ordering::Relaxed);
        let mut status = self.status.lock().unwrap();
        if matches!(*status, OperationStatus::Running) {
            *status = OperationStatus::Cancelled;
        }
    }

    fn is_finished(&self) -> bool {
        matches!(
            self.get_status(),
            OperationStatus::Completed | OperationStatus::Cancelled | OperationStatus::Failed(_)
        )
    }
}

// Manager for long operations
pub struct LongOperationManager {
    operations: Arc<Mutex<HashMap<String, Box<dyn OperationHandleTrait>>>>,
    next_id: Arc<Mutex<u64>>,
    results: Arc<Mutex<HashMap<String, String>>>, // Store serialized results
    event_hub: Option<Arc<EventHub>>,
}

impl LongOperationManager {
    pub fn new() -> Self {
        Self {
            operations: Arc::new(Mutex::new(HashMap::new())),
            next_id: Arc::new(Mutex::new(0)),
            results: Arc::new(Mutex::new(HashMap::new())),
            event_hub: None,
        }
    }

    /// Inject the event hub to allow sending long operation related events
    pub fn set_event_hub(&mut self, event_hub: &Arc<EventHub>) {
        self.event_hub = Some(Arc::clone(event_hub));
    }

    /// Start a new long operation and return its ID
    pub fn start_operation<Op: LongOperation>(&self, operation: Op) -> String {
        let id = {
            let mut next_id = self.next_id.lock().unwrap();
            *next_id += 1;
            format!("op_{}", *next_id)
        };

        // Emit started event
        if let Some(event_hub) = &self.event_hub {
            event_hub.send_event(Event {
                origin: Origin::LongOperation(LongOperationEvent::Started),
                ids: vec![],
                data: Some(id.clone()),
            });
        }

        let status = Arc::new(Mutex::new(OperationStatus::Running));
        let progress = Arc::new(Mutex::new(OperationProgress::new(0.0, None)));
        let cancel_flag = Arc::new(AtomicBool::new(false));

        let status_clone = status.clone();
        let progress_clone = progress.clone();
        let cancel_flag_clone = cancel_flag.clone();
        let results_clone = self.results.clone();
        let id_clone = id.clone();
        let event_hub_opt = self.event_hub.clone();

        let join_handle = thread::spawn(move || {
            let progress_callback = {
                let progress = progress_clone.clone();
                let event_hub_opt = event_hub_opt.clone();
                let id_for_cb = id_clone.clone();
                Box::new(move |prog: OperationProgress| {
                    *progress.lock().unwrap() = prog.clone();
                    if let Some(event_hub) = &event_hub_opt {
                        let payload = serde_json::json!({
                            "id": id_for_cb,
                            "percentage": prog.percentage,
                            "message": prog.message,
                        })
                        .to_string();
                        event_hub.send_event(Event {
                            origin: Origin::LongOperation(LongOperationEvent::Progress),
                            ids: vec![],
                            data: Some(payload),
                        });
                    }
                }) as Box<dyn Fn(OperationProgress) + Send>
            };

            let operation_result = operation.execute(progress_callback, cancel_flag_clone.clone());

            let final_status = if cancel_flag_clone.load(Ordering::Relaxed) {
                OperationStatus::Cancelled
            } else {
                match &operation_result {
                    Ok(result) => {
                        // Store the result
                        if let Ok(serialized) = serde_json::to_string(result) {
                            let mut results = results_clone.lock().unwrap();
                            results.insert(id_clone.clone(), serialized);
                        }
                        OperationStatus::Completed
                    }
                    Err(e) => OperationStatus::Failed(e.to_string()),
                }
            };

            // Emit final status event
            if let Some(event_hub) = &event_hub_opt {
                let (event, data) = match &final_status {
                    OperationStatus::Completed => (
                        LongOperationEvent::Completed,
                        serde_json::json!({"id": id_clone}).to_string(),
                    ),
                    OperationStatus::Cancelled => (
                        LongOperationEvent::Cancelled,
                        serde_json::json!({"id": id_clone}).to_string(),
                    ),
                    OperationStatus::Failed(err) => (
                        LongOperationEvent::Failed,
                        serde_json::json!({"id": id_clone, "error": err}).to_string(),
                    ),
                    OperationStatus::Running => (
                        LongOperationEvent::Progress,
                        serde_json::json!({"id": id_clone}).to_string(),
                    ),
                };
                event_hub.send_event(Event {
                    origin: Origin::LongOperation(event),
                    ids: vec![],
                    data: Some(data),
                });
            }

            *status_clone.lock().unwrap() = final_status;
        });

        let handle = OperationHandle {
            status,
            progress,
            cancel_flag,
            _join_handle: join_handle,
        };

        self.operations
            .lock()
            .unwrap()
            .insert(id.clone(), Box::new(handle));

        id
    }

    /// Get the status of an operation
    pub fn get_operation_status(&self, id: &str) -> Option<OperationStatus> {
        let operations = self.operations.lock().unwrap();
        operations.get(id).map(|handle| handle.get_status())
    }

    /// Get the progress of an operation
    pub fn get_operation_progress(&self, id: &str) -> Option<OperationProgress> {
        let operations = self.operations.lock().unwrap();
        operations.get(id).map(|handle| handle.get_progress())
    }

    /// Cancel an operation
    pub fn cancel_operation(&self, id: &str) -> bool {
        let operations = self.operations.lock().unwrap();
        if let Some(handle) = operations.get(id) {
            handle.cancel();
            // Emit cancelled event immediately
            if let Some(event_hub) = &self.event_hub {
                let payload = serde_json::json!({"id": id}).to_string();
                event_hub.send_event(Event {
                    origin: Origin::LongOperation(LongOperationEvent::Cancelled),
                    ids: vec![],
                    data: Some(payload),
                });
            }
            true
        } else {
            false
        }
    }

    /// Check if an operation is finished
    pub fn is_operation_finished(&self, id: &str) -> Option<bool> {
        let operations = self.operations.lock().unwrap();
        operations.get(id).map(|handle| handle.is_finished())
    }

    /// Remove finished operations from memory
    pub fn cleanup_finished_operations(&self) {
        let mut operations = self.operations.lock().unwrap();
        operations.retain(|_, handle| !handle.is_finished());
    }

    /// Get list of all operation IDs
    pub fn list_operations(&self) -> Vec<String> {
        let operations = self.operations.lock().unwrap();
        operations.keys().cloned().collect()
    }

    /// Get summary of all operations
    pub fn get_operations_summary(&self) -> Vec<(String, OperationStatus, OperationProgress)> {
        let operations = self.operations.lock().unwrap();
        operations
            .iter()
            .map(|(id, handle)| (id.clone(), handle.get_status(), handle.get_progress()))
            .collect()
    }

    /// Store an operation result
    pub fn store_operation_result<T: serde::Serialize>(&self, id: &str, result: T) -> Result<()> {
        let serialized = serde_json::to_string(&result)?;
        let mut results = self.results.lock().unwrap();
        results.insert(id.to_string(), serialized);
        Ok(())
    }

    /// Get an operation result
    pub fn get_operation_result(&self, id: &str) -> Option<String> {
        let results = self.results.lock().unwrap();
        results.get(id).cloned()
    }
}

impl Default for LongOperationManager {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use anyhow::anyhow;
    use std::time::Duration;

    // Example implementation of a long operation
    pub struct FileProcessingOperation {
        pub _file_path: String,
        pub total_files: usize,
    }

    impl LongOperation for FileProcessingOperation {
        type Output = ();

        fn execute(
            &self,
            progress_callback: Box<dyn Fn(OperationProgress) + Send>,
            cancel_flag: Arc<AtomicBool>,
        ) -> Result<Self::Output> {
            for i in 0..self.total_files {
                // Check if operation was cancelled
                if cancel_flag.load(Ordering::Relaxed) {
                    return Err(anyhow!("Operation was cancelled".to_string()));
                }

                // Simulate work
                thread::sleep(Duration::from_millis(500));

                // Report progress
                let percentage = (i as f32 / self.total_files as f32) * 100.0;
                progress_callback(OperationProgress::new(
                    percentage,
                    Some(format!("Processing file {} of {}", i + 1, self.total_files)),
                ));
            }

            // Final progress
            progress_callback(OperationProgress::new(100.0, Some("Completed".to_string())));
            Ok(())
        }
    }

    #[test]
    fn test_operation_manager() {
        let manager = LongOperationManager::new();

        let operation = FileProcessingOperation {
            _file_path: "/tmp/test".to_string(),
            total_files: 5,
        };

        let op_id = manager.start_operation(operation);

        // Check initial status
        assert_eq!(
            manager.get_operation_status(&op_id),
            Some(OperationStatus::Running)
        );

        // Wait a bit and check progress
        thread::sleep(Duration::from_millis(100));
        let progress = manager.get_operation_progress(&op_id);
        assert!(progress.is_some());

        // Test cancellation
        assert!(manager.cancel_operation(&op_id));
        thread::sleep(Duration::from_millis(100));
        assert_eq!(
            manager.get_operation_status(&op_id),
            Some(OperationStatus::Cancelled)
        );
    }
}