use crate::events::{EventBroadcaster, EventType};
use crate::progress::{ProgressManager, ProgressTracker};
use std::sync::Arc;
use things3_core::Result;
use things3_core::{Task, ThingsDatabase};
pub struct BulkOperationsManager {
progress_manager: Arc<ProgressManager>,
event_broadcaster: Arc<EventBroadcaster>,
}
impl BulkOperationsManager {
#[must_use]
pub fn new() -> Self {
Self {
progress_manager: Arc::new(ProgressManager::new()),
event_broadcaster: Arc::new(EventBroadcaster::new()),
}
}
pub async fn export_all_tasks(&self, db: &ThingsDatabase, format: &str) -> Result<Vec<Task>> {
let tracker = self.progress_manager.create_tracker(
"Export All Tasks",
None, true,
);
tracker.set_message("Fetching tasks from database...".to_string());
let tasks = db.search_tasks("").await?;
tracker.set_message(format!(
"Found {} tasks, exporting to {}...",
tasks.len(),
format
));
for (i, task) in tasks.iter().enumerate() {
if tracker.is_cancelled() {
return Err(things3_core::ThingsError::unknown("Export cancelled"));
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
tracker.set_current(i as u64 + 1);
tracker.set_message(format!("Processing task: {}", task.title));
self.event_broadcaster
.broadcast_task_event(
EventType::TaskUpdated { task_id: task.uuid },
task.uuid,
Some(serde_json::to_value(task)?),
"bulk_export",
)
.await?;
}
tracker.set_message("Export completed successfully".to_string());
tracker.complete();
Ok(tasks)
}
pub async fn bulk_update_task_status(
&self,
_db: &ThingsDatabase,
task_ids: Vec<uuid::Uuid>,
new_status: things3_core::TaskStatus,
) -> Result<usize> {
let tracker = self.progress_manager.create_tracker(
"Bulk Update Task Status",
Some(task_ids.len() as u64),
true,
);
tracker.set_message(format!(
"Updating {} tasks to {:?}...",
task_ids.len(),
new_status
));
let mut updated_count = 0;
for (i, task_id) in task_ids.iter().enumerate() {
if tracker.is_cancelled() {
return Err(things3_core::ThingsError::unknown("Bulk update cancelled"));
}
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
tracker.inc(1);
tracker.set_message(format!("Updated task {} of {}", i + 1, task_ids.len()));
self.event_broadcaster
.broadcast_task_event(
EventType::TaskUpdated { task_id: *task_id },
*task_id,
Some(serde_json::json!({ "status": format!("{:?}", new_status) })),
"bulk_update",
)
.await?;
updated_count += 1;
}
tracker.set_message("Bulk update completed successfully".to_string());
tracker.complete();
Ok(updated_count)
}
pub async fn search_and_process_tasks(
&self,
db: &ThingsDatabase,
query: &str,
processor: impl Fn(&Task) -> Result<()> + Send + Sync + 'static,
) -> Result<Vec<Task>> {
let tracker = self.progress_manager.create_tracker(
&format!("Search and Process: {query}"),
None,
true,
);
tracker.set_message("Searching tasks...".to_string());
let tasks = db.search_tasks(query).await?;
tracker.set_message(format!("Found {} tasks, processing...", tasks.len()));
let mut processed_tasks = Vec::new();
for (i, task) in tasks.iter().enumerate() {
if tracker.is_cancelled() {
return Err(things3_core::ThingsError::unknown(
"Search and process cancelled",
));
}
processor(task)?;
tracker.set_current(i as u64 + 1);
tracker.set_message(format!("Processing task: {}", task.title));
self.event_broadcaster
.broadcast_task_event(
EventType::TaskUpdated { task_id: task.uuid },
task.uuid,
Some(serde_json::to_value(task)?),
"search_and_process",
)
.await?;
processed_tasks.push(task.clone());
}
tracker.set_message("Processing completed successfully".to_string());
tracker.complete();
Ok(processed_tasks)
}
#[must_use]
pub fn progress_manager(&self) -> Arc<ProgressManager> {
self.progress_manager.clone()
}
#[must_use]
pub fn event_broadcaster(&self) -> Arc<EventBroadcaster> {
self.event_broadcaster.clone()
}
}
impl Default for BulkOperationsManager {
fn default() -> Self {
Self::new()
}
}
#[must_use]
pub fn create_operation_tracker(
operation_name: &str,
total: Option<u64>,
progress_manager: &Arc<ProgressManager>,
) -> ProgressTracker {
progress_manager.create_tracker(operation_name, total, true)
}
#[macro_export]
macro_rules! with_progress {
($name:expr, $total:expr, $progress_manager:expr, $operation:block) => {{
let tracker = create_operation_tracker($name, $total, $progress_manager);
let result = $operation;
match &result {
Ok(_) => tracker.complete(),
Err(e) => tracker.fail(format!("{:?}", e)),
}
result
}};
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;
use things3_core::test_utils::create_test_database;
#[tokio::test]
async fn test_bulk_operations_manager_creation() {
let manager = BulkOperationsManager::new();
let _progress_manager = manager.progress_manager();
let _event_broadcaster = manager.event_broadcaster();
}
#[tokio::test]
async fn test_bulk_operations_manager_export_all_tasks() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let formats = vec!["json", "csv", "xml", "markdown", "opml"];
for format in formats {
let result = manager.export_all_tasks(&db, format).await;
if let Err(e) = &result {
println!("Export failed for format {format}: {e:?}");
}
assert!(result.is_ok());
let _tasks = result.unwrap();
}
}
#[tokio::test]
async fn test_bulk_operations_manager_export_all_tasks_with_data() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let result = manager.export_all_tasks(&db, "json").await;
assert!(result.is_ok());
let _tasks = result.unwrap();
}
#[tokio::test]
async fn test_bulk_operations_manager_bulk_update_task_status() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let task_ids = vec![];
let result = manager
.bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Completed)
.await;
assert!(result.is_ok());
let _updated_count = result.unwrap();
}
#[tokio::test]
async fn test_bulk_operations_manager_bulk_update_task_status_with_invalid_ids() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let task_ids = vec![uuid::Uuid::new_v4(), uuid::Uuid::new_v4()];
let result = manager
.bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Completed)
.await;
assert!(result.is_ok());
let _updated_count = result.unwrap();
}
#[tokio::test]
async fn test_bulk_operations_manager_bulk_update_task_status_different_statuses() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let task_ids = vec![];
let statuses = vec![
("completed", things3_core::TaskStatus::Completed),
("cancelled", things3_core::TaskStatus::Canceled),
("in_progress", things3_core::TaskStatus::Incomplete),
];
for (_name, status) in statuses {
let result = manager
.bulk_update_task_status(&db, task_ids.clone(), status)
.await;
assert!(result.is_ok());
let _updated_count = result.unwrap();
}
}
#[tokio::test]
async fn test_bulk_operations_manager_search_and_process_tasks() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let result = manager
.search_and_process_tasks(&db, "", |_task| Ok(()))
.await;
assert!(result.is_ok());
let processed_count = result.unwrap();
assert!(!processed_count.is_empty() || processed_count.is_empty()); }
#[tokio::test]
async fn test_bulk_operations_manager_search_and_process_tasks_with_query() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let result = manager
.search_and_process_tasks(&db, "test", |_task| Ok(()))
.await;
assert!(result.is_ok());
let processed_count = result.unwrap();
assert!(!processed_count.is_empty() || processed_count.is_empty()); }
#[tokio::test]
async fn test_bulk_operations_manager_search_and_process_tasks_different_limits() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let limits = vec![1, 5, 10, 100];
for _limit in limits {
let result = manager
.search_and_process_tasks(&db, "test", |_task| Ok(()))
.await;
assert!(result.is_ok());
let processed_count = result.unwrap();
assert_eq!(processed_count.len(), 0); }
}
#[tokio::test]
async fn test_bulk_operations_manager_progress_manager_access() {
let manager = BulkOperationsManager::new();
let _progress_manager = manager.progress_manager();
}
#[tokio::test]
async fn test_bulk_operations_manager_event_broadcaster_access() {
let manager = BulkOperationsManager::new();
let event_broadcaster = manager.event_broadcaster();
let _subscription_count = event_broadcaster.subscription_count().await;
}
#[tokio::test]
async fn test_create_operation_tracker() {
let progress_manager = Arc::new(ProgressManager::new());
let tracker = create_operation_tracker("test_operation", Some(100), &progress_manager);
assert_eq!(tracker.operation_name(), "test_operation");
assert_eq!(tracker.total(), Some(100));
assert_eq!(tracker.current(), 0);
}
#[tokio::test]
async fn test_create_operation_tracker_without_total() {
let progress_manager = Arc::new(ProgressManager::new());
let tracker = create_operation_tracker("test_operation", None, &progress_manager);
assert_eq!(tracker.operation_name(), "test_operation");
assert_eq!(tracker.total(), None);
assert_eq!(tracker.current(), 0);
}
#[tokio::test]
async fn test_create_operation_tracker_different_operations() {
let operations = vec![
("export_tasks", Some(50)),
("update_status", Some(25)),
("search_tasks", None),
("bulk_operation", Some(1000)),
];
let progress_manager = Arc::new(ProgressManager::new());
for (name, total) in operations {
let tracker = create_operation_tracker(name, total, &progress_manager);
assert_eq!(tracker.operation_name(), name);
assert_eq!(tracker.total(), total);
assert_eq!(tracker.current(), 0);
}
}
#[tokio::test]
async fn test_bulk_operations_manager_export_all_tasks_error_handling() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let result = manager.export_all_tasks(&db, "invalid_format").await;
assert!(result.is_ok());
let _tasks = result.unwrap();
}
#[tokio::test]
async fn test_bulk_operations_manager_bulk_update_task_status_error_handling() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let task_ids = vec![];
let result = manager
.bulk_update_task_status(&db, task_ids, things3_core::TaskStatus::Incomplete)
.await;
assert!(result.is_ok());
let _updated_count = result.unwrap();
}
#[tokio::test]
async fn test_bulk_operations_manager_search_and_process_tasks_error_handling() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let result = manager
.search_and_process_tasks(&db, "test", |_task| Ok(()))
.await;
assert!(result.is_ok());
let processed_count = result.unwrap();
assert!(!processed_count.is_empty() || processed_count.is_empty()); }
#[tokio::test]
async fn test_bulk_operations_manager_concurrent_operations() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
for _i in 0..5 {
let result = manager.export_all_tasks(&db, "json").await;
assert!(result.is_ok());
}
}
#[tokio::test]
async fn test_bulk_operations_manager_progress_tracking() {
let manager = BulkOperationsManager::new();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let _db = ThingsDatabase::new(db_path).await.unwrap();
let progress_manager = manager.progress_manager();
let tracker = progress_manager.create_tracker("test_operation", Some(10), true);
assert_eq!(tracker.operation_name(), "test_operation");
assert_eq!(tracker.total(), Some(10));
assert_eq!(tracker.current(), 0);
}
#[tokio::test]
async fn test_bulk_operations_manager_event_broadcasting() {
let manager = BulkOperationsManager::new();
let event_broadcaster = manager.event_broadcaster();
let _subscription_count = event_broadcaster.subscription_count().await;
let event = crate::events::Event {
event_type: crate::events::EventType::TaskCreated {
task_id: uuid::Uuid::new_v4(),
},
id: uuid::Uuid::new_v4(),
source: "test".to_string(),
timestamp: chrono::Utc::now(),
data: None,
};
let result = event_broadcaster.broadcast(event).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_export_all_tasks() {
let temp_file = NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let tasks = db.get_inbox(None).await.unwrap();
assert!(!tasks.is_empty());
let json = serde_json::to_string(&tasks).unwrap();
assert!(!json.is_empty());
}
#[tokio::test]
async fn test_bulk_update_task_status() {
let temp_file = NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let tasks = db.get_inbox(Some(5)).await.unwrap();
let task_ids: Vec<uuid::Uuid> = tasks.iter().map(|t| t.uuid).collect();
if !task_ids.is_empty() {
assert_eq!(task_ids.len(), tasks.len());
for task_id in &task_ids {
assert!(!task_id.is_nil());
}
}
}
#[tokio::test]
async fn test_search_and_process_tasks() {
let temp_file = NamedTempFile::new().unwrap();
let db_path = temp_file.path();
create_test_database(db_path).await.unwrap();
let db = ThingsDatabase::new(db_path).await.unwrap();
let manager = BulkOperationsManager::new();
let result = manager
.search_and_process_tasks(&db, "test", |_task| Ok(()))
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_with_progress_macro() {
let manager = BulkOperationsManager::new();
let progress_manager = manager.progress_manager();
let result = with_progress!("test_operation", Some(10), &progress_manager, {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
Ok::<(), anyhow::Error>(())
});
assert!(result.is_ok());
}
}