Skip to main content

redisctl_core/cloud/
workflows.rs

1//! Cloud workflows - multi-step operations
2//!
3//! These workflows compose Layer 1 operations with progress tracking
4//! and additional logic.
5
6use crate::error::{CoreError, Result};
7use crate::progress::{ProgressCallback, poll_task};
8use redis_cloud::databases::{
9    Database, DatabaseBackupRequest, DatabaseCreateRequest, DatabaseImportRequest,
10    DatabaseUpdateRequest,
11};
12use redis_cloud::subscriptions::{
13    BaseSubscriptionUpdateRequest, Subscription, SubscriptionCreateRequest,
14};
15use redis_cloud::{CloudClient, DatabaseHandler, SubscriptionHandler};
16use std::time::Duration;
17
18/// Create a database and wait for completion
19///
20/// This is a convenience workflow that:
21/// 1. Creates a database (returns task)
22/// 2. Polls the task until completion
23/// 3. Fetches and returns the created database
24///
25/// # Arguments
26///
27/// * `client` - The Cloud API client
28/// * `subscription_id` - The subscription to create the database in
29/// * `request` - The database creation request
30/// * `timeout` - Maximum time to wait for completion
31/// * `on_progress` - Optional callback for progress updates
32///
33/// # Example
34///
35/// ```rust,ignore
36/// use redis_cloud::databases::DatabaseCreateRequest;
37/// use redisctl_core::cloud::create_database_and_wait;
38/// use std::time::Duration;
39///
40/// let request = DatabaseCreateRequest::builder()
41///     .name("my-database")
42///     .memory_limit_in_gb(1.0)
43///     .build();
44///
45/// let database = create_database_and_wait(
46///     &client,
47///     subscription_id,
48///     &request,
49///     Duration::from_secs(600),
50///     None,  // No progress callback
51/// ).await?;
52///
53/// println!("Created database: {}", database.name.unwrap_or_default());
54/// ```
55pub async fn create_database_and_wait(
56    client: &CloudClient,
57    subscription_id: i32,
58    request: &DatabaseCreateRequest,
59    timeout: Duration,
60    on_progress: Option<ProgressCallback>,
61) -> Result<Database> {
62    let handler = DatabaseHandler::new(client.clone());
63
64    // Step 1: Create (returns task)
65    let task = handler.create(subscription_id, request).await?;
66    let task_id = task
67        .task_id
68        .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
69
70    // Step 2: Poll until complete
71    let completed = poll_task(
72        client,
73        &task_id,
74        timeout,
75        Duration::from_secs(10),
76        on_progress,
77    )
78    .await?;
79
80    // Step 3: Fetch the created resource
81    let resource_id = completed
82        .response
83        .and_then(|r| r.resource_id)
84        .ok_or_else(|| CoreError::TaskFailed("No resource ID in completed task".to_string()))?;
85
86    let db = handler.get(subscription_id, resource_id as i32).await?;
87    Ok(db)
88}
89
90/// Delete a database and wait for completion
91///
92/// # Arguments
93///
94/// * `client` - The Cloud API client
95/// * `subscription_id` - The subscription containing the database
96/// * `database_id` - The database to delete
97/// * `timeout` - Maximum time to wait for completion
98/// * `on_progress` - Optional callback for progress updates
99pub async fn delete_database_and_wait(
100    client: &CloudClient,
101    subscription_id: i32,
102    database_id: i32,
103    timeout: Duration,
104    on_progress: Option<ProgressCallback>,
105) -> Result<()> {
106    let handler = DatabaseHandler::new(client.clone());
107
108    // Step 1: Delete (returns task)
109    let task = handler.delete(subscription_id, database_id).await?;
110    let task_id = task
111        .task_id
112        .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
113
114    // Step 2: Poll until complete
115    poll_task(
116        client,
117        &task_id,
118        timeout,
119        Duration::from_secs(10),
120        on_progress,
121    )
122    .await?;
123
124    Ok(())
125}
126
127/// Update a database and wait for completion
128///
129/// # Arguments
130///
131/// * `client` - The Cloud API client
132/// * `subscription_id` - The subscription containing the database
133/// * `database_id` - The database to update
134/// * `request` - The database update request
135/// * `timeout` - Maximum time to wait for completion
136/// * `on_progress` - Optional callback for progress updates
137///
138/// # Example
139///
140/// ```rust,ignore
141/// use redis_cloud::databases::DatabaseUpdateRequest;
142/// use redisctl_core::cloud::update_database_and_wait;
143/// use std::time::Duration;
144///
145/// let request = DatabaseUpdateRequest::builder()
146///     .name("new-name")
147///     .memory_limit_in_gb(2.0)
148///     .build();
149///
150/// let database = update_database_and_wait(
151///     &client,
152///     subscription_id,
153///     database_id,
154///     &request,
155///     Duration::from_secs(600),
156///     None,
157/// ).await?;
158/// ```
159pub async fn update_database_and_wait(
160    client: &CloudClient,
161    subscription_id: i32,
162    database_id: i32,
163    request: &DatabaseUpdateRequest,
164    timeout: Duration,
165    on_progress: Option<ProgressCallback>,
166) -> Result<Database> {
167    let handler = DatabaseHandler::new(client.clone());
168
169    // Step 1: Update (returns task)
170    let task = handler
171        .update(subscription_id, database_id, request)
172        .await?;
173    let task_id = task
174        .task_id
175        .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
176
177    // Step 2: Poll until complete
178    poll_task(
179        client,
180        &task_id,
181        timeout,
182        Duration::from_secs(10),
183        on_progress,
184    )
185    .await?;
186
187    // Step 3: Fetch the updated database
188    let db = handler.get(subscription_id, database_id).await?;
189    Ok(db)
190}
191
192/// Backup a database and wait for completion
193///
194/// # Arguments
195///
196/// * `client` - The Cloud API client
197/// * `subscription_id` - The subscription containing the database
198/// * `database_id` - The database to backup
199/// * `region_name` - Optional region name (required for Active-Active databases)
200/// * `timeout` - Maximum time to wait for completion
201/// * `on_progress` - Optional callback for progress updates
202pub async fn backup_database_and_wait(
203    client: &CloudClient,
204    subscription_id: i32,
205    database_id: i32,
206    region_name: Option<&str>,
207    timeout: Duration,
208    on_progress: Option<ProgressCallback>,
209) -> Result<()> {
210    let handler = DatabaseHandler::new(client.clone());
211
212    // Build backup request
213    let request = if let Some(region) = region_name {
214        DatabaseBackupRequest::builder().region_name(region).build()
215    } else {
216        DatabaseBackupRequest::builder().build()
217    };
218
219    // Step 1: Trigger backup (returns task)
220    let task = handler
221        .backup_database(subscription_id, database_id, &request)
222        .await?;
223    let task_id = task
224        .task_id
225        .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
226
227    // Step 2: Poll until complete
228    poll_task(
229        client,
230        &task_id,
231        timeout,
232        Duration::from_secs(10),
233        on_progress,
234    )
235    .await?;
236
237    Ok(())
238}
239
240/// Import data into a database and wait for completion
241///
242/// WARNING: This will overwrite existing data in the database!
243///
244/// # Arguments
245///
246/// * `client` - The Cloud API client
247/// * `subscription_id` - The subscription containing the database
248/// * `database_id` - The database to import into
249/// * `request` - The import request specifying source type and URIs
250/// * `timeout` - Maximum time to wait for completion
251/// * `on_progress` - Optional callback for progress updates
252///
253/// # Example
254///
255/// ```rust,ignore
256/// use redis_cloud::databases::DatabaseImportRequest;
257/// use redisctl_core::cloud::import_database_and_wait;
258/// use std::time::Duration;
259///
260/// let request = DatabaseImportRequest::builder()
261///     .source_type("aws-s3")
262///     .import_from_uri(vec!["s3://bucket/file.rdb".to_string()])
263///     .build();
264///
265/// import_database_and_wait(
266///     &client,
267///     subscription_id,
268///     database_id,
269///     &request,
270///     Duration::from_secs(1800), // Imports can take longer
271///     None,
272/// ).await?;
273/// ```
274pub async fn import_database_and_wait(
275    client: &CloudClient,
276    subscription_id: i32,
277    database_id: i32,
278    request: &DatabaseImportRequest,
279    timeout: Duration,
280    on_progress: Option<ProgressCallback>,
281) -> Result<()> {
282    let handler = DatabaseHandler::new(client.clone());
283
284    // Step 1: Start import (returns task)
285    let task = handler
286        .import_database(subscription_id, database_id, request)
287        .await?;
288    let task_id = task
289        .task_id
290        .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
291
292    // Step 2: Poll until complete
293    poll_task(
294        client,
295        &task_id,
296        timeout,
297        Duration::from_secs(10),
298        on_progress,
299    )
300    .await?;
301
302    Ok(())
303}
304
305/// Flush all data from a database and wait for completion
306///
307/// WARNING: This permanently deletes ALL data in the database!
308///
309/// # Arguments
310///
311/// * `client` - The Cloud API client
312/// * `subscription_id` - The subscription containing the database
313/// * `database_id` - The database to flush
314/// * `timeout` - Maximum time to wait for completion
315/// * `on_progress` - Optional callback for progress updates
316pub async fn flush_database_and_wait(
317    client: &CloudClient,
318    subscription_id: i32,
319    database_id: i32,
320    timeout: Duration,
321    on_progress: Option<ProgressCallback>,
322) -> Result<()> {
323    let handler = DatabaseHandler::new(client.clone());
324
325    // Step 1: Flush (returns task)
326    let task = handler.flush_database(subscription_id, database_id).await?;
327    let task_id = task
328        .task_id
329        .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
330
331    // Step 2: Poll until complete
332    poll_task(
333        client,
334        &task_id,
335        timeout,
336        Duration::from_secs(5),
337        on_progress,
338    )
339    .await?;
340
341    Ok(())
342}
343
344// =============================================================================
345// Subscription Workflows
346// =============================================================================
347
348/// Create a subscription and wait for completion
349///
350/// This workflow:
351/// 1. Creates a subscription (returns task)
352/// 2. Polls the task until completion
353/// 3. Fetches and returns the created subscription
354///
355/// Note: Subscription creation requires complex nested structures (cloudProviders,
356/// databases arrays). Use `SubscriptionCreateRequest::builder()` from redis-cloud
357/// to construct the request.
358///
359/// # Arguments
360///
361/// * `client` - The Cloud API client
362/// * `request` - The subscription creation request
363/// * `timeout` - Maximum time to wait for completion
364/// * `on_progress` - Optional callback for progress updates
365///
366/// # Example
367///
368/// ```rust,ignore
369/// use redis_cloud::subscriptions::{
370///     SubscriptionCreateRequest, SubscriptionSpec, SubscriptionRegionSpec,
371///     SubscriptionDatabaseSpec,
372/// };
373/// use redisctl_core::cloud::create_subscription_and_wait;
374/// use std::time::Duration;
375///
376/// let request = SubscriptionCreateRequest::builder()
377///     .name("my-subscription")
378///     .cloud_providers(vec![SubscriptionSpec {
379///         provider: Some("AWS".to_string()),
380///         cloud_account_id: Some(1),
381///         regions: vec![SubscriptionRegionSpec {
382///             region: "us-east-1".to_string(),
383///             ..Default::default()
384///         }],
385///     }])
386///     .databases(vec![SubscriptionDatabaseSpec { /* ... */ }])
387///     .build();
388///
389/// let subscription = create_subscription_and_wait(
390///     &client,
391///     &request,
392///     Duration::from_secs(1800), // Subscriptions can take longer
393///     None,
394/// ).await?;
395/// ```
396pub async fn create_subscription_and_wait(
397    client: &CloudClient,
398    request: &SubscriptionCreateRequest,
399    timeout: Duration,
400    on_progress: Option<ProgressCallback>,
401) -> Result<Subscription> {
402    let handler = SubscriptionHandler::new(client.clone());
403
404    // Step 1: Create (returns task)
405    let task = handler.create_subscription(request).await?;
406    let task_id = task
407        .task_id
408        .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
409
410    // Step 2: Poll until complete
411    let completed = poll_task(
412        client,
413        &task_id,
414        timeout,
415        Duration::from_secs(15), // Subscriptions take longer, poll less frequently
416        on_progress,
417    )
418    .await?;
419
420    // Step 3: Fetch the created resource
421    let resource_id = completed
422        .response
423        .and_then(|r| r.resource_id)
424        .ok_or_else(|| CoreError::TaskFailed("No resource ID in completed task".to_string()))?;
425
426    let subscription = handler.get_subscription_by_id(resource_id).await?;
427    Ok(subscription)
428}
429
430/// Update a subscription and wait for completion
431///
432/// # Arguments
433///
434/// * `client` - The Cloud API client
435/// * `subscription_id` - The subscription to update
436/// * `request` - The update request
437/// * `timeout` - Maximum time to wait for completion
438/// * `on_progress` - Optional callback for progress updates
439///
440/// # Example
441///
442/// ```rust,ignore
443/// use redis_cloud::subscriptions::BaseSubscriptionUpdateRequest;
444/// use redisctl_core::cloud::update_subscription_and_wait;
445/// use std::time::Duration;
446///
447/// let request = BaseSubscriptionUpdateRequest {
448///     subscription_id: None,
449///     command_type: None,
450/// };
451///
452/// let subscription = update_subscription_and_wait(
453///     &client,
454///     123,
455///     &request,
456///     Duration::from_secs(600),
457///     None,
458/// ).await?;
459/// ```
460pub async fn update_subscription_and_wait(
461    client: &CloudClient,
462    subscription_id: i32,
463    request: &BaseSubscriptionUpdateRequest,
464    timeout: Duration,
465    on_progress: Option<ProgressCallback>,
466) -> Result<Subscription> {
467    let handler = SubscriptionHandler::new(client.clone());
468
469    // Step 1: Update (returns task)
470    let task = handler
471        .update_subscription(subscription_id, request)
472        .await?;
473    let task_id = task
474        .task_id
475        .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
476
477    // Step 2: Poll until complete
478    poll_task(
479        client,
480        &task_id,
481        timeout,
482        Duration::from_secs(10),
483        on_progress,
484    )
485    .await?;
486
487    // Step 3: Fetch the updated subscription
488    let subscription = handler.get_subscription_by_id(subscription_id).await?;
489    Ok(subscription)
490}
491
492/// Delete a subscription and wait for completion
493///
494/// WARNING: This will delete the subscription and all its databases!
495/// Ensure all databases are deleted first or this operation may fail.
496///
497/// # Arguments
498///
499/// * `client` - The Cloud API client
500/// * `subscription_id` - The subscription to delete
501/// * `timeout` - Maximum time to wait for completion
502/// * `on_progress` - Optional callback for progress updates
503pub async fn delete_subscription_and_wait(
504    client: &CloudClient,
505    subscription_id: i32,
506    timeout: Duration,
507    on_progress: Option<ProgressCallback>,
508) -> Result<()> {
509    let handler = SubscriptionHandler::new(client.clone());
510
511    // Step 1: Delete (returns task)
512    let task = handler.delete_subscription_by_id(subscription_id).await?;
513    let task_id = task
514        .task_id
515        .ok_or_else(|| CoreError::TaskFailed("No task ID returned".to_string()))?;
516
517    // Step 2: Poll until complete
518    poll_task(
519        client,
520        &task_id,
521        timeout,
522        Duration::from_secs(10),
523        on_progress,
524    )
525    .await?;
526
527    Ok(())
528}