redisctl_core/progress.rs
1//! Progress tracking and task polling for async Cloud operations
2//!
3//! Cloud API operations return a `TaskStateUpdate` which must be polled
4//! until completion. This module provides utilities for that polling
5//! with optional progress callbacks for UI updates.
6
7use crate::error::{CoreError, Result};
8use redis_cloud::tasks::TaskStateUpdate;
9use redis_cloud::types::TaskStatus;
10use redis_cloud::{CloudClient, TaskHandler};
11use std::time::{Duration, Instant};
12
13/// Progress events emitted during async operations
14#[derive(Debug, Clone)]
15pub enum ProgressEvent {
16 /// Task has been created/started
17 Started { task_id: String },
18 /// Polling iteration with current status
19 Polling {
20 task_id: String,
21 status: String,
22 elapsed: Duration,
23 },
24 /// Task completed successfully
25 Completed {
26 task_id: String,
27 resource_id: Option<i32>,
28 },
29 /// Task failed
30 Failed { task_id: String, error: String },
31}
32
33/// Callback type for progress updates
34///
35/// CLI can use this to update spinners/progress bars.
36/// MCP typically doesn't need this.
37pub type ProgressCallback = Box<dyn Fn(ProgressEvent) + Send + Sync>;
38
39/// Poll a Cloud task until completion
40///
41/// # Arguments
42///
43/// * `client` - The Cloud API client
44/// * `task_id` - The task ID to poll
45/// * `timeout` - Maximum time to wait for completion
46/// * `interval` - Time between polling attempts
47/// * `on_progress` - Optional callback for progress updates
48///
49/// # Returns
50///
51/// The completed task response, or an error if the task failed or timed out.
52///
53/// # Example
54///
55/// ```rust,ignore
56/// use redisctl_core::{poll_task, ProgressEvent};
57/// use std::time::Duration;
58///
59/// // Create a database (returns TaskStateUpdate)
60/// let task = handler.create(subscription_id, &request).await?;
61/// let task_id = task.task_id.unwrap();
62///
63/// // Poll with progress callback
64/// let completed = poll_task(
65/// &client,
66/// &task_id,
67/// Duration::from_secs(600),
68/// Duration::from_secs(10),
69/// Some(Box::new(|event| {
70/// match event {
71/// ProgressEvent::Polling { status, elapsed, .. } => {
72/// println!("Status: {} ({:.0}s)", status, elapsed.as_secs());
73/// }
74/// ProgressEvent::Completed { resource_id, .. } => {
75/// println!("Done! Resource ID: {:?}", resource_id);
76/// }
77/// _ => {}
78/// }
79/// })),
80/// ).await?;
81/// ```
82pub async fn poll_task(
83 client: &CloudClient,
84 task_id: &str,
85 timeout: Duration,
86 interval: Duration,
87 on_progress: Option<ProgressCallback>,
88) -> Result<TaskStateUpdate> {
89 let start = Instant::now();
90 let handler = TaskHandler::new(client.clone());
91
92 emit(
93 &on_progress,
94 ProgressEvent::Started {
95 task_id: task_id.to_string(),
96 },
97 );
98
99 loop {
100 let elapsed = start.elapsed();
101 if elapsed > timeout {
102 return Err(CoreError::TaskTimeout(timeout));
103 }
104
105 let task = handler.get_task_by_id(task_id.to_string()).await?;
106 let status = task.status.clone();
107 let status_label = task_status_label(status.as_ref());
108
109 emit(
110 &on_progress,
111 ProgressEvent::Polling {
112 task_id: task_id.to_string(),
113 status: status_label.clone(),
114 elapsed,
115 },
116 );
117
118 // Check for terminal states. `TaskStatus` only models the two terminal
119 // states explicitly; everything else (Initialized, Received,
120 // ProcessingInProgress, and any Unknown wire value) is still in flight.
121 match status {
122 // Success
123 Some(TaskStatus::ProcessingCompleted) => {
124 let resource_id = task.response.as_ref().and_then(|r| r.resource_id);
125 emit(
126 &on_progress,
127 ProgressEvent::Completed {
128 task_id: task_id.to_string(),
129 resource_id,
130 },
131 );
132 return Ok(task);
133 }
134 // Failure
135 Some(TaskStatus::ProcessingError) => {
136 let error = task
137 .response
138 .as_ref()
139 .and_then(|r| r.error_message())
140 .unwrap_or_else(|| format!("Task failed with status: {}", status_label));
141
142 emit(
143 &on_progress,
144 ProgressEvent::Failed {
145 task_id: task_id.to_string(),
146 error: error.clone(),
147 },
148 );
149 return Err(CoreError::TaskFailed(error));
150 }
151 // Still processing, wait and try again
152 _ => {
153 tokio::time::sleep(interval).await;
154 }
155 }
156 }
157}
158
159/// Helper to emit progress events
160fn emit(callback: &Option<ProgressCallback>, event: ProgressEvent) {
161 if let Some(cb) = callback {
162 cb(event);
163 }
164}
165
166/// Human-readable label for a task status, mirroring the kebab-case wire form.
167///
168/// Used for progress events and failure messages. A missing status (or one the
169/// client doesn't recognize) is reported as `"unknown"`.
170fn task_status_label(status: Option<&TaskStatus>) -> String {
171 match status {
172 Some(TaskStatus::Initialized) => "initialized",
173 Some(TaskStatus::Received) => "received",
174 Some(TaskStatus::ProcessingInProgress) => "processing-in-progress",
175 Some(TaskStatus::ProcessingCompleted) => "processing-completed",
176 Some(TaskStatus::ProcessingError) => "processing-error",
177 Some(TaskStatus::Unknown) | None => "unknown",
178 }
179 .to_string()
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use serde_json::json;
186 use wiremock::matchers::{method, path};
187 use wiremock::{Mock, MockServer, ResponseTemplate};
188
189 fn test_client(uri: String) -> CloudClient {
190 CloudClient::builder()
191 .api_key("test-key".to_string())
192 .api_secret("test-secret".to_string())
193 .base_url(uri)
194 .build()
195 .unwrap()
196 }
197
198 // Regression: a failed task whose `response.error` is a structured object
199 // (e.g. a failed database backup) must surface the human-readable
200 // description, not fail to deserialize. See redis-cloud-rs
201 // ProcessorResponse::error_message.
202 #[tokio::test]
203 async fn poll_task_surfaces_object_error_description() {
204 let mock_server = MockServer::start().await;
205 Mock::given(method("GET"))
206 .and(path("/tasks/task-backup"))
207 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
208 "taskId": "task-backup",
209 "commandType": "DATABASE_BACKUP",
210 "status": "processing-error",
211 "response": {
212 "error": {
213 "type": "BACKUP_FAILED",
214 "status": "400 BAD_REQUEST",
215 "description": "Remote backup location is not configured"
216 }
217 }
218 })))
219 .mount(&mock_server)
220 .await;
221
222 let client = test_client(mock_server.uri());
223 let result = poll_task(
224 &client,
225 "task-backup",
226 Duration::from_secs(5),
227 Duration::from_millis(10),
228 None,
229 )
230 .await;
231
232 match result {
233 Err(CoreError::TaskFailed(msg)) => {
234 assert_eq!(msg, "Remote backup location is not configured");
235 }
236 other => panic!("expected TaskFailed with description, got {other:?}"),
237 }
238 }
239}