redis_enterprise/bdb.rs
1//! Database (BDB) management for Redis Enterprise
2//!
3//! ## Overview
4//! - Create, list, update, and delete databases
5//! - Execute database actions (backup, restore, import, export)
6//! - Monitor database status and metrics
7//! - Configure database endpoints and sharding
8//!
9//! ## Examples
10//!
11//! ### Creating a Database
12//! ```no_run
13//! use redis_enterprise::{EnterpriseClient, CreateDatabaseRequest};
14//!
15//! # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
16//! // Simple cache database
17//! let cache_db = CreateDatabaseRequest::builder()
18//! .name("my-cache")
19//! .memory_size(1_073_741_824) // 1GB
20//! .eviction_policy("allkeys-lru")
21//! .persistence("disabled")
22//! .build();
23//!
24//! let db = client.databases().create(cache_db).await?;
25//! println!("Created database with ID: {}", db.uid);
26//! # Ok(())
27//! # }
28//! ```
29//!
30//! ### Database Actions
31//! ```no_run
32//! # use redis_enterprise::EnterpriseClient;
33//! # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
34//! let db_id = 1;
35//!
36//! // Export to remote location
37//! let export = client.databases().export(db_id, "ftp://backup.site/db.rdb").await?;
38//! println!("Export initiated: {:?}", export.action_uid);
39//!
40//! // Import from backup
41//! let import = client.databases().import(db_id, "ftp://backup.site/db.rdb", true).await?;
42//! println!("Import started: {:?}", import.action_uid);
43//! # Ok(())
44//! # }
45//! ```
46//!
47//! ### Monitoring Databases
48//! ```no_run
49//! # use redis_enterprise::EnterpriseClient;
50//! # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
51//! // List all databases
52//! let databases = client.databases().list().await?;
53//! for db in databases {
54//! println!("{}: {} MB used", db.name, db.memory_used.unwrap_or(0) / 1_048_576);
55//! }
56//!
57//! // Get database endpoints
58//! let endpoints = client.databases().endpoints(1).await?;
59//! for endpoint in endpoints {
60//! println!("Endpoint: {:?}:{:?}", endpoint.dns_name, endpoint.port);
61//! }
62//! # Ok(())
63//! # }
64//! ```
65
66use crate::client::RestClient;
67use crate::error::Result;
68use futures::stream::Stream;
69use serde::{Deserialize, Serialize};
70use serde_json::Value;
71use std::pin::Pin;
72use std::time::Duration;
73use tokio::time::sleep;
74use typed_builder::TypedBuilder;
75
76// Aliases for easier use
77/// Alias for [`DatabaseInfo`]; the BDB ("Berkeley DB") naming is legacy Redis Enterprise terminology.
78pub type Database = DatabaseInfo;
79/// Alias for [`DatabaseHandler`]; retained for backwards compatibility.
80pub type BdbHandler = DatabaseHandler;
81/// Stream of database state updates yielded by the database watcher.
82pub type DatabaseWatchStream<'a> =
83 Pin<Box<dyn Stream<Item = Result<(DatabaseInfo, Option<String>)>> + Send + 'a>>;
84
85/// Response from database action operations
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct DatabaseActionResponse {
88 /// The action UID for tracking async operations
89 pub action_uid: String,
90 /// Description of the action
91 pub description: Option<String>,
92}
93
94/// Response from import operation
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct ImportResponse {
97 /// The action UID for tracking the import operation
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub action_uid: Option<String>,
100 /// Import status
101 pub status: Option<String>,
102}
103
104/// Response from export operation
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct ExportResponse {
107 /// The action UID for tracking the export operation
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub action_uid: Option<String>,
110 /// Export status
111 pub status: Option<String>,
112}
113
114/// Module information for database upgrade
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct ModuleUpgrade {
117 /// Module name
118 pub module_name: String,
119 /// Module version
120 #[serde(skip_serializing_if = "Option::is_none")]
121 pub new_version: Option<String>,
122 /// Module arguments
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub module_args: Option<String>,
125}
126
127/// Request for database upgrade operation
128///
129/// # Examples
130///
131/// ```rust,no_run
132/// use redis_enterprise::bdb::DatabaseUpgradeRequest;
133///
134/// // Upgrade to latest Redis version with role preservation
135/// let request = DatabaseUpgradeRequest::builder()
136/// .preserve_roles(true)
137/// .build();
138///
139/// // Upgrade to specific version
140/// let request = DatabaseUpgradeRequest::builder()
141/// .redis_version("7.4.2")
142/// .preserve_roles(true)
143/// .parallel_shards_upgrade(2)
144/// .build();
145/// ```
146#[derive(Debug, Clone, Serialize, Deserialize, Default, TypedBuilder)]
147pub struct DatabaseUpgradeRequest {
148 /// Target Redis version (optional, defaults to latest)
149 #[serde(skip_serializing_if = "Option::is_none")]
150 #[builder(default, setter(into, strip_option))]
151 pub redis_version: Option<String>,
152
153 /// Preserve master/replica roles (requires extra failover)
154 #[serde(skip_serializing_if = "Option::is_none")]
155 #[builder(default, setter(strip_option))]
156 pub preserve_roles: Option<bool>,
157
158 /// Restart shards even if no version change
159 #[serde(skip_serializing_if = "Option::is_none")]
160 #[builder(default, setter(strip_option))]
161 pub force_restart: Option<bool>,
162
163 /// Allow data loss in non-replicated, non-persistent databases
164 #[serde(skip_serializing_if = "Option::is_none")]
165 #[builder(default, setter(strip_option))]
166 pub may_discard_data: Option<bool>,
167
168 /// Force data discard even if replicated/persistent
169 #[serde(skip_serializing_if = "Option::is_none")]
170 #[builder(default, setter(strip_option))]
171 pub force_discard: Option<bool>,
172
173 /// Keep current CRDT protocol version
174 #[serde(skip_serializing_if = "Option::is_none")]
175 #[builder(default, setter(strip_option))]
176 pub keep_crdt_protocol_version: Option<bool>,
177
178 /// Maximum parallel shard upgrades (default: all shards)
179 #[serde(skip_serializing_if = "Option::is_none")]
180 #[builder(default, setter(strip_option))]
181 pub parallel_shards_upgrade: Option<u32>,
182
183 /// Modules to upgrade alongside Redis
184 #[serde(skip_serializing_if = "Option::is_none")]
185 #[builder(default, setter(strip_option))]
186 pub modules: Option<Vec<ModuleUpgrade>>,
187}
188
189/// Database information from the REST API - 100% field coverage (152/152 fields)
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct DatabaseInfo {
192 // Core database identification and status
193 /// Database's unique ID (read-only).
194 pub uid: u32,
195 /// Database name.
196 pub name: String,
197 /// TCP port on which the database is available (read-only).
198 pub port: Option<u16>,
199 /// Current status of the database (e.g. `"active"`, `"pending"`).
200 pub status: Option<String>,
201 /// Database memory limit in bytes (0 for unlimited).
202 pub memory_size: Option<u64>,
203 /// Current memory usage in bytes (read-only).
204 pub memory_used: Option<u64>,
205
206 /// Database type (e.g., "redis", "memcached")
207 #[serde(rename = "type")]
208 pub type_: Option<String>,
209 /// Database version (read-only).
210 pub version: Option<String>,
211
212 /// Account and action tracking
213 pub account_id: Option<u32>,
214 /// UID of the most recent action affecting this database (read-only).
215 pub action_uid: Option<String>,
216
217 // Sharding and placement
218 /// Number of database shards.
219 pub shards_count: Option<u32>,
220 /// List of shard UIDs that compose the database.
221 pub shard_list: Option<Vec<u32>>,
222 /// Whether the database is sharded.
223 pub sharding: Option<bool>,
224 /// Shard placement strategy (e.g. `"dense"`, `"sparse"`).
225 pub shards_placement: Option<String>,
226 /// Whether in-memory replication is enabled.
227 pub replication: Option<bool>,
228
229 // Endpoints and networking
230 /// Endpoints exposed by this database. See [`EndpointInfo`].
231 pub endpoints: Option<Vec<EndpointInfo>>,
232 /// Primary endpoint address (read-only).
233 pub endpoint: Option<String>,
234 /// List of endpoint IP addresses (read-only).
235 pub endpoint_ip: Option<Vec<String>>,
236 /// Node UID that currently hosts the endpoint (read-only).
237 pub endpoint_node: Option<u32>,
238 /// DNS name pointing to the master endpoint (read-only).
239 pub dns_address_master: Option<String>,
240
241 // Data persistence and backup
242 /// Persistence policy (e.g. `"disabled"`, `"aof"`, `"snapshot"`).
243 pub persistence: Option<String>,
244 /// Data persistence mode (alias for `persistence` on some API versions).
245 pub data_persistence: Option<String>,
246 /// Eviction policy when the database reaches its memory limit (e.g. `"allkeys-lru"`).
247 pub eviction_policy: Option<String>,
248
249 // Timestamps
250 /// Timestamp when the database was created (ISO-8601).
251 pub created_time: Option<String>,
252 /// Timestamp of the most recent configuration change (ISO-8601).
253 pub last_changed_time: Option<String>,
254 /// Timestamp of the most recent successful backup (ISO-8601).
255 pub last_backup_time: Option<String>,
256 /// Timestamp of the most recent successful export (ISO-8601).
257 pub last_export_time: Option<String>,
258
259 // Security and authentication
260 /// Whether weak hashing is permitted for mTLS connections.
261 pub mtls_allow_weak_hashing: Option<bool>,
262 /// Whether outdated/expired certificates are permitted for mTLS connections.
263 pub mtls_allow_outdated_certs: Option<bool>,
264 /// Redis password used for client authentication.
265 pub authentication_redis_pass: Option<String>,
266 /// Admin password used for elevated authentication.
267 pub authentication_admin_pass: Option<String>,
268 /// SASL password (Memcached databases).
269 pub authentication_sasl_pass: Option<String>,
270 /// SASL username (Memcached databases).
271 pub authentication_sasl_uname: Option<String>,
272 /// List of SSL client certificates accepted for authentication.
273 pub authentication_ssl_client_certs: Option<Vec<Value>>,
274 /// List of SSL certificates trusted for CRDT (Active-Active) connections.
275 pub authentication_ssl_crdt_certs: Option<Vec<Value>>,
276 /// List of authorized subjects for certificate-based authentication.
277 pub authorized_subjects: Option<Vec<Value>>,
278 /// Whether inter-node data encryption is enabled.
279 pub data_internode_encryption: Option<bool>,
280 /// Whether SSL/TLS is required for client connections.
281 pub ssl: Option<bool>,
282 /// TLS mode for client connections (e.g. `"enabled"`, `"disabled"`).
283 pub tls_mode: Option<String>,
284 /// Client certificate enforcement mode (e.g. `"enabled"`, `"disabled"`).
285 pub enforce_client_authentication: Option<String>,
286 /// Whether the default Redis user is enabled.
287 pub default_user: Option<bool>,
288 /// ACL configuration
289 pub acl: Option<Value>,
290 /// Client certificate subject validation type
291 pub client_cert_subject_validation_type: Option<String>,
292 /// Compare key hslot
293 pub compare_key_hslot: Option<bool>,
294 /// DNS suffixes for endpoints
295 pub dns_suffixes: Option<Vec<String>>,
296 /// Group UID for the database
297 pub group_uid: Option<u32>,
298 /// Redis cluster mode enabled
299 pub redis_cluster_enabled: Option<bool>,
300
301 // CRDT/Active-Active fields
302 /// Whether this database participates in a CRDT (Active-Active) deployment.
303 pub crdt: Option<bool>,
304 /// Whether CRDT (Active-Active) functionality is enabled.
305 pub crdt_enabled: Option<bool>,
306 /// CRDT configuration version.
307 pub crdt_config_version: Option<u32>,
308 /// Replica ID assigned to this database in the CRDT cluster.
309 pub crdt_replica_id: Option<u32>,
310 /// Comma-separated list of replica IDs that have been removed from the CRDT.
311 pub crdt_ghost_replica_ids: Option<String>,
312 /// CRDT feature-set version.
313 pub crdt_featureset_version: Option<u32>,
314 /// CRDT protocol version.
315 pub crdt_protocol_version: Option<u32>,
316 /// Globally unique identifier for the CRDT.
317 pub crdt_guid: Option<String>,
318 /// Modules configuration for the CRDT.
319 pub crdt_modules: Option<String>,
320 /// Comma-separated list of CRDT replica endpoints.
321 pub crdt_replicas: Option<String>,
322 /// List of CRDT replica sources.
323 pub crdt_sources: Option<Vec<Value>>,
324 /// CRDT sync state (e.g. `"enabled"`, `"disabled"`).
325 pub crdt_sync: Option<String>,
326 /// Seconds before a stalled CRDT sync connection raises an alarm.
327 pub crdt_sync_connection_alarm_timeout_seconds: Option<u32>,
328 /// Whether CRDT sync is distributed across shards.
329 pub crdt_sync_dist: Option<bool>,
330 /// Whether the CRDT syncer auto-unlatches on out-of-memory conditions.
331 pub crdt_syncer_auto_oom_unlatch: Option<bool>,
332 /// XADD stream ID uniqueness mode for CRDT.
333 pub crdt_xadd_id_uniqueness_mode: Option<String>,
334 /// Whether CRDT causal consistency is enabled.
335 pub crdt_causal_consistency: Option<bool>,
336 /// Replication backlog size for CRDT (bytes, or `"auto"`).
337 pub crdt_repl_backlog_size: Option<String>,
338
339 // Replication settings
340 /// Whether persistence is enabled on the master shard.
341 pub master_persistence: Option<bool>,
342 /// Whether slave (replica) high availability is enabled.
343 pub slave_ha: Option<bool>,
344 /// Priority for slave HA replica selection.
345 pub slave_ha_priority: Option<u32>,
346 /// Whether replica shards are read-only.
347 pub replica_read_only: Option<bool>,
348 /// List of upstream replica sources for Replica-Of mode.
349 pub replica_sources: Option<Vec<Value>>,
350 /// Replica sync state (e.g. `"enabled"`, `"paused"`).
351 pub replica_sync: Option<String>,
352 /// Seconds before a stalled replica sync connection raises an alarm.
353 pub replica_sync_connection_alarm_timeout_seconds: Option<u32>,
354 /// Whether replica sync is distributed across shards.
355 pub replica_sync_dist: Option<bool>,
356 /// Replication backlog size (bytes, or `"auto"`).
357 pub repl_backlog_size: Option<String>,
358
359 // Connection and performance settings
360 /// Maximum number of client connections (alias for `maxclients` on some versions).
361 pub max_connections: Option<u32>,
362 /// Maximum number of concurrent client connections.
363 pub maxclients: Option<u32>,
364 /// Number of proxy connection threads.
365 pub conns: Option<u32>,
366 /// Proxy connection type (e.g. `"per-thread"`, `"per-shard"`).
367 pub conns_type: Option<String>,
368 /// Maximum number of pipelined commands per client.
369 pub max_client_pipeline: Option<u32>,
370 /// Maximum number of pipelined commands.
371 pub max_pipelined: Option<u32>,
372
373 // AOF (Append Only File) settings
374 /// AOF (append-only file) fsync policy (e.g. `"appendfsync-every-sec"`).
375 pub aof_policy: Option<String>,
376 /// Maximum AOF file size in bytes.
377 pub max_aof_file_size: Option<u64>,
378 /// Maximum AOF load time in seconds.
379 pub max_aof_load_time: Option<u32>,
380
381 // Active defragmentation settings
382 /// Active defragmentation toggle (e.g. `"enabled"`, `"disabled"`).
383 pub activedefrag: Option<String>,
384 /// Maximum CPU percentage used by active defrag.
385 pub active_defrag_cycle_max: Option<u32>,
386 /// Minimum CPU percentage used by active defrag.
387 pub active_defrag_cycle_min: Option<u32>,
388 /// Minimum amount of fragmentation waste (bytes) before defrag starts.
389 pub active_defrag_ignore_bytes: Option<String>,
390 /// Maximum number of fields scanned per defrag cycle.
391 pub active_defrag_max_scan_fields: Option<u32>,
392 /// Lower fragmentation threshold (percent) for active defrag.
393 pub active_defrag_threshold_lower: Option<u32>,
394 /// Upper fragmentation threshold (percent) for active defrag.
395 pub active_defrag_threshold_upper: Option<u32>,
396
397 // Backup settings
398 /// Whether periodic backup is enabled.
399 pub backup: Option<bool>,
400 /// Reason for the most recent backup failure (read-only).
401 pub backup_failure_reason: Option<String>,
402 /// Number of backup snapshots retained.
403 pub backup_history: Option<u32>,
404 /// Backup interval in seconds.
405 pub backup_interval: Option<u32>,
406 /// Offset (in seconds) from midnight when scheduled backups run.
407 pub backup_interval_offset: Option<u32>,
408 /// Backup destination location (URI or storage config object).
409 pub backup_location: Option<Value>,
410 /// Percent progress (0–100) of the in-flight backup (read-only).
411 pub backup_progress: Option<f64>,
412 /// Current backup status (e.g. `"idle"`, `"running"`, `"failed"`).
413 pub backup_status: Option<String>,
414
415 // Import/Export settings
416 /// List of dataset import source descriptors.
417 pub dataset_import_sources: Option<Vec<Value>>,
418 /// Reason for the most recent import failure (read-only).
419 pub import_failure_reason: Option<String>,
420 /// Percent progress (0–100) of the in-flight import (read-only).
421 pub import_progress: Option<f64>,
422 /// Current import status (e.g. `"idle"`, `"running"`, `"failed"`).
423 pub import_status: Option<String>,
424 /// Reason for the most recent export failure (read-only).
425 pub export_failure_reason: Option<String>,
426 /// Percent progress (0–100) of the in-flight export (read-only).
427 pub export_progress: Option<f64>,
428 /// Current export status (e.g. `"idle"`, `"running"`, `"failed"`).
429 pub export_status: Option<String>,
430 /// Skip-analyze policy applied during import.
431 pub skip_import_analyze: Option<String>,
432
433 // Monitoring and metrics
434 /// Whether all metrics are exported.
435 pub metrics_export_all: Option<bool>,
436 /// Whether the text monitor log is generated for this database.
437 pub generate_text_monitor: Option<bool>,
438 /// Whether email alerts are enabled for this database.
439 pub email_alerts: Option<bool>,
440
441 // Modules and features
442 /// List of Redis modules loaded into the database.
443 pub module_list: Option<Vec<Value>>,
444 /// Search configuration - can be bool or object depending on API version
445 #[serde(default)]
446 pub search: Option<Value>,
447 /// Timeseries configuration - can be bool or object depending on API version
448 #[serde(default)]
449 pub timeseries: Option<Value>,
450
451 // BigStore/Flash storage settings
452 /// Whether Redis on Flash (BigStore) is enabled.
453 pub bigstore: Option<bool>,
454 /// RAM portion of the database (bytes) when BigStore is enabled.
455 pub bigstore_ram_size: Option<u64>,
456 /// Maximum percentage of memory used by RAM in BigStore.
457 pub bigstore_max_ram_ratio: Option<u32>,
458 /// Per-shard RAM weights for BigStore.
459 pub bigstore_ram_weights: Option<Vec<Value>>,
460 /// BigStore version in use.
461 pub bigstore_version: Option<u32>,
462
463 // Network and proxy settings
464 /// Proxy policy (e.g. `"single"`, `"all-master-shards"`, `"all-nodes"`).
465 pub proxy_policy: Option<String>,
466 /// Whether OSS-cluster API compatibility is enabled.
467 pub oss_cluster: Option<bool>,
468 /// Preferred endpoint type advertised by the OSS-cluster API (e.g. `"hostname"`, `"ip"`).
469 pub oss_cluster_api_preferred_endpoint_type: Option<String>,
470 /// Preferred IP type advertised by the OSS-cluster API (e.g. `"internal"`, `"external"`).
471 pub oss_cluster_api_preferred_ip_type: Option<String>,
472 /// Whether OSS-style hash-slot sharding is enabled.
473 pub oss_sharding: Option<bool>,
474
475 // Redis-specific settings
476 /// Redis Enterprise database server version (read-only).
477 pub redis_version: Option<String>,
478 /// Whether RESP3 protocol is enabled.
479 pub resp3: Option<bool>,
480 /// Comma-separated list of disabled Redis commands.
481 pub disabled_commands: Option<String>,
482
483 // Clustering and sharding
484 /// Hash-slot policy (e.g. `"legacy"`, `"16k"`).
485 pub hash_slots_policy: Option<String>,
486 /// List of regular expressions used to extract shard keys.
487 pub shard_key_regex: Option<Vec<Value>>,
488 /// Whether cross-slot multi-key commands are blocked.
489 pub shard_block_crossslot_keys: Option<bool>,
490 /// Whether commands referencing foreign keys are blocked.
491 pub shard_block_foreign_keys: Option<bool>,
492 /// Whether implicit shard keys are used.
493 pub implicit_shard_key: Option<bool>,
494
495 // Node placement and rack awareness
496 /// List of node UIDs to avoid when placing shards.
497 pub avoid_nodes: Option<Vec<String>>,
498 /// List of node UIDs preferred for placing shards.
499 pub use_nodes: Option<Vec<String>>,
500 /// Whether rack-aware shard placement is enabled.
501 pub rack_aware: Option<bool>,
502
503 // Operational settings
504 /// Whether automatic Redis upgrades are enabled.
505 pub auto_upgrade: Option<bool>,
506 /// Whether this is an internal/system database.
507 pub internal: Option<bool>,
508 /// Whether database connection auditing is enabled.
509 pub db_conns_auditing: Option<bool>,
510 /// Whether the replica flushes its dataset on full sync.
511 pub flush_on_fullsync: Option<bool>,
512 /// Whether selective flush is used for replica sync.
513 pub use_selective_flush: Option<bool>,
514
515 // Sync and replication control
516 /// Database sync mode.
517 pub sync: Option<String>,
518 /// List of sync sources for Replica-Of configurations.
519 pub sync_sources: Option<Vec<Value>>,
520 /// Number of dedicated threads used by the syncer.
521 pub sync_dedicated_threads: Option<u32>,
522 /// Syncer mode (e.g. `"distributed"`, `"centralized"`).
523 pub syncer_mode: Option<String>,
524 /// Log level used by the syncer.
525 pub syncer_log_level: Option<String>,
526 /// Whether the syncer supports on-the-fly reconfiguration.
527 pub support_syncer_reconf: Option<bool>,
528
529 // Gradual sync settings
530 /// Gradual-source sync mode.
531 pub gradual_src_mode: Option<String>,
532 /// Maximum number of sources synced concurrently in gradual mode.
533 pub gradual_src_max_sources: Option<u32>,
534 /// Gradual sync mode.
535 pub gradual_sync_mode: Option<String>,
536 /// Maximum number of shards synced concurrently per source in gradual mode.
537 pub gradual_sync_max_shards_per_source: Option<u32>,
538
539 // Slave and buffer settings
540 /// Replica output buffer limits.
541 pub slave_buffer: Option<String>,
542
543 // Snapshot settings
544 /// Snapshot policy entries (RDB save points).
545 pub snapshot_policy: Option<Vec<Value>>,
546
547 // Scheduling and recovery
548 /// Proxy scheduling policy (e.g. `"cmp"`, `"mnp"`, `"mru"`).
549 pub sched_policy: Option<String>,
550 /// Seconds to wait before automatic recovery (negative disables).
551 pub recovery_wait_time: Option<i32>,
552
553 // Performance and optimization
554 /// MULTI command optimization mode.
555 pub multi_commands_opt: Option<String>,
556 /// Configured ingress throughput cap.
557 pub throughput_ingress: Option<f64>,
558 /// Maximum number of keys tracked by the client-side caching tracking table.
559 pub tracking_table_max_keys: Option<u32>,
560 /// Whether the WAIT command is allowed.
561 pub wait_command: Option<bool>,
562
563 // Legacy and deprecated fields
564 /// Background operations currently running on the database (read-only).
565 pub background_op: Option<Vec<Value>>,
566
567 // Advanced configuration
568 /// Whether MKMS (multi-key multi-slot) commands are allowed.
569 pub mkms: Option<bool>,
570 /// Role-based permissions for the database.
571 pub roles_permissions: Option<Vec<Value>>,
572 /// Free-form tags attached to the database.
573 pub tags: Option<Vec<String>>,
574 /// Current topology epoch counter (read-only).
575 pub topology_epoch: Option<u32>,
576}
577
578/// Database endpoint information
579#[derive(Debug, Clone, Serialize, Deserialize)]
580pub struct EndpointInfo {
581 /// Unique identifier for the endpoint
582 pub uid: Option<String>,
583 /// List of IP addresses for the endpoint
584 pub addr: Option<Vec<String>>,
585 /// Port number for the endpoint
586 pub port: Option<u16>,
587 /// DNS name for the endpoint
588 pub dns_name: Option<String>,
589 /// Proxy policy for the endpoint
590 pub proxy_policy: Option<String>,
591 /// Address type (e.g., "internal", "external")
592 pub addr_type: Option<String>,
593 /// OSS cluster API preferred IP type
594 pub oss_cluster_api_preferred_ip_type: Option<String>,
595 /// List of proxy UIDs to exclude
596 pub exclude_proxies: Option<Vec<u32>>,
597 /// List of proxy UIDs to include
598 pub include_proxies: Option<Vec<u32>>,
599}
600
601/// Module configuration for database creation
602#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
603pub struct ModuleConfig {
604 /// Module name to load (e.g. `"search"`, `"timeseries"`, `"bf"`).
605 #[builder(setter(into))]
606 pub module_name: String,
607 /// Arguments passed to the module when it loads.
608 #[serde(skip_serializing_if = "Option::is_none")]
609 #[builder(default, setter(into, strip_option))]
610 pub module_args: Option<String>,
611}
612
613/// Create database request
614///
615/// # Examples
616///
617/// ```rust,no_run
618/// use redis_enterprise::{CreateDatabaseRequest, ModuleConfig};
619///
620/// let request = CreateDatabaseRequest::builder()
621/// .name("my-database")
622/// .memory_size(1024 * 1024 * 1024) // 1GB
623/// .port(12000)
624/// .replication(true)
625/// .persistence("aof")
626/// .eviction_policy("volatile-lru")
627/// .shards_count(2)
628/// .authentication_redis_pass("secure-password")
629/// .build();
630/// ```
631#[derive(Debug, Serialize, Deserialize, TypedBuilder)]
632pub struct CreateDatabaseRequest {
633 /// Database name.
634 #[builder(setter(into))]
635 pub name: String,
636 /// Database memory limit in bytes (0 for unlimited).
637 #[serde(skip_serializing_if = "Option::is_none")]
638 #[builder(default, setter(strip_option))]
639 pub memory_size: Option<u64>,
640 /// TCP port on which the database is available (read-only).
641 #[serde(skip_serializing_if = "Option::is_none")]
642 #[builder(default, setter(strip_option))]
643 pub port: Option<u16>,
644 /// Whether in-memory replication is enabled.
645 #[serde(skip_serializing_if = "Option::is_none")]
646 #[builder(default, setter(strip_option))]
647 pub replication: Option<bool>,
648 /// Persistence policy (e.g. `"disabled"`, `"aof"`, `"snapshot"`).
649 #[serde(skip_serializing_if = "Option::is_none")]
650 #[builder(default, setter(into, strip_option))]
651 pub persistence: Option<String>,
652 /// Eviction policy when the database reaches its memory limit (e.g. `"allkeys-lru"`).
653 #[serde(skip_serializing_if = "Option::is_none")]
654 #[builder(default, setter(into, strip_option))]
655 pub eviction_policy: Option<String>,
656 /// Whether the database is sharded.
657 #[serde(skip_serializing_if = "Option::is_none")]
658 #[builder(default, setter(strip_option))]
659 pub sharding: Option<bool>,
660 /// Number of database shards.
661 #[serde(skip_serializing_if = "Option::is_none")]
662 #[builder(default, setter(strip_option))]
663 pub shards_count: Option<u32>,
664 /// Shard count.
665 #[serde(skip_serializing_if = "Option::is_none", alias = "shard_count")]
666 #[builder(default, setter(strip_option))]
667 pub shard_count: Option<u32>,
668 /// Proxy policy (e.g. `"single"`, `"all-master-shards"`, `"all-nodes"`).
669 #[serde(skip_serializing_if = "Option::is_none")]
670 #[builder(default, setter(into, strip_option))]
671 pub proxy_policy: Option<String>,
672 /// Whether rack-aware shard placement is enabled.
673 #[serde(skip_serializing_if = "Option::is_none")]
674 #[builder(default, setter(strip_option))]
675 pub rack_aware: Option<bool>,
676 /// List of Redis modules loaded into the database.
677 #[serde(skip_serializing_if = "Option::is_none")]
678 #[builder(default, setter(strip_option))]
679 pub module_list: Option<Vec<ModuleConfig>>,
680 /// Whether this database participates in a CRDT (Active-Active) deployment.
681 #[serde(skip_serializing_if = "Option::is_none")]
682 #[builder(default, setter(strip_option))]
683 pub crdt: Option<bool>,
684 /// Redis password used for client authentication.
685 #[serde(skip_serializing_if = "Option::is_none")]
686 #[builder(default, setter(into, strip_option))]
687 pub authentication_redis_pass: Option<String>,
688}
689
690/// Database handler for executing database commands
691pub struct DatabaseHandler {
692 client: RestClient,
693}
694
695impl DatabaseHandler {
696 /// New.
697 pub fn new(client: RestClient) -> Self {
698 DatabaseHandler { client }
699 }
700
701 /// List all databases (BDB.LIST)
702 pub async fn list(&self) -> Result<Vec<DatabaseInfo>> {
703 self.client.get("/v1/bdbs").await
704 }
705
706 /// Get specific database info (BDB.INFO)
707 pub async fn info(&self, uid: u32) -> Result<DatabaseInfo> {
708 self.client.get(&format!("/v1/bdbs/{}", uid)).await
709 }
710
711 /// Get specific database info (alias for info)
712 pub async fn get(&self, uid: u32) -> Result<DatabaseInfo> {
713 self.info(uid).await
714 }
715
716 /// Create a new database (BDB.CREATE)
717 pub async fn create(&self, request: CreateDatabaseRequest) -> Result<DatabaseInfo> {
718 self.client.post("/v1/bdbs", &request).await
719 }
720
721 /// Update database configuration (BDB.UPDATE)
722 pub async fn update(&self, uid: u32, updates: Value) -> Result<DatabaseInfo> {
723 self.client
724 .put(&format!("/v1/bdbs/{}", uid), &updates)
725 .await
726 }
727
728 /// Delete a database (BDB.DELETE)
729 pub async fn delete(&self, uid: u32) -> Result<()> {
730 self.client.delete(&format!("/v1/bdbs/{}", uid)).await
731 }
732
733 /// Get database stats (BDB.STATS)
734 pub async fn stats(&self, uid: u32) -> Result<Value> {
735 self.client.get(&format!("/v1/bdbs/stats/{}", uid)).await
736 }
737
738 /// Get database metrics (BDB.METRICS)
739 pub async fn metrics(&self, uid: u32) -> Result<Value> {
740 self.client.get(&format!("/v1/bdbs/metrics/{}", uid)).await
741 }
742
743 /// Export database (BDB.EXPORT)
744 pub async fn export(&self, uid: u32, export_location: &str) -> Result<ExportResponse> {
745 let body = serde_json::json!({
746 "export_location": export_location
747 });
748 self.client
749 .post(&format!("/v1/bdbs/{}/actions/export", uid), &body)
750 .await
751 }
752
753 /// Import database (BDB.IMPORT)
754 pub async fn import(
755 &self,
756 uid: u32,
757 import_location: &str,
758 flush: bool,
759 ) -> Result<ImportResponse> {
760 let body = serde_json::json!({
761 "import_location": import_location,
762 "flush": flush
763 });
764 self.client
765 .post(&format!("/v1/bdbs/{}/actions/import", uid), &body)
766 .await
767 }
768
769 /// Flush all keys from a database.
770 ///
771 /// `PUT /v1/bdbs/{uid}/flush`. This is the path-segment-style action
772 /// the REST API documents; the previous implementation POSTed to
773 /// `/v1/bdbs/{uid}/actions/flush`, which is not in the spec.
774 pub async fn flush(&self, uid: u32) -> Result<DatabaseActionResponse> {
775 let response = self
776 .client
777 .put_raw(&format!("/v1/bdbs/{}/flush", uid), serde_json::json!({}))
778 .await?;
779 serde_json::from_value(response).map_err(Into::into)
780 }
781
782 /// Get database shards (BDB.SHARDS)
783 pub async fn shards(&self, uid: u32) -> Result<Value> {
784 self.client.get(&format!("/v1/bdbs/{}/shards", uid)).await
785 }
786
787 /// Get database endpoints (BDB.ENDPOINTS)
788 pub async fn endpoints(&self, uid: u32) -> Result<Vec<EndpointInfo>> {
789 self.client
790 .get(&format!("/v1/bdbs/{}/endpoints", uid))
791 .await
792 }
793
794 /// Optimize shards placement (status) - GET
795 pub async fn optimize_shards_placement(&self, uid: u32) -> Result<Value> {
796 self.client
797 .get(&format!(
798 "/v1/bdbs/{}/actions/optimize_shards_placement",
799 uid
800 ))
801 .await
802 }
803
804 /// Recover database (status) - GET
805 pub async fn recover_status(&self, uid: u32) -> Result<Value> {
806 self.client
807 .get(&format!("/v1/bdbs/{}/actions/recover", uid))
808 .await
809 }
810
811 /// Recover database - POST
812 pub async fn recover(&self, uid: u32) -> Result<DatabaseActionResponse> {
813 self.client
814 .post(
815 &format!("/v1/bdbs/{}/actions/recover", uid),
816 &serde_json::json!({}),
817 )
818 .await
819 }
820
821 /// Resume traffic - POST
822 pub async fn resume_traffic(&self, uid: u32) -> Result<DatabaseActionResponse> {
823 self.client
824 .post(
825 &format!("/v1/bdbs/{}/actions/resume_traffic", uid),
826 &serde_json::json!({}),
827 )
828 .await
829 }
830
831 /// Stop traffic - POST
832 pub async fn stop_traffic(&self, uid: u32) -> Result<DatabaseActionResponse> {
833 self.client
834 .post(
835 &format!("/v1/bdbs/{}/actions/stop_traffic", uid),
836 &serde_json::json!({}),
837 )
838 .await
839 }
840
841 /// Rebalance database - PUT
842 pub async fn rebalance(&self, uid: u32) -> Result<DatabaseActionResponse> {
843 self.client
844 .put(
845 &format!("/v1/bdbs/{}/actions/rebalance", uid),
846 &serde_json::json!({}),
847 )
848 .await
849 }
850
851 /// Revamp database - PUT
852 pub async fn revamp(&self, uid: u32) -> Result<DatabaseActionResponse> {
853 self.client
854 .put(
855 &format!("/v1/bdbs/{}/actions/revamp", uid),
856 &serde_json::json!({}),
857 )
858 .await
859 }
860
861 /// Reset backup status - PUT
862 pub async fn backup_reset_status(&self, uid: u32) -> Result<Value> {
863 self.client
864 .put(
865 &format!("/v1/bdbs/{}/actions/backup_reset_status", uid),
866 &serde_json::json!({}),
867 )
868 .await
869 }
870
871 /// Reset export status - PUT
872 pub async fn export_reset_status(&self, uid: u32) -> Result<Value> {
873 self.client
874 .put(
875 &format!("/v1/bdbs/{}/actions/export_reset_status", uid),
876 &serde_json::json!({}),
877 )
878 .await
879 }
880
881 /// Reset import status - PUT
882 pub async fn import_reset_status(&self, uid: u32) -> Result<Value> {
883 self.client
884 .put(
885 &format!("/v1/bdbs/{}/actions/import_reset_status", uid),
886 &serde_json::json!({}),
887 )
888 .await
889 }
890
891 /// Peer stats for a database - GET
892 pub async fn peer_stats(&self, uid: u32) -> Result<Value> {
893 self.client
894 .get(&format!("/v1/bdbs/{}/peer_stats", uid))
895 .await
896 }
897
898 /// Peer stats for a specific peer - GET
899 pub async fn peer_stats_for(&self, uid: u32, peer_uid: u32) -> Result<Value> {
900 self.client
901 .get(&format!("/v1/bdbs/{}/peer_stats/{}", uid, peer_uid))
902 .await
903 }
904
905 /// Sync source stats for a database - GET
906 pub async fn sync_source_stats(&self, uid: u32) -> Result<Value> {
907 self.client
908 .get(&format!("/v1/bdbs/{}/sync_source_stats", uid))
909 .await
910 }
911
912 /// Sync source stats for a specific source - GET
913 pub async fn sync_source_stats_for(&self, uid: u32, src_uid: u32) -> Result<Value> {
914 self.client
915 .get(&format!("/v1/bdbs/{}/sync_source_stats/{}", uid, src_uid))
916 .await
917 }
918
919 /// Syncer state (all) - GET
920 pub async fn syncer_state(&self, uid: u32) -> Result<Value> {
921 self.client
922 .get(&format!("/v1/bdbs/{}/syncer_state", uid))
923 .await
924 }
925
926 /// Syncer state for CRDT - GET
927 pub async fn syncer_state_crdt(&self, uid: u32) -> Result<Value> {
928 self.client
929 .get(&format!("/v1/bdbs/{}/syncer_state/crdt", uid))
930 .await
931 }
932
933 /// Syncer state for replica - GET
934 pub async fn syncer_state_replica(&self, uid: u32) -> Result<Value> {
935 self.client
936 .get(&format!("/v1/bdbs/{}/syncer_state/replica", uid))
937 .await
938 }
939
940 /// Database passwords delete - DELETE
941 pub async fn passwords_delete(&self, uid: u32) -> Result<()> {
942 self.client
943 .delete(&format!("/v1/bdbs/{}/passwords", uid))
944 .await
945 }
946
947 /// List all database alerts - GET
948 pub async fn alerts_all(&self) -> Result<Value> {
949 self.client.get("/v1/bdbs/alerts").await
950 }
951
952 /// List alerts for a specific database - GET
953 pub async fn alerts_for(&self, uid: u32) -> Result<Value> {
954 self.client.get(&format!("/v1/bdbs/alerts/{}", uid)).await
955 }
956
957 /// Get a specific alert for a database - GET
958 pub async fn alert_detail(&self, uid: u32, alert: &str) -> Result<Value> {
959 self.client
960 .get(&format!("/v1/bdbs/alerts/{}/{}", uid, alert))
961 .await
962 }
963
964 /// CRDT source alerts - GET
965 pub async fn crdt_source_alerts_all(&self) -> Result<Value> {
966 self.client.get("/v1/bdbs/crdt_sources/alerts").await
967 }
968
969 /// CRDT source alerts for DB - GET
970 pub async fn crdt_source_alerts_for(&self, uid: u32) -> Result<Value> {
971 self.client
972 .get(&format!("/v1/bdbs/crdt_sources/alerts/{}", uid))
973 .await
974 }
975
976 /// CRDT source alerts for specific source - GET
977 pub async fn crdt_source_alerts_source(&self, uid: u32, source_id: u32) -> Result<Value> {
978 self.client
979 .get(&format!(
980 "/v1/bdbs/crdt_sources/alerts/{}/{}",
981 uid, source_id
982 ))
983 .await
984 }
985
986 /// CRDT source alert detail - GET
987 pub async fn crdt_source_alert_detail(
988 &self,
989 uid: u32,
990 source_id: u32,
991 alert: &str,
992 ) -> Result<Value> {
993 self.client
994 .get(&format!(
995 "/v1/bdbs/crdt_sources/alerts/{}/{}/{}",
996 uid, source_id, alert
997 ))
998 .await
999 }
1000
1001 /// Replica source alerts - GET
1002 pub async fn replica_source_alerts_all(&self) -> Result<Value> {
1003 self.client.get("/v1/bdbs/replica_sources/alerts").await
1004 }
1005
1006 /// Replica source alerts for DB - GET
1007 pub async fn replica_source_alerts_for(&self, uid: u32) -> Result<Value> {
1008 self.client
1009 .get(&format!("/v1/bdbs/replica_sources/alerts/{}", uid))
1010 .await
1011 }
1012
1013 /// Replica source alerts for specific source - GET
1014 pub async fn replica_source_alerts_source(&self, uid: u32, source_id: u32) -> Result<Value> {
1015 self.client
1016 .get(&format!(
1017 "/v1/bdbs/replica_sources/alerts/{}/{}",
1018 uid, source_id
1019 ))
1020 .await
1021 }
1022
1023 /// Replica source alert detail - GET
1024 pub async fn replica_source_alert_detail(
1025 &self,
1026 uid: u32,
1027 source_id: u32,
1028 alert: &str,
1029 ) -> Result<Value> {
1030 self.client
1031 .get(&format!(
1032 "/v1/bdbs/replica_sources/alerts/{}/{}/{}",
1033 uid, source_id, alert
1034 ))
1035 .await
1036 }
1037
1038 /// Upgrade database Redis version and/or modules (BDB.UPGRADE)
1039 ///
1040 /// # Examples
1041 ///
1042 /// ```no_run
1043 /// # use redis_enterprise::EnterpriseClient;
1044 /// # use redis_enterprise::bdb::DatabaseUpgradeRequest;
1045 /// # async fn example() -> redis_enterprise::Result<()> {
1046 /// let client = EnterpriseClient::builder()
1047 /// .base_url("https://localhost:9443")
1048 /// .username("admin")
1049 /// .password("password")
1050 /// .insecure(true)
1051 /// .build()?;
1052 ///
1053 /// // Upgrade to latest Redis version
1054 /// let request = DatabaseUpgradeRequest {
1055 /// redis_version: None, // defaults to latest
1056 /// preserve_roles: Some(true),
1057 /// ..Default::default()
1058 /// };
1059 /// client.databases().upgrade_redis_version(1, request).await?;
1060 ///
1061 /// // Upgrade to specific Redis version
1062 /// let request = DatabaseUpgradeRequest {
1063 /// redis_version: Some("7.4.2".to_string()),
1064 /// preserve_roles: Some(true),
1065 /// ..Default::default()
1066 /// };
1067 /// client.databases().upgrade_redis_version(1, request).await?;
1068 /// # Ok(())
1069 /// # }
1070 /// ```
1071 pub async fn upgrade_redis_version(
1072 &self,
1073 uid: u32,
1074 request: DatabaseUpgradeRequest,
1075 ) -> Result<DatabaseActionResponse> {
1076 self.client
1077 .post(&format!("/v1/bdbs/{}/upgrade", uid), &request)
1078 .await
1079 }
1080
1081 /// Reset the database admin password.
1082 ///
1083 /// `PUT /v1/bdbs/{uid}/reset_admin_pass`. This is the path-segment-style
1084 /// action the REST API documents. The new password is sent in the body
1085 /// under the `authentication_redis_pass` key.
1086 pub async fn reset_admin_pass(
1087 &self,
1088 uid: u32,
1089 new_password: &str,
1090 ) -> Result<DatabaseActionResponse> {
1091 let body = serde_json::json!({
1092 "authentication_redis_pass": new_password
1093 });
1094 let response = self
1095 .client
1096 .put_raw(&format!("/v1/bdbs/{}/reset_admin_pass", uid), body)
1097 .await?;
1098 serde_json::from_value(response).map_err(Into::into)
1099 }
1100
1101 /// Reset database password.
1102 ///
1103 /// Deprecated alias for [`Self::reset_admin_pass`]. The previous
1104 /// implementation POSTed to `/v1/bdbs/{uid}/actions/reset_password`,
1105 /// which is not in the REST API spec.
1106 #[deprecated(
1107 since = "0.9.0",
1108 note = "use `reset_admin_pass`; the action POST path was not in the REST API spec"
1109 )]
1110 pub async fn reset_password(
1111 &self,
1112 uid: u32,
1113 new_password: &str,
1114 ) -> Result<DatabaseActionResponse> {
1115 self.reset_admin_pass(uid, new_password).await
1116 }
1117
1118 /// Check database availability
1119 pub async fn availability(&self, uid: u32) -> Result<Value> {
1120 self.client
1121 .get(&format!("/v1/bdbs/{}/availability", uid))
1122 .await
1123 }
1124
1125 /// Check local database endpoint availability
1126 pub async fn endpoint_availability(&self, uid: u32) -> Result<Value> {
1127 self.client
1128 .get(&format!("/v1/local/bdbs/{}/endpoint/availability", uid))
1129 .await
1130 }
1131
1132 /// Create database using v2 API (supports recovery plan)
1133 pub async fn create_v2(&self, request: Value) -> Result<DatabaseInfo> {
1134 self.client.post("/v2/bdbs", &request).await
1135 }
1136
1137 /// Watch database status changes in real-time
1138 ///
1139 /// Polls the database endpoint and yields updates when status changes occur.
1140 /// Useful for monitoring database operations like upgrades, migrations, backups, etc.
1141 ///
1142 /// # Arguments
1143 /// * `uid` - Database ID to watch
1144 /// * `poll_interval` - Time to wait between polls
1145 ///
1146 /// # Returns
1147 /// A stream of `(DatabaseInfo, Option<String>)` tuples where:
1148 /// - `DatabaseInfo` - Current database state
1149 /// - `Option<String>` - Previous status (None on first poll, Some on status change)
1150 ///
1151 /// # Example
1152 /// ```no_run
1153 /// use redis_enterprise::EnterpriseClient;
1154 /// use futures::StreamExt;
1155 /// use std::time::Duration;
1156 ///
1157 /// # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
1158 /// let db_handler = client.databases();
1159 /// let mut stream = db_handler.watch_database(1, Duration::from_secs(5));
1160 ///
1161 /// while let Some(result) = stream.next().await {
1162 /// match result {
1163 /// Ok((db_info, prev_status)) => {
1164 /// if let Some(old_status) = prev_status {
1165 /// println!("Status changed: {} -> {}", old_status, db_info.status.unwrap_or_default());
1166 /// } else {
1167 /// println!("Initial status: {}", db_info.status.unwrap_or_default());
1168 /// }
1169 /// }
1170 /// Err(e) => eprintln!("Error: {}", e),
1171 /// }
1172 /// }
1173 /// # Ok(())
1174 /// # }
1175 /// ```
1176 pub fn watch_database(&self, uid: u32, poll_interval: Duration) -> DatabaseWatchStream<'_> {
1177 Box::pin(async_stream::stream! {
1178 let mut last_status: Option<String> = None;
1179
1180 loop {
1181 match self.info(uid).await {
1182 Ok(db_info) => {
1183 let current_status = db_info.status.clone();
1184
1185 // Check if status changed
1186 let status_changed = match (&last_status, ¤t_status) {
1187 (Some(old), Some(new)) => old != new,
1188 (None, Some(_)) => false, // First poll, not a change
1189 (Some(_), None) => true, // Status disappeared
1190 (None, None) => false,
1191 };
1192
1193 // Yield the database info with previous status if changed
1194 if status_changed {
1195 yield Ok((db_info, last_status.clone()));
1196 } else if last_status.is_none() {
1197 // First poll - always yield
1198 yield Ok((db_info, None));
1199 } else {
1200 // Status unchanged - yield current state for monitoring
1201 yield Ok((db_info, None));
1202 }
1203
1204 last_status = current_status;
1205 }
1206 Err(e) => {
1207 yield Err(e);
1208 break;
1209 }
1210 }
1211
1212 sleep(poll_interval).await;
1213 }
1214 })
1215 }
1216}