1use crate::events::{EventBroadcaster, EventType};
4use crate::progress::{ProgressManager, ProgressTracker};
5use std::sync::Arc;
6use things3_core::models::ThingsId;
7use things3_core::Result;
8use things3_core::{Task, ThingsDatabase};
9
10pub struct BulkOperationsManager {
12 progress_manager: Arc<ProgressManager>,
13 event_broadcaster: Arc<EventBroadcaster>,
14}
15
16impl BulkOperationsManager {
17 #[must_use]
19 pub fn new() -> Self {
20 Self {
21 progress_manager: Arc::new(ProgressManager::new()),
22 event_broadcaster: Arc::new(EventBroadcaster::new()),
23 }
24 }
25
26 pub async fn export_all_tasks(&self, db: &ThingsDatabase, format: &str) -> Result<Vec<Task>> {
31 let tracker = self.progress_manager.create_tracker(
32 "Export All Tasks",
33 None, true,
35 );
36
37 tracker.set_message("Fetching tasks from database...".to_string());
38
39 let tasks = db.search_tasks("").await?;
41
42 tracker.set_message(format!(
43 "Found {} tasks, exporting to {}...",
44 tasks.len(),
45 format
46 ));
47
48 for (i, task) in tasks.iter().enumerate() {
50 if tracker.is_cancelled() {
51 return Err(things3_core::ThingsError::unknown("Export cancelled"));
52 }
53
54 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
56
57 tracker.set_current(i as u64 + 1);
59 tracker.set_message(format!("Processing task: {}", task.title));
60
61 self.event_broadcaster
63 .broadcast_task_event(
64 EventType::TaskUpdated {
65 task_id: task.uuid.clone(),
66 },
67 Some(serde_json::to_value(task)?),
68 "bulk_export",
69 )
70 .await?;
71 }
72
73 tracker.set_message("Export completed successfully".to_string());
74 tracker.complete();
75
76 Ok(tasks)
77 }
78
79 pub async fn bulk_update_task_status(
84 &self,
85 _db: &ThingsDatabase,
86 task_ids: Vec<ThingsId>,
87 new_status: things3_core::TaskStatus,
88 ) -> Result<usize> {
89 let tracker = self.progress_manager.create_tracker(
90 "Bulk Update Task Status",
91 Some(task_ids.len() as u64),
92 true,
93 );
94
95 tracker.set_message(format!(
96 "Updating {} tasks to {:?}...",
97 task_ids.len(),
98 new_status
99 ));
100
101 let mut updated_count = 0;
102
103 for (i, task_id) in task_ids.iter().enumerate() {
104 if tracker.is_cancelled() {
105 return Err(things3_core::ThingsError::unknown("Bulk update cancelled"));
106 }
107
108 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
110
111 tracker.inc(1);
113 tracker.set_message(format!("Updated task {} of {}", i + 1, task_ids.len()));
114
115 self.event_broadcaster
117 .broadcast_task_event(
118 EventType::TaskUpdated {
119 task_id: task_id.clone(),
120 },
121 Some(serde_json::json!({ "status": format!("{:?}", new_status) })),
122 "bulk_update",
123 )
124 .await?;
125
126 updated_count += 1;
127 }
128
129 tracker.set_message("Bulk update completed successfully".to_string());
130 tracker.complete();
131
132 Ok(updated_count)
133 }
134
135 pub async fn search_and_process_tasks(
140 &self,
141 db: &ThingsDatabase,
142 query: &str,
143 processor: impl Fn(&Task) -> Result<()> + Send + Sync + 'static,
144 ) -> Result<Vec<Task>> {
145 let tracker = self.progress_manager.create_tracker(
146 &format!("Search and Process: {query}"),
147 None,
148 true,
149 );
150
151 tracker.set_message("Searching tasks...".to_string());
152
153 let tasks = db.search_tasks(query).await?;
155
156 tracker.set_message(format!("Found {} tasks, processing...", tasks.len()));
157
158 let mut processed_tasks = Vec::new();
159
160 for (i, task) in tasks.iter().enumerate() {
161 if tracker.is_cancelled() {
162 return Err(things3_core::ThingsError::unknown(
163 "Search and process cancelled",
164 ));
165 }
166
167 processor(task)?;
169
170 tracker.set_current(i as u64 + 1);
172 tracker.set_message(format!("Processing task: {}", task.title));
173
174 self.event_broadcaster
176 .broadcast_task_event(
177 EventType::TaskUpdated {
178 task_id: task.uuid.clone(),
179 },
180 Some(serde_json::to_value(task)?),
181 "search_and_process",
182 )
183 .await?;
184
185 processed_tasks.push(task.clone());
186 }
187
188 tracker.set_message("Processing completed successfully".to_string());
189 tracker.complete();
190
191 Ok(processed_tasks)
192 }
193
194 #[must_use]
196 pub fn progress_manager(&self) -> Arc<ProgressManager> {
197 self.progress_manager.clone()
198 }
199
200 #[must_use]
202 pub fn event_broadcaster(&self) -> Arc<EventBroadcaster> {
203 self.event_broadcaster.clone()
204 }
205}
206
207impl Default for BulkOperationsManager {
208 fn default() -> Self {
209 Self::new()
210 }
211}
212
213#[must_use]
215pub fn create_operation_tracker(
216 operation_name: &str,
217 total: Option<u64>,
218 progress_manager: &Arc<ProgressManager>,
219) -> ProgressTracker {
220 progress_manager.create_tracker(operation_name, total, true)
221}
222
223#[macro_export]
225macro_rules! with_progress {
226 ($name:expr, $total:expr, $progress_manager:expr, $operation:block) => {{
227 let tracker = create_operation_tracker($name, $total, $progress_manager);
228 let result = $operation;
229
230 match &result {
231 Ok(_) => tracker.complete(),
232 Err(e) => tracker.fail(format!("{:?}", e)),
233 }
234
235 result
236 }};
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242 use tempfile::NamedTempFile;
243 use things3_core::test_utils::create_test_database;
244
245 #[tokio::test]
246 async fn test_bulk_operations_manager_creation() {
247 let manager = BulkOperationsManager::new();
248 let _progress_manager = manager.progress_manager();
250 let _event_broadcaster = manager.event_broadcaster();
251 }
252
253 #[tokio::test]
254 async fn test_bulk_operations_manager_export_all_tasks() {
255 let manager = BulkOperationsManager::new();
256 let temp_file = tempfile::NamedTempFile::new().unwrap();
257 let db_path = temp_file.path();
258 create_test_database(db_path).await.unwrap();
259 let db = ThingsDatabase::new(db_path).await.unwrap();
260
261 let formats = vec!["json", "csv", "xml", "markdown", "opml"];
266
267 for format in formats {
268 let result = manager.export_all_tasks(&db, format).await;
269 if let Err(e) = &result {
270 println!("Export failed for format {format}: {e:?}");
271 }
272 assert!(result.is_ok());
273
274 let _tasks = result.unwrap();
275 }
278 }
279
280 #[tokio::test]
281 async fn test_bulk_operations_manager_export_all_tasks_with_data() {
282 let manager = BulkOperationsManager::new();
283 let temp_file = tempfile::NamedTempFile::new().unwrap();
284 let db_path = temp_file.path();
285 create_test_database(db_path).await.unwrap();
286 let db = ThingsDatabase::new(db_path).await.unwrap();
287
288 let result = manager.export_all_tasks(&db, "json").await;
293 assert!(result.is_ok());
294
295 let _tasks = result.unwrap();
296 }
299
300 #[tokio::test]
301 async fn test_bulk_operations_manager_bulk_update_task_status() {
302 let manager = BulkOperationsManager::new();
303 let temp_file = tempfile::NamedTempFile::new().unwrap();
304 let db_path = temp_file.path();
305 create_test_database(db_path).await.unwrap();
306 let db = ThingsDatabase::new(db_path).await.unwrap();
307
308 let task_ids = vec![];
313 let result = manager
314 .bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Completed)
315 .await;
316 assert!(result.is_ok());
317
318 let _updated_count = result.unwrap();
319 }
321
322 #[tokio::test]
323 async fn test_bulk_operations_manager_bulk_update_task_status_with_invalid_ids() {
324 let manager = BulkOperationsManager::new();
325 let temp_file = tempfile::NamedTempFile::new().unwrap();
326 let db_path = temp_file.path();
327 create_test_database(db_path).await.unwrap();
328 let db = ThingsDatabase::new(db_path).await.unwrap();
329
330 let task_ids = vec![ThingsId::new_v4(), ThingsId::new_v4()];
335 let result = manager
336 .bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Completed)
337 .await;
338 assert!(result.is_ok());
339
340 let _updated_count = result.unwrap();
341 }
344
345 #[tokio::test]
346 async fn test_bulk_operations_manager_bulk_update_task_status_different_statuses() {
347 let manager = BulkOperationsManager::new();
348 let temp_file = tempfile::NamedTempFile::new().unwrap();
349 let db_path = temp_file.path();
350 create_test_database(db_path).await.unwrap();
351 let db = ThingsDatabase::new(db_path).await.unwrap();
352
353 let task_ids = vec![];
354 let statuses = vec![
355 ("completed", things3_core::TaskStatus::Completed),
356 ("cancelled", things3_core::TaskStatus::Canceled),
357 ("in_progress", things3_core::TaskStatus::Incomplete),
358 ];
359
360 for (_name, status) in statuses {
361 let result = manager
362 .bulk_update_task_status(&db, task_ids.clone(), status)
363 .await;
364 assert!(result.is_ok());
365
366 let _updated_count = result.unwrap();
367 }
369 }
370
371 #[tokio::test]
372 async fn test_bulk_operations_manager_search_and_process_tasks() {
373 let manager = BulkOperationsManager::new();
374 let temp_file = tempfile::NamedTempFile::new().unwrap();
375 let db_path = temp_file.path();
376 create_test_database(db_path).await.unwrap();
377 let db = ThingsDatabase::new(db_path).await.unwrap();
378
379 let result = manager
384 .search_and_process_tasks(&db, "", |_task| Ok(()))
385 .await;
386 assert!(result.is_ok());
387
388 let processed_count = result.unwrap();
389 assert!(!processed_count.is_empty() || processed_count.is_empty()); }
392
393 #[tokio::test]
394 async fn test_bulk_operations_manager_search_and_process_tasks_with_query() {
395 let manager = BulkOperationsManager::new();
396 let temp_file = tempfile::NamedTempFile::new().unwrap();
397 let db_path = temp_file.path();
398 create_test_database(db_path).await.unwrap();
399 let db = ThingsDatabase::new(db_path).await.unwrap();
400
401 let result = manager
406 .search_and_process_tasks(&db, "test", |_task| Ok(()))
407 .await;
408 assert!(result.is_ok());
409
410 let processed_count = result.unwrap();
411 assert!(!processed_count.is_empty() || processed_count.is_empty()); }
414
415 #[tokio::test]
416 async fn test_bulk_operations_manager_search_and_process_tasks_different_limits() {
417 let manager = BulkOperationsManager::new();
418 let temp_file = tempfile::NamedTempFile::new().unwrap();
419 let db_path = temp_file.path();
420 create_test_database(db_path).await.unwrap();
421 let db = ThingsDatabase::new(db_path).await.unwrap();
422
423 let limits = vec![1, 5, 10, 100];
424
425 for _limit in limits {
426 let result = manager
427 .search_and_process_tasks(&db, "test", |_task| Ok(()))
428 .await;
429 assert!(result.is_ok());
430
431 let processed_count = result.unwrap();
432 assert_eq!(processed_count.len(), 0); }
434 }
435
436 #[tokio::test]
437 async fn test_bulk_operations_manager_progress_manager_access() {
438 let manager = BulkOperationsManager::new();
439 let _progress_manager = manager.progress_manager();
440
441 }
444
445 #[tokio::test]
446 async fn test_bulk_operations_manager_event_broadcaster_access() {
447 let manager = BulkOperationsManager::new();
448 let event_broadcaster = manager.event_broadcaster();
449
450 let _subscription_count = event_broadcaster.subscription_count().await;
452 }
454
455 #[tokio::test]
456 async fn test_create_operation_tracker() {
457 let progress_manager = Arc::new(ProgressManager::new());
458 let tracker = create_operation_tracker("test_operation", Some(100), &progress_manager);
459
460 assert_eq!(tracker.operation_name(), "test_operation");
461 assert_eq!(tracker.total(), Some(100));
462 assert_eq!(tracker.current(), 0);
463 }
464
465 #[tokio::test]
466 async fn test_create_operation_tracker_without_total() {
467 let progress_manager = Arc::new(ProgressManager::new());
468 let tracker = create_operation_tracker("test_operation", None, &progress_manager);
469
470 assert_eq!(tracker.operation_name(), "test_operation");
471 assert_eq!(tracker.total(), None);
472 assert_eq!(tracker.current(), 0);
473 }
474
475 #[tokio::test]
476 async fn test_create_operation_tracker_different_operations() {
477 let operations = vec![
478 ("export_tasks", Some(50)),
479 ("update_status", Some(25)),
480 ("search_tasks", None),
481 ("bulk_operation", Some(1000)),
482 ];
483
484 let progress_manager = Arc::new(ProgressManager::new());
485 for (name, total) in operations {
486 let tracker = create_operation_tracker(name, total, &progress_manager);
487 assert_eq!(tracker.operation_name(), name);
488 assert_eq!(tracker.total(), total);
489 assert_eq!(tracker.current(), 0);
490 }
491 }
492
493 #[tokio::test]
494 async fn test_bulk_operations_manager_export_all_tasks_error_handling() {
495 let manager = BulkOperationsManager::new();
496 let temp_file = tempfile::NamedTempFile::new().unwrap();
497 let db_path = temp_file.path();
498 create_test_database(db_path).await.unwrap();
499 let db = ThingsDatabase::new(db_path).await.unwrap();
500
501 let result = manager.export_all_tasks(&db, "invalid_format").await;
506 assert!(result.is_ok()); let _tasks = result.unwrap();
509 }
512
513 #[tokio::test]
514 async fn test_bulk_operations_manager_bulk_update_task_status_error_handling() {
515 let manager = BulkOperationsManager::new();
516 let temp_file = tempfile::NamedTempFile::new().unwrap();
517 let db_path = temp_file.path();
518 create_test_database(db_path).await.unwrap();
519 let db = ThingsDatabase::new(db_path).await.unwrap();
520
521 let task_ids = vec![];
526 let result = manager
527 .bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Incomplete)
528 .await;
529 assert!(result.is_ok()); let _updated_count = result.unwrap();
532 }
534
535 #[tokio::test]
536 async fn test_bulk_operations_manager_search_and_process_tasks_error_handling() {
537 let manager = BulkOperationsManager::new();
538 let temp_file = tempfile::NamedTempFile::new().unwrap();
539 let db_path = temp_file.path();
540 create_test_database(db_path).await.unwrap();
541 let db = ThingsDatabase::new(db_path).await.unwrap();
542
543 let result = manager
548 .search_and_process_tasks(&db, "test", |_task| Ok(()))
549 .await;
550 assert!(result.is_ok());
551
552 let processed_count = result.unwrap();
553 assert!(!processed_count.is_empty() || processed_count.is_empty()); }
556
557 #[tokio::test]
558 async fn test_bulk_operations_manager_concurrent_operations() {
559 let manager = BulkOperationsManager::new();
560 let temp_file = tempfile::NamedTempFile::new().unwrap();
561 let db_path = temp_file.path();
562 create_test_database(db_path).await.unwrap();
563 let db = ThingsDatabase::new(db_path).await.unwrap();
564
565 for _i in 0..5 {
570 let result = manager.export_all_tasks(&db, "json").await;
571 assert!(result.is_ok());
572 }
573 }
574
575 #[tokio::test]
576 async fn test_bulk_operations_manager_progress_tracking() {
577 let manager = BulkOperationsManager::new();
578 let temp_file = tempfile::NamedTempFile::new().unwrap();
579 let db_path = temp_file.path();
580 create_test_database(db_path).await.unwrap();
581 let _db = ThingsDatabase::new(db_path).await.unwrap();
582
583 let progress_manager = manager.progress_manager();
588 let tracker = progress_manager.create_tracker("test_operation", Some(10), true);
589
590 assert_eq!(tracker.operation_name(), "test_operation");
591 assert_eq!(tracker.total(), Some(10));
592 assert_eq!(tracker.current(), 0);
593 }
594
595 #[tokio::test]
596 async fn test_bulk_operations_manager_event_broadcasting() {
597 let manager = BulkOperationsManager::new();
598 let event_broadcaster = manager.event_broadcaster();
599
600 let _subscription_count = event_broadcaster.subscription_count().await;
602 let event = crate::events::Event {
606 event_type: crate::events::EventType::TaskCreated {
607 task_id: ThingsId::new_v4(),
608 },
609 id: uuid::Uuid::new_v4(),
610 source: "test".to_string(),
611 timestamp: chrono::Utc::now(),
612 data: None,
613 };
614
615 let result = event_broadcaster.broadcast(event).await;
616 assert!(result.is_ok());
617 }
618
619 #[tokio::test]
620 async fn test_export_all_tasks() {
621 let temp_file = NamedTempFile::new().unwrap();
622 let db_path = temp_file.path();
623 create_test_database(db_path).await.unwrap();
624
625 let db = ThingsDatabase::new(db_path).await.unwrap();
626
627 let tasks = db.get_inbox(None).await.unwrap();
629 assert!(!tasks.is_empty());
630
631 let json = serde_json::to_string(&tasks).unwrap();
633 assert!(!json.is_empty());
634 }
635
636 #[tokio::test]
637 async fn test_bulk_update_task_status() {
638 let temp_file = NamedTempFile::new().unwrap();
639 let db_path = temp_file.path();
640 create_test_database(db_path).await.unwrap();
641
642 let db = ThingsDatabase::new(db_path).await.unwrap();
643
644 let tasks = db.get_inbox(Some(5)).await.unwrap();
646 let task_ids: Vec<ThingsId> = tasks.iter().map(|t| t.uuid.clone()).collect();
647
648 if !task_ids.is_empty() {
649 assert_eq!(task_ids.len(), tasks.len());
651
652 for task_id in &task_ids {
654 assert!(!task_id.as_str().is_empty());
655 }
656 }
657 }
658
659 #[tokio::test]
660 async fn test_search_and_process_tasks() {
661 let temp_file = NamedTempFile::new().unwrap();
662 let db_path = temp_file.path();
663 create_test_database(db_path).await.unwrap();
664
665 let db = ThingsDatabase::new(db_path).await.unwrap();
666 let manager = BulkOperationsManager::new();
667
668 let result = manager
669 .search_and_process_tasks(&db, "test", |_task| Ok(()))
670 .await;
671
672 assert!(result.is_ok());
673 }
674
675 #[tokio::test]
676 async fn test_with_progress_macro() {
677 let manager = BulkOperationsManager::new();
678 let progress_manager = manager.progress_manager();
679
680 let result = with_progress!("test_operation", Some(10), &progress_manager, {
681 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
682 Ok::<(), anyhow::Error>(())
683 });
684
685 assert!(result.is_ok());
686 }
687}