redisctl_core/progress.rs
1//! Progress tracking and task polling for async Cloud operations
2//!
3//! Cloud API operations return a `TaskStateUpdate` which must be polled
4//! until completion. This module provides utilities for that polling
5//! with optional progress callbacks for UI updates.
6
7use crate::error::{CoreError, Result};
8use redis_cloud::tasks::TaskStateUpdate;
9use redis_cloud::{CloudClient, TaskHandler};
10use std::time::{Duration, Instant};
11
12/// Progress events emitted during async operations
13#[derive(Debug, Clone)]
14pub enum ProgressEvent {
15 /// Task has been created/started
16 Started { task_id: String },
17 /// Polling iteration with current status
18 Polling {
19 task_id: String,
20 status: String,
21 elapsed: Duration,
22 },
23 /// Task completed successfully
24 Completed {
25 task_id: String,
26 resource_id: Option<i32>,
27 },
28 /// Task failed
29 Failed { task_id: String, error: String },
30}
31
32/// Callback type for progress updates
33///
34/// CLI can use this to update spinners/progress bars.
35/// MCP typically doesn't need this.
36pub type ProgressCallback = Box<dyn Fn(ProgressEvent) + Send + Sync>;
37
38/// Poll a Cloud task until completion
39///
40/// # Arguments
41///
42/// * `client` - The Cloud API client
43/// * `task_id` - The task ID to poll
44/// * `timeout` - Maximum time to wait for completion
45/// * `interval` - Time between polling attempts
46/// * `on_progress` - Optional callback for progress updates
47///
48/// # Returns
49///
50/// The completed task response, or an error if the task failed or timed out.
51///
52/// # Example
53///
54/// ```rust,ignore
55/// use redisctl_core::{poll_task, ProgressEvent};
56/// use std::time::Duration;
57///
58/// // Create a database (returns TaskStateUpdate)
59/// let task = handler.create(subscription_id, &request).await?;
60/// let task_id = task.task_id.unwrap();
61///
62/// // Poll with progress callback
63/// let completed = poll_task(
64/// &client,
65/// &task_id,
66/// Duration::from_secs(600),
67/// Duration::from_secs(10),
68/// Some(Box::new(|event| {
69/// match event {
70/// ProgressEvent::Polling { status, elapsed, .. } => {
71/// println!("Status: {} ({:.0}s)", status, elapsed.as_secs());
72/// }
73/// ProgressEvent::Completed { resource_id, .. } => {
74/// println!("Done! Resource ID: {:?}", resource_id);
75/// }
76/// _ => {}
77/// }
78/// })),
79/// ).await?;
80/// ```
81pub async fn poll_task(
82 client: &CloudClient,
83 task_id: &str,
84 timeout: Duration,
85 interval: Duration,
86 on_progress: Option<ProgressCallback>,
87) -> Result<TaskStateUpdate> {
88 let start = Instant::now();
89 let handler = TaskHandler::new(client.clone());
90
91 emit(
92 &on_progress,
93 ProgressEvent::Started {
94 task_id: task_id.to_string(),
95 },
96 );
97
98 loop {
99 let elapsed = start.elapsed();
100 if elapsed > timeout {
101 return Err(CoreError::TaskTimeout(timeout));
102 }
103
104 let task = handler.get_task_by_id(task_id.to_string()).await?;
105 let status = task.status.clone().unwrap_or_default();
106
107 emit(
108 &on_progress,
109 ProgressEvent::Polling {
110 task_id: task_id.to_string(),
111 status: status.clone(),
112 elapsed,
113 },
114 );
115
116 // Check for terminal states (case-insensitive)
117 match status.to_lowercase().as_str() {
118 // Success states
119 "processing-completed" | "completed" | "complete" | "succeeded" | "success" => {
120 let resource_id = task.response.as_ref().and_then(|r| r.resource_id);
121 emit(
122 &on_progress,
123 ProgressEvent::Completed {
124 task_id: task_id.to_string(),
125 resource_id,
126 },
127 );
128 return Ok(task);
129 }
130 // Failure states
131 "processing-error" | "failed" | "error" => {
132 let error = task
133 .response
134 .as_ref()
135 .and_then(|r| r.error.clone())
136 .unwrap_or_else(|| format!("Task failed with status: {}", status));
137
138 emit(
139 &on_progress,
140 ProgressEvent::Failed {
141 task_id: task_id.to_string(),
142 error: error.clone(),
143 },
144 );
145 return Err(CoreError::TaskFailed(error));
146 }
147 // Cancelled state
148 "cancelled" => {
149 emit(
150 &on_progress,
151 ProgressEvent::Failed {
152 task_id: task_id.to_string(),
153 error: "Task was cancelled".to_string(),
154 },
155 );
156 return Err(CoreError::TaskFailed("Task was cancelled".to_string()));
157 }
158 _ => {
159 // Still processing, wait and try again
160 tokio::time::sleep(interval).await;
161 }
162 }
163 }
164}
165
166/// Helper to emit progress events
167fn emit(callback: &Option<ProgressCallback>, event: ProgressEvent) {
168 if let Some(cb) = callback {
169 cb(event);
170 }
171}