use crate::error::{CoreError, Result};
use redis_cloud::tasks::TaskStateUpdate;
use redis_cloud::{CloudClient, TaskHandler};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub enum ProgressEvent {
Started { task_id: String },
Polling {
task_id: String,
status: String,
elapsed: Duration,
},
Completed {
task_id: String,
resource_id: Option<i32>,
},
Failed { task_id: String, error: String },
}
pub type ProgressCallback = Box<dyn Fn(ProgressEvent) + Send + Sync>;
pub async fn poll_task(
client: &CloudClient,
task_id: &str,
timeout: Duration,
interval: Duration,
on_progress: Option<ProgressCallback>,
) -> Result<TaskStateUpdate> {
let start = Instant::now();
let handler = TaskHandler::new(client.clone());
emit(
&on_progress,
ProgressEvent::Started {
task_id: task_id.to_string(),
},
);
loop {
let elapsed = start.elapsed();
if elapsed > timeout {
return Err(CoreError::TaskTimeout(timeout));
}
let task = handler.get_task_by_id(task_id.to_string()).await?;
let status = task.status.clone().unwrap_or_default();
emit(
&on_progress,
ProgressEvent::Polling {
task_id: task_id.to_string(),
status: status.clone(),
elapsed,
},
);
match status.to_lowercase().as_str() {
"processing-completed" | "completed" | "complete" | "succeeded" | "success" => {
let resource_id = task.response.as_ref().and_then(|r| r.resource_id);
emit(
&on_progress,
ProgressEvent::Completed {
task_id: task_id.to_string(),
resource_id,
},
);
return Ok(task);
}
"processing-error" | "failed" | "error" => {
let error = task
.response
.as_ref()
.and_then(|r| r.error.clone())
.unwrap_or_else(|| format!("Task failed with status: {}", status));
emit(
&on_progress,
ProgressEvent::Failed {
task_id: task_id.to_string(),
error: error.clone(),
},
);
return Err(CoreError::TaskFailed(error));
}
"cancelled" => {
emit(
&on_progress,
ProgressEvent::Failed {
task_id: task_id.to_string(),
error: "Task was cancelled".to_string(),
},
);
return Err(CoreError::TaskFailed("Task was cancelled".to_string()));
}
_ => {
tokio::time::sleep(interval).await;
}
}
}
}
fn emit(callback: &Option<ProgressCallback>, event: ProgressEvent) {
if let Some(cb) = callback {
cb(event);
}
}