redis_enterprise/
cluster.rs

1//! Cluster management for Redis Enterprise
2//!
3//! ## Overview
4//! - Query cluster info, settings, topology, and license
5//! - Manage nodes (join, remove, maintenance mode)
6//! - Configure cluster policies and services
7//! - Handle certificates and LDAP configuration
8//!
9//! ## Examples
10//!
11//! ### Getting Cluster Information
12//! ```no_run
13//! use redis_enterprise::{EnterpriseClient, ClusterHandler};
14//!
15//! # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
16//! let cluster = ClusterHandler::new(client);
17//!
18//! // Get basic cluster info
19//! let info = cluster.info().await?;
20//! println!("Cluster: {} ({})", info.name, info.version.unwrap_or_default());
21//!
22//! // Check license status
23//! let license = cluster.license().await?;
24//! println!("Licensed shards: {:?}", license.shards_limit);
25//! # Ok(())
26//! # }
27//! ```
28//!
29//! ### Node Management
30//! ```no_run
31//! # use redis_enterprise::{EnterpriseClient, ClusterHandler};
32//! # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
33//! let cluster = ClusterHandler::new(client);
34//!
35//! // Join a new node to the cluster
36//! let result = cluster.join_node(
37//!     "192.168.1.100",
38//!     "admin",
39//!     "password"
40//! ).await?;
41//! println!("Node joined: {:?}", result);
42//!
43//! // Remove a node
44//! let action = cluster.remove_node(3).await?;
45//! println!("Removal started: {:?}", action);
46//! # Ok(())
47//! # }
48//! ```
49
50use crate::client::RestClient;
51use crate::error::Result;
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54use typed_builder::TypedBuilder;
55
56/// Response from cluster action operations
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct ClusterActionResponse {
59    /// The action UID for tracking async operations
60    pub action_uid: String,
61    /// Description of the action
62    pub description: Option<String>,
63    /// Additional fields from the response
64    #[serde(flatten)]
65    pub extra: Value,
66}
67
68/// Node information
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct ClusterNode {
71    pub id: u32,
72    pub address: String,
73    pub status: String,
74    pub role: Option<String>,
75    pub total_memory: Option<u64>,
76    pub used_memory: Option<u64>,
77    pub cpu_cores: Option<u32>,
78
79    #[serde(flatten)]
80    pub extra: Value,
81}
82
83/// Cluster information from the REST API
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct ClusterInfo {
86    /// Cluster unique ID (read-only)
87    pub uid: Option<u32>,
88
89    /// Cluster's fully qualified domain name (read-only)
90    pub name: String,
91
92    /// Cluster creation date (read-only)
93    pub created: Option<String>,
94
95    /// Last changed time (read-only)
96    pub last_changed_time: Option<String>,
97
98    /// Software version
99    pub version: Option<String>,
100
101    /// License expiration status
102    pub license_expired: Option<bool>,
103
104    /// List of node UIDs in the cluster
105    pub nodes: Option<Vec<u32>>,
106
107    /// List of database UIDs in the cluster
108    pub databases: Option<Vec<u32>>,
109
110    /// Cluster status
111    pub status: Option<String>,
112
113    /// Enables/disables node/cluster email alerts
114    pub email_alerts: Option<bool>,
115
116    /// Indicates if cluster operates in rack-aware mode
117    pub rack_aware: Option<bool>,
118
119    /// Storage engine for Auto Tiering ('speedb' or 'rocksdb')
120    pub bigstore_driver: Option<String>,
121
122    /// API HTTP listening port (range: 1024-65535)
123    pub cnm_http_port: Option<u16>,
124
125    /// API HTTPS listening port (range: 1024-65535)
126    pub cnm_https_port: Option<u16>,
127
128    // Stats
129    /// Total memory available in the cluster
130    pub total_memory: Option<u64>,
131
132    /// Total memory used in the cluster
133    pub used_memory: Option<u64>,
134
135    /// Total number of shards in the cluster
136    pub total_shards: Option<u32>,
137
138    // Additional fields from API audit
139    /// Alert settings configuration for cluster and nodes
140    pub alert_settings: Option<Value>,
141
142    /// Whether cluster changes are currently blocked (maintenance mode)
143    pub block_cluster_changes: Option<bool>,
144
145    /// Whether CCS (Cluster Configuration Store) internode encryption is enabled
146    pub ccs_internode_encryption: Option<bool>,
147
148    /// Internal port used by the cluster API
149    pub cluster_api_internal_port: Option<u32>,
150
151    /// SSH public key for cluster authentication
152    pub cluster_ssh_public_key: Option<String>,
153
154    /// Port used by Cluster Manager (CM)
155    pub cm_port: Option<u32>,
156
157    /// Version of the Cluster Manager server
158    pub cm_server_version: Option<u32>,
159
160    /// Session timeout for Cluster Manager in minutes
161    pub cm_session_timeout_minutes: Option<u32>,
162
163    /// Maximum threads per worker for CNM HTTP server
164    pub cnm_http_max_threads_per_worker: Option<u32>,
165
166    /// Number of workers for CNM HTTP server
167    pub cnm_http_workers: Option<u32>,
168
169    /// Cipher suites for control plane TLS connections
170    pub control_cipher_suites: Option<String>,
171
172    /// Cipher suites for control plane TLS 1.3 connections
173    pub control_cipher_suites_tls_1_3: Option<String>,
174
175    /// Whether CRDB coordinator should ignore incoming requests
176    pub crdb_coordinator_ignore_requests: Option<bool>,
177
178    /// Port used by CRDB (Conflict-free Replicated Database) coordinator
179    pub crdb_coordinator_port: Option<u32>,
180
181    /// Supported CRDT featureset version number
182    pub crdt_supported_featureset_version: Option<u32>,
183
184    /// List of supported CRDT protocol versions
185    pub crdt_supported_protocol_versions: Option<Vec<String>>,
186
187    /// Timestamp when the cluster was created
188    pub created_time: Option<String>,
189
190    /// Cipher list for data plane connections
191    pub data_cipher_list: Option<String>,
192
193    /// Cipher suites for data plane TLS 1.3 connections
194    pub data_cipher_suites_tls_1_3: Option<Vec<Value>>,
195
196    /// Path to debug information files
197    pub debuginfo_path: Option<String>,
198
199    /// Whether private keys should be encrypted
200    pub encrypt_pkeys: Option<bool>,
201
202    /// Time-to-live for Entra ID (Azure AD) cache in seconds
203    pub entra_id_cache_ttl: Option<u32>,
204
205    /// Admin port for Envoy proxy
206    pub envoy_admin_port: Option<u32>,
207
208    /// Whether Envoy external authorization is enabled
209    pub envoy_external_authorization: Option<bool>,
210
211    /// Maximum number of downstream connections for Envoy
212    pub envoy_max_downstream_connections: Option<u32>,
213
214    /// Port for Envoy management server
215    pub envoy_mgmt_server_port: Option<u32>,
216
217    /// Admin port for gossip Envoy proxy
218    pub gossip_envoy_admin_port: Option<u32>,
219
220    /// Whether to handle metrics endpoint redirects
221    pub handle_metrics_redirects: Option<bool>,
222
223    /// Whether to handle HTTP redirects
224    pub handle_redirects: Option<bool>,
225
226    /// Whether HTTP support is enabled (in addition to HTTPS)
227    pub http_support: Option<bool>,
228
229    /// Configuration for log rotation
230    pub logrotate_settings: Option<Value>,
231
232    /// Whether to mask database credentials in logs
233    pub mask_bdb_credentials: Option<bool>,
234
235    /// Type of metrics system in use
236    pub metrics_system: Option<u32>,
237
238    /// Minimum TLS version for control plane connections
239    #[serde(rename = "min_control_TLS_version")]
240    pub min_control_tls_version: Option<String>,
241
242    /// Minimum TLS version for data plane connections
243    #[serde(rename = "min_data_TLS_version")]
244    pub min_data_tls_version: Option<String>,
245
246    /// Minimum TLS version for sentinel connections
247    #[serde(rename = "min_sentinel_TLS_version")]
248    pub min_sentinel_tls_version: Option<String>,
249
250    /// Maximum size allowed for module uploads in megabytes
251    pub module_upload_max_size_mb: Option<u32>,
252
253    /// List of authorized subject names for mutual TLS authentication
254    pub mtls_authorized_subjects: Option<Vec<String>>,
255
256    /// Certificate authentication mode for mutual TLS
257    pub mtls_certificate_authentication: Option<bool>,
258
259    /// Validation type for MTLS client certificate subjects
260    pub mtls_client_cert_subject_validation_type: Option<String>,
261
262    /// Optimization level for multi-command operations
263    pub multi_commands_opt: Option<String>,
264
265    /// Whether HTTP OPTIONS method is forbidden
266    pub options_method_forbidden: Option<bool>,
267
268    /// Requirements for password complexity
269    pub password_complexity: Option<bool>,
270
271    /// Duration in seconds before passwords expire
272    pub password_expiration_duration: Option<u32>,
273
274    /// Algorithm used for hashing passwords
275    pub password_hashing_algorithm: Option<String>,
276
277    /// Minimum required length for passwords
278    pub password_min_length: Option<u32>,
279
280    /// Certificate used by proxy servers
281    pub proxy_certificate: Option<String>,
282
283    /// List of ports reserved for system use
284    pub reserved_ports: Option<Vec<u32>>,
285
286    /// Whether robust CRDT syncer mode is enabled
287    pub robust_crdt_syncer: Option<bool>,
288
289    /// Whether to verify S3 certificates
290    pub s3_certificate_verification: Option<bool>,
291
292    /// Cipher suites for sentinel TLS connections
293    pub sentinel_cipher_suites: Option<Vec<String>>,
294
295    /// Cipher suites for sentinel TLS 1.3 connections
296    pub sentinel_cipher_suites_tls_1_3: Option<String>,
297
298    /// TLS mode for sentinel connections
299    pub sentinel_tls_mode: Option<String>,
300
301    /// Whether slave high availability is enabled
302    pub slave_ha: Option<bool>,
303
304    /// Cooldown period for database slave HA in seconds
305    pub slave_ha_bdb_cooldown_period: Option<u32>,
306
307    /// General cooldown period for slave HA in seconds
308    pub slave_ha_cooldown_period: Option<u32>,
309
310    /// Grace period for slave HA operations in seconds
311    pub slave_ha_grace_period: Option<u32>,
312
313    /// Whether slowlog sanitization is supported
314    pub slowlog_in_sanitized_support: Option<bool>,
315
316    /// TLS mode for SMTP connections
317    pub smtp_tls_mode: Option<String>,
318
319    /// Whether to use TLS for SMTP connections
320    pub smtp_use_tls: Option<bool>,
321
322    /// Certificate used by syncer processes
323    pub syncer_certificate: Option<String>,
324
325    /// Ports reserved for system processes
326    pub system_reserved_ports: Option<Vec<u32>>,
327
328    /// Whether a cluster upgrade is currently in progress
329    pub upgrade_in_progress: Option<bool>,
330
331    /// Current upgrade mode for the cluster
332    pub upgrade_mode: Option<bool>,
333
334    /// Use external IPv6
335    pub use_external_ipv6: Option<bool>,
336
337    /// Use IPv6
338    pub use_ipv6: Option<bool>,
339
340    /// Wait command support
341    pub wait_command: Option<bool>,
342
343    #[serde(flatten)]
344    pub extra: Value,
345}
346
347/// Cluster-wide settings configuration (57 fields)
348#[derive(Debug, Clone, Serialize, Deserialize)]
349pub struct ClusterSettings {
350    /// Automatic recovery on shard failure
351    pub auto_recovery: Option<bool>,
352
353    /// Automatic migration of shards from overbooked nodes
354    pub automatic_node_offload: Option<bool>,
355
356    /// BigStore migration thresholds
357    pub bigstore_migrate_node_threshold: Option<u32>,
358    pub bigstore_migrate_node_threshold_p: Option<u32>,
359    pub bigstore_provision_node_threshold: Option<u32>,
360    pub bigstore_provision_node_threshold_p: Option<u32>,
361
362    /// Default BigStore version
363    pub default_bigstore_version: Option<u32>,
364
365    /// Data internode encryption
366    pub data_internode_encryption: Option<bool>,
367
368    /// Database connections auditing
369    pub db_conns_auditing: Option<bool>,
370
371    /// Default concurrent restore actions
372    pub default_concurrent_restore_actions: Option<u32>,
373
374    /// Default fork evict RAM
375    pub default_fork_evict_ram: Option<bool>,
376
377    /// Default proxy policies
378    pub default_non_sharded_proxy_policy: Option<String>,
379    pub default_sharded_proxy_policy: Option<String>,
380
381    /// OSS cluster defaults
382    pub default_oss_cluster: Option<bool>,
383    pub default_oss_sharding: Option<bool>,
384
385    /// Default Redis version for new databases
386    pub default_provisioned_redis_version: Option<String>,
387
388    /// Recovery settings
389    pub default_recovery_wait_time: Option<u32>,
390
391    /// Shards placement strategy
392    pub default_shards_placement: Option<String>,
393
394    /// Tracking table settings
395    pub default_tracking_table_max_keys_policy: Option<String>,
396
397    /// Additional cluster-wide settings
398    pub email_alerts: Option<bool>,
399    pub endpoint_rebind_enabled: Option<bool>,
400    pub failure_detection_sensitivity: Option<String>,
401    pub gossip_envoy_admin_port: Option<u32>,
402    pub gossip_envoy_port: Option<u32>,
403    pub gossip_envoy_proxy_mode: Option<bool>,
404    pub hot_spare: Option<bool>,
405    pub max_saved_events_per_type: Option<u32>,
406    pub max_simultaneous_backups: Option<u32>,
407    pub parallel_shards_upgrade: Option<u32>,
408    pub persistent_node_removal: Option<bool>,
409    pub rack_aware: Option<bool>,
410    pub redis_migrate_node_threshold: Option<String>,
411    pub redis_migrate_node_threshold_p: Option<u32>,
412    pub redis_provision_node_threshold: Option<String>,
413    pub redis_provision_node_threshold_p: Option<u32>,
414    pub redis_upgrade_policy: Option<String>,
415    pub resp3_default: Option<bool>,
416    pub show_internals: Option<bool>,
417    pub slave_threads_when_master: Option<bool>,
418    pub use_empty_shard_backups: Option<bool>,
419
420    #[serde(flatten)]
421    pub extra: Value,
422}
423
424/// Bootstrap request for creating a new cluster
425#[derive(Debug, Serialize, TypedBuilder)]
426pub struct BootstrapRequest {
427    #[builder(setter(into))]
428    pub action: String,
429    pub cluster: ClusterBootstrapInfo,
430    pub credentials: BootstrapCredentials,
431}
432
433/// Cluster information for bootstrap
434#[derive(Debug, Serialize, TypedBuilder)]
435pub struct ClusterBootstrapInfo {
436    #[builder(setter(into))]
437    pub name: String,
438}
439
440/// Credentials for bootstrap
441#[derive(Debug, Serialize, TypedBuilder)]
442pub struct BootstrapCredentials {
443    #[builder(setter(into))]
444    pub username: String,
445    #[builder(setter(into))]
446    pub password: String,
447}
448
449/// Cluster handler for executing cluster commands
450pub struct ClusterHandler {
451    client: RestClient,
452}
453
454impl ClusterHandler {
455    pub fn new(client: RestClient) -> Self {
456        ClusterHandler { client }
457    }
458
459    /// Get cluster information (CLUSTER.INFO)
460    pub async fn info(&self) -> Result<ClusterInfo> {
461        self.client.get("/v1/cluster").await
462    }
463
464    /// Bootstrap a new cluster (CLUSTER.BOOTSTRAP)
465    pub async fn bootstrap(&self, request: BootstrapRequest) -> Result<Value> {
466        // The bootstrap endpoint returns empty response on success
467        // Note: Despite docs saying /v1/bootstrap, the actual endpoint is /v1/bootstrap/create_cluster
468        self.client
469            .post_bootstrap("/v1/bootstrap/create_cluster", &request)
470            .await
471    }
472
473    /// Update cluster configuration (CLUSTER.UPDATE)
474    pub async fn update(&self, updates: Value) -> Result<Value> {
475        self.client.put("/v1/cluster", &updates).await
476    }
477
478    /// Get cluster stats (CLUSTER.STATS)
479    pub async fn stats(&self) -> Result<Value> {
480        self.client.get("/v1/cluster/stats").await
481    }
482
483    /// Get cluster nodes (CLUSTER.NODES)
484    pub async fn nodes(&self) -> Result<Vec<NodeInfo>> {
485        self.client.get("/v1/nodes").await
486    }
487
488    /// Get cluster license (CLUSTER.LICENSE)
489    pub async fn license(&self) -> Result<LicenseInfo> {
490        self.client.get("/v1/license").await
491    }
492
493    /// Join node to cluster (CLUSTER.JOIN)
494    pub async fn join_node(
495        &self,
496        node_address: &str,
497        username: &str,
498        password: &str,
499    ) -> Result<Value> {
500        let body = serde_json::json!({
501            "action": "join_cluster",
502            "cluster": {
503                "nodes": [node_address]
504            },
505            "credentials": {
506                "username": username,
507                "password": password
508            }
509        });
510        self.client.post("/v1/bootstrap/join", &body).await
511    }
512
513    /// Remove node from cluster (CLUSTER.REMOVE_NODE)
514    pub async fn remove_node(&self, node_uid: u32) -> Result<Value> {
515        self.client
516            .delete(&format!("/v1/nodes/{}", node_uid))
517            .await?;
518        Ok(serde_json::json!({"message": format!("Node {} removed", node_uid)}))
519    }
520
521    /// Reset cluster to factory defaults (CLUSTER.RESET) - DANGEROUS
522    pub async fn reset(&self) -> Result<ClusterActionResponse> {
523        self.client
524            .post("/v1/cluster/actions/reset", &serde_json::json!({}))
525            .await
526    }
527
528    // raw variant removed: use reset()
529
530    /// Recover cluster from failure (CLUSTER.RECOVER)
531    pub async fn recover(&self) -> Result<ClusterActionResponse> {
532        self.client
533            .post("/v1/cluster/actions/recover", &serde_json::json!({}))
534            .await
535    }
536
537    // raw variant removed: use recover()
538
539    /// Get cluster settings (CLUSTER.SETTINGS)
540    pub async fn settings(&self) -> Result<Value> {
541        self.client.get("/v1/cluster/settings").await
542    }
543
544    /// Get cluster topology (CLUSTER.TOPOLOGY)
545    pub async fn topology(&self) -> Result<Value> {
546        self.client.get("/v1/cluster/topology").await
547    }
548
549    /// List available cluster actions - GET /v1/cluster/actions
550    pub async fn actions(&self) -> Result<Value> {
551        self.client.get("/v1/cluster/actions").await
552    }
553
554    /// Get a specific cluster action details - GET /v1/cluster/actions/{action}
555    pub async fn action_detail(&self, action: &str) -> Result<Value> {
556        self.client
557            .get(&format!("/v1/cluster/actions/{}", action))
558            .await
559    }
560
561    /// Execute a specific cluster action - POST /v1/cluster/actions/{action}
562    pub async fn action_execute(&self, action: &str, body: Value) -> Result<Value> {
563        self.client
564            .post(&format!("/v1/cluster/actions/{}", action), &body)
565            .await
566    }
567
568    /// Delete a specific cluster action - DELETE /v1/cluster/actions/{action}
569    pub async fn action_delete(&self, action: &str) -> Result<()> {
570        self.client
571            .delete(&format!("/v1/cluster/actions/{}", action))
572            .await
573    }
574
575    /// Get auditing DB connections - GET /v1/cluster/auditing/db_conns
576    pub async fn auditing_db_conns(&self) -> Result<Value> {
577        self.client.get("/v1/cluster/auditing/db_conns").await
578    }
579
580    /// Update auditing DB connections - PUT /v1/cluster/auditing/db_conns
581    pub async fn auditing_db_conns_update(&self, cfg: Value) -> Result<Value> {
582        self.client.put("/v1/cluster/auditing/db_conns", &cfg).await
583    }
584
585    /// Delete auditing DB connections - DELETE /v1/cluster/auditing/db_conns
586    pub async fn auditing_db_conns_delete(&self) -> Result<()> {
587        self.client.delete("/v1/cluster/auditing/db_conns").await
588    }
589
590    /// List cluster certificates - GET /v1/cluster/certificates
591    pub async fn certificates(&self) -> Result<Value> {
592        self.client.get("/v1/cluster/certificates").await
593    }
594
595    /// Delete a certificate - DELETE /v1/cluster/certificates/{uid}
596    pub async fn certificate_delete(&self, uid: u32) -> Result<()> {
597        self.client
598            .delete(&format!("/v1/cluster/certificates/{}", uid))
599            .await
600    }
601
602    /// Rotate certificates - POST /v1/cluster/certificates/rotate
603    pub async fn certificates_rotate(&self) -> Result<Value> {
604        self.client
605            .post("/v1/cluster/certificates/rotate", &serde_json::json!({}))
606            .await
607    }
608
609    /// Update certificate bundle - PUT /v1/cluster/update_cert
610    pub async fn update_cert(&self, body: Value) -> Result<Value> {
611        self.client.put("/v1/cluster/update_cert", &body).await
612    }
613
614    /// Delete LDAP configuration - DELETE /v1/cluster/ldap
615    pub async fn ldap_delete(&self) -> Result<()> {
616        self.client.delete("/v1/cluster/ldap").await
617    }
618
619    /// Get cluster module capabilities - GET /v1/cluster/module-capabilities
620    pub async fn module_capabilities(&self) -> Result<Value> {
621        self.client.get("/v1/cluster/module-capabilities").await
622    }
623
624    /// Get cluster policy - GET /v1/cluster/policy
625    pub async fn policy(&self) -> Result<Value> {
626        self.client.get("/v1/cluster/policy").await
627    }
628
629    /// Update cluster policy - PUT /v1/cluster/policy
630    pub async fn policy_update(&self, policy: Value) -> Result<Value> {
631        self.client.put("/v1/cluster/policy", &policy).await
632    }
633
634    /// Restore default cluster policy - PUT /v1/cluster/policy/restore_default
635    pub async fn policy_restore_default(&self) -> Result<Value> {
636        self.client
637            .put("/v1/cluster/policy/restore_default", &serde_json::json!({}))
638            .await
639    }
640
641    /// Get services configuration - GET /v1/cluster/services_configuration
642    pub async fn services_configuration(&self) -> Result<Value> {
643        self.client.get("/v1/cluster/services_configuration").await
644    }
645
646    /// Update services configuration - PUT /v1/cluster/services_configuration
647    pub async fn services_configuration_update(&self, cfg: Value) -> Result<Value> {
648        self.client
649            .put("/v1/cluster/services_configuration", &cfg)
650            .await
651    }
652
653    /// Get witness disk info - GET /v1/cluster/witness_disk
654    pub async fn witness_disk(&self) -> Result<Value> {
655        self.client.get("/v1/cluster/witness_disk").await
656    }
657
658    /// Get specific cluster alert detail - GET /v1/cluster/alerts/{alert}
659    pub async fn alert_detail(&self, alert: &str) -> Result<Value> {
660        self.client
661            .get(&format!("/v1/cluster/alerts/{}", alert))
662            .await
663    }
664}
665
666/// Node information
667#[derive(Debug, Clone, Serialize, Deserialize)]
668pub struct NodeInfo {
669    pub uid: u32,
670    pub address: String,
671    pub status: String,
672    pub role: Option<String>,
673    pub shards: Option<Vec<u32>>,
674    pub total_memory: Option<u64>,
675    pub used_memory: Option<u64>,
676
677    #[serde(flatten)]
678    pub extra: Value,
679}
680
681/// License information
682#[derive(Debug, Clone, Serialize, Deserialize)]
683pub struct LicenseInfo {
684    pub license_type: Option<String>,
685    pub expired: Option<bool>,
686    pub expiration_date: Option<String>,
687    pub shards_limit: Option<u32>,
688    pub features: Option<Vec<String>>,
689
690    #[serde(flatten)]
691    pub extra: Value,
692}