Skip to main content

redisctl_core/
progress.rs

1//! Progress tracking and task polling for async Cloud operations
2//!
3//! Cloud API operations return a `TaskStateUpdate` which must be polled
4//! until completion. This module provides utilities for that polling
5//! with optional progress callbacks for UI updates.
6
7use crate::error::{CoreError, Result};
8use redis_cloud::tasks::TaskStateUpdate;
9use redis_cloud::types::TaskStatus;
10use redis_cloud::{CloudClient, TaskHandler};
11use std::time::{Duration, Instant};
12
13/// Progress events emitted during async operations
14#[derive(Debug, Clone)]
15pub enum ProgressEvent {
16    /// Task has been created/started
17    Started { task_id: String },
18    /// Polling iteration with current status
19    Polling {
20        task_id: String,
21        status: String,
22        elapsed: Duration,
23    },
24    /// Task completed successfully
25    Completed {
26        task_id: String,
27        resource_id: Option<i32>,
28    },
29    /// Task failed
30    Failed { task_id: String, error: String },
31}
32
33/// Callback type for progress updates
34///
35/// CLI can use this to update spinners/progress bars.
36/// MCP typically doesn't need this.
37pub type ProgressCallback = Box<dyn Fn(ProgressEvent) + Send + Sync>;
38
39/// Poll a Cloud task until completion
40///
41/// # Arguments
42///
43/// * `client` - The Cloud API client
44/// * `task_id` - The task ID to poll
45/// * `timeout` - Maximum time to wait for completion
46/// * `interval` - Time between polling attempts
47/// * `on_progress` - Optional callback for progress updates
48///
49/// # Returns
50///
51/// The completed task response, or an error if the task failed or timed out.
52///
53/// # Example
54///
55/// ```rust,ignore
56/// use redisctl_core::{poll_task, ProgressEvent};
57/// use std::time::Duration;
58///
59/// // Create a database (returns TaskStateUpdate)
60/// let task = handler.create(subscription_id, &request).await?;
61/// let task_id = task.task_id.unwrap();
62///
63/// // Poll with progress callback
64/// let completed = poll_task(
65///     &client,
66///     &task_id,
67///     Duration::from_secs(600),
68///     Duration::from_secs(10),
69///     Some(Box::new(|event| {
70///         match event {
71///             ProgressEvent::Polling { status, elapsed, .. } => {
72///                 println!("Status: {} ({:.0}s)", status, elapsed.as_secs());
73///             }
74///             ProgressEvent::Completed { resource_id, .. } => {
75///                 println!("Done! Resource ID: {:?}", resource_id);
76///             }
77///             _ => {}
78///         }
79///     })),
80/// ).await?;
81/// ```
82pub async fn poll_task(
83    client: &CloudClient,
84    task_id: &str,
85    timeout: Duration,
86    interval: Duration,
87    on_progress: Option<ProgressCallback>,
88) -> Result<TaskStateUpdate> {
89    let start = Instant::now();
90    let handler = TaskHandler::new(client.clone());
91
92    emit(
93        &on_progress,
94        ProgressEvent::Started {
95            task_id: task_id.to_string(),
96        },
97    );
98
99    loop {
100        let elapsed = start.elapsed();
101        if elapsed > timeout {
102            return Err(CoreError::TaskTimeout(timeout));
103        }
104
105        let task = handler.get_task_by_id(task_id.to_string()).await?;
106        let status = task.status.clone();
107        let status_label = task_status_label(status.as_ref());
108
109        emit(
110            &on_progress,
111            ProgressEvent::Polling {
112                task_id: task_id.to_string(),
113                status: status_label.clone(),
114                elapsed,
115            },
116        );
117
118        // Check for terminal states. `TaskStatus` only models the two terminal
119        // states explicitly; everything else (Initialized, Received,
120        // ProcessingInProgress, and any Unknown wire value) is still in flight.
121        match status {
122            // Success
123            Some(TaskStatus::ProcessingCompleted) => {
124                let resource_id = task.response.as_ref().and_then(|r| r.resource_id);
125                emit(
126                    &on_progress,
127                    ProgressEvent::Completed {
128                        task_id: task_id.to_string(),
129                        resource_id,
130                    },
131                );
132                return Ok(task);
133            }
134            // Failure
135            Some(TaskStatus::ProcessingError) => {
136                let error = task
137                    .response
138                    .as_ref()
139                    .and_then(|r| r.error_message())
140                    .unwrap_or_else(|| format!("Task failed with status: {}", status_label));
141
142                emit(
143                    &on_progress,
144                    ProgressEvent::Failed {
145                        task_id: task_id.to_string(),
146                        error: error.clone(),
147                    },
148                );
149                return Err(CoreError::TaskFailed(error));
150            }
151            // Still processing, wait and try again
152            _ => {
153                tokio::time::sleep(interval).await;
154            }
155        }
156    }
157}
158
159/// Helper to emit progress events
160fn emit(callback: &Option<ProgressCallback>, event: ProgressEvent) {
161    if let Some(cb) = callback {
162        cb(event);
163    }
164}
165
166/// Human-readable label for a task status, mirroring the kebab-case wire form.
167///
168/// Used for progress events and failure messages. A missing status (or one the
169/// client doesn't recognize) is reported as `"unknown"`.
170fn task_status_label(status: Option<&TaskStatus>) -> String {
171    match status {
172        Some(TaskStatus::Initialized) => "initialized",
173        Some(TaskStatus::Received) => "received",
174        Some(TaskStatus::ProcessingInProgress) => "processing-in-progress",
175        Some(TaskStatus::ProcessingCompleted) => "processing-completed",
176        Some(TaskStatus::ProcessingError) => "processing-error",
177        Some(TaskStatus::Unknown) | None => "unknown",
178    }
179    .to_string()
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use serde_json::json;
186    use wiremock::matchers::{method, path};
187    use wiremock::{Mock, MockServer, ResponseTemplate};
188
189    fn test_client(uri: String) -> CloudClient {
190        CloudClient::builder()
191            .api_key("test-key".to_string())
192            .api_secret("test-secret".to_string())
193            .base_url(uri)
194            .build()
195            .unwrap()
196    }
197
198    // Regression: a failed task whose `response.error` is a structured object
199    // (e.g. a failed database backup) must surface the human-readable
200    // description, not fail to deserialize. See redis-cloud-rs
201    // ProcessorResponse::error_message.
202    #[tokio::test]
203    async fn poll_task_surfaces_object_error_description() {
204        let mock_server = MockServer::start().await;
205        Mock::given(method("GET"))
206            .and(path("/tasks/task-backup"))
207            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
208                "taskId": "task-backup",
209                "commandType": "DATABASE_BACKUP",
210                "status": "processing-error",
211                "response": {
212                    "error": {
213                        "type": "BACKUP_FAILED",
214                        "status": "400 BAD_REQUEST",
215                        "description": "Remote backup location is not configured"
216                    }
217                }
218            })))
219            .mount(&mock_server)
220            .await;
221
222        let client = test_client(mock_server.uri());
223        let result = poll_task(
224            &client,
225            "task-backup",
226            Duration::from_secs(5),
227            Duration::from_millis(10),
228            None,
229        )
230        .await;
231
232        match result {
233            Err(CoreError::TaskFailed(msg)) => {
234                assert_eq!(msg, "Remote backup location is not configured");
235            }
236            other => panic!("expected TaskFailed with description, got {other:?}"),
237        }
238    }
239}