Skip to main content

things3_cli/
bulk_operations.rs

1//! Bulk operations with progress tracking
2
3use crate::events::{EventBroadcaster, EventType};
4use crate::progress::{ProgressManager, ProgressTracker};
5use std::sync::Arc;
6use things3_core::models::ThingsId;
7use things3_core::Result;
8use things3_core::{Task, ThingsDatabase};
9
10/// Bulk operations manager
11pub struct BulkOperationsManager {
12    progress_manager: Arc<ProgressManager>,
13    event_broadcaster: Arc<EventBroadcaster>,
14}
15
16impl BulkOperationsManager {
17    /// Create a new bulk operations manager
18    #[must_use]
19    pub fn new() -> Self {
20        Self {
21            progress_manager: Arc::new(ProgressManager::new()),
22            event_broadcaster: Arc::new(EventBroadcaster::new()),
23        }
24    }
25
26    /// Export all tasks with progress tracking
27    ///
28    /// # Errors
29    /// Returns an error if the export operation fails
30    pub async fn export_all_tasks(&self, db: &ThingsDatabase, format: &str) -> Result<Vec<Task>> {
31        let tracker = self.progress_manager.create_tracker(
32            "Export All Tasks",
33            None, // We don't know the total count yet
34            true,
35        );
36
37        tracker.set_message("Fetching tasks from database...".to_string());
38
39        // Get all tasks
40        let tasks = db.search_tasks("").await?;
41
42        tracker.set_message(format!(
43            "Found {} tasks, exporting to {}...",
44            tasks.len(),
45            format
46        ));
47
48        // Simulate export processing
49        for (i, task) in tasks.iter().enumerate() {
50            if tracker.is_cancelled() {
51                return Err(things3_core::ThingsError::unknown("Export cancelled"));
52            }
53
54            // Simulate processing time
55            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
56
57            // Update progress
58            tracker.set_current(i as u64 + 1);
59            tracker.set_message(format!("Processing task: {}", task.title));
60
61            // Broadcast task event
62            self.event_broadcaster
63                .broadcast_task_event(
64                    EventType::TaskUpdated {
65                        task_id: task.uuid.clone(),
66                    },
67                    Some(serde_json::to_value(task)?),
68                    "bulk_export",
69                )
70                .await?;
71        }
72
73        tracker.set_message("Export completed successfully".to_string());
74        tracker.complete();
75
76        Ok(tasks)
77    }
78
79    /// Bulk update task status with progress tracking
80    ///
81    /// # Errors
82    /// Returns an error if the bulk update operation fails
83    pub async fn bulk_update_task_status(
84        &self,
85        _db: &ThingsDatabase,
86        task_ids: Vec<ThingsId>,
87        new_status: things3_core::TaskStatus,
88    ) -> Result<usize> {
89        let tracker = self.progress_manager.create_tracker(
90            "Bulk Update Task Status",
91            Some(task_ids.len() as u64),
92            true,
93        );
94
95        tracker.set_message(format!(
96            "Updating {} tasks to {:?}...",
97            task_ids.len(),
98            new_status
99        ));
100
101        let mut updated_count = 0;
102
103        for (i, task_id) in task_ids.iter().enumerate() {
104            if tracker.is_cancelled() {
105                return Err(things3_core::ThingsError::unknown("Bulk update cancelled"));
106            }
107
108            // Simulate database update
109            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
110
111            // Update progress
112            tracker.inc(1);
113            tracker.set_message(format!("Updated task {} of {}", i + 1, task_ids.len()));
114
115            // Broadcast task event
116            self.event_broadcaster
117                .broadcast_task_event(
118                    EventType::TaskUpdated {
119                        task_id: task_id.clone(),
120                    },
121                    Some(serde_json::json!({ "status": format!("{:?}", new_status) })),
122                    "bulk_update",
123                )
124                .await?;
125
126            updated_count += 1;
127        }
128
129        tracker.set_message("Bulk update completed successfully".to_string());
130        tracker.complete();
131
132        Ok(updated_count)
133    }
134
135    /// Search and process tasks with progress tracking
136    ///
137    /// # Errors
138    /// Returns an error if the search or processing operation fails
139    pub async fn search_and_process_tasks(
140        &self,
141        db: &ThingsDatabase,
142        query: &str,
143        processor: impl Fn(&Task) -> Result<()> + Send + Sync + 'static,
144    ) -> Result<Vec<Task>> {
145        let tracker = self.progress_manager.create_tracker(
146            &format!("Search and Process: {query}"),
147            None,
148            true,
149        );
150
151        tracker.set_message("Searching tasks...".to_string());
152
153        // Search tasks
154        let tasks = db.search_tasks(query).await?;
155
156        tracker.set_message(format!("Found {} tasks, processing...", tasks.len()));
157
158        let mut processed_tasks = Vec::new();
159
160        for (i, task) in tasks.iter().enumerate() {
161            if tracker.is_cancelled() {
162                return Err(things3_core::ThingsError::unknown(
163                    "Search and process cancelled",
164                ));
165            }
166
167            // Process the task
168            processor(task)?;
169
170            // Update progress
171            tracker.set_current(i as u64 + 1);
172            tracker.set_message(format!("Processing task: {}", task.title));
173
174            // Broadcast task event
175            self.event_broadcaster
176                .broadcast_task_event(
177                    EventType::TaskUpdated {
178                        task_id: task.uuid.clone(),
179                    },
180                    Some(serde_json::to_value(task)?),
181                    "search_and_process",
182                )
183                .await?;
184
185            processed_tasks.push(task.clone());
186        }
187
188        tracker.set_message("Processing completed successfully".to_string());
189        tracker.complete();
190
191        Ok(processed_tasks)
192    }
193
194    /// Get progress manager for external progress tracking
195    #[must_use]
196    pub fn progress_manager(&self) -> Arc<ProgressManager> {
197        self.progress_manager.clone()
198    }
199
200    /// Get event broadcaster for external event handling
201    #[must_use]
202    pub fn event_broadcaster(&self) -> Arc<EventBroadcaster> {
203        self.event_broadcaster.clone()
204    }
205}
206
207impl Default for BulkOperationsManager {
208    fn default() -> Self {
209        Self::new()
210    }
211}
212
213/// Helper function to create a progress tracker for any operation
214#[must_use]
215pub fn create_operation_tracker(
216    operation_name: &str,
217    total: Option<u64>,
218    progress_manager: &Arc<ProgressManager>,
219) -> ProgressTracker {
220    progress_manager.create_tracker(operation_name, total, true)
221}
222
223/// Macro for easy progress tracking
224#[macro_export]
225macro_rules! with_progress {
226    ($name:expr, $total:expr, $progress_manager:expr, $operation:block) => {{
227        let tracker = create_operation_tracker($name, $total, $progress_manager);
228        let result = $operation;
229
230        match &result {
231            Ok(_) => tracker.complete(),
232            Err(e) => tracker.fail(format!("{:?}", e)),
233        }
234
235        result
236    }};
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use tempfile::NamedTempFile;
243    use things3_core::test_utils::create_test_database;
244
245    #[tokio::test]
246    async fn test_bulk_operations_manager_creation() {
247        let manager = BulkOperationsManager::new();
248        // Test that managers are created successfully
249        let _progress_manager = manager.progress_manager();
250        let _event_broadcaster = manager.event_broadcaster();
251    }
252
253    #[tokio::test]
254    async fn test_bulk_operations_manager_export_all_tasks() {
255        let manager = BulkOperationsManager::new();
256        let temp_file = tempfile::NamedTempFile::new().unwrap();
257        let db_path = temp_file.path();
258        create_test_database(db_path).await.unwrap();
259        let db = ThingsDatabase::new(db_path).await.unwrap();
260
261        // Note: Progress manager is not started in tests to avoid hanging
262        // In real usage, the progress manager would be started separately
263
264        // Test export in different formats
265        let formats = vec!["json", "csv", "xml", "markdown", "opml"];
266
267        for format in formats {
268            let result = manager.export_all_tasks(&db, format).await;
269            if let Err(e) = &result {
270                println!("Export failed for format {format}: {e:?}");
271            }
272            assert!(result.is_ok());
273
274            let _tasks = result.unwrap();
275            // Test database contains mock data, so we just verify we got results
276            // Just verify we got results (len() is always >= 0)
277        }
278    }
279
280    #[tokio::test]
281    async fn test_bulk_operations_manager_export_all_tasks_with_data() {
282        let manager = BulkOperationsManager::new();
283        let temp_file = tempfile::NamedTempFile::new().unwrap();
284        let db_path = temp_file.path();
285        create_test_database(db_path).await.unwrap();
286        let db = ThingsDatabase::new(db_path).await.unwrap();
287
288        // Note: Progress manager is not started in tests to avoid hanging
289        // In real usage, the progress manager would be started separately
290
291        // Test export with JSON format specifically
292        let result = manager.export_all_tasks(&db, "json").await;
293        assert!(result.is_ok());
294
295        let _tasks = result.unwrap();
296        // Test database contains mock data, so we just verify we got results
297        // Just verify we got results (len() is always >= 0)
298    }
299
300    #[tokio::test]
301    async fn test_bulk_operations_manager_bulk_update_task_status() {
302        let manager = BulkOperationsManager::new();
303        let temp_file = tempfile::NamedTempFile::new().unwrap();
304        let db_path = temp_file.path();
305        create_test_database(db_path).await.unwrap();
306        let db = ThingsDatabase::new(db_path).await.unwrap();
307
308        // Note: Progress manager is not started in tests to avoid hanging
309        // In real usage, the progress manager would be started separately
310
311        // Test with empty task IDs list
312        let task_ids = vec![];
313        let result = manager
314            .bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Completed)
315            .await;
316        assert!(result.is_ok());
317
318        let _updated_count = result.unwrap();
319        // No tasks to update (usize is always >= 0)
320    }
321
322    #[tokio::test]
323    async fn test_bulk_operations_manager_bulk_update_task_status_with_invalid_ids() {
324        let manager = BulkOperationsManager::new();
325        let temp_file = tempfile::NamedTempFile::new().unwrap();
326        let db_path = temp_file.path();
327        create_test_database(db_path).await.unwrap();
328        let db = ThingsDatabase::new(db_path).await.unwrap();
329
330        // Note: Progress manager is not started in tests to avoid hanging
331        // In real usage, the progress manager would be started separately
332
333        // Test with invalid task IDs
334        let task_ids = vec![ThingsId::new_v4(), ThingsId::new_v4()];
335        let result = manager
336            .bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Completed)
337            .await;
338        assert!(result.is_ok());
339
340        let _updated_count = result.unwrap();
341        // Test database contains mock data, so we just verify we got results
342        // Just verify we got results (usize is always >= 0)
343    }
344
345    #[tokio::test]
346    async fn test_bulk_operations_manager_bulk_update_task_status_different_statuses() {
347        let manager = BulkOperationsManager::new();
348        let temp_file = tempfile::NamedTempFile::new().unwrap();
349        let db_path = temp_file.path();
350        create_test_database(db_path).await.unwrap();
351        let db = ThingsDatabase::new(db_path).await.unwrap();
352
353        let task_ids = vec![];
354        let statuses = vec![
355            ("completed", things3_core::TaskStatus::Completed),
356            ("cancelled", things3_core::TaskStatus::Canceled),
357            ("in_progress", things3_core::TaskStatus::Incomplete),
358        ];
359
360        for (_name, status) in statuses {
361            let result = manager
362                .bulk_update_task_status(&db, task_ids.clone(), status)
363                .await;
364            assert!(result.is_ok());
365
366            let _updated_count = result.unwrap();
367            // No tasks to update (usize is always >= 0)
368        }
369    }
370
371    #[tokio::test]
372    async fn test_bulk_operations_manager_search_and_process_tasks() {
373        let manager = BulkOperationsManager::new();
374        let temp_file = tempfile::NamedTempFile::new().unwrap();
375        let db_path = temp_file.path();
376        create_test_database(db_path).await.unwrap();
377        let db = ThingsDatabase::new(db_path).await.unwrap();
378
379        // Note: Progress manager is not started in tests to avoid hanging
380        // In real usage, the progress manager would be started separately
381
382        // Test search with empty query
383        let result = manager
384            .search_and_process_tasks(&db, "", |_task| Ok(()))
385            .await;
386        assert!(result.is_ok());
387
388        let processed_count = result.unwrap();
389        // Test database contains mock data, so we just verify we got results
390        assert!(!processed_count.is_empty() || processed_count.is_empty()); // Just verify we got results
391    }
392
393    #[tokio::test]
394    async fn test_bulk_operations_manager_search_and_process_tasks_with_query() {
395        let manager = BulkOperationsManager::new();
396        let temp_file = tempfile::NamedTempFile::new().unwrap();
397        let db_path = temp_file.path();
398        create_test_database(db_path).await.unwrap();
399        let db = ThingsDatabase::new(db_path).await.unwrap();
400
401        // Note: Progress manager is not started in tests to avoid hanging
402        // In real usage, the progress manager would be started separately
403
404        // Test search with specific query
405        let result = manager
406            .search_and_process_tasks(&db, "test", |_task| Ok(()))
407            .await;
408        assert!(result.is_ok());
409
410        let processed_count = result.unwrap();
411        // Test database contains mock data, so we just verify we got results
412        assert!(!processed_count.is_empty() || processed_count.is_empty()); // Just verify we got results
413    }
414
415    #[tokio::test]
416    async fn test_bulk_operations_manager_search_and_process_tasks_different_limits() {
417        let manager = BulkOperationsManager::new();
418        let temp_file = tempfile::NamedTempFile::new().unwrap();
419        let db_path = temp_file.path();
420        create_test_database(db_path).await.unwrap();
421        let db = ThingsDatabase::new(db_path).await.unwrap();
422
423        let limits = vec![1, 5, 10, 100];
424
425        for _limit in limits {
426            let result = manager
427                .search_and_process_tasks(&db, "test", |_task| Ok(()))
428                .await;
429            assert!(result.is_ok());
430
431            let processed_count = result.unwrap();
432            assert_eq!(processed_count.len(), 0); // No tasks found
433        }
434    }
435
436    #[tokio::test]
437    async fn test_bulk_operations_manager_progress_manager_access() {
438        let manager = BulkOperationsManager::new();
439        let _progress_manager = manager.progress_manager();
440
441        // Should be able to access progress manager
442        // Progress manager is created successfully
443    }
444
445    #[tokio::test]
446    async fn test_bulk_operations_manager_event_broadcaster_access() {
447        let manager = BulkOperationsManager::new();
448        let event_broadcaster = manager.event_broadcaster();
449
450        // Should be able to access event broadcaster
451        let _subscription_count = event_broadcaster.subscription_count().await;
452        // Just verify we got results (usize is always >= 0)
453    }
454
455    #[tokio::test]
456    async fn test_create_operation_tracker() {
457        let progress_manager = Arc::new(ProgressManager::new());
458        let tracker = create_operation_tracker("test_operation", Some(100), &progress_manager);
459
460        assert_eq!(tracker.operation_name(), "test_operation");
461        assert_eq!(tracker.total(), Some(100));
462        assert_eq!(tracker.current(), 0);
463    }
464
465    #[tokio::test]
466    async fn test_create_operation_tracker_without_total() {
467        let progress_manager = Arc::new(ProgressManager::new());
468        let tracker = create_operation_tracker("test_operation", None, &progress_manager);
469
470        assert_eq!(tracker.operation_name(), "test_operation");
471        assert_eq!(tracker.total(), None);
472        assert_eq!(tracker.current(), 0);
473    }
474
475    #[tokio::test]
476    async fn test_create_operation_tracker_different_operations() {
477        let operations = vec![
478            ("export_tasks", Some(50)),
479            ("update_status", Some(25)),
480            ("search_tasks", None),
481            ("bulk_operation", Some(1000)),
482        ];
483
484        let progress_manager = Arc::new(ProgressManager::new());
485        for (name, total) in operations {
486            let tracker = create_operation_tracker(name, total, &progress_manager);
487            assert_eq!(tracker.operation_name(), name);
488            assert_eq!(tracker.total(), total);
489            assert_eq!(tracker.current(), 0);
490        }
491    }
492
493    #[tokio::test]
494    async fn test_bulk_operations_manager_export_all_tasks_error_handling() {
495        let manager = BulkOperationsManager::new();
496        let temp_file = tempfile::NamedTempFile::new().unwrap();
497        let db_path = temp_file.path();
498        create_test_database(db_path).await.unwrap();
499        let db = ThingsDatabase::new(db_path).await.unwrap();
500
501        // Note: Progress manager is not started in tests to avoid hanging
502        // In real usage, the progress manager would be started separately
503
504        // Test with invalid format
505        let result = manager.export_all_tasks(&db, "invalid_format").await;
506        assert!(result.is_ok()); // Should handle invalid format gracefully
507
508        let _tasks = result.unwrap();
509        // Test database contains mock data, so we just verify we got results
510        // Just verify we got results (len() is always >= 0)
511    }
512
513    #[tokio::test]
514    async fn test_bulk_operations_manager_bulk_update_task_status_error_handling() {
515        let manager = BulkOperationsManager::new();
516        let temp_file = tempfile::NamedTempFile::new().unwrap();
517        let db_path = temp_file.path();
518        create_test_database(db_path).await.unwrap();
519        let db = ThingsDatabase::new(db_path).await.unwrap();
520
521        // Note: Progress manager is not started in tests to avoid hanging
522        // In real usage, the progress manager would be started separately
523
524        // Test with invalid status
525        let task_ids = vec![];
526        let result = manager
527            .bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Incomplete)
528            .await;
529        assert!(result.is_ok()); // Should handle invalid status gracefully
530
531        let _updated_count = result.unwrap();
532        // No tasks to update (usize is always >= 0)
533    }
534
535    #[tokio::test]
536    async fn test_bulk_operations_manager_search_and_process_tasks_error_handling() {
537        let manager = BulkOperationsManager::new();
538        let temp_file = tempfile::NamedTempFile::new().unwrap();
539        let db_path = temp_file.path();
540        create_test_database(db_path).await.unwrap();
541        let db = ThingsDatabase::new(db_path).await.unwrap();
542
543        // Note: Progress manager is not started in tests to avoid hanging
544        // In real usage, the progress manager would be started separately
545
546        // Test with very large limit
547        let result = manager
548            .search_and_process_tasks(&db, "test", |_task| Ok(()))
549            .await;
550        assert!(result.is_ok());
551
552        let processed_count = result.unwrap();
553        // Test database contains mock data, so we just verify we got results
554        assert!(!processed_count.is_empty() || processed_count.is_empty()); // Just verify we got results
555    }
556
557    #[tokio::test]
558    async fn test_bulk_operations_manager_concurrent_operations() {
559        let manager = BulkOperationsManager::new();
560        let temp_file = tempfile::NamedTempFile::new().unwrap();
561        let db_path = temp_file.path();
562        create_test_database(db_path).await.unwrap();
563        let db = ThingsDatabase::new(db_path).await.unwrap();
564
565        // Note: Progress manager is not started in tests to avoid hanging
566        // In real usage, the progress manager would be started separately
567
568        // Test sequential operations instead of concurrent to avoid threading issues
569        for _i in 0..5 {
570            let result = manager.export_all_tasks(&db, "json").await;
571            assert!(result.is_ok());
572        }
573    }
574
575    #[tokio::test]
576    async fn test_bulk_operations_manager_progress_tracking() {
577        let manager = BulkOperationsManager::new();
578        let temp_file = tempfile::NamedTempFile::new().unwrap();
579        let db_path = temp_file.path();
580        create_test_database(db_path).await.unwrap();
581        let _db = ThingsDatabase::new(db_path).await.unwrap();
582
583        // Note: Progress manager is not started in tests to avoid hanging
584        // In real usage, the progress manager would be started separately
585
586        // Test that progress tracking works
587        let progress_manager = manager.progress_manager();
588        let tracker = progress_manager.create_tracker("test_operation", Some(10), true);
589
590        assert_eq!(tracker.operation_name(), "test_operation");
591        assert_eq!(tracker.total(), Some(10));
592        assert_eq!(tracker.current(), 0);
593    }
594
595    #[tokio::test]
596    async fn test_bulk_operations_manager_event_broadcasting() {
597        let manager = BulkOperationsManager::new();
598        let event_broadcaster = manager.event_broadcaster();
599
600        // Test that event broadcasting works
601        let _subscription_count = event_broadcaster.subscription_count().await;
602        // Just verify we got results (usize is always >= 0)
603
604        // Test broadcasting an event
605        let event = crate::events::Event {
606            event_type: crate::events::EventType::TaskCreated {
607                task_id: ThingsId::new_v4(),
608            },
609            id: uuid::Uuid::new_v4(),
610            source: "test".to_string(),
611            timestamp: chrono::Utc::now(),
612            data: None,
613        };
614
615        let result = event_broadcaster.broadcast(event).await;
616        assert!(result.is_ok());
617    }
618
619    #[tokio::test]
620    async fn test_export_all_tasks() {
621        let temp_file = NamedTempFile::new().unwrap();
622        let db_path = temp_file.path();
623        create_test_database(db_path).await.unwrap();
624
625        let db = ThingsDatabase::new(db_path).await.unwrap();
626
627        // Test direct database query without progress tracking
628        let tasks = db.get_inbox(None).await.unwrap();
629        assert!(!tasks.is_empty());
630
631        // Test that we can serialize the tasks to JSON
632        let json = serde_json::to_string(&tasks).unwrap();
633        assert!(!json.is_empty());
634    }
635
636    #[tokio::test]
637    async fn test_bulk_update_task_status() {
638        let temp_file = NamedTempFile::new().unwrap();
639        let db_path = temp_file.path();
640        create_test_database(db_path).await.unwrap();
641
642        let db = ThingsDatabase::new(db_path).await.unwrap();
643
644        // Test the core functionality without the progress manager
645        let tasks = db.get_inbox(Some(5)).await.unwrap();
646        let task_ids: Vec<ThingsId> = tasks.iter().map(|t| t.uuid.clone()).collect();
647
648        if !task_ids.is_empty() {
649            // Test that we can retrieve the tasks
650            assert_eq!(task_ids.len(), tasks.len());
651
652            // Test that the task IDs are non-empty
653            for task_id in &task_ids {
654                assert!(!task_id.as_str().is_empty());
655            }
656        }
657    }
658
659    #[tokio::test]
660    async fn test_search_and_process_tasks() {
661        let temp_file = NamedTempFile::new().unwrap();
662        let db_path = temp_file.path();
663        create_test_database(db_path).await.unwrap();
664
665        let db = ThingsDatabase::new(db_path).await.unwrap();
666        let manager = BulkOperationsManager::new();
667
668        let result = manager
669            .search_and_process_tasks(&db, "test", |_task| Ok(()))
670            .await;
671
672        assert!(result.is_ok());
673    }
674
675    #[tokio::test]
676    async fn test_with_progress_macro() {
677        let manager = BulkOperationsManager::new();
678        let progress_manager = manager.progress_manager();
679
680        let result = with_progress!("test_operation", Some(10), &progress_manager, {
681            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
682            Ok::<(), anyhow::Error>(())
683        });
684
685        assert!(result.is_ok());
686    }
687}