1#![allow(deprecated)]
3
4use crate::events::{EventBroadcaster, EventType};
5use crate::progress::{ProgressManager, ProgressTracker};
6use std::sync::Arc;
7use things3_core::models::ThingsId;
8use things3_core::Result;
9use things3_core::{Task, ThingsDatabase};
10
11pub struct BulkOperationsManager {
13 progress_manager: Arc<ProgressManager>,
14 event_broadcaster: Arc<EventBroadcaster>,
15}
16
17impl BulkOperationsManager {
18 #[must_use]
20 pub fn new() -> Self {
21 Self {
22 progress_manager: Arc::new(ProgressManager::new()),
23 event_broadcaster: Arc::new(EventBroadcaster::new()),
24 }
25 }
26
27 pub async fn export_all_tasks(&self, db: &ThingsDatabase, format: &str) -> Result<Vec<Task>> {
32 let tracker = self.progress_manager.create_tracker(
33 "Export All Tasks",
34 None, true,
36 );
37
38 tracker.set_message("Fetching tasks from database...".to_string());
39
40 let tasks = db.search_tasks("").await?;
42
43 tracker.set_message(format!(
44 "Found {} tasks, exporting to {}...",
45 tasks.len(),
46 format
47 ));
48
49 for (i, task) in tasks.iter().enumerate() {
51 if tracker.is_cancelled() {
52 return Err(things3_core::ThingsError::unknown("Export cancelled"));
53 }
54
55 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
57
58 tracker.set_current(i as u64 + 1);
60 tracker.set_message(format!("Processing task: {}", task.title));
61
62 self.event_broadcaster
64 .broadcast_task_event(
65 EventType::TaskUpdated {
66 task_id: task.uuid.clone(),
67 },
68 Some(serde_json::to_value(task)?),
69 "bulk_export",
70 )
71 .await?;
72 }
73
74 tracker.set_message("Export completed successfully".to_string());
75 tracker.complete();
76
77 Ok(tasks)
78 }
79
80 pub async fn bulk_update_task_status(
85 &self,
86 _db: &ThingsDatabase,
87 task_ids: Vec<ThingsId>,
88 new_status: things3_core::TaskStatus,
89 ) -> Result<usize> {
90 let tracker = self.progress_manager.create_tracker(
91 "Bulk Update Task Status",
92 Some(task_ids.len() as u64),
93 true,
94 );
95
96 tracker.set_message(format!(
97 "Updating {} tasks to {:?}...",
98 task_ids.len(),
99 new_status
100 ));
101
102 let mut updated_count = 0;
103
104 for (i, task_id) in task_ids.iter().enumerate() {
105 if tracker.is_cancelled() {
106 return Err(things3_core::ThingsError::unknown("Bulk update cancelled"));
107 }
108
109 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
111
112 tracker.inc(1);
114 tracker.set_message(format!("Updated task {} of {}", i + 1, task_ids.len()));
115
116 self.event_broadcaster
118 .broadcast_task_event(
119 EventType::TaskUpdated {
120 task_id: task_id.clone(),
121 },
122 Some(serde_json::json!({ "status": format!("{:?}", new_status) })),
123 "bulk_update",
124 )
125 .await?;
126
127 updated_count += 1;
128 }
129
130 tracker.set_message("Bulk update completed successfully".to_string());
131 tracker.complete();
132
133 Ok(updated_count)
134 }
135
136 pub async fn search_and_process_tasks(
141 &self,
142 db: &ThingsDatabase,
143 query: &str,
144 processor: impl Fn(&Task) -> Result<()> + Send + Sync + 'static,
145 ) -> Result<Vec<Task>> {
146 let tracker = self.progress_manager.create_tracker(
147 &format!("Search and Process: {query}"),
148 None,
149 true,
150 );
151
152 tracker.set_message("Searching tasks...".to_string());
153
154 let tasks = db.search_tasks(query).await?;
156
157 tracker.set_message(format!("Found {} tasks, processing...", tasks.len()));
158
159 let mut processed_tasks = Vec::new();
160
161 for (i, task) in tasks.iter().enumerate() {
162 if tracker.is_cancelled() {
163 return Err(things3_core::ThingsError::unknown(
164 "Search and process cancelled",
165 ));
166 }
167
168 processor(task)?;
170
171 tracker.set_current(i as u64 + 1);
173 tracker.set_message(format!("Processing task: {}", task.title));
174
175 self.event_broadcaster
177 .broadcast_task_event(
178 EventType::TaskUpdated {
179 task_id: task.uuid.clone(),
180 },
181 Some(serde_json::to_value(task)?),
182 "search_and_process",
183 )
184 .await?;
185
186 processed_tasks.push(task.clone());
187 }
188
189 tracker.set_message("Processing completed successfully".to_string());
190 tracker.complete();
191
192 Ok(processed_tasks)
193 }
194
195 #[must_use]
197 pub fn progress_manager(&self) -> Arc<ProgressManager> {
198 self.progress_manager.clone()
199 }
200
201 #[must_use]
203 pub fn event_broadcaster(&self) -> Arc<EventBroadcaster> {
204 self.event_broadcaster.clone()
205 }
206}
207
208impl Default for BulkOperationsManager {
209 fn default() -> Self {
210 Self::new()
211 }
212}
213
214#[must_use]
216pub fn create_operation_tracker(
217 operation_name: &str,
218 total: Option<u64>,
219 progress_manager: &Arc<ProgressManager>,
220) -> ProgressTracker {
221 progress_manager.create_tracker(operation_name, total, true)
222}
223
224#[macro_export]
226macro_rules! with_progress {
227 ($name:expr, $total:expr, $progress_manager:expr, $operation:block) => {{
228 let tracker = create_operation_tracker($name, $total, $progress_manager);
229 let result = $operation;
230
231 match &result {
232 Ok(_) => tracker.complete(),
233 Err(e) => tracker.fail(format!("{:?}", e)),
234 }
235
236 result
237 }};
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use tempfile::NamedTempFile;
244 use things3_core::test_utils::create_test_database;
245
246 #[tokio::test]
247 async fn test_bulk_operations_manager_creation() {
248 let manager = BulkOperationsManager::new();
249 let _progress_manager = manager.progress_manager();
251 let _event_broadcaster = manager.event_broadcaster();
252 }
253
254 #[tokio::test]
255 async fn test_bulk_operations_manager_export_all_tasks() {
256 let manager = BulkOperationsManager::new();
257 let temp_file = tempfile::NamedTempFile::new().unwrap();
258 let db_path = temp_file.path();
259 create_test_database(db_path).await.unwrap();
260 let db = ThingsDatabase::new(db_path).await.unwrap();
261
262 let formats = vec!["json", "csv", "xml", "markdown", "opml"];
267
268 for format in formats {
269 let result = manager.export_all_tasks(&db, format).await;
270 if let Err(e) = &result {
271 println!("Export failed for format {format}: {e:?}");
272 }
273 assert!(result.is_ok());
274
275 let _tasks = result.unwrap();
276 }
279 }
280
281 #[tokio::test]
282 async fn test_bulk_operations_manager_export_all_tasks_with_data() {
283 let manager = BulkOperationsManager::new();
284 let temp_file = tempfile::NamedTempFile::new().unwrap();
285 let db_path = temp_file.path();
286 create_test_database(db_path).await.unwrap();
287 let db = ThingsDatabase::new(db_path).await.unwrap();
288
289 let result = manager.export_all_tasks(&db, "json").await;
294 assert!(result.is_ok());
295
296 let _tasks = result.unwrap();
297 }
300
301 #[tokio::test]
302 async fn test_bulk_operations_manager_bulk_update_task_status() {
303 let manager = BulkOperationsManager::new();
304 let temp_file = tempfile::NamedTempFile::new().unwrap();
305 let db_path = temp_file.path();
306 create_test_database(db_path).await.unwrap();
307 let db = ThingsDatabase::new(db_path).await.unwrap();
308
309 let task_ids = vec![];
314 let result = manager
315 .bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Completed)
316 .await;
317 assert!(result.is_ok());
318
319 let _updated_count = result.unwrap();
320 }
322
323 #[tokio::test]
324 async fn test_bulk_operations_manager_bulk_update_task_status_with_invalid_ids() {
325 let manager = BulkOperationsManager::new();
326 let temp_file = tempfile::NamedTempFile::new().unwrap();
327 let db_path = temp_file.path();
328 create_test_database(db_path).await.unwrap();
329 let db = ThingsDatabase::new(db_path).await.unwrap();
330
331 let task_ids = vec![ThingsId::new_v4(), ThingsId::new_v4()];
336 let result = manager
337 .bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Completed)
338 .await;
339 assert!(result.is_ok());
340
341 let _updated_count = result.unwrap();
342 }
345
346 #[tokio::test]
347 async fn test_bulk_operations_manager_bulk_update_task_status_different_statuses() {
348 let manager = BulkOperationsManager::new();
349 let temp_file = tempfile::NamedTempFile::new().unwrap();
350 let db_path = temp_file.path();
351 create_test_database(db_path).await.unwrap();
352 let db = ThingsDatabase::new(db_path).await.unwrap();
353
354 let task_ids = vec![];
355 let statuses = vec![
356 ("completed", things3_core::TaskStatus::Completed),
357 ("cancelled", things3_core::TaskStatus::Canceled),
358 ("in_progress", things3_core::TaskStatus::Incomplete),
359 ];
360
361 for (_name, status) in statuses {
362 let result = manager
363 .bulk_update_task_status(&db, task_ids.clone(), status)
364 .await;
365 assert!(result.is_ok());
366
367 let _updated_count = result.unwrap();
368 }
370 }
371
372 #[tokio::test]
373 async fn test_bulk_operations_manager_search_and_process_tasks() {
374 let manager = BulkOperationsManager::new();
375 let temp_file = tempfile::NamedTempFile::new().unwrap();
376 let db_path = temp_file.path();
377 create_test_database(db_path).await.unwrap();
378 let db = ThingsDatabase::new(db_path).await.unwrap();
379
380 let result = manager
385 .search_and_process_tasks(&db, "", |_task| Ok(()))
386 .await;
387 assert!(result.is_ok());
388
389 let processed_count = result.unwrap();
390 assert!(!processed_count.is_empty() || processed_count.is_empty()); }
393
394 #[tokio::test]
395 async fn test_bulk_operations_manager_search_and_process_tasks_with_query() {
396 let manager = BulkOperationsManager::new();
397 let temp_file = tempfile::NamedTempFile::new().unwrap();
398 let db_path = temp_file.path();
399 create_test_database(db_path).await.unwrap();
400 let db = ThingsDatabase::new(db_path).await.unwrap();
401
402 let result = manager
407 .search_and_process_tasks(&db, "test", |_task| Ok(()))
408 .await;
409 assert!(result.is_ok());
410
411 let processed_count = result.unwrap();
412 assert!(!processed_count.is_empty() || processed_count.is_empty()); }
415
416 #[tokio::test]
417 async fn test_bulk_operations_manager_search_and_process_tasks_different_limits() {
418 let manager = BulkOperationsManager::new();
419 let temp_file = tempfile::NamedTempFile::new().unwrap();
420 let db_path = temp_file.path();
421 create_test_database(db_path).await.unwrap();
422 let db = ThingsDatabase::new(db_path).await.unwrap();
423
424 let limits = vec![1, 5, 10, 100];
425
426 for _limit in limits {
427 let result = manager
428 .search_and_process_tasks(&db, "test", |_task| Ok(()))
429 .await;
430 assert!(result.is_ok());
431
432 let processed_count = result.unwrap();
433 assert_eq!(processed_count.len(), 0); }
435 }
436
437 #[tokio::test]
438 async fn test_bulk_operations_manager_progress_manager_access() {
439 let manager = BulkOperationsManager::new();
440 let _progress_manager = manager.progress_manager();
441
442 }
445
446 #[tokio::test]
447 async fn test_bulk_operations_manager_event_broadcaster_access() {
448 let manager = BulkOperationsManager::new();
449 let event_broadcaster = manager.event_broadcaster();
450
451 let _subscription_count = event_broadcaster.subscription_count().await;
453 }
455
456 #[tokio::test]
457 async fn test_create_operation_tracker() {
458 let progress_manager = Arc::new(ProgressManager::new());
459 let tracker = create_operation_tracker("test_operation", Some(100), &progress_manager);
460
461 assert_eq!(tracker.operation_name(), "test_operation");
462 assert_eq!(tracker.total(), Some(100));
463 assert_eq!(tracker.current(), 0);
464 }
465
466 #[tokio::test]
467 async fn test_create_operation_tracker_without_total() {
468 let progress_manager = Arc::new(ProgressManager::new());
469 let tracker = create_operation_tracker("test_operation", None, &progress_manager);
470
471 assert_eq!(tracker.operation_name(), "test_operation");
472 assert_eq!(tracker.total(), None);
473 assert_eq!(tracker.current(), 0);
474 }
475
476 #[tokio::test]
477 async fn test_create_operation_tracker_different_operations() {
478 let operations = vec![
479 ("export_tasks", Some(50)),
480 ("update_status", Some(25)),
481 ("search_tasks", None),
482 ("bulk_operation", Some(1000)),
483 ];
484
485 let progress_manager = Arc::new(ProgressManager::new());
486 for (name, total) in operations {
487 let tracker = create_operation_tracker(name, total, &progress_manager);
488 assert_eq!(tracker.operation_name(), name);
489 assert_eq!(tracker.total(), total);
490 assert_eq!(tracker.current(), 0);
491 }
492 }
493
494 #[tokio::test]
495 async fn test_bulk_operations_manager_export_all_tasks_error_handling() {
496 let manager = BulkOperationsManager::new();
497 let temp_file = tempfile::NamedTempFile::new().unwrap();
498 let db_path = temp_file.path();
499 create_test_database(db_path).await.unwrap();
500 let db = ThingsDatabase::new(db_path).await.unwrap();
501
502 let result = manager.export_all_tasks(&db, "invalid_format").await;
507 assert!(result.is_ok()); let _tasks = result.unwrap();
510 }
513
514 #[tokio::test]
515 async fn test_bulk_operations_manager_bulk_update_task_status_error_handling() {
516 let manager = BulkOperationsManager::new();
517 let temp_file = tempfile::NamedTempFile::new().unwrap();
518 let db_path = temp_file.path();
519 create_test_database(db_path).await.unwrap();
520 let db = ThingsDatabase::new(db_path).await.unwrap();
521
522 let task_ids = vec![];
527 let result = manager
528 .bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Incomplete)
529 .await;
530 assert!(result.is_ok()); let _updated_count = result.unwrap();
533 }
535
536 #[tokio::test]
537 async fn test_bulk_operations_manager_search_and_process_tasks_error_handling() {
538 let manager = BulkOperationsManager::new();
539 let temp_file = tempfile::NamedTempFile::new().unwrap();
540 let db_path = temp_file.path();
541 create_test_database(db_path).await.unwrap();
542 let db = ThingsDatabase::new(db_path).await.unwrap();
543
544 let result = manager
549 .search_and_process_tasks(&db, "test", |_task| Ok(()))
550 .await;
551 assert!(result.is_ok());
552
553 let processed_count = result.unwrap();
554 assert!(!processed_count.is_empty() || processed_count.is_empty()); }
557
558 #[tokio::test]
559 async fn test_bulk_operations_manager_concurrent_operations() {
560 let manager = BulkOperationsManager::new();
561 let temp_file = tempfile::NamedTempFile::new().unwrap();
562 let db_path = temp_file.path();
563 create_test_database(db_path).await.unwrap();
564 let db = ThingsDatabase::new(db_path).await.unwrap();
565
566 for _i in 0..5 {
571 let result = manager.export_all_tasks(&db, "json").await;
572 assert!(result.is_ok());
573 }
574 }
575
576 #[tokio::test]
577 async fn test_bulk_operations_manager_progress_tracking() {
578 let manager = BulkOperationsManager::new();
579 let temp_file = tempfile::NamedTempFile::new().unwrap();
580 let db_path = temp_file.path();
581 create_test_database(db_path).await.unwrap();
582 let _db = ThingsDatabase::new(db_path).await.unwrap();
583
584 let progress_manager = manager.progress_manager();
589 let tracker = progress_manager.create_tracker("test_operation", Some(10), true);
590
591 assert_eq!(tracker.operation_name(), "test_operation");
592 assert_eq!(tracker.total(), Some(10));
593 assert_eq!(tracker.current(), 0);
594 }
595
596 #[tokio::test]
597 async fn test_bulk_operations_manager_event_broadcasting() {
598 let manager = BulkOperationsManager::new();
599 let event_broadcaster = manager.event_broadcaster();
600
601 let _subscription_count = event_broadcaster.subscription_count().await;
603 let event = crate::events::Event {
607 event_type: crate::events::EventType::TaskCreated {
608 task_id: ThingsId::new_v4(),
609 },
610 id: uuid::Uuid::new_v4(),
611 source: "test".to_string(),
612 timestamp: chrono::Utc::now(),
613 data: None,
614 };
615
616 let result = event_broadcaster.broadcast(event).await;
617 assert!(result.is_ok());
618 }
619
620 #[tokio::test]
621 async fn test_export_all_tasks() {
622 let temp_file = NamedTempFile::new().unwrap();
623 let db_path = temp_file.path();
624 create_test_database(db_path).await.unwrap();
625
626 let db = ThingsDatabase::new(db_path).await.unwrap();
627
628 let tasks = db.get_inbox(None).await.unwrap();
630 assert!(!tasks.is_empty());
631
632 let json = serde_json::to_string(&tasks).unwrap();
634 assert!(!json.is_empty());
635 }
636
637 #[tokio::test]
638 async fn test_bulk_update_task_status() {
639 let temp_file = NamedTempFile::new().unwrap();
640 let db_path = temp_file.path();
641 create_test_database(db_path).await.unwrap();
642
643 let db = ThingsDatabase::new(db_path).await.unwrap();
644
645 let tasks = db.get_inbox(Some(5)).await.unwrap();
647 let task_ids: Vec<ThingsId> = tasks.iter().map(|t| t.uuid.clone()).collect();
648
649 if !task_ids.is_empty() {
650 assert_eq!(task_ids.len(), tasks.len());
652
653 for task_id in &task_ids {
655 assert!(!task_id.as_str().is_empty());
656 }
657 }
658 }
659
660 #[tokio::test]
661 async fn test_search_and_process_tasks() {
662 let temp_file = NamedTempFile::new().unwrap();
663 let db_path = temp_file.path();
664 create_test_database(db_path).await.unwrap();
665
666 let db = ThingsDatabase::new(db_path).await.unwrap();
667 let manager = BulkOperationsManager::new();
668
669 let result = manager
670 .search_and_process_tasks(&db, "test", |_task| Ok(()))
671 .await;
672
673 assert!(result.is_ok());
674 }
675
676 #[tokio::test]
677 async fn test_with_progress_macro() {
678 let manager = BulkOperationsManager::new();
679 let progress_manager = manager.progress_manager();
680
681 let result = with_progress!("test_operation", Some(10), &progress_manager, {
682 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
683 Ok::<(), anyhow::Error>(())
684 });
685
686 assert!(result.is_ok());
687 }
688}