Skip to main content

redis_enterprise/
bdb.rs

1//! Database (BDB) management for Redis Enterprise
2//!
3//! ## Overview
4//! - Create, list, update, and delete databases
5//! - Execute database actions (backup, restore, import, export)
6//! - Monitor database status and metrics
7//! - Configure database endpoints and sharding
8//!
9//! ## Examples
10//!
11//! ### Creating a Database
12//! ```no_run
13//! use redis_enterprise::{EnterpriseClient, CreateDatabaseRequest};
14//!
15//! # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
16//! // Simple cache database
17//! let cache_db = CreateDatabaseRequest::builder()
18//!     .name("my-cache")
19//!     .memory_size(1_073_741_824)  // 1GB
20//!     .eviction_policy("allkeys-lru")
21//!     .persistence("disabled")
22//!     .build();
23//!
24//! let db = client.databases().create(cache_db).await?;
25//! println!("Created database with ID: {}", db.uid);
26//! # Ok(())
27//! # }
28//! ```
29//!
30//! ### Database Actions
31//! ```no_run
32//! # use redis_enterprise::EnterpriseClient;
33//! # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
34//! let db_id = 1;
35//!
36//! // Backup database
37//! let backup = client.databases().backup(db_id).await?;
38//! println!("Backup started: {:?}", backup.action_uid);
39//!
40//! // Export to remote location
41//! let export = client.databases().export(db_id, "ftp://backup.site/db.rdb").await?;
42//! println!("Export initiated: {:?}", export.action_uid);
43//!
44//! // Import from backup
45//! let import = client.databases().import(db_id, "ftp://backup.site/db.rdb", true).await?;
46//! println!("Import started: {:?}", import.action_uid);
47//! # Ok(())
48//! # }
49//! ```
50//!
51//! ### Monitoring Databases
52//! ```no_run
53//! # use redis_enterprise::EnterpriseClient;
54//! # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
55//! // List all databases
56//! let databases = client.databases().list().await?;
57//! for db in databases {
58//!     println!("{}: {} MB used", db.name, db.memory_used.unwrap_or(0) / 1_048_576);
59//! }
60//!
61//! // Get database endpoints
62//! let endpoints = client.databases().endpoints(1).await?;
63//! for endpoint in endpoints {
64//!     println!("Endpoint: {:?}:{:?}", endpoint.dns_name, endpoint.port);
65//! }
66//! # Ok(())
67//! # }
68//! ```
69
70use crate::client::RestClient;
71use crate::error::Result;
72use futures::stream::Stream;
73use serde::{Deserialize, Serialize};
74use serde_json::Value;
75use std::pin::Pin;
76use std::time::Duration;
77use tokio::time::sleep;
78use typed_builder::TypedBuilder;
79
80// Aliases for easier use
81pub type Database = DatabaseInfo;
82pub type BdbHandler = DatabaseHandler;
83pub type DatabaseWatchStream<'a> =
84    Pin<Box<dyn Stream<Item = Result<(DatabaseInfo, Option<String>)>> + Send + 'a>>;
85
86/// Response from database action operations
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct DatabaseActionResponse {
89    /// The action UID for tracking async operations
90    pub action_uid: String,
91    /// Description of the action
92    pub description: Option<String>,
93}
94
95/// Response from backup operation
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct BackupResponse {
98    /// The action UID for tracking the backup operation
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub action_uid: Option<String>,
101    /// Backup UID if available
102    pub backup_uid: Option<String>,
103}
104
105/// Response from import operation
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ImportResponse {
108    /// The action UID for tracking the import operation
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub action_uid: Option<String>,
111    /// Import status
112    pub status: Option<String>,
113}
114
115/// Response from export operation
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct ExportResponse {
118    /// The action UID for tracking the export operation
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub action_uid: Option<String>,
121    /// Export status
122    pub status: Option<String>,
123}
124
125/// Module information for database upgrade
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct ModuleUpgrade {
128    /// Module name
129    pub module_name: String,
130    /// Module version
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub new_version: Option<String>,
133    /// Module arguments
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub module_args: Option<String>,
136}
137
138/// Request for database upgrade operation
139///
140/// # Examples
141///
142/// ```rust,no_run
143/// use redis_enterprise::bdb::DatabaseUpgradeRequest;
144///
145/// // Upgrade to latest Redis version with role preservation
146/// let request = DatabaseUpgradeRequest::builder()
147///     .preserve_roles(true)
148///     .build();
149///
150/// // Upgrade to specific version
151/// let request = DatabaseUpgradeRequest::builder()
152///     .redis_version("7.4.2")
153///     .preserve_roles(true)
154///     .parallel_shards_upgrade(2)
155///     .build();
156/// ```
157#[derive(Debug, Clone, Serialize, Deserialize, Default, TypedBuilder)]
158pub struct DatabaseUpgradeRequest {
159    /// Target Redis version (optional, defaults to latest)
160    #[serde(skip_serializing_if = "Option::is_none")]
161    #[builder(default, setter(into, strip_option))]
162    pub redis_version: Option<String>,
163
164    /// Preserve master/replica roles (requires extra failover)
165    #[serde(skip_serializing_if = "Option::is_none")]
166    #[builder(default, setter(strip_option))]
167    pub preserve_roles: Option<bool>,
168
169    /// Restart shards even if no version change
170    #[serde(skip_serializing_if = "Option::is_none")]
171    #[builder(default, setter(strip_option))]
172    pub force_restart: Option<bool>,
173
174    /// Allow data loss in non-replicated, non-persistent databases
175    #[serde(skip_serializing_if = "Option::is_none")]
176    #[builder(default, setter(strip_option))]
177    pub may_discard_data: Option<bool>,
178
179    /// Force data discard even if replicated/persistent
180    #[serde(skip_serializing_if = "Option::is_none")]
181    #[builder(default, setter(strip_option))]
182    pub force_discard: Option<bool>,
183
184    /// Keep current CRDT protocol version
185    #[serde(skip_serializing_if = "Option::is_none")]
186    #[builder(default, setter(strip_option))]
187    pub keep_crdt_protocol_version: Option<bool>,
188
189    /// Maximum parallel shard upgrades (default: all shards)
190    #[serde(skip_serializing_if = "Option::is_none")]
191    #[builder(default, setter(strip_option))]
192    pub parallel_shards_upgrade: Option<u32>,
193
194    /// Modules to upgrade alongside Redis
195    #[serde(skip_serializing_if = "Option::is_none")]
196    #[builder(default, setter(strip_option))]
197    pub modules: Option<Vec<ModuleUpgrade>>,
198}
199
200/// Database information from the REST API - 100% field coverage (152/152 fields)
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct DatabaseInfo {
203    // Core database identification and status
204    pub uid: u32,
205    pub name: String,
206    pub port: Option<u16>,
207    pub status: Option<String>,
208    pub memory_size: Option<u64>,
209    pub memory_used: Option<u64>,
210
211    /// Database type (e.g., "redis", "memcached")
212    #[serde(rename = "type")]
213    pub type_: Option<String>,
214    pub version: Option<String>,
215
216    /// Account and action tracking
217    pub account_id: Option<u32>,
218    pub action_uid: Option<String>,
219
220    // Sharding and placement
221    pub shards_count: Option<u32>,
222    pub shard_list: Option<Vec<u32>>,
223    pub sharding: Option<bool>,
224    pub shards_placement: Option<String>,
225    pub replication: Option<bool>,
226
227    // Endpoints and networking
228    pub endpoints: Option<Vec<EndpointInfo>>,
229    pub endpoint: Option<String>,
230    pub endpoint_ip: Option<Vec<String>>,
231    pub endpoint_node: Option<u32>,
232    pub dns_address_master: Option<String>,
233
234    // Data persistence and backup
235    pub persistence: Option<String>,
236    pub data_persistence: Option<String>,
237    pub eviction_policy: Option<String>,
238
239    // Timestamps
240    pub created_time: Option<String>,
241    pub last_changed_time: Option<String>,
242    pub last_backup_time: Option<String>,
243    pub last_export_time: Option<String>,
244
245    // Security and authentication
246    pub mtls_allow_weak_hashing: Option<bool>,
247    pub mtls_allow_outdated_certs: Option<bool>,
248    pub authentication_redis_pass: Option<String>,
249    pub authentication_admin_pass: Option<String>,
250    pub authentication_sasl_pass: Option<String>,
251    pub authentication_sasl_uname: Option<String>,
252    pub authentication_ssl_client_certs: Option<Vec<Value>>,
253    pub authentication_ssl_crdt_certs: Option<Vec<Value>>,
254    pub authorized_subjects: Option<Vec<Value>>,
255    pub data_internode_encryption: Option<bool>,
256    pub ssl: Option<bool>,
257    pub tls_mode: Option<String>,
258    pub enforce_client_authentication: Option<String>,
259    pub default_user: Option<bool>,
260    /// ACL configuration
261    pub acl: Option<Value>,
262    /// Client certificate subject validation type
263    pub client_cert_subject_validation_type: Option<String>,
264    /// Compare key hslot
265    pub compare_key_hslot: Option<bool>,
266    /// DNS suffixes for endpoints
267    pub dns_suffixes: Option<Vec<String>>,
268    /// Group UID for the database
269    pub group_uid: Option<u32>,
270    /// Redis cluster mode enabled
271    pub redis_cluster_enabled: Option<bool>,
272
273    // CRDT/Active-Active fields
274    pub crdt: Option<bool>,
275    pub crdt_enabled: Option<bool>,
276    pub crdt_config_version: Option<u32>,
277    pub crdt_replica_id: Option<u32>,
278    pub crdt_ghost_replica_ids: Option<String>,
279    pub crdt_featureset_version: Option<u32>,
280    pub crdt_protocol_version: Option<u32>,
281    pub crdt_guid: Option<String>,
282    pub crdt_modules: Option<String>,
283    pub crdt_replicas: Option<String>,
284    pub crdt_sources: Option<Vec<Value>>,
285    pub crdt_sync: Option<String>,
286    pub crdt_sync_connection_alarm_timeout_seconds: Option<u32>,
287    pub crdt_sync_dist: Option<bool>,
288    pub crdt_syncer_auto_oom_unlatch: Option<bool>,
289    pub crdt_xadd_id_uniqueness_mode: Option<String>,
290    pub crdt_causal_consistency: Option<bool>,
291    pub crdt_repl_backlog_size: Option<String>,
292
293    // Replication settings
294    pub master_persistence: Option<bool>,
295    pub slave_ha: Option<bool>,
296    pub slave_ha_priority: Option<u32>,
297    pub replica_read_only: Option<bool>,
298    pub replica_sources: Option<Vec<Value>>,
299    pub replica_sync: Option<String>,
300    pub replica_sync_connection_alarm_timeout_seconds: Option<u32>,
301    pub replica_sync_dist: Option<bool>,
302    pub repl_backlog_size: Option<String>,
303
304    // Connection and performance settings
305    pub max_connections: Option<u32>,
306    pub maxclients: Option<u32>,
307    pub conns: Option<u32>,
308    pub conns_type: Option<String>,
309    pub max_client_pipeline: Option<u32>,
310    pub max_pipelined: Option<u32>,
311
312    // AOF (Append Only File) settings
313    pub aof_policy: Option<String>,
314    pub max_aof_file_size: Option<u64>,
315    pub max_aof_load_time: Option<u32>,
316
317    // Active defragmentation settings
318    pub activedefrag: Option<String>,
319    pub active_defrag_cycle_max: Option<u32>,
320    pub active_defrag_cycle_min: Option<u32>,
321    pub active_defrag_ignore_bytes: Option<String>,
322    pub active_defrag_max_scan_fields: Option<u32>,
323    pub active_defrag_threshold_lower: Option<u32>,
324    pub active_defrag_threshold_upper: Option<u32>,
325
326    // Backup settings
327    pub backup: Option<bool>,
328    pub backup_failure_reason: Option<String>,
329    pub backup_history: Option<u32>,
330    pub backup_interval: Option<u32>,
331    pub backup_interval_offset: Option<u32>,
332    pub backup_location: Option<Value>,
333    pub backup_progress: Option<f64>,
334    pub backup_status: Option<String>,
335
336    // Import/Export settings
337    pub dataset_import_sources: Option<Vec<Value>>,
338    pub import_failure_reason: Option<String>,
339    pub import_progress: Option<f64>,
340    pub import_status: Option<String>,
341    pub export_failure_reason: Option<String>,
342    pub export_progress: Option<f64>,
343    pub export_status: Option<String>,
344    pub skip_import_analyze: Option<String>,
345
346    // Monitoring and metrics
347    pub metrics_export_all: Option<bool>,
348    pub generate_text_monitor: Option<bool>,
349    pub email_alerts: Option<bool>,
350
351    // Modules and features
352    pub module_list: Option<Vec<Value>>,
353    /// Search configuration - can be bool or object depending on API version
354    #[serde(default)]
355    pub search: Option<Value>,
356    /// Timeseries configuration - can be bool or object depending on API version
357    #[serde(default)]
358    pub timeseries: Option<Value>,
359
360    // BigStore/Flash storage settings
361    pub bigstore: Option<bool>,
362    pub bigstore_ram_size: Option<u64>,
363    pub bigstore_max_ram_ratio: Option<u32>,
364    pub bigstore_ram_weights: Option<Vec<Value>>,
365    pub bigstore_version: Option<u32>,
366
367    // Network and proxy settings
368    pub proxy_policy: Option<String>,
369    pub oss_cluster: Option<bool>,
370    pub oss_cluster_api_preferred_endpoint_type: Option<String>,
371    pub oss_cluster_api_preferred_ip_type: Option<String>,
372    pub oss_sharding: Option<bool>,
373
374    // Redis-specific settings
375    pub redis_version: Option<String>,
376    pub resp3: Option<bool>,
377    pub disabled_commands: Option<String>,
378
379    // Clustering and sharding
380    pub hash_slots_policy: Option<String>,
381    pub shard_key_regex: Option<Vec<Value>>,
382    pub shard_block_crossslot_keys: Option<bool>,
383    pub shard_block_foreign_keys: Option<bool>,
384    pub implicit_shard_key: Option<bool>,
385
386    // Node placement and rack awareness
387    pub avoid_nodes: Option<Vec<String>>,
388    pub use_nodes: Option<Vec<String>>,
389    pub rack_aware: Option<bool>,
390
391    // Operational settings
392    pub auto_upgrade: Option<bool>,
393    pub internal: Option<bool>,
394    pub db_conns_auditing: Option<bool>,
395    pub flush_on_fullsync: Option<bool>,
396    pub use_selective_flush: Option<bool>,
397
398    // Sync and replication control
399    pub sync: Option<String>,
400    pub sync_sources: Option<Vec<Value>>,
401    pub sync_dedicated_threads: Option<u32>,
402    pub syncer_mode: Option<String>,
403    pub syncer_log_level: Option<String>,
404    pub support_syncer_reconf: Option<bool>,
405
406    // Gradual sync settings
407    pub gradual_src_mode: Option<String>,
408    pub gradual_src_max_sources: Option<u32>,
409    pub gradual_sync_mode: Option<String>,
410    pub gradual_sync_max_shards_per_source: Option<u32>,
411
412    // Slave and buffer settings
413    pub slave_buffer: Option<String>,
414
415    // Snapshot settings
416    pub snapshot_policy: Option<Vec<Value>>,
417
418    // Scheduling and recovery
419    pub sched_policy: Option<String>,
420    pub recovery_wait_time: Option<i32>,
421
422    // Performance and optimization
423    pub multi_commands_opt: Option<String>,
424    pub throughput_ingress: Option<f64>,
425    pub tracking_table_max_keys: Option<u32>,
426    pub wait_command: Option<bool>,
427
428    // Legacy and deprecated fields
429    pub background_op: Option<Vec<Value>>,
430
431    // Advanced configuration
432    pub mkms: Option<bool>,
433    pub roles_permissions: Option<Vec<Value>>,
434    pub tags: Option<Vec<String>>,
435    pub topology_epoch: Option<u32>,
436}
437
438/// Database endpoint information
439#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct EndpointInfo {
441    /// Unique identifier for the endpoint
442    pub uid: Option<String>,
443    /// List of IP addresses for the endpoint
444    pub addr: Option<Vec<String>>,
445    /// Port number for the endpoint
446    pub port: Option<u16>,
447    /// DNS name for the endpoint
448    pub dns_name: Option<String>,
449    /// Proxy policy for the endpoint
450    pub proxy_policy: Option<String>,
451    /// Address type (e.g., "internal", "external")
452    pub addr_type: Option<String>,
453    /// OSS cluster API preferred IP type
454    pub oss_cluster_api_preferred_ip_type: Option<String>,
455    /// List of proxy UIDs to exclude
456    pub exclude_proxies: Option<Vec<u32>>,
457    /// List of proxy UIDs to include
458    pub include_proxies: Option<Vec<u32>>,
459}
460
461/// Module configuration for database creation
462#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
463pub struct ModuleConfig {
464    #[builder(setter(into))]
465    pub module_name: String,
466    #[serde(skip_serializing_if = "Option::is_none")]
467    #[builder(default, setter(into, strip_option))]
468    pub module_args: Option<String>,
469}
470
471/// Create database request
472///
473/// # Examples
474///
475/// ```rust,no_run
476/// use redis_enterprise::{CreateDatabaseRequest, ModuleConfig};
477///
478/// let request = CreateDatabaseRequest::builder()
479///     .name("my-database")
480///     .memory_size(1024 * 1024 * 1024) // 1GB
481///     .port(12000)
482///     .replication(true)
483///     .persistence("aof")
484///     .eviction_policy("volatile-lru")
485///     .shards_count(2)
486///     .authentication_redis_pass("secure-password")
487///     .build();
488/// ```
489#[derive(Debug, Serialize, Deserialize, TypedBuilder)]
490pub struct CreateDatabaseRequest {
491    #[builder(setter(into))]
492    pub name: String,
493    #[serde(skip_serializing_if = "Option::is_none")]
494    #[builder(default, setter(strip_option))]
495    pub memory_size: Option<u64>,
496    #[serde(skip_serializing_if = "Option::is_none")]
497    #[builder(default, setter(strip_option))]
498    pub port: Option<u16>,
499    #[serde(skip_serializing_if = "Option::is_none")]
500    #[builder(default, setter(strip_option))]
501    pub replication: Option<bool>,
502    #[serde(skip_serializing_if = "Option::is_none")]
503    #[builder(default, setter(into, strip_option))]
504    pub persistence: Option<String>,
505    #[serde(skip_serializing_if = "Option::is_none")]
506    #[builder(default, setter(into, strip_option))]
507    pub eviction_policy: Option<String>,
508    #[serde(skip_serializing_if = "Option::is_none")]
509    #[builder(default, setter(strip_option))]
510    pub sharding: Option<bool>,
511    #[serde(skip_serializing_if = "Option::is_none")]
512    #[builder(default, setter(strip_option))]
513    pub shards_count: Option<u32>,
514    #[serde(skip_serializing_if = "Option::is_none", alias = "shard_count")]
515    #[builder(default, setter(strip_option))]
516    pub shard_count: Option<u32>,
517    #[serde(skip_serializing_if = "Option::is_none")]
518    #[builder(default, setter(into, strip_option))]
519    pub proxy_policy: Option<String>,
520    #[serde(skip_serializing_if = "Option::is_none")]
521    #[builder(default, setter(strip_option))]
522    pub rack_aware: Option<bool>,
523    #[serde(skip_serializing_if = "Option::is_none")]
524    #[builder(default, setter(strip_option))]
525    pub module_list: Option<Vec<ModuleConfig>>,
526    #[serde(skip_serializing_if = "Option::is_none")]
527    #[builder(default, setter(strip_option))]
528    pub crdt: Option<bool>,
529    #[serde(skip_serializing_if = "Option::is_none")]
530    #[builder(default, setter(into, strip_option))]
531    pub authentication_redis_pass: Option<String>,
532}
533
534/// Database handler for executing database commands
535pub struct DatabaseHandler {
536    client: RestClient,
537}
538
539impl DatabaseHandler {
540    pub fn new(client: RestClient) -> Self {
541        DatabaseHandler { client }
542    }
543
544    /// List all databases (BDB.LIST)
545    pub async fn list(&self) -> Result<Vec<DatabaseInfo>> {
546        self.client.get("/v1/bdbs").await
547    }
548
549    /// Get specific database info (BDB.INFO)
550    pub async fn info(&self, uid: u32) -> Result<DatabaseInfo> {
551        self.client.get(&format!("/v1/bdbs/{}", uid)).await
552    }
553
554    /// Get specific database info (alias for info)
555    pub async fn get(&self, uid: u32) -> Result<DatabaseInfo> {
556        self.info(uid).await
557    }
558
559    /// Create a new database (BDB.CREATE)
560    pub async fn create(&self, request: CreateDatabaseRequest) -> Result<DatabaseInfo> {
561        self.client.post("/v1/bdbs", &request).await
562    }
563
564    /// Update database configuration (BDB.UPDATE)
565    pub async fn update(&self, uid: u32, updates: Value) -> Result<DatabaseInfo> {
566        self.client
567            .put(&format!("/v1/bdbs/{}", uid), &updates)
568            .await
569    }
570
571    /// Delete a database (BDB.DELETE)
572    pub async fn delete(&self, uid: u32) -> Result<()> {
573        self.client.delete(&format!("/v1/bdbs/{}", uid)).await
574    }
575
576    /// Get database stats (BDB.STATS)
577    pub async fn stats(&self, uid: u32) -> Result<Value> {
578        self.client.get(&format!("/v1/bdbs/stats/{}", uid)).await
579    }
580
581    /// Get database metrics (BDB.METRICS)
582    pub async fn metrics(&self, uid: u32) -> Result<Value> {
583        self.client.get(&format!("/v1/bdbs/metrics/{}", uid)).await
584    }
585
586    /// Export database (BDB.EXPORT)
587    pub async fn export(&self, uid: u32, export_location: &str) -> Result<ExportResponse> {
588        let body = serde_json::json!({
589            "export_location": export_location
590        });
591        self.client
592            .post(&format!("/v1/bdbs/{}/actions/export", uid), &body)
593            .await
594    }
595
596    /// Import database (BDB.IMPORT)
597    pub async fn import(
598        &self,
599        uid: u32,
600        import_location: &str,
601        flush: bool,
602    ) -> Result<ImportResponse> {
603        let body = serde_json::json!({
604            "import_location": import_location,
605            "flush": flush
606        });
607        self.client
608            .post(&format!("/v1/bdbs/{}/actions/import", uid), &body)
609            .await
610    }
611
612    /// Flush database (BDB.FLUSH)
613    pub async fn flush(&self, uid: u32) -> Result<DatabaseActionResponse> {
614        self.client
615            .post(
616                &format!("/v1/bdbs/{}/actions/flush", uid),
617                &serde_json::json!({}),
618            )
619            .await
620    }
621
622    /// Backup database (BDB.BACKUP)
623    pub async fn backup(&self, uid: u32) -> Result<BackupResponse> {
624        self.client
625            .post(
626                &format!("/v1/bdbs/{}/actions/backup", uid),
627                &serde_json::json!({}),
628            )
629            .await
630    }
631
632    /// Restore database from backup (BDB.RESTORE)
633    pub async fn restore(
634        &self,
635        uid: u32,
636        backup_uid: Option<&str>,
637    ) -> Result<DatabaseActionResponse> {
638        let body = if let Some(backup_id) = backup_uid {
639            serde_json::json!({ "backup_uid": backup_id })
640        } else {
641            serde_json::json!({})
642        };
643        self.client
644            .post(&format!("/v1/bdbs/{}/actions/restore", uid), &body)
645            .await
646    }
647
648    /// Get database shards (BDB.SHARDS)
649    pub async fn shards(&self, uid: u32) -> Result<Value> {
650        self.client.get(&format!("/v1/bdbs/{}/shards", uid)).await
651    }
652
653    /// Get database endpoints (BDB.ENDPOINTS)
654    pub async fn endpoints(&self, uid: u32) -> Result<Vec<EndpointInfo>> {
655        self.client
656            .get(&format!("/v1/bdbs/{}/endpoints", uid))
657            .await
658    }
659
660    /// Optimize shards placement (status) - GET
661    pub async fn optimize_shards_placement(&self, uid: u32) -> Result<Value> {
662        self.client
663            .get(&format!(
664                "/v1/bdbs/{}/actions/optimize_shards_placement",
665                uid
666            ))
667            .await
668    }
669
670    /// Recover database (status) - GET
671    pub async fn recover_status(&self, uid: u32) -> Result<Value> {
672        self.client
673            .get(&format!("/v1/bdbs/{}/actions/recover", uid))
674            .await
675    }
676
677    /// Recover database - POST
678    pub async fn recover(&self, uid: u32) -> Result<DatabaseActionResponse> {
679        self.client
680            .post(
681                &format!("/v1/bdbs/{}/actions/recover", uid),
682                &serde_json::json!({}),
683            )
684            .await
685    }
686
687    /// Resume traffic - POST
688    pub async fn resume_traffic(&self, uid: u32) -> Result<DatabaseActionResponse> {
689        self.client
690            .post(
691                &format!("/v1/bdbs/{}/actions/resume_traffic", uid),
692                &serde_json::json!({}),
693            )
694            .await
695    }
696
697    /// Stop traffic - POST
698    pub async fn stop_traffic(&self, uid: u32) -> Result<DatabaseActionResponse> {
699        self.client
700            .post(
701                &format!("/v1/bdbs/{}/actions/stop_traffic", uid),
702                &serde_json::json!({}),
703            )
704            .await
705    }
706
707    /// Rebalance database - PUT
708    pub async fn rebalance(&self, uid: u32) -> Result<DatabaseActionResponse> {
709        self.client
710            .put(
711                &format!("/v1/bdbs/{}/actions/rebalance", uid),
712                &serde_json::json!({}),
713            )
714            .await
715    }
716
717    /// Revamp database - PUT
718    pub async fn revamp(&self, uid: u32) -> Result<DatabaseActionResponse> {
719        self.client
720            .put(
721                &format!("/v1/bdbs/{}/actions/revamp", uid),
722                &serde_json::json!({}),
723            )
724            .await
725    }
726
727    /// Reset backup status - PUT
728    pub async fn backup_reset_status(&self, uid: u32) -> Result<Value> {
729        self.client
730            .put(
731                &format!("/v1/bdbs/{}/actions/backup_reset_status", uid),
732                &serde_json::json!({}),
733            )
734            .await
735    }
736
737    /// Reset export status - PUT
738    pub async fn export_reset_status(&self, uid: u32) -> Result<Value> {
739        self.client
740            .put(
741                &format!("/v1/bdbs/{}/actions/export_reset_status", uid),
742                &serde_json::json!({}),
743            )
744            .await
745    }
746
747    /// Reset import status - PUT
748    pub async fn import_reset_status(&self, uid: u32) -> Result<Value> {
749        self.client
750            .put(
751                &format!("/v1/bdbs/{}/actions/import_reset_status", uid),
752                &serde_json::json!({}),
753            )
754            .await
755    }
756
757    /// Peer stats for a database - GET
758    pub async fn peer_stats(&self, uid: u32) -> Result<Value> {
759        self.client
760            .get(&format!("/v1/bdbs/{}/peer_stats", uid))
761            .await
762    }
763
764    /// Peer stats for a specific peer - GET
765    pub async fn peer_stats_for(&self, uid: u32, peer_uid: u32) -> Result<Value> {
766        self.client
767            .get(&format!("/v1/bdbs/{}/peer_stats/{}", uid, peer_uid))
768            .await
769    }
770
771    /// Sync source stats for a database - GET
772    pub async fn sync_source_stats(&self, uid: u32) -> Result<Value> {
773        self.client
774            .get(&format!("/v1/bdbs/{}/sync_source_stats", uid))
775            .await
776    }
777
778    /// Sync source stats for a specific source - GET
779    pub async fn sync_source_stats_for(&self, uid: u32, src_uid: u32) -> Result<Value> {
780        self.client
781            .get(&format!("/v1/bdbs/{}/sync_source_stats/{}", uid, src_uid))
782            .await
783    }
784
785    /// Syncer state (all) - GET
786    pub async fn syncer_state(&self, uid: u32) -> Result<Value> {
787        self.client
788            .get(&format!("/v1/bdbs/{}/syncer_state", uid))
789            .await
790    }
791
792    /// Syncer state for CRDT - GET
793    pub async fn syncer_state_crdt(&self, uid: u32) -> Result<Value> {
794        self.client
795            .get(&format!("/v1/bdbs/{}/syncer_state/crdt", uid))
796            .await
797    }
798
799    /// Syncer state for replica - GET
800    pub async fn syncer_state_replica(&self, uid: u32) -> Result<Value> {
801        self.client
802            .get(&format!("/v1/bdbs/{}/syncer_state/replica", uid))
803            .await
804    }
805
806    /// Database passwords delete - DELETE
807    pub async fn passwords_delete(&self, uid: u32) -> Result<()> {
808        self.client
809            .delete(&format!("/v1/bdbs/{}/passwords", uid))
810            .await
811    }
812
813    /// List all database alerts - GET
814    pub async fn alerts_all(&self) -> Result<Value> {
815        self.client.get("/v1/bdbs/alerts").await
816    }
817
818    /// List alerts for a specific database - GET
819    pub async fn alerts_for(&self, uid: u32) -> Result<Value> {
820        self.client.get(&format!("/v1/bdbs/alerts/{}", uid)).await
821    }
822
823    /// Get a specific alert for a database - GET
824    pub async fn alert_detail(&self, uid: u32, alert: &str) -> Result<Value> {
825        self.client
826            .get(&format!("/v1/bdbs/alerts/{}/{}", uid, alert))
827            .await
828    }
829
830    /// CRDT source alerts - GET
831    pub async fn crdt_source_alerts_all(&self) -> Result<Value> {
832        self.client.get("/v1/bdbs/crdt_sources/alerts").await
833    }
834
835    /// CRDT source alerts for DB - GET
836    pub async fn crdt_source_alerts_for(&self, uid: u32) -> Result<Value> {
837        self.client
838            .get(&format!("/v1/bdbs/crdt_sources/alerts/{}", uid))
839            .await
840    }
841
842    /// CRDT source alerts for specific source - GET
843    pub async fn crdt_source_alerts_source(&self, uid: u32, source_id: u32) -> Result<Value> {
844        self.client
845            .get(&format!(
846                "/v1/bdbs/crdt_sources/alerts/{}/{}",
847                uid, source_id
848            ))
849            .await
850    }
851
852    /// CRDT source alert detail - GET
853    pub async fn crdt_source_alert_detail(
854        &self,
855        uid: u32,
856        source_id: u32,
857        alert: &str,
858    ) -> Result<Value> {
859        self.client
860            .get(&format!(
861                "/v1/bdbs/crdt_sources/alerts/{}/{}/{}",
862                uid, source_id, alert
863            ))
864            .await
865    }
866
867    /// Replica source alerts - GET
868    pub async fn replica_source_alerts_all(&self) -> Result<Value> {
869        self.client.get("/v1/bdbs/replica_sources/alerts").await
870    }
871
872    /// Replica source alerts for DB - GET
873    pub async fn replica_source_alerts_for(&self, uid: u32) -> Result<Value> {
874        self.client
875            .get(&format!("/v1/bdbs/replica_sources/alerts/{}", uid))
876            .await
877    }
878
879    /// Replica source alerts for specific source - GET
880    pub async fn replica_source_alerts_source(&self, uid: u32, source_id: u32) -> Result<Value> {
881        self.client
882            .get(&format!(
883                "/v1/bdbs/replica_sources/alerts/{}/{}",
884                uid, source_id
885            ))
886            .await
887    }
888
889    /// Replica source alert detail - GET
890    pub async fn replica_source_alert_detail(
891        &self,
892        uid: u32,
893        source_id: u32,
894        alert: &str,
895    ) -> Result<Value> {
896        self.client
897            .get(&format!(
898                "/v1/bdbs/replica_sources/alerts/{}/{}/{}",
899                uid, source_id, alert
900            ))
901            .await
902    }
903
904    /// Upgrade database with new module version (BDB.UPGRADE)
905    pub async fn upgrade(
906        &self,
907        uid: u32,
908        module_name: &str,
909        new_version: &str,
910    ) -> Result<DatabaseActionResponse> {
911        let body = serde_json::json!({
912            "module_name": module_name,
913            "new_version": new_version
914        });
915        self.client
916            .post(&format!("/v1/bdbs/{}/actions/upgrade", uid), &body)
917            .await
918    }
919
920    /// Upgrade database Redis version and/or modules (BDB.UPGRADE)
921    ///
922    /// # Examples
923    ///
924    /// ```no_run
925    /// # use redis_enterprise::EnterpriseClient;
926    /// # use redis_enterprise::bdb::DatabaseUpgradeRequest;
927    /// # async fn example() -> redis_enterprise::Result<()> {
928    /// let client = EnterpriseClient::builder()
929    ///     .base_url("https://localhost:9443")
930    ///     .username("admin")
931    ///     .password("password")
932    ///     .insecure(true)
933    ///     .build()?;
934    ///
935    /// // Upgrade to latest Redis version
936    /// let request = DatabaseUpgradeRequest {
937    ///     redis_version: None,  // defaults to latest
938    ///     preserve_roles: Some(true),
939    ///     ..Default::default()
940    /// };
941    /// client.databases().upgrade_redis_version(1, request).await?;
942    ///
943    /// // Upgrade to specific Redis version
944    /// let request = DatabaseUpgradeRequest {
945    ///     redis_version: Some("7.4.2".to_string()),
946    ///     preserve_roles: Some(true),
947    ///     ..Default::default()
948    /// };
949    /// client.databases().upgrade_redis_version(1, request).await?;
950    /// # Ok(())
951    /// # }
952    /// ```
953    pub async fn upgrade_redis_version(
954        &self,
955        uid: u32,
956        request: DatabaseUpgradeRequest,
957    ) -> Result<DatabaseActionResponse> {
958        self.client
959            .post(&format!("/v1/bdbs/{}/upgrade", uid), &request)
960            .await
961    }
962
963    /// Reset database password (BDB.RESET_PASSWORD)
964    pub async fn reset_password(
965        &self,
966        uid: u32,
967        new_password: &str,
968    ) -> Result<DatabaseActionResponse> {
969        let body = serde_json::json!({
970            "authentication_redis_pass": new_password
971        });
972        self.client
973            .post(&format!("/v1/bdbs/{}/actions/reset_password", uid), &body)
974            .await
975    }
976
977    /// Check database availability
978    pub async fn availability(&self, uid: u32) -> Result<Value> {
979        self.client
980            .get(&format!("/v1/bdbs/{}/availability", uid))
981            .await
982    }
983
984    /// Check local database endpoint availability
985    pub async fn endpoint_availability(&self, uid: u32) -> Result<Value> {
986        self.client
987            .get(&format!("/v1/local/bdbs/{}/endpoint/availability", uid))
988            .await
989    }
990
991    /// Create database using v2 API (supports recovery plan)
992    pub async fn create_v2(&self, request: Value) -> Result<DatabaseInfo> {
993        self.client.post("/v2/bdbs", &request).await
994    }
995
996    /// Watch database status changes in real-time
997    ///
998    /// Polls the database endpoint and yields updates when status changes occur.
999    /// Useful for monitoring database operations like upgrades, migrations, backups, etc.
1000    ///
1001    /// # Arguments
1002    /// * `uid` - Database ID to watch
1003    /// * `poll_interval` - Time to wait between polls
1004    ///
1005    /// # Returns
1006    /// A stream of `(DatabaseInfo, Option<String>)` tuples where:
1007    /// - `DatabaseInfo` - Current database state
1008    /// - `Option<String>` - Previous status (None on first poll, Some on status change)
1009    ///
1010    /// # Example
1011    /// ```no_run
1012    /// use redis_enterprise::EnterpriseClient;
1013    /// use futures::StreamExt;
1014    /// use std::time::Duration;
1015    ///
1016    /// # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
1017    /// let db_handler = client.databases();
1018    /// let mut stream = db_handler.watch_database(1, Duration::from_secs(5));
1019    ///
1020    /// while let Some(result) = stream.next().await {
1021    ///     match result {
1022    ///         Ok((db_info, prev_status)) => {
1023    ///             if let Some(old_status) = prev_status {
1024    ///                 println!("Status changed: {} -> {}", old_status, db_info.status.unwrap_or_default());
1025    ///             } else {
1026    ///                 println!("Initial status: {}", db_info.status.unwrap_or_default());
1027    ///             }
1028    ///         }
1029    ///         Err(e) => eprintln!("Error: {}", e),
1030    ///     }
1031    /// }
1032    /// # Ok(())
1033    /// # }
1034    /// ```
1035    pub fn watch_database(&self, uid: u32, poll_interval: Duration) -> DatabaseWatchStream<'_> {
1036        Box::pin(async_stream::stream! {
1037            let mut last_status: Option<String> = None;
1038
1039            loop {
1040                match self.info(uid).await {
1041                    Ok(db_info) => {
1042                        let current_status = db_info.status.clone();
1043
1044                        // Check if status changed
1045                        let status_changed = match (&last_status, &current_status) {
1046                            (Some(old), Some(new)) => old != new,
1047                            (None, Some(_)) => false, // First poll, not a change
1048                            (Some(_), None) => true,  // Status disappeared
1049                            (None, None) => false,
1050                        };
1051
1052                        // Yield the database info with previous status if changed
1053                        if status_changed {
1054                            yield Ok((db_info, last_status.clone()));
1055                        } else if last_status.is_none() {
1056                            // First poll - always yield
1057                            yield Ok((db_info, None));
1058                        } else {
1059                            // Status unchanged - yield current state for monitoring
1060                            yield Ok((db_info, None));
1061                        }
1062
1063                        last_status = current_status;
1064                    }
1065                    Err(e) => {
1066                        yield Err(e);
1067                        break;
1068                    }
1069                }
1070
1071                sleep(poll_interval).await;
1072            }
1073        })
1074    }
1075}