1use crate::client::RestClient;
71use crate::error::Result;
72use futures::stream::Stream;
73use serde::{Deserialize, Serialize};
74use serde_json::Value;
75use std::pin::Pin;
76use std::time::Duration;
77use tokio::time::sleep;
78use typed_builder::TypedBuilder;
79
80pub type Database = DatabaseInfo;
82pub type BdbHandler = DatabaseHandler;
83pub type DatabaseWatchStream<'a> =
84 Pin<Box<dyn Stream<Item = Result<(DatabaseInfo, Option<String>)>> + Send + 'a>>;
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct DatabaseActionResponse {
89 pub action_uid: String,
91 pub description: Option<String>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct BackupResponse {
98 #[serde(skip_serializing_if = "Option::is_none")]
100 pub action_uid: Option<String>,
101 pub backup_uid: Option<String>,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ImportResponse {
108 #[serde(skip_serializing_if = "Option::is_none")]
110 pub action_uid: Option<String>,
111 pub status: Option<String>,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct ExportResponse {
118 #[serde(skip_serializing_if = "Option::is_none")]
120 pub action_uid: Option<String>,
121 pub status: Option<String>,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct ModuleUpgrade {
128 pub module_name: String,
130 #[serde(skip_serializing_if = "Option::is_none")]
132 pub new_version: Option<String>,
133 #[serde(skip_serializing_if = "Option::is_none")]
135 pub module_args: Option<String>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize, Default, TypedBuilder)]
158pub struct DatabaseUpgradeRequest {
159 #[serde(skip_serializing_if = "Option::is_none")]
161 #[builder(default, setter(into, strip_option))]
162 pub redis_version: Option<String>,
163
164 #[serde(skip_serializing_if = "Option::is_none")]
166 #[builder(default, setter(strip_option))]
167 pub preserve_roles: Option<bool>,
168
169 #[serde(skip_serializing_if = "Option::is_none")]
171 #[builder(default, setter(strip_option))]
172 pub force_restart: Option<bool>,
173
174 #[serde(skip_serializing_if = "Option::is_none")]
176 #[builder(default, setter(strip_option))]
177 pub may_discard_data: Option<bool>,
178
179 #[serde(skip_serializing_if = "Option::is_none")]
181 #[builder(default, setter(strip_option))]
182 pub force_discard: Option<bool>,
183
184 #[serde(skip_serializing_if = "Option::is_none")]
186 #[builder(default, setter(strip_option))]
187 pub keep_crdt_protocol_version: Option<bool>,
188
189 #[serde(skip_serializing_if = "Option::is_none")]
191 #[builder(default, setter(strip_option))]
192 pub parallel_shards_upgrade: Option<u32>,
193
194 #[serde(skip_serializing_if = "Option::is_none")]
196 #[builder(default, setter(strip_option))]
197 pub modules: Option<Vec<ModuleUpgrade>>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct DatabaseInfo {
203 pub uid: u32,
205 pub name: String,
206 pub port: Option<u16>,
207 pub status: Option<String>,
208 pub memory_size: Option<u64>,
209 pub memory_used: Option<u64>,
210
211 #[serde(rename = "type")]
213 pub type_: Option<String>,
214 pub version: Option<String>,
215
216 pub account_id: Option<u32>,
218 pub action_uid: Option<String>,
219
220 pub shards_count: Option<u32>,
222 pub shard_list: Option<Vec<u32>>,
223 pub sharding: Option<bool>,
224 pub shards_placement: Option<String>,
225 pub replication: Option<bool>,
226
227 pub endpoints: Option<Vec<EndpointInfo>>,
229 pub endpoint: Option<String>,
230 pub endpoint_ip: Option<Vec<String>>,
231 pub endpoint_node: Option<u32>,
232 pub dns_address_master: Option<String>,
233
234 pub persistence: Option<String>,
236 pub data_persistence: Option<String>,
237 pub eviction_policy: Option<String>,
238
239 pub created_time: Option<String>,
241 pub last_changed_time: Option<String>,
242 pub last_backup_time: Option<String>,
243 pub last_export_time: Option<String>,
244
245 pub mtls_allow_weak_hashing: Option<bool>,
247 pub mtls_allow_outdated_certs: Option<bool>,
248 pub authentication_redis_pass: Option<String>,
249 pub authentication_admin_pass: Option<String>,
250 pub authentication_sasl_pass: Option<String>,
251 pub authentication_sasl_uname: Option<String>,
252 pub authentication_ssl_client_certs: Option<Vec<Value>>,
253 pub authentication_ssl_crdt_certs: Option<Vec<Value>>,
254 pub authorized_subjects: Option<Vec<Value>>,
255 pub data_internode_encryption: Option<bool>,
256 pub ssl: Option<bool>,
257 pub tls_mode: Option<String>,
258 pub enforce_client_authentication: Option<String>,
259 pub default_user: Option<bool>,
260 pub acl: Option<Value>,
262 pub client_cert_subject_validation_type: Option<String>,
264 pub compare_key_hslot: Option<bool>,
266 pub dns_suffixes: Option<Vec<String>>,
268 pub group_uid: Option<u32>,
270 pub redis_cluster_enabled: Option<bool>,
272
273 pub crdt: Option<bool>,
275 pub crdt_enabled: Option<bool>,
276 pub crdt_config_version: Option<u32>,
277 pub crdt_replica_id: Option<u32>,
278 pub crdt_ghost_replica_ids: Option<String>,
279 pub crdt_featureset_version: Option<u32>,
280 pub crdt_protocol_version: Option<u32>,
281 pub crdt_guid: Option<String>,
282 pub crdt_modules: Option<String>,
283 pub crdt_replicas: Option<String>,
284 pub crdt_sources: Option<Vec<Value>>,
285 pub crdt_sync: Option<String>,
286 pub crdt_sync_connection_alarm_timeout_seconds: Option<u32>,
287 pub crdt_sync_dist: Option<bool>,
288 pub crdt_syncer_auto_oom_unlatch: Option<bool>,
289 pub crdt_xadd_id_uniqueness_mode: Option<String>,
290 pub crdt_causal_consistency: Option<bool>,
291 pub crdt_repl_backlog_size: Option<String>,
292
293 pub master_persistence: Option<bool>,
295 pub slave_ha: Option<bool>,
296 pub slave_ha_priority: Option<u32>,
297 pub replica_read_only: Option<bool>,
298 pub replica_sources: Option<Vec<Value>>,
299 pub replica_sync: Option<String>,
300 pub replica_sync_connection_alarm_timeout_seconds: Option<u32>,
301 pub replica_sync_dist: Option<bool>,
302 pub repl_backlog_size: Option<String>,
303
304 pub max_connections: Option<u32>,
306 pub maxclients: Option<u32>,
307 pub conns: Option<u32>,
308 pub conns_type: Option<String>,
309 pub max_client_pipeline: Option<u32>,
310 pub max_pipelined: Option<u32>,
311
312 pub aof_policy: Option<String>,
314 pub max_aof_file_size: Option<u64>,
315 pub max_aof_load_time: Option<u32>,
316
317 pub activedefrag: Option<String>,
319 pub active_defrag_cycle_max: Option<u32>,
320 pub active_defrag_cycle_min: Option<u32>,
321 pub active_defrag_ignore_bytes: Option<String>,
322 pub active_defrag_max_scan_fields: Option<u32>,
323 pub active_defrag_threshold_lower: Option<u32>,
324 pub active_defrag_threshold_upper: Option<u32>,
325
326 pub backup: Option<bool>,
328 pub backup_failure_reason: Option<String>,
329 pub backup_history: Option<u32>,
330 pub backup_interval: Option<u32>,
331 pub backup_interval_offset: Option<u32>,
332 pub backup_location: Option<Value>,
333 pub backup_progress: Option<f64>,
334 pub backup_status: Option<String>,
335
336 pub dataset_import_sources: Option<Vec<Value>>,
338 pub import_failure_reason: Option<String>,
339 pub import_progress: Option<f64>,
340 pub import_status: Option<String>,
341 pub export_failure_reason: Option<String>,
342 pub export_progress: Option<f64>,
343 pub export_status: Option<String>,
344 pub skip_import_analyze: Option<String>,
345
346 pub metrics_export_all: Option<bool>,
348 pub generate_text_monitor: Option<bool>,
349 pub email_alerts: Option<bool>,
350
351 pub module_list: Option<Vec<Value>>,
353 #[serde(default)]
355 pub search: Option<Value>,
356 #[serde(default)]
358 pub timeseries: Option<Value>,
359
360 pub bigstore: Option<bool>,
362 pub bigstore_ram_size: Option<u64>,
363 pub bigstore_max_ram_ratio: Option<u32>,
364 pub bigstore_ram_weights: Option<Vec<Value>>,
365 pub bigstore_version: Option<u32>,
366
367 pub proxy_policy: Option<String>,
369 pub oss_cluster: Option<bool>,
370 pub oss_cluster_api_preferred_endpoint_type: Option<String>,
371 pub oss_cluster_api_preferred_ip_type: Option<String>,
372 pub oss_sharding: Option<bool>,
373
374 pub redis_version: Option<String>,
376 pub resp3: Option<bool>,
377 pub disabled_commands: Option<String>,
378
379 pub hash_slots_policy: Option<String>,
381 pub shard_key_regex: Option<Vec<Value>>,
382 pub shard_block_crossslot_keys: Option<bool>,
383 pub shard_block_foreign_keys: Option<bool>,
384 pub implicit_shard_key: Option<bool>,
385
386 pub avoid_nodes: Option<Vec<String>>,
388 pub use_nodes: Option<Vec<String>>,
389 pub rack_aware: Option<bool>,
390
391 pub auto_upgrade: Option<bool>,
393 pub internal: Option<bool>,
394 pub db_conns_auditing: Option<bool>,
395 pub flush_on_fullsync: Option<bool>,
396 pub use_selective_flush: Option<bool>,
397
398 pub sync: Option<String>,
400 pub sync_sources: Option<Vec<Value>>,
401 pub sync_dedicated_threads: Option<u32>,
402 pub syncer_mode: Option<String>,
403 pub syncer_log_level: Option<String>,
404 pub support_syncer_reconf: Option<bool>,
405
406 pub gradual_src_mode: Option<String>,
408 pub gradual_src_max_sources: Option<u32>,
409 pub gradual_sync_mode: Option<String>,
410 pub gradual_sync_max_shards_per_source: Option<u32>,
411
412 pub slave_buffer: Option<String>,
414
415 pub snapshot_policy: Option<Vec<Value>>,
417
418 pub sched_policy: Option<String>,
420 pub recovery_wait_time: Option<i32>,
421
422 pub multi_commands_opt: Option<String>,
424 pub throughput_ingress: Option<f64>,
425 pub tracking_table_max_keys: Option<u32>,
426 pub wait_command: Option<bool>,
427
428 pub background_op: Option<Vec<Value>>,
430
431 pub mkms: Option<bool>,
433 pub roles_permissions: Option<Vec<Value>>,
434 pub tags: Option<Vec<String>>,
435 pub topology_epoch: Option<u32>,
436}
437
438#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct EndpointInfo {
441 pub uid: Option<String>,
443 pub addr: Option<Vec<String>>,
445 pub port: Option<u16>,
447 pub dns_name: Option<String>,
449 pub proxy_policy: Option<String>,
451 pub addr_type: Option<String>,
453 pub oss_cluster_api_preferred_ip_type: Option<String>,
455 pub exclude_proxies: Option<Vec<u32>>,
457 pub include_proxies: Option<Vec<u32>>,
459}
460
461#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
463pub struct ModuleConfig {
464 #[builder(setter(into))]
465 pub module_name: String,
466 #[serde(skip_serializing_if = "Option::is_none")]
467 #[builder(default, setter(into, strip_option))]
468 pub module_args: Option<String>,
469}
470
471#[derive(Debug, Serialize, Deserialize, TypedBuilder)]
490pub struct CreateDatabaseRequest {
491 #[builder(setter(into))]
492 pub name: String,
493 #[serde(skip_serializing_if = "Option::is_none")]
494 #[builder(default, setter(strip_option))]
495 pub memory_size: Option<u64>,
496 #[serde(skip_serializing_if = "Option::is_none")]
497 #[builder(default, setter(strip_option))]
498 pub port: Option<u16>,
499 #[serde(skip_serializing_if = "Option::is_none")]
500 #[builder(default, setter(strip_option))]
501 pub replication: Option<bool>,
502 #[serde(skip_serializing_if = "Option::is_none")]
503 #[builder(default, setter(into, strip_option))]
504 pub persistence: Option<String>,
505 #[serde(skip_serializing_if = "Option::is_none")]
506 #[builder(default, setter(into, strip_option))]
507 pub eviction_policy: Option<String>,
508 #[serde(skip_serializing_if = "Option::is_none")]
509 #[builder(default, setter(strip_option))]
510 pub sharding: Option<bool>,
511 #[serde(skip_serializing_if = "Option::is_none")]
512 #[builder(default, setter(strip_option))]
513 pub shards_count: Option<u32>,
514 #[serde(skip_serializing_if = "Option::is_none", alias = "shard_count")]
515 #[builder(default, setter(strip_option))]
516 pub shard_count: Option<u32>,
517 #[serde(skip_serializing_if = "Option::is_none")]
518 #[builder(default, setter(into, strip_option))]
519 pub proxy_policy: Option<String>,
520 #[serde(skip_serializing_if = "Option::is_none")]
521 #[builder(default, setter(strip_option))]
522 pub rack_aware: Option<bool>,
523 #[serde(skip_serializing_if = "Option::is_none")]
524 #[builder(default, setter(strip_option))]
525 pub module_list: Option<Vec<ModuleConfig>>,
526 #[serde(skip_serializing_if = "Option::is_none")]
527 #[builder(default, setter(strip_option))]
528 pub crdt: Option<bool>,
529 #[serde(skip_serializing_if = "Option::is_none")]
530 #[builder(default, setter(into, strip_option))]
531 pub authentication_redis_pass: Option<String>,
532}
533
534pub struct DatabaseHandler {
536 client: RestClient,
537}
538
539impl DatabaseHandler {
540 pub fn new(client: RestClient) -> Self {
541 DatabaseHandler { client }
542 }
543
544 pub async fn list(&self) -> Result<Vec<DatabaseInfo>> {
546 self.client.get("/v1/bdbs").await
547 }
548
549 pub async fn info(&self, uid: u32) -> Result<DatabaseInfo> {
551 self.client.get(&format!("/v1/bdbs/{}", uid)).await
552 }
553
554 pub async fn get(&self, uid: u32) -> Result<DatabaseInfo> {
556 self.info(uid).await
557 }
558
559 pub async fn create(&self, request: CreateDatabaseRequest) -> Result<DatabaseInfo> {
561 self.client.post("/v1/bdbs", &request).await
562 }
563
564 pub async fn update(&self, uid: u32, updates: Value) -> Result<DatabaseInfo> {
566 self.client
567 .put(&format!("/v1/bdbs/{}", uid), &updates)
568 .await
569 }
570
571 pub async fn delete(&self, uid: u32) -> Result<()> {
573 self.client.delete(&format!("/v1/bdbs/{}", uid)).await
574 }
575
576 pub async fn stats(&self, uid: u32) -> Result<Value> {
578 self.client.get(&format!("/v1/bdbs/stats/{}", uid)).await
579 }
580
581 pub async fn metrics(&self, uid: u32) -> Result<Value> {
583 self.client.get(&format!("/v1/bdbs/metrics/{}", uid)).await
584 }
585
586 pub async fn export(&self, uid: u32, export_location: &str) -> Result<ExportResponse> {
588 let body = serde_json::json!({
589 "export_location": export_location
590 });
591 self.client
592 .post(&format!("/v1/bdbs/{}/actions/export", uid), &body)
593 .await
594 }
595
596 pub async fn import(
598 &self,
599 uid: u32,
600 import_location: &str,
601 flush: bool,
602 ) -> Result<ImportResponse> {
603 let body = serde_json::json!({
604 "import_location": import_location,
605 "flush": flush
606 });
607 self.client
608 .post(&format!("/v1/bdbs/{}/actions/import", uid), &body)
609 .await
610 }
611
612 pub async fn flush(&self, uid: u32) -> Result<DatabaseActionResponse> {
614 self.client
615 .post(
616 &format!("/v1/bdbs/{}/actions/flush", uid),
617 &serde_json::json!({}),
618 )
619 .await
620 }
621
622 pub async fn backup(&self, uid: u32) -> Result<BackupResponse> {
624 self.client
625 .post(
626 &format!("/v1/bdbs/{}/actions/backup", uid),
627 &serde_json::json!({}),
628 )
629 .await
630 }
631
632 pub async fn restore(
634 &self,
635 uid: u32,
636 backup_uid: Option<&str>,
637 ) -> Result<DatabaseActionResponse> {
638 let body = if let Some(backup_id) = backup_uid {
639 serde_json::json!({ "backup_uid": backup_id })
640 } else {
641 serde_json::json!({})
642 };
643 self.client
644 .post(&format!("/v1/bdbs/{}/actions/restore", uid), &body)
645 .await
646 }
647
648 pub async fn shards(&self, uid: u32) -> Result<Value> {
650 self.client.get(&format!("/v1/bdbs/{}/shards", uid)).await
651 }
652
653 pub async fn endpoints(&self, uid: u32) -> Result<Vec<EndpointInfo>> {
655 self.client
656 .get(&format!("/v1/bdbs/{}/endpoints", uid))
657 .await
658 }
659
660 pub async fn optimize_shards_placement(&self, uid: u32) -> Result<Value> {
662 self.client
663 .get(&format!(
664 "/v1/bdbs/{}/actions/optimize_shards_placement",
665 uid
666 ))
667 .await
668 }
669
670 pub async fn recover_status(&self, uid: u32) -> Result<Value> {
672 self.client
673 .get(&format!("/v1/bdbs/{}/actions/recover", uid))
674 .await
675 }
676
677 pub async fn recover(&self, uid: u32) -> Result<DatabaseActionResponse> {
679 self.client
680 .post(
681 &format!("/v1/bdbs/{}/actions/recover", uid),
682 &serde_json::json!({}),
683 )
684 .await
685 }
686
687 pub async fn resume_traffic(&self, uid: u32) -> Result<DatabaseActionResponse> {
689 self.client
690 .post(
691 &format!("/v1/bdbs/{}/actions/resume_traffic", uid),
692 &serde_json::json!({}),
693 )
694 .await
695 }
696
697 pub async fn stop_traffic(&self, uid: u32) -> Result<DatabaseActionResponse> {
699 self.client
700 .post(
701 &format!("/v1/bdbs/{}/actions/stop_traffic", uid),
702 &serde_json::json!({}),
703 )
704 .await
705 }
706
707 pub async fn rebalance(&self, uid: u32) -> Result<DatabaseActionResponse> {
709 self.client
710 .put(
711 &format!("/v1/bdbs/{}/actions/rebalance", uid),
712 &serde_json::json!({}),
713 )
714 .await
715 }
716
717 pub async fn revamp(&self, uid: u32) -> Result<DatabaseActionResponse> {
719 self.client
720 .put(
721 &format!("/v1/bdbs/{}/actions/revamp", uid),
722 &serde_json::json!({}),
723 )
724 .await
725 }
726
727 pub async fn backup_reset_status(&self, uid: u32) -> Result<Value> {
729 self.client
730 .put(
731 &format!("/v1/bdbs/{}/actions/backup_reset_status", uid),
732 &serde_json::json!({}),
733 )
734 .await
735 }
736
737 pub async fn export_reset_status(&self, uid: u32) -> Result<Value> {
739 self.client
740 .put(
741 &format!("/v1/bdbs/{}/actions/export_reset_status", uid),
742 &serde_json::json!({}),
743 )
744 .await
745 }
746
747 pub async fn import_reset_status(&self, uid: u32) -> Result<Value> {
749 self.client
750 .put(
751 &format!("/v1/bdbs/{}/actions/import_reset_status", uid),
752 &serde_json::json!({}),
753 )
754 .await
755 }
756
757 pub async fn peer_stats(&self, uid: u32) -> Result<Value> {
759 self.client
760 .get(&format!("/v1/bdbs/{}/peer_stats", uid))
761 .await
762 }
763
764 pub async fn peer_stats_for(&self, uid: u32, peer_uid: u32) -> Result<Value> {
766 self.client
767 .get(&format!("/v1/bdbs/{}/peer_stats/{}", uid, peer_uid))
768 .await
769 }
770
771 pub async fn sync_source_stats(&self, uid: u32) -> Result<Value> {
773 self.client
774 .get(&format!("/v1/bdbs/{}/sync_source_stats", uid))
775 .await
776 }
777
778 pub async fn sync_source_stats_for(&self, uid: u32, src_uid: u32) -> Result<Value> {
780 self.client
781 .get(&format!("/v1/bdbs/{}/sync_source_stats/{}", uid, src_uid))
782 .await
783 }
784
785 pub async fn syncer_state(&self, uid: u32) -> Result<Value> {
787 self.client
788 .get(&format!("/v1/bdbs/{}/syncer_state", uid))
789 .await
790 }
791
792 pub async fn syncer_state_crdt(&self, uid: u32) -> Result<Value> {
794 self.client
795 .get(&format!("/v1/bdbs/{}/syncer_state/crdt", uid))
796 .await
797 }
798
799 pub async fn syncer_state_replica(&self, uid: u32) -> Result<Value> {
801 self.client
802 .get(&format!("/v1/bdbs/{}/syncer_state/replica", uid))
803 .await
804 }
805
806 pub async fn passwords_delete(&self, uid: u32) -> Result<()> {
808 self.client
809 .delete(&format!("/v1/bdbs/{}/passwords", uid))
810 .await
811 }
812
813 pub async fn alerts_all(&self) -> Result<Value> {
815 self.client.get("/v1/bdbs/alerts").await
816 }
817
818 pub async fn alerts_for(&self, uid: u32) -> Result<Value> {
820 self.client.get(&format!("/v1/bdbs/alerts/{}", uid)).await
821 }
822
823 pub async fn alert_detail(&self, uid: u32, alert: &str) -> Result<Value> {
825 self.client
826 .get(&format!("/v1/bdbs/alerts/{}/{}", uid, alert))
827 .await
828 }
829
830 pub async fn crdt_source_alerts_all(&self) -> Result<Value> {
832 self.client.get("/v1/bdbs/crdt_sources/alerts").await
833 }
834
835 pub async fn crdt_source_alerts_for(&self, uid: u32) -> Result<Value> {
837 self.client
838 .get(&format!("/v1/bdbs/crdt_sources/alerts/{}", uid))
839 .await
840 }
841
842 pub async fn crdt_source_alerts_source(&self, uid: u32, source_id: u32) -> Result<Value> {
844 self.client
845 .get(&format!(
846 "/v1/bdbs/crdt_sources/alerts/{}/{}",
847 uid, source_id
848 ))
849 .await
850 }
851
852 pub async fn crdt_source_alert_detail(
854 &self,
855 uid: u32,
856 source_id: u32,
857 alert: &str,
858 ) -> Result<Value> {
859 self.client
860 .get(&format!(
861 "/v1/bdbs/crdt_sources/alerts/{}/{}/{}",
862 uid, source_id, alert
863 ))
864 .await
865 }
866
867 pub async fn replica_source_alerts_all(&self) -> Result<Value> {
869 self.client.get("/v1/bdbs/replica_sources/alerts").await
870 }
871
872 pub async fn replica_source_alerts_for(&self, uid: u32) -> Result<Value> {
874 self.client
875 .get(&format!("/v1/bdbs/replica_sources/alerts/{}", uid))
876 .await
877 }
878
879 pub async fn replica_source_alerts_source(&self, uid: u32, source_id: u32) -> Result<Value> {
881 self.client
882 .get(&format!(
883 "/v1/bdbs/replica_sources/alerts/{}/{}",
884 uid, source_id
885 ))
886 .await
887 }
888
889 pub async fn replica_source_alert_detail(
891 &self,
892 uid: u32,
893 source_id: u32,
894 alert: &str,
895 ) -> Result<Value> {
896 self.client
897 .get(&format!(
898 "/v1/bdbs/replica_sources/alerts/{}/{}/{}",
899 uid, source_id, alert
900 ))
901 .await
902 }
903
904 pub async fn upgrade(
906 &self,
907 uid: u32,
908 module_name: &str,
909 new_version: &str,
910 ) -> Result<DatabaseActionResponse> {
911 let body = serde_json::json!({
912 "module_name": module_name,
913 "new_version": new_version
914 });
915 self.client
916 .post(&format!("/v1/bdbs/{}/actions/upgrade", uid), &body)
917 .await
918 }
919
920 pub async fn upgrade_redis_version(
954 &self,
955 uid: u32,
956 request: DatabaseUpgradeRequest,
957 ) -> Result<DatabaseActionResponse> {
958 self.client
959 .post(&format!("/v1/bdbs/{}/upgrade", uid), &request)
960 .await
961 }
962
963 pub async fn reset_password(
965 &self,
966 uid: u32,
967 new_password: &str,
968 ) -> Result<DatabaseActionResponse> {
969 let body = serde_json::json!({
970 "authentication_redis_pass": new_password
971 });
972 self.client
973 .post(&format!("/v1/bdbs/{}/actions/reset_password", uid), &body)
974 .await
975 }
976
977 pub async fn availability(&self, uid: u32) -> Result<Value> {
979 self.client
980 .get(&format!("/v1/bdbs/{}/availability", uid))
981 .await
982 }
983
984 pub async fn endpoint_availability(&self, uid: u32) -> Result<Value> {
986 self.client
987 .get(&format!("/v1/local/bdbs/{}/endpoint/availability", uid))
988 .await
989 }
990
991 pub async fn create_v2(&self, request: Value) -> Result<DatabaseInfo> {
993 self.client.post("/v2/bdbs", &request).await
994 }
995
996 pub fn watch_database(&self, uid: u32, poll_interval: Duration) -> DatabaseWatchStream<'_> {
1036 Box::pin(async_stream::stream! {
1037 let mut last_status: Option<String> = None;
1038
1039 loop {
1040 match self.info(uid).await {
1041 Ok(db_info) => {
1042 let current_status = db_info.status.clone();
1043
1044 let status_changed = match (&last_status, ¤t_status) {
1046 (Some(old), Some(new)) => old != new,
1047 (None, Some(_)) => false, (Some(_), None) => true, (None, None) => false,
1050 };
1051
1052 if status_changed {
1054 yield Ok((db_info, last_status.clone()));
1055 } else if last_status.is_none() {
1056 yield Ok((db_info, None));
1058 } else {
1059 yield Ok((db_info, None));
1061 }
1062
1063 last_status = current_status;
1064 }
1065 Err(e) => {
1066 yield Err(e);
1067 break;
1068 }
1069 }
1070
1071 sleep(poll_interval).await;
1072 }
1073 })
1074 }
1075}