use crate::error::{CoreError, Result};
use redis_enterprise::EnterpriseClient;
use redis_enterprise::actions::Action;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub enum EnterpriseProgressEvent {
Started { action_uid: String },
Polling {
action_uid: String,
status: String,
progress: Option<f32>,
elapsed: Duration,
},
Completed { action_uid: String },
Failed { action_uid: String, error: String },
}
pub type EnterpriseProgressCallback = Box<dyn Fn(EnterpriseProgressEvent) + Send + Sync>;
pub async fn poll_action(
client: &EnterpriseClient,
action_uid: &str,
timeout: Duration,
interval: Duration,
on_progress: Option<EnterpriseProgressCallback>,
) -> Result<Action> {
let start = Instant::now();
let handler = client.actions();
emit(
&on_progress,
EnterpriseProgressEvent::Started {
action_uid: action_uid.to_string(),
},
);
loop {
let elapsed = start.elapsed();
if elapsed > timeout {
return Err(CoreError::TaskTimeout(timeout));
}
let action = handler.get(action_uid).await?;
let status = action.status.clone();
emit(
&on_progress,
EnterpriseProgressEvent::Polling {
action_uid: action_uid.to_string(),
status: status.clone(),
progress: action.progress,
elapsed,
},
);
match status.as_str() {
"completed" => {
emit(
&on_progress,
EnterpriseProgressEvent::Completed {
action_uid: action_uid.to_string(),
},
);
return Ok(action);
}
"failed" | "cancelled" => {
let error = action
.error
.clone()
.unwrap_or_else(|| format!("Action {}", status));
emit(
&on_progress,
EnterpriseProgressEvent::Failed {
action_uid: action_uid.to_string(),
error: error.clone(),
},
);
return Err(CoreError::TaskFailed(error));
}
_ => {
tokio::time::sleep(interval).await;
}
}
}
}
fn emit(callback: &Option<EnterpriseProgressCallback>, event: EnterpriseProgressEvent) {
if let Some(cb) = callback {
cb(event);
}
}