redisctl_core/cloud/workflows.rs
1//! Cloud workflows - multi-step operations
2//!
3//! These workflows compose Layer 1 operations with progress tracking
4//! and additional logic.
5
6use crate::error::{CoreError, Result};
7use crate::progress::{ProgressCallback, poll_task};
8use redis_cloud::databases::{
9 Database, DatabaseBackupRequest, DatabaseCreateRequest, DatabaseImportRequest,
10 DatabaseUpdateRequest,
11};
12use redis_cloud::subscriptions::{
13 BaseSubscriptionUpdateRequest, Subscription, SubscriptionCreateRequest,
14};
15use redis_cloud::{CloudClient, DatabaseHandler, SubscriptionHandler};
16use std::time::Duration;
17
18/// Create a database and wait for completion
19///
20/// This is a convenience workflow that:
21/// 1. Creates a database (returns task)
22/// 2. Polls the task until completion
23/// 3. Fetches and returns the created database
24///
25/// # Arguments
26///
27/// * `client` - The Cloud API client
28/// * `subscription_id` - The subscription to create the database in
29/// * `request` - The database creation request
30/// * `timeout` - Maximum time to wait for completion
31/// * `on_progress` - Optional callback for progress updates
32///
33/// # Example
34///
35/// ```rust,ignore
36/// use redis_cloud::databases::DatabaseCreateRequest;
37/// use redisctl_core::cloud::create_database_and_wait;
38/// use std::time::Duration;
39///
40/// let request = DatabaseCreateRequest::builder()
41/// .name("my-database")
42/// .memory_limit_in_gb(1.0)
43/// .build();
44///
45/// let database = create_database_and_wait(
46/// &client,
47/// subscription_id,
48/// &request,
49/// Duration::from_secs(600),
50/// None, // No progress callback
51/// ).await?;
52///
53/// println!("Created database: {}", database.name.unwrap_or_default());
54/// ```
55pub async fn create_database_and_wait(
56 client: &CloudClient,
57 subscription_id: i32,
58 request: &DatabaseCreateRequest,
59 timeout: Duration,
60 on_progress: Option<ProgressCallback>,
61) -> Result<Database> {
62 let handler = DatabaseHandler::new(client.clone());
63
64 // Step 1: Create (returns task)
65 let task = handler.create(subscription_id, request).await?;
66 let task_id = task
67 .task_id
68 .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
69
70 // Step 2: Poll until complete
71 let completed = poll_task(
72 client,
73 &task_id,
74 timeout,
75 Duration::from_secs(10),
76 on_progress,
77 )
78 .await?;
79
80 // Step 3: Fetch the created resource
81 let resource_id = completed
82 .response
83 .and_then(|r| r.resource_id)
84 .ok_or_else(|| CoreError::TaskFailed("No resource ID in completed task".to_string()))?;
85
86 let db = handler.get(subscription_id, resource_id as i32).await?;
87 Ok(db)
88}
89
90/// Delete a database and wait for completion
91///
92/// # Arguments
93///
94/// * `client` - The Cloud API client
95/// * `subscription_id` - The subscription containing the database
96/// * `database_id` - The database to delete
97/// * `timeout` - Maximum time to wait for completion
98/// * `on_progress` - Optional callback for progress updates
99pub async fn delete_database_and_wait(
100 client: &CloudClient,
101 subscription_id: i32,
102 database_id: i32,
103 timeout: Duration,
104 on_progress: Option<ProgressCallback>,
105) -> Result<()> {
106 let handler = DatabaseHandler::new(client.clone());
107
108 // Step 1: Delete (returns task)
109 let task = handler.delete(subscription_id, database_id).await?;
110 let task_id = task
111 .task_id
112 .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
113
114 // Step 2: Poll until complete
115 poll_task(
116 client,
117 &task_id,
118 timeout,
119 Duration::from_secs(10),
120 on_progress,
121 )
122 .await?;
123
124 Ok(())
125}
126
127/// Update a database and wait for completion
128///
129/// # Arguments
130///
131/// * `client` - The Cloud API client
132/// * `subscription_id` - The subscription containing the database
133/// * `database_id` - The database to update
134/// * `request` - The database update request
135/// * `timeout` - Maximum time to wait for completion
136/// * `on_progress` - Optional callback for progress updates
137///
138/// # Example
139///
140/// ```rust,ignore
141/// use redis_cloud::databases::DatabaseUpdateRequest;
142/// use redisctl_core::cloud::update_database_and_wait;
143/// use std::time::Duration;
144///
145/// let request = DatabaseUpdateRequest::builder()
146/// .name("new-name")
147/// .memory_limit_in_gb(2.0)
148/// .build();
149///
150/// let database = update_database_and_wait(
151/// &client,
152/// subscription_id,
153/// database_id,
154/// &request,
155/// Duration::from_secs(600),
156/// None,
157/// ).await?;
158/// ```
159pub async fn update_database_and_wait(
160 client: &CloudClient,
161 subscription_id: i32,
162 database_id: i32,
163 request: &DatabaseUpdateRequest,
164 timeout: Duration,
165 on_progress: Option<ProgressCallback>,
166) -> Result<Database> {
167 let handler = DatabaseHandler::new(client.clone());
168
169 // Step 1: Update (returns task)
170 let task = handler
171 .update(subscription_id, database_id, request)
172 .await?;
173 let task_id = task
174 .task_id
175 .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
176
177 // Step 2: Poll until complete
178 poll_task(
179 client,
180 &task_id,
181 timeout,
182 Duration::from_secs(10),
183 on_progress,
184 )
185 .await?;
186
187 // Step 3: Fetch the updated database
188 let db = handler.get(subscription_id, database_id).await?;
189 Ok(db)
190}
191
192/// Backup a database and wait for completion
193///
194/// # Arguments
195///
196/// * `client` - The Cloud API client
197/// * `subscription_id` - The subscription containing the database
198/// * `database_id` - The database to backup
199/// * `region_name` - Optional region name (required for Active-Active databases)
200/// * `timeout` - Maximum time to wait for completion
201/// * `on_progress` - Optional callback for progress updates
202pub async fn backup_database_and_wait(
203 client: &CloudClient,
204 subscription_id: i32,
205 database_id: i32,
206 region_name: Option<&str>,
207 timeout: Duration,
208 on_progress: Option<ProgressCallback>,
209) -> Result<()> {
210 let handler = DatabaseHandler::new(client.clone());
211
212 // Build backup request
213 let request = if let Some(region) = region_name {
214 DatabaseBackupRequest::builder().region_name(region).build()
215 } else {
216 DatabaseBackupRequest::builder().build()
217 };
218
219 // Step 1: Trigger backup (returns task)
220 let task = handler
221 .backup_database(subscription_id, database_id, &request)
222 .await?;
223 let task_id = task
224 .task_id
225 .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
226
227 // Step 2: Poll until complete
228 poll_task(
229 client,
230 &task_id,
231 timeout,
232 Duration::from_secs(10),
233 on_progress,
234 )
235 .await?;
236
237 Ok(())
238}
239
240/// Import data into a database and wait for completion
241///
242/// WARNING: This will overwrite existing data in the database!
243///
244/// # Arguments
245///
246/// * `client` - The Cloud API client
247/// * `subscription_id` - The subscription containing the database
248/// * `database_id` - The database to import into
249/// * `request` - The import request specifying source type and URIs
250/// * `timeout` - Maximum time to wait for completion
251/// * `on_progress` - Optional callback for progress updates
252///
253/// # Example
254///
255/// ```rust,ignore
256/// use redis_cloud::databases::DatabaseImportRequest;
257/// use redisctl_core::cloud::import_database_and_wait;
258/// use std::time::Duration;
259///
260/// let request = DatabaseImportRequest::builder()
261/// .source_type("aws-s3")
262/// .import_from_uri(vec!["s3://bucket/file.rdb".to_string()])
263/// .build();
264///
265/// import_database_and_wait(
266/// &client,
267/// subscription_id,
268/// database_id,
269/// &request,
270/// Duration::from_secs(1800), // Imports can take longer
271/// None,
272/// ).await?;
273/// ```
274pub async fn import_database_and_wait(
275 client: &CloudClient,
276 subscription_id: i32,
277 database_id: i32,
278 request: &DatabaseImportRequest,
279 timeout: Duration,
280 on_progress: Option<ProgressCallback>,
281) -> Result<()> {
282 let handler = DatabaseHandler::new(client.clone());
283
284 // Step 1: Start import (returns task)
285 let task = handler
286 .import_database(subscription_id, database_id, request)
287 .await?;
288 let task_id = task
289 .task_id
290 .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
291
292 // Step 2: Poll until complete
293 poll_task(
294 client,
295 &task_id,
296 timeout,
297 Duration::from_secs(10),
298 on_progress,
299 )
300 .await?;
301
302 Ok(())
303}
304
305/// Flush all data from a database and wait for completion
306///
307/// WARNING: This permanently deletes ALL data in the database!
308///
309/// # Arguments
310///
311/// * `client` - The Cloud API client
312/// * `subscription_id` - The subscription containing the database
313/// * `database_id` - The database to flush
314/// * `timeout` - Maximum time to wait for completion
315/// * `on_progress` - Optional callback for progress updates
316pub async fn flush_database_and_wait(
317 client: &CloudClient,
318 subscription_id: i32,
319 database_id: i32,
320 timeout: Duration,
321 on_progress: Option<ProgressCallback>,
322) -> Result<()> {
323 let handler = DatabaseHandler::new(client.clone());
324
325 // Step 1: Flush (returns task)
326 let task = handler.flush_database(subscription_id, database_id).await?;
327 let task_id = task
328 .task_id
329 .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
330
331 // Step 2: Poll until complete
332 poll_task(
333 client,
334 &task_id,
335 timeout,
336 Duration::from_secs(5),
337 on_progress,
338 )
339 .await?;
340
341 Ok(())
342}
343
344// =============================================================================
345// Subscription Workflows
346// =============================================================================
347
348/// Create a subscription and wait for completion
349///
350/// This workflow:
351/// 1. Creates a subscription (returns task)
352/// 2. Polls the task until completion
353/// 3. Fetches and returns the created subscription
354///
355/// Note: Subscription creation requires complex nested structures (cloudProviders,
356/// databases arrays). Use `SubscriptionCreateRequest::builder()` from redis-cloud
357/// to construct the request.
358///
359/// # Arguments
360///
361/// * `client` - The Cloud API client
362/// * `request` - The subscription creation request
363/// * `timeout` - Maximum time to wait for completion
364/// * `on_progress` - Optional callback for progress updates
365///
366/// # Example
367///
368/// ```rust,ignore
369/// use redis_cloud::subscriptions::{
370/// SubscriptionCreateRequest, SubscriptionSpec, SubscriptionRegionSpec,
371/// SubscriptionDatabaseSpec,
372/// };
373/// use redisctl_core::cloud::create_subscription_and_wait;
374/// use std::time::Duration;
375///
376/// let request = SubscriptionCreateRequest::builder()
377/// .name("my-subscription")
378/// .cloud_providers(vec![SubscriptionSpec {
379/// provider: Some("AWS".to_string()),
380/// cloud_account_id: Some(1),
381/// regions: vec![SubscriptionRegionSpec {
382/// region: "us-east-1".to_string(),
383/// ..Default::default()
384/// }],
385/// }])
386/// .databases(vec![SubscriptionDatabaseSpec { /* ... */ }])
387/// .build();
388///
389/// let subscription = create_subscription_and_wait(
390/// &client,
391/// &request,
392/// Duration::from_secs(1800), // Subscriptions can take longer
393/// None,
394/// ).await?;
395/// ```
396pub async fn create_subscription_and_wait(
397 client: &CloudClient,
398 request: &SubscriptionCreateRequest,
399 timeout: Duration,
400 on_progress: Option<ProgressCallback>,
401) -> Result<Subscription> {
402 let handler = SubscriptionHandler::new(client.clone());
403
404 // Step 1: Create (returns task)
405 let task = handler.create_subscription(request).await?;
406 let task_id = task
407 .task_id
408 .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
409
410 // Step 2: Poll until complete
411 let completed = poll_task(
412 client,
413 &task_id,
414 timeout,
415 Duration::from_secs(15), // Subscriptions take longer, poll less frequently
416 on_progress,
417 )
418 .await?;
419
420 // Step 3: Fetch the created resource
421 let resource_id = completed
422 .response
423 .and_then(|r| r.resource_id)
424 .ok_or_else(|| CoreError::TaskFailed("No resource ID in completed task".to_string()))?;
425
426 let subscription = handler.get_subscription_by_id(resource_id).await?;
427 Ok(subscription)
428}
429
430/// Update a subscription and wait for completion
431///
432/// # Arguments
433///
434/// * `client` - The Cloud API client
435/// * `subscription_id` - The subscription to update
436/// * `request` - The update request
437/// * `timeout` - Maximum time to wait for completion
438/// * `on_progress` - Optional callback for progress updates
439///
440/// # Example
441///
442/// ```rust,ignore
443/// use redis_cloud::subscriptions::BaseSubscriptionUpdateRequest;
444/// use redisctl_core::cloud::update_subscription_and_wait;
445/// use std::time::Duration;
446///
447/// let request = BaseSubscriptionUpdateRequest {
448/// subscription_id: None,
449/// command_type: None,
450/// };
451///
452/// let subscription = update_subscription_and_wait(
453/// &client,
454/// 123,
455/// &request,
456/// Duration::from_secs(600),
457/// None,
458/// ).await?;
459/// ```
460pub async fn update_subscription_and_wait(
461 client: &CloudClient,
462 subscription_id: i32,
463 request: &BaseSubscriptionUpdateRequest,
464 timeout: Duration,
465 on_progress: Option<ProgressCallback>,
466) -> Result<Subscription> {
467 let handler = SubscriptionHandler::new(client.clone());
468
469 // Step 1: Update (returns task)
470 let task = handler
471 .update_subscription(subscription_id, request)
472 .await?;
473 let task_id = task
474 .task_id
475 .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
476
477 // Step 2: Poll until complete
478 poll_task(
479 client,
480 &task_id,
481 timeout,
482 Duration::from_secs(10),
483 on_progress,
484 )
485 .await?;
486
487 // Step 3: Fetch the updated subscription
488 let subscription = handler.get_subscription_by_id(subscription_id).await?;
489 Ok(subscription)
490}
491
492/// Delete a subscription and wait for completion
493///
494/// WARNING: This will delete the subscription and all its databases!
495/// Ensure all databases are deleted first or this operation may fail.
496///
497/// # Arguments
498///
499/// * `client` - The Cloud API client
500/// * `subscription_id` - The subscription to delete
501/// * `timeout` - Maximum time to wait for completion
502/// * `on_progress` - Optional callback for progress updates
503pub async fn delete_subscription_and_wait(
504 client: &CloudClient,
505 subscription_id: i32,
506 timeout: Duration,
507 on_progress: Option<ProgressCallback>,
508) -> Result<()> {
509 let handler = SubscriptionHandler::new(client.clone());
510
511 // Step 1: Delete (returns task)
512 let task = handler.delete_subscription_by_id(subscription_id).await?;
513 let task_id = task
514 .task_id
515 .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
516
517 // Step 2: Poll until complete
518 poll_task(
519 client,
520 &task_id,
521 timeout,
522 Duration::from_secs(10),
523 on_progress,
524 )
525 .await?;
526
527 Ok(())
528}