use crate::client::RestClient;
use crate::error::Result;
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::pin::Pin;
use std::time::Duration;
use tokio::time::sleep;
use typed_builder::TypedBuilder;
pub type Database = DatabaseInfo;
pub type BdbHandler = DatabaseHandler;
pub type DatabaseWatchStream<'a> =
Pin<Box<dyn Stream<Item = Result<(DatabaseInfo, Option<String>)>> + Send + 'a>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseActionResponse {
pub action_uid: String,
pub description: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupResponse {
#[serde(skip_serializing_if = "Option::is_none")]
pub action_uid: Option<String>,
pub backup_uid: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImportResponse {
#[serde(skip_serializing_if = "Option::is_none")]
pub action_uid: Option<String>,
pub status: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportResponse {
#[serde(skip_serializing_if = "Option::is_none")]
pub action_uid: Option<String>,
pub status: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModuleUpgrade {
pub module_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub new_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub module_args: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, TypedBuilder)]
pub struct DatabaseUpgradeRequest {
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(into, strip_option))]
pub redis_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub preserve_roles: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub force_restart: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub may_discard_data: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub force_discard: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub keep_crdt_protocol_version: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub parallel_shards_upgrade: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub modules: Option<Vec<ModuleUpgrade>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseInfo {
pub uid: u32,
pub name: String,
pub port: Option<u16>,
pub status: Option<String>,
pub memory_size: Option<u64>,
pub memory_used: Option<u64>,
#[serde(rename = "type")]
pub type_: Option<String>,
pub version: Option<String>,
pub account_id: Option<u32>,
pub action_uid: Option<String>,
pub shards_count: Option<u32>,
pub shard_list: Option<Vec<u32>>,
pub sharding: Option<bool>,
pub shards_placement: Option<String>,
pub replication: Option<bool>,
pub endpoints: Option<Vec<EndpointInfo>>,
pub endpoint: Option<String>,
pub endpoint_ip: Option<Vec<String>>,
pub endpoint_node: Option<u32>,
pub dns_address_master: Option<String>,
pub persistence: Option<String>,
pub data_persistence: Option<String>,
pub eviction_policy: Option<String>,
pub created_time: Option<String>,
pub last_changed_time: Option<String>,
pub last_backup_time: Option<String>,
pub last_export_time: Option<String>,
pub mtls_allow_weak_hashing: Option<bool>,
pub mtls_allow_outdated_certs: Option<bool>,
pub authentication_redis_pass: Option<String>,
pub authentication_admin_pass: Option<String>,
pub authentication_sasl_pass: Option<String>,
pub authentication_sasl_uname: Option<String>,
pub authentication_ssl_client_certs: Option<Vec<Value>>,
pub authentication_ssl_crdt_certs: Option<Vec<Value>>,
pub authorized_subjects: Option<Vec<Value>>,
pub data_internode_encryption: Option<bool>,
pub ssl: Option<bool>,
pub tls_mode: Option<String>,
pub enforce_client_authentication: Option<String>,
pub default_user: Option<bool>,
pub acl: Option<Value>,
pub client_cert_subject_validation_type: Option<String>,
pub compare_key_hslot: Option<bool>,
pub dns_suffixes: Option<Vec<String>>,
pub group_uid: Option<u32>,
pub redis_cluster_enabled: Option<bool>,
pub crdt: Option<bool>,
pub crdt_enabled: Option<bool>,
pub crdt_config_version: Option<u32>,
pub crdt_replica_id: Option<u32>,
pub crdt_ghost_replica_ids: Option<String>,
pub crdt_featureset_version: Option<u32>,
pub crdt_protocol_version: Option<u32>,
pub crdt_guid: Option<String>,
pub crdt_modules: Option<String>,
pub crdt_replicas: Option<String>,
pub crdt_sources: Option<Vec<Value>>,
pub crdt_sync: Option<String>,
pub crdt_sync_connection_alarm_timeout_seconds: Option<u32>,
pub crdt_sync_dist: Option<bool>,
pub crdt_syncer_auto_oom_unlatch: Option<bool>,
pub crdt_xadd_id_uniqueness_mode: Option<String>,
pub crdt_causal_consistency: Option<bool>,
pub crdt_repl_backlog_size: Option<String>,
pub master_persistence: Option<bool>,
pub slave_ha: Option<bool>,
pub slave_ha_priority: Option<u32>,
pub replica_read_only: Option<bool>,
pub replica_sources: Option<Vec<Value>>,
pub replica_sync: Option<String>,
pub replica_sync_connection_alarm_timeout_seconds: Option<u32>,
pub replica_sync_dist: Option<bool>,
pub repl_backlog_size: Option<String>,
pub max_connections: Option<u32>,
pub maxclients: Option<u32>,
pub conns: Option<u32>,
pub conns_type: Option<String>,
pub max_client_pipeline: Option<u32>,
pub max_pipelined: Option<u32>,
pub aof_policy: Option<String>,
pub max_aof_file_size: Option<u64>,
pub max_aof_load_time: Option<u32>,
pub activedefrag: Option<String>,
pub active_defrag_cycle_max: Option<u32>,
pub active_defrag_cycle_min: Option<u32>,
pub active_defrag_ignore_bytes: Option<String>,
pub active_defrag_max_scan_fields: Option<u32>,
pub active_defrag_threshold_lower: Option<u32>,
pub active_defrag_threshold_upper: Option<u32>,
pub backup: Option<bool>,
pub backup_failure_reason: Option<String>,
pub backup_history: Option<u32>,
pub backup_interval: Option<u32>,
pub backup_interval_offset: Option<u32>,
pub backup_location: Option<Value>,
pub backup_progress: Option<f64>,
pub backup_status: Option<String>,
pub dataset_import_sources: Option<Vec<Value>>,
pub import_failure_reason: Option<String>,
pub import_progress: Option<f64>,
pub import_status: Option<String>,
pub export_failure_reason: Option<String>,
pub export_progress: Option<f64>,
pub export_status: Option<String>,
pub skip_import_analyze: Option<String>,
pub metrics_export_all: Option<bool>,
pub generate_text_monitor: Option<bool>,
pub email_alerts: Option<bool>,
pub module_list: Option<Vec<Value>>,
#[serde(default)]
pub search: Option<Value>,
#[serde(default)]
pub timeseries: Option<Value>,
pub bigstore: Option<bool>,
pub bigstore_ram_size: Option<u64>,
pub bigstore_max_ram_ratio: Option<u32>,
pub bigstore_ram_weights: Option<Vec<Value>>,
pub bigstore_version: Option<u32>,
pub proxy_policy: Option<String>,
pub oss_cluster: Option<bool>,
pub oss_cluster_api_preferred_endpoint_type: Option<String>,
pub oss_cluster_api_preferred_ip_type: Option<String>,
pub oss_sharding: Option<bool>,
pub redis_version: Option<String>,
pub resp3: Option<bool>,
pub disabled_commands: Option<String>,
pub hash_slots_policy: Option<String>,
pub shard_key_regex: Option<Vec<Value>>,
pub shard_block_crossslot_keys: Option<bool>,
pub shard_block_foreign_keys: Option<bool>,
pub implicit_shard_key: Option<bool>,
pub avoid_nodes: Option<Vec<String>>,
pub use_nodes: Option<Vec<String>>,
pub rack_aware: Option<bool>,
pub auto_upgrade: Option<bool>,
pub internal: Option<bool>,
pub db_conns_auditing: Option<bool>,
pub flush_on_fullsync: Option<bool>,
pub use_selective_flush: Option<bool>,
pub sync: Option<String>,
pub sync_sources: Option<Vec<Value>>,
pub sync_dedicated_threads: Option<u32>,
pub syncer_mode: Option<String>,
pub syncer_log_level: Option<String>,
pub support_syncer_reconf: Option<bool>,
pub gradual_src_mode: Option<String>,
pub gradual_src_max_sources: Option<u32>,
pub gradual_sync_mode: Option<String>,
pub gradual_sync_max_shards_per_source: Option<u32>,
pub slave_buffer: Option<String>,
pub snapshot_policy: Option<Vec<Value>>,
pub sched_policy: Option<String>,
pub recovery_wait_time: Option<i32>,
pub multi_commands_opt: Option<String>,
pub throughput_ingress: Option<f64>,
pub tracking_table_max_keys: Option<u32>,
pub wait_command: Option<bool>,
pub background_op: Option<Vec<Value>>,
pub mkms: Option<bool>,
pub roles_permissions: Option<Vec<Value>>,
pub tags: Option<Vec<String>>,
pub topology_epoch: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EndpointInfo {
pub uid: Option<String>,
pub addr: Option<Vec<String>>,
pub port: Option<u16>,
pub dns_name: Option<String>,
pub proxy_policy: Option<String>,
pub addr_type: Option<String>,
pub oss_cluster_api_preferred_ip_type: Option<String>,
pub exclude_proxies: Option<Vec<u32>>,
pub include_proxies: Option<Vec<u32>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
pub struct ModuleConfig {
#[builder(setter(into))]
pub module_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(into, strip_option))]
pub module_args: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, TypedBuilder)]
pub struct CreateDatabaseRequest {
#[builder(setter(into))]
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub memory_size: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub port: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub replication: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(into, strip_option))]
pub persistence: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(into, strip_option))]
pub eviction_policy: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub sharding: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub shards_count: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none", alias = "shard_count")]
#[builder(default, setter(strip_option))]
pub shard_count: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(into, strip_option))]
pub proxy_policy: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub rack_aware: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub module_list: Option<Vec<ModuleConfig>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub crdt: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default, setter(into, strip_option))]
pub authentication_redis_pass: Option<String>,
}
pub struct DatabaseHandler {
client: RestClient,
}
impl DatabaseHandler {
pub fn new(client: RestClient) -> Self {
DatabaseHandler { client }
}
pub async fn list(&self) -> Result<Vec<DatabaseInfo>> {
self.client.get("/v1/bdbs").await
}
pub async fn info(&self, uid: u32) -> Result<DatabaseInfo> {
self.client.get(&format!("/v1/bdbs/{}", uid)).await
}
pub async fn get(&self, uid: u32) -> Result<DatabaseInfo> {
self.info(uid).await
}
pub async fn create(&self, request: CreateDatabaseRequest) -> Result<DatabaseInfo> {
self.client.post("/v1/bdbs", &request).await
}
pub async fn update(&self, uid: u32, updates: Value) -> Result<DatabaseInfo> {
self.client
.put(&format!("/v1/bdbs/{}", uid), &updates)
.await
}
pub async fn delete(&self, uid: u32) -> Result<()> {
self.client.delete(&format!("/v1/bdbs/{}", uid)).await
}
pub async fn stats(&self, uid: u32) -> Result<Value> {
self.client.get(&format!("/v1/bdbs/stats/{}", uid)).await
}
pub async fn metrics(&self, uid: u32) -> Result<Value> {
self.client.get(&format!("/v1/bdbs/metrics/{}", uid)).await
}
pub async fn export(&self, uid: u32, export_location: &str) -> Result<ExportResponse> {
let body = serde_json::json!({
"export_location": export_location
});
self.client
.post(&format!("/v1/bdbs/{}/actions/export", uid), &body)
.await
}
pub async fn import(
&self,
uid: u32,
import_location: &str,
flush: bool,
) -> Result<ImportResponse> {
let body = serde_json::json!({
"import_location": import_location,
"flush": flush
});
self.client
.post(&format!("/v1/bdbs/{}/actions/import", uid), &body)
.await
}
pub async fn flush(&self, uid: u32) -> Result<DatabaseActionResponse> {
self.client
.post(
&format!("/v1/bdbs/{}/actions/flush", uid),
&serde_json::json!({}),
)
.await
}
pub async fn backup(&self, uid: u32) -> Result<BackupResponse> {
self.client
.post(
&format!("/v1/bdbs/{}/actions/backup", uid),
&serde_json::json!({}),
)
.await
}
pub async fn restore(
&self,
uid: u32,
backup_uid: Option<&str>,
) -> Result<DatabaseActionResponse> {
let body = if let Some(backup_id) = backup_uid {
serde_json::json!({ "backup_uid": backup_id })
} else {
serde_json::json!({})
};
self.client
.post(&format!("/v1/bdbs/{}/actions/restore", uid), &body)
.await
}
pub async fn shards(&self, uid: u32) -> Result<Value> {
self.client.get(&format!("/v1/bdbs/{}/shards", uid)).await
}
pub async fn endpoints(&self, uid: u32) -> Result<Vec<EndpointInfo>> {
self.client
.get(&format!("/v1/bdbs/{}/endpoints", uid))
.await
}
pub async fn optimize_shards_placement(&self, uid: u32) -> Result<Value> {
self.client
.get(&format!(
"/v1/bdbs/{}/actions/optimize_shards_placement",
uid
))
.await
}
pub async fn recover_status(&self, uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/bdbs/{}/actions/recover", uid))
.await
}
pub async fn recover(&self, uid: u32) -> Result<DatabaseActionResponse> {
self.client
.post(
&format!("/v1/bdbs/{}/actions/recover", uid),
&serde_json::json!({}),
)
.await
}
pub async fn resume_traffic(&self, uid: u32) -> Result<DatabaseActionResponse> {
self.client
.post(
&format!("/v1/bdbs/{}/actions/resume_traffic", uid),
&serde_json::json!({}),
)
.await
}
pub async fn stop_traffic(&self, uid: u32) -> Result<DatabaseActionResponse> {
self.client
.post(
&format!("/v1/bdbs/{}/actions/stop_traffic", uid),
&serde_json::json!({}),
)
.await
}
pub async fn rebalance(&self, uid: u32) -> Result<DatabaseActionResponse> {
self.client
.put(
&format!("/v1/bdbs/{}/actions/rebalance", uid),
&serde_json::json!({}),
)
.await
}
pub async fn revamp(&self, uid: u32) -> Result<DatabaseActionResponse> {
self.client
.put(
&format!("/v1/bdbs/{}/actions/revamp", uid),
&serde_json::json!({}),
)
.await
}
pub async fn backup_reset_status(&self, uid: u32) -> Result<Value> {
self.client
.put(
&format!("/v1/bdbs/{}/actions/backup_reset_status", uid),
&serde_json::json!({}),
)
.await
}
pub async fn export_reset_status(&self, uid: u32) -> Result<Value> {
self.client
.put(
&format!("/v1/bdbs/{}/actions/export_reset_status", uid),
&serde_json::json!({}),
)
.await
}
pub async fn import_reset_status(&self, uid: u32) -> Result<Value> {
self.client
.put(
&format!("/v1/bdbs/{}/actions/import_reset_status", uid),
&serde_json::json!({}),
)
.await
}
pub async fn peer_stats(&self, uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/bdbs/{}/peer_stats", uid))
.await
}
pub async fn peer_stats_for(&self, uid: u32, peer_uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/bdbs/{}/peer_stats/{}", uid, peer_uid))
.await
}
pub async fn sync_source_stats(&self, uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/bdbs/{}/sync_source_stats", uid))
.await
}
pub async fn sync_source_stats_for(&self, uid: u32, src_uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/bdbs/{}/sync_source_stats/{}", uid, src_uid))
.await
}
pub async fn syncer_state(&self, uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/bdbs/{}/syncer_state", uid))
.await
}
pub async fn syncer_state_crdt(&self, uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/bdbs/{}/syncer_state/crdt", uid))
.await
}
pub async fn syncer_state_replica(&self, uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/bdbs/{}/syncer_state/replica", uid))
.await
}
pub async fn passwords_delete(&self, uid: u32) -> Result<()> {
self.client
.delete(&format!("/v1/bdbs/{}/passwords", uid))
.await
}
pub async fn alerts_all(&self) -> Result<Value> {
self.client.get("/v1/bdbs/alerts").await
}
pub async fn alerts_for(&self, uid: u32) -> Result<Value> {
self.client.get(&format!("/v1/bdbs/alerts/{}", uid)).await
}
pub async fn alert_detail(&self, uid: u32, alert: &str) -> Result<Value> {
self.client
.get(&format!("/v1/bdbs/alerts/{}/{}", uid, alert))
.await
}
pub async fn crdt_source_alerts_all(&self) -> Result<Value> {
self.client.get("/v1/bdbs/crdt_sources/alerts").await
}
pub async fn crdt_source_alerts_for(&self, uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/bdbs/crdt_sources/alerts/{}", uid))
.await
}
pub async fn crdt_source_alerts_source(&self, uid: u32, source_id: u32) -> Result<Value> {
self.client
.get(&format!(
"/v1/bdbs/crdt_sources/alerts/{}/{}",
uid, source_id
))
.await
}
pub async fn crdt_source_alert_detail(
&self,
uid: u32,
source_id: u32,
alert: &str,
) -> Result<Value> {
self.client
.get(&format!(
"/v1/bdbs/crdt_sources/alerts/{}/{}/{}",
uid, source_id, alert
))
.await
}
pub async fn replica_source_alerts_all(&self) -> Result<Value> {
self.client.get("/v1/bdbs/replica_sources/alerts").await
}
pub async fn replica_source_alerts_for(&self, uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/bdbs/replica_sources/alerts/{}", uid))
.await
}
pub async fn replica_source_alerts_source(&self, uid: u32, source_id: u32) -> Result<Value> {
self.client
.get(&format!(
"/v1/bdbs/replica_sources/alerts/{}/{}",
uid, source_id
))
.await
}
pub async fn replica_source_alert_detail(
&self,
uid: u32,
source_id: u32,
alert: &str,
) -> Result<Value> {
self.client
.get(&format!(
"/v1/bdbs/replica_sources/alerts/{}/{}/{}",
uid, source_id, alert
))
.await
}
pub async fn upgrade(
&self,
uid: u32,
module_name: &str,
new_version: &str,
) -> Result<DatabaseActionResponse> {
let body = serde_json::json!({
"module_name": module_name,
"new_version": new_version
});
self.client
.post(&format!("/v1/bdbs/{}/actions/upgrade", uid), &body)
.await
}
pub async fn upgrade_redis_version(
&self,
uid: u32,
request: DatabaseUpgradeRequest,
) -> Result<DatabaseActionResponse> {
self.client
.post(&format!("/v1/bdbs/{}/upgrade", uid), &request)
.await
}
pub async fn reset_password(
&self,
uid: u32,
new_password: &str,
) -> Result<DatabaseActionResponse> {
let body = serde_json::json!({
"authentication_redis_pass": new_password
});
self.client
.post(&format!("/v1/bdbs/{}/actions/reset_password", uid), &body)
.await
}
pub async fn availability(&self, uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/bdbs/{}/availability", uid))
.await
}
pub async fn endpoint_availability(&self, uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/local/bdbs/{}/endpoint/availability", uid))
.await
}
pub async fn create_v2(&self, request: Value) -> Result<DatabaseInfo> {
self.client.post("/v2/bdbs", &request).await
}
pub fn watch_database(&self, uid: u32, poll_interval: Duration) -> DatabaseWatchStream<'_> {
Box::pin(async_stream::stream! {
let mut last_status: Option<String> = None;
loop {
match self.info(uid).await {
Ok(db_info) => {
let current_status = db_info.status.clone();
let status_changed = match (&last_status, ¤t_status) {
(Some(old), Some(new)) => old != new,
(None, Some(_)) => false, (Some(_), None) => true, (None, None) => false,
};
if status_changed {
yield Ok((db_info, last_status.clone()));
} else if last_status.is_none() {
yield Ok((db_info, None));
} else {
yield Ok((db_info, None));
}
last_status = current_status;
}
Err(e) => {
yield Err(e);
break;
}
}
sleep(poll_interval).await;
}
})
}
}