Skip to main content

redisctl_core/
progress.rs

1//! Progress tracking and task polling for async Cloud operations
2//!
3//! Cloud API operations return a `TaskStateUpdate` which must be polled
4//! until completion. This module provides utilities for that polling
5//! with optional progress callbacks for UI updates.
6
7use crate::error::{CoreError, Result};
8use redis_cloud::tasks::TaskStateUpdate;
9use redis_cloud::{CloudClient, TaskHandler};
10use std::time::{Duration, Instant};
11
12/// Progress events emitted during async operations
13#[derive(Debug, Clone)]
14pub enum ProgressEvent {
15    /// Task has been created/started
16    Started { task_id: String },
17    /// Polling iteration with current status
18    Polling {
19        task_id: String,
20        status: String,
21        elapsed: Duration,
22    },
23    /// Task completed successfully
24    Completed {
25        task_id: String,
26        resource_id: Option<i32>,
27    },
28    /// Task failed
29    Failed { task_id: String, error: String },
30}
31
32/// Callback type for progress updates
33///
34/// CLI can use this to update spinners/progress bars.
35/// MCP typically doesn't need this.
36pub type ProgressCallback = Box<dyn Fn(ProgressEvent) + Send + Sync>;
37
38/// Poll a Cloud task until completion
39///
40/// # Arguments
41///
42/// * `client` - The Cloud API client
43/// * `task_id` - The task ID to poll
44/// * `timeout` - Maximum time to wait for completion
45/// * `interval` - Time between polling attempts
46/// * `on_progress` - Optional callback for progress updates
47///
48/// # Returns
49///
50/// The completed task response, or an error if the task failed or timed out.
51///
52/// # Example
53///
54/// ```rust,ignore
55/// use redisctl_core::{poll_task, ProgressEvent};
56/// use std::time::Duration;
57///
58/// // Create a database (returns TaskStateUpdate)
59/// let task = handler.create(subscription_id, &request).await?;
60/// let task_id = task.task_id.unwrap();
61///
62/// // Poll with progress callback
63/// let completed = poll_task(
64///     &client,
65///     &task_id,
66///     Duration::from_secs(600),
67///     Duration::from_secs(10),
68///     Some(Box::new(|event| {
69///         match event {
70///             ProgressEvent::Polling { status, elapsed, .. } => {
71///                 println!("Status: {} ({:.0}s)", status, elapsed.as_secs());
72///             }
73///             ProgressEvent::Completed { resource_id, .. } => {
74///                 println!("Done! Resource ID: {:?}", resource_id);
75///             }
76///             _ => {}
77///         }
78///     })),
79/// ).await?;
80/// ```
81pub async fn poll_task(
82    client: &CloudClient,
83    task_id: &str,
84    timeout: Duration,
85    interval: Duration,
86    on_progress: Option<ProgressCallback>,
87) -> Result<TaskStateUpdate> {
88    let start = Instant::now();
89    let handler = TaskHandler::new(client.clone());
90
91    emit(
92        &on_progress,
93        ProgressEvent::Started {
94            task_id: task_id.to_string(),
95        },
96    );
97
98    loop {
99        let elapsed = start.elapsed();
100        if elapsed > timeout {
101            return Err(CoreError::TaskTimeout(timeout));
102        }
103
104        let task = handler.get_task_by_id(task_id.to_string()).await?;
105        let status = task.status.clone().unwrap_or_default();
106
107        emit(
108            &on_progress,
109            ProgressEvent::Polling {
110                task_id: task_id.to_string(),
111                status: status.clone(),
112                elapsed,
113            },
114        );
115
116        // Check for terminal states (case-insensitive)
117        match status.to_lowercase().as_str() {
118            // Success states
119            "processing-completed" | "completed" | "complete" | "succeeded" | "success" => {
120                let resource_id = task.response.as_ref().and_then(|r| r.resource_id);
121                emit(
122                    &on_progress,
123                    ProgressEvent::Completed {
124                        task_id: task_id.to_string(),
125                        resource_id,
126                    },
127                );
128                return Ok(task);
129            }
130            // Failure states
131            "processing-error" | "failed" | "error" => {
132                let error = task
133                    .response
134                    .as_ref()
135                    .and_then(|r| r.error.clone())
136                    .unwrap_or_else(|| format!("Task failed with status: {}", status));
137
138                emit(
139                    &on_progress,
140                    ProgressEvent::Failed {
141                        task_id: task_id.to_string(),
142                        error: error.clone(),
143                    },
144                );
145                return Err(CoreError::TaskFailed(error));
146            }
147            // Cancelled state
148            "cancelled" => {
149                emit(
150                    &on_progress,
151                    ProgressEvent::Failed {
152                        task_id: task_id.to_string(),
153                        error: "Task was cancelled".to_string(),
154                    },
155                );
156                return Err(CoreError::TaskFailed("Task was cancelled".to_string()));
157            }
158            _ => {
159                // Still processing, wait and try again
160                tokio::time::sleep(interval).await;
161            }
162        }
163    }
164}
165
166/// Helper to emit progress events
167fn emit(callback: &Option<ProgressCallback>, event: ProgressEvent) {
168    if let Some(cb) = callback {
169        cb(event);
170    }
171}