1use crate::events::{EventBroadcaster, EventType};
4use crate::progress::{ProgressManager, ProgressTracker};
5use std::sync::Arc;
6use things3_core::Result;
7use things3_core::{Task, ThingsDatabase};
8
9pub struct BulkOperationsManager {
11 progress_manager: Arc<ProgressManager>,
12 event_broadcaster: Arc<EventBroadcaster>,
13}
14
15impl BulkOperationsManager {
16 #[must_use]
18 pub fn new() -> Self {
19 Self {
20 progress_manager: Arc::new(ProgressManager::new()),
21 event_broadcaster: Arc::new(EventBroadcaster::new()),
22 }
23 }
24
25 pub async fn export_all_tasks(&self, db: &ThingsDatabase, format: &str) -> Result<Vec<Task>> {
30 let tracker = self.progress_manager.create_tracker(
31 "Export All Tasks",
32 None, true,
34 );
35
36 tracker.set_message("Fetching tasks from database...".to_string());
37
38 let tasks = db.search_tasks("").await?;
40
41 tracker.set_message(format!(
42 "Found {} tasks, exporting to {}...",
43 tasks.len(),
44 format
45 ));
46
47 for (i, task) in tasks.iter().enumerate() {
49 if tracker.is_cancelled() {
50 return Err(things3_core::ThingsError::unknown("Export cancelled"));
51 }
52
53 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
55
56 tracker.set_current(i as u64 + 1);
58 tracker.set_message(format!("Processing task: {}", task.title));
59
60 self.event_broadcaster
62 .broadcast_task_event(
63 EventType::TaskUpdated { task_id: task.uuid },
64 task.uuid,
65 Some(serde_json::to_value(task)?),
66 "bulk_export",
67 )
68 .await?;
69 }
70
71 tracker.set_message("Export completed successfully".to_string());
72 tracker.complete();
73
74 Ok(tasks)
75 }
76
77 pub async fn bulk_update_task_status(
82 &self,
83 _db: &ThingsDatabase,
84 task_ids: Vec<uuid::Uuid>,
85 new_status: things3_core::TaskStatus,
86 ) -> Result<usize> {
87 let tracker = self.progress_manager.create_tracker(
88 "Bulk Update Task Status",
89 Some(task_ids.len() as u64),
90 true,
91 );
92
93 tracker.set_message(format!(
94 "Updating {} tasks to {:?}...",
95 task_ids.len(),
96 new_status
97 ));
98
99 let mut updated_count = 0;
100
101 for (i, task_id) in task_ids.iter().enumerate() {
102 if tracker.is_cancelled() {
103 return Err(things3_core::ThingsError::unknown("Bulk update cancelled"));
104 }
105
106 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
108
109 tracker.inc(1);
111 tracker.set_message(format!("Updated task {} of {}", i + 1, task_ids.len()));
112
113 self.event_broadcaster
115 .broadcast_task_event(
116 EventType::TaskUpdated { task_id: *task_id },
117 *task_id,
118 Some(serde_json::json!({ "status": format!("{:?}", new_status) })),
119 "bulk_update",
120 )
121 .await?;
122
123 updated_count += 1;
124 }
125
126 tracker.set_message("Bulk update completed successfully".to_string());
127 tracker.complete();
128
129 Ok(updated_count)
130 }
131
132 pub async fn search_and_process_tasks(
137 &self,
138 db: &ThingsDatabase,
139 query: &str,
140 processor: impl Fn(&Task) -> Result<()> + Send + Sync + 'static,
141 ) -> Result<Vec<Task>> {
142 let tracker = self.progress_manager.create_tracker(
143 &format!("Search and Process: {query}"),
144 None,
145 true,
146 );
147
148 tracker.set_message("Searching tasks...".to_string());
149
150 let tasks = db.search_tasks(query).await?;
152
153 tracker.set_message(format!("Found {} tasks, processing...", tasks.len()));
154
155 let mut processed_tasks = Vec::new();
156
157 for (i, task) in tasks.iter().enumerate() {
158 if tracker.is_cancelled() {
159 return Err(things3_core::ThingsError::unknown(
160 "Search and process cancelled",
161 ));
162 }
163
164 processor(task)?;
166
167 tracker.set_current(i as u64 + 1);
169 tracker.set_message(format!("Processing task: {}", task.title));
170
171 self.event_broadcaster
173 .broadcast_task_event(
174 EventType::TaskUpdated { task_id: task.uuid },
175 task.uuid,
176 Some(serde_json::to_value(task)?),
177 "search_and_process",
178 )
179 .await?;
180
181 processed_tasks.push(task.clone());
182 }
183
184 tracker.set_message("Processing completed successfully".to_string());
185 tracker.complete();
186
187 Ok(processed_tasks)
188 }
189
190 #[must_use]
192 pub fn progress_manager(&self) -> Arc<ProgressManager> {
193 self.progress_manager.clone()
194 }
195
196 #[must_use]
198 pub fn event_broadcaster(&self) -> Arc<EventBroadcaster> {
199 self.event_broadcaster.clone()
200 }
201}
202
203impl Default for BulkOperationsManager {
204 fn default() -> Self {
205 Self::new()
206 }
207}
208
209#[must_use]
211pub fn create_operation_tracker(
212 operation_name: &str,
213 total: Option<u64>,
214 progress_manager: &Arc<ProgressManager>,
215) -> ProgressTracker {
216 progress_manager.create_tracker(operation_name, total, true)
217}
218
219#[macro_export]
221macro_rules! with_progress {
222 ($name:expr, $total:expr, $progress_manager:expr, $operation:block) => {{
223 let tracker = create_operation_tracker($name, $total, $progress_manager);
224 let result = $operation;
225
226 match &result {
227 Ok(_) => tracker.complete(),
228 Err(e) => tracker.fail(format!("{:?}", e)),
229 }
230
231 result
232 }};
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238 use tempfile::NamedTempFile;
239 use things3_core::test_utils::create_test_database;
240
241 #[tokio::test]
242 async fn test_bulk_operations_manager_creation() {
243 let manager = BulkOperationsManager::new();
244 let _progress_manager = manager.progress_manager();
246 let _event_broadcaster = manager.event_broadcaster();
247 }
248
249 #[tokio::test]
250 async fn test_bulk_operations_manager_export_all_tasks() {
251 let manager = BulkOperationsManager::new();
252 let temp_file = tempfile::NamedTempFile::new().unwrap();
253 let db_path = temp_file.path();
254 create_test_database(db_path).await.unwrap();
255 let db = ThingsDatabase::new(db_path).await.unwrap();
256
257 let formats = vec!["json", "csv", "xml", "markdown", "opml"];
262
263 for format in formats {
264 let result = manager.export_all_tasks(&db, format).await;
265 if let Err(e) = &result {
266 println!("Export failed for format {format}: {e:?}");
267 }
268 assert!(result.is_ok());
269
270 let _tasks = result.unwrap();
271 }
274 }
275
276 #[tokio::test]
277 async fn test_bulk_operations_manager_export_all_tasks_with_data() {
278 let manager = BulkOperationsManager::new();
279 let temp_file = tempfile::NamedTempFile::new().unwrap();
280 let db_path = temp_file.path();
281 create_test_database(db_path).await.unwrap();
282 let db = ThingsDatabase::new(db_path).await.unwrap();
283
284 let result = manager.export_all_tasks(&db, "json").await;
289 assert!(result.is_ok());
290
291 let _tasks = result.unwrap();
292 }
295
296 #[tokio::test]
297 async fn test_bulk_operations_manager_bulk_update_task_status() {
298 let manager = BulkOperationsManager::new();
299 let temp_file = tempfile::NamedTempFile::new().unwrap();
300 let db_path = temp_file.path();
301 create_test_database(db_path).await.unwrap();
302 let db = ThingsDatabase::new(db_path).await.unwrap();
303
304 let task_ids = vec![];
309 let result = manager
310 .bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Completed)
311 .await;
312 assert!(result.is_ok());
313
314 let _updated_count = result.unwrap();
315 }
317
318 #[tokio::test]
319 async fn test_bulk_operations_manager_bulk_update_task_status_with_invalid_ids() {
320 let manager = BulkOperationsManager::new();
321 let temp_file = tempfile::NamedTempFile::new().unwrap();
322 let db_path = temp_file.path();
323 create_test_database(db_path).await.unwrap();
324 let db = ThingsDatabase::new(db_path).await.unwrap();
325
326 let task_ids = vec![uuid::Uuid::new_v4(), uuid::Uuid::new_v4()];
331 let result = manager
332 .bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Completed)
333 .await;
334 assert!(result.is_ok());
335
336 let _updated_count = result.unwrap();
337 }
340
341 #[tokio::test]
342 async fn test_bulk_operations_manager_bulk_update_task_status_different_statuses() {
343 let manager = BulkOperationsManager::new();
344 let temp_file = tempfile::NamedTempFile::new().unwrap();
345 let db_path = temp_file.path();
346 create_test_database(db_path).await.unwrap();
347 let db = ThingsDatabase::new(db_path).await.unwrap();
348
349 let task_ids = vec![];
350 let statuses = vec![
351 ("completed", things3_core::TaskStatus::Completed),
352 ("cancelled", things3_core::TaskStatus::Canceled),
353 ("in_progress", things3_core::TaskStatus::Incomplete),
354 ];
355
356 for (_name, status) in statuses {
357 let result = manager
358 .bulk_update_task_status(&db, task_ids.clone(), status)
359 .await;
360 assert!(result.is_ok());
361
362 let _updated_count = result.unwrap();
363 }
365 }
366
367 #[tokio::test]
368 async fn test_bulk_operations_manager_search_and_process_tasks() {
369 let manager = BulkOperationsManager::new();
370 let temp_file = tempfile::NamedTempFile::new().unwrap();
371 let db_path = temp_file.path();
372 create_test_database(db_path).await.unwrap();
373 let db = ThingsDatabase::new(db_path).await.unwrap();
374
375 let result = manager
380 .search_and_process_tasks(&db, "", |_task| Ok(()))
381 .await;
382 assert!(result.is_ok());
383
384 let processed_count = result.unwrap();
385 assert!(!processed_count.is_empty() || processed_count.is_empty()); }
388
389 #[tokio::test]
390 async fn test_bulk_operations_manager_search_and_process_tasks_with_query() {
391 let manager = BulkOperationsManager::new();
392 let temp_file = tempfile::NamedTempFile::new().unwrap();
393 let db_path = temp_file.path();
394 create_test_database(db_path).await.unwrap();
395 let db = ThingsDatabase::new(db_path).await.unwrap();
396
397 let result = manager
402 .search_and_process_tasks(&db, "test", |_task| Ok(()))
403 .await;
404 assert!(result.is_ok());
405
406 let processed_count = result.unwrap();
407 assert!(!processed_count.is_empty() || processed_count.is_empty()); }
410
411 #[tokio::test]
412 async fn test_bulk_operations_manager_search_and_process_tasks_different_limits() {
413 let manager = BulkOperationsManager::new();
414 let temp_file = tempfile::NamedTempFile::new().unwrap();
415 let db_path = temp_file.path();
416 create_test_database(db_path).await.unwrap();
417 let db = ThingsDatabase::new(db_path).await.unwrap();
418
419 let limits = vec![1, 5, 10, 100];
420
421 for _limit in limits {
422 let result = manager
423 .search_and_process_tasks(&db, "test", |_task| Ok(()))
424 .await;
425 assert!(result.is_ok());
426
427 let processed_count = result.unwrap();
428 assert_eq!(processed_count.len(), 0); }
430 }
431
432 #[tokio::test]
433 async fn test_bulk_operations_manager_progress_manager_access() {
434 let manager = BulkOperationsManager::new();
435 let _progress_manager = manager.progress_manager();
436
437 }
440
441 #[tokio::test]
442 async fn test_bulk_operations_manager_event_broadcaster_access() {
443 let manager = BulkOperationsManager::new();
444 let event_broadcaster = manager.event_broadcaster();
445
446 let _subscription_count = event_broadcaster.subscription_count().await;
448 }
450
451 #[tokio::test]
452 async fn test_create_operation_tracker() {
453 let progress_manager = Arc::new(ProgressManager::new());
454 let tracker = create_operation_tracker("test_operation", Some(100), &progress_manager);
455
456 assert_eq!(tracker.operation_name(), "test_operation");
457 assert_eq!(tracker.total(), Some(100));
458 assert_eq!(tracker.current(), 0);
459 }
460
461 #[tokio::test]
462 async fn test_create_operation_tracker_without_total() {
463 let progress_manager = Arc::new(ProgressManager::new());
464 let tracker = create_operation_tracker("test_operation", None, &progress_manager);
465
466 assert_eq!(tracker.operation_name(), "test_operation");
467 assert_eq!(tracker.total(), None);
468 assert_eq!(tracker.current(), 0);
469 }
470
471 #[tokio::test]
472 async fn test_create_operation_tracker_different_operations() {
473 let operations = vec![
474 ("export_tasks", Some(50)),
475 ("update_status", Some(25)),
476 ("search_tasks", None),
477 ("bulk_operation", Some(1000)),
478 ];
479
480 let progress_manager = Arc::new(ProgressManager::new());
481 for (name, total) in operations {
482 let tracker = create_operation_tracker(name, total, &progress_manager);
483 assert_eq!(tracker.operation_name(), name);
484 assert_eq!(tracker.total(), total);
485 assert_eq!(tracker.current(), 0);
486 }
487 }
488
489 #[tokio::test]
490 async fn test_bulk_operations_manager_export_all_tasks_error_handling() {
491 let manager = BulkOperationsManager::new();
492 let temp_file = tempfile::NamedTempFile::new().unwrap();
493 let db_path = temp_file.path();
494 create_test_database(db_path).await.unwrap();
495 let db = ThingsDatabase::new(db_path).await.unwrap();
496
497 let result = manager.export_all_tasks(&db, "invalid_format").await;
502 assert!(result.is_ok()); let _tasks = result.unwrap();
505 }
508
509 #[tokio::test]
510 async fn test_bulk_operations_manager_bulk_update_task_status_error_handling() {
511 let manager = BulkOperationsManager::new();
512 let temp_file = tempfile::NamedTempFile::new().unwrap();
513 let db_path = temp_file.path();
514 create_test_database(db_path).await.unwrap();
515 let db = ThingsDatabase::new(db_path).await.unwrap();
516
517 let task_ids = vec![];
522 let result = manager
523 .bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Incomplete)
524 .await;
525 assert!(result.is_ok()); let _updated_count = result.unwrap();
528 }
530
531 #[tokio::test]
532 async fn test_bulk_operations_manager_search_and_process_tasks_error_handling() {
533 let manager = BulkOperationsManager::new();
534 let temp_file = tempfile::NamedTempFile::new().unwrap();
535 let db_path = temp_file.path();
536 create_test_database(db_path).await.unwrap();
537 let db = ThingsDatabase::new(db_path).await.unwrap();
538
539 let result = manager
544 .search_and_process_tasks(&db, "test", |_task| Ok(()))
545 .await;
546 assert!(result.is_ok());
547
548 let processed_count = result.unwrap();
549 assert!(!processed_count.is_empty() || processed_count.is_empty()); }
552
553 #[tokio::test]
554 async fn test_bulk_operations_manager_concurrent_operations() {
555 let manager = BulkOperationsManager::new();
556 let temp_file = tempfile::NamedTempFile::new().unwrap();
557 let db_path = temp_file.path();
558 create_test_database(db_path).await.unwrap();
559 let db = ThingsDatabase::new(db_path).await.unwrap();
560
561 for _i in 0..5 {
566 let result = manager.export_all_tasks(&db, "json").await;
567 assert!(result.is_ok());
568 }
569 }
570
571 #[tokio::test]
572 async fn test_bulk_operations_manager_progress_tracking() {
573 let manager = BulkOperationsManager::new();
574 let temp_file = tempfile::NamedTempFile::new().unwrap();
575 let db_path = temp_file.path();
576 create_test_database(db_path).await.unwrap();
577 let _db = ThingsDatabase::new(db_path).await.unwrap();
578
579 let progress_manager = manager.progress_manager();
584 let tracker = progress_manager.create_tracker("test_operation", Some(10), true);
585
586 assert_eq!(tracker.operation_name(), "test_operation");
587 assert_eq!(tracker.total(), Some(10));
588 assert_eq!(tracker.current(), 0);
589 }
590
591 #[tokio::test]
592 async fn test_bulk_operations_manager_event_broadcasting() {
593 let manager = BulkOperationsManager::new();
594 let event_broadcaster = manager.event_broadcaster();
595
596 let _subscription_count = event_broadcaster.subscription_count().await;
598 let event = crate::events::Event {
602 event_type: crate::events::EventType::TaskCreated {
603 task_id: uuid::Uuid::new_v4(),
604 },
605 id: uuid::Uuid::new_v4(),
606 source: "test".to_string(),
607 timestamp: chrono::Utc::now(),
608 data: None,
609 };
610
611 let result = event_broadcaster.broadcast(event).await;
612 assert!(result.is_ok());
613 }
614
615 #[tokio::test]
616 async fn test_export_all_tasks() {
617 let temp_file = NamedTempFile::new().unwrap();
618 let db_path = temp_file.path();
619 create_test_database(db_path).await.unwrap();
620
621 let db = ThingsDatabase::new(db_path).await.unwrap();
622
623 let tasks = db.get_inbox(None).await.unwrap();
625 assert!(!tasks.is_empty());
626
627 let json = serde_json::to_string(&tasks).unwrap();
629 assert!(!json.is_empty());
630 }
631
632 #[tokio::test]
633 async fn test_bulk_update_task_status() {
634 let temp_file = NamedTempFile::new().unwrap();
635 let db_path = temp_file.path();
636 create_test_database(db_path).await.unwrap();
637
638 let db = ThingsDatabase::new(db_path).await.unwrap();
639
640 let tasks = db.get_inbox(Some(5)).await.unwrap();
642 let task_ids: Vec<uuid::Uuid> = tasks.iter().map(|t| t.uuid).collect();
643
644 if !task_ids.is_empty() {
645 assert_eq!(task_ids.len(), tasks.len());
647
648 for task_id in &task_ids {
650 assert!(!task_id.is_nil());
651 }
652 }
653 }
654
655 #[tokio::test]
656 async fn test_search_and_process_tasks() {
657 let temp_file = NamedTempFile::new().unwrap();
658 let db_path = temp_file.path();
659 create_test_database(db_path).await.unwrap();
660
661 let db = ThingsDatabase::new(db_path).await.unwrap();
662 let manager = BulkOperationsManager::new();
663
664 let result = manager
665 .search_and_process_tasks(&db, "test", |_task| Ok(()))
666 .await;
667
668 assert!(result.is_ok());
669 }
670
671 #[tokio::test]
672 async fn test_with_progress_macro() {
673 let manager = BulkOperationsManager::new();
674 let progress_manager = manager.progress_manager();
675
676 let result = with_progress!("test_operation", Some(10), &progress_manager, {
677 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
678 Ok::<(), anyhow::Error>(())
679 });
680
681 assert!(result.is_ok());
682 }
683}