typedb_driver/database/
database_manager.rs1#[cfg(not(feature = "sync"))]
21use std::future::Future;
22
23use super::Database;
24use crate::{
25 common::{error::ConnectionError, Result},
26 connection::ServerConnection,
27 Connection, Error,
28};
29
30#[derive(Clone, Debug)]
32pub struct DatabaseManager {
33 connection: Connection,
34}
35
36impl DatabaseManager {
38 pub fn new(connection: Connection) -> Self {
39 Self { connection }
40 }
41
42 #[cfg_attr(feature = "sync", doc = "driver.databases().get(name);")]
52 #[cfg_attr(not(feature = "sync"), doc = "driver.databases().get(name).await;")]
53 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
55 pub async fn get(&self, name: impl Into<String>) -> Result<Database> {
56 let name = name.into();
57 if !self.contains(name.clone()).await? {
58 return Err(ConnectionError::DatabaseDoesNotExist { name }.into());
59 }
60 Database::get(name, self.connection.clone()).await
61 }
62
63 #[cfg_attr(feature = "sync", doc = "driver.databases().contains(name);")]
73 #[cfg_attr(not(feature = "sync"), doc = "driver.databases().contains(name).await;")]
74 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
76 pub async fn contains(&self, name: impl Into<String>) -> Result<bool> {
77 let name = name.into();
78 self.run_failsafe(name, |server_connection, name| async move { server_connection.database_exists(name).await })
79 .await
80 }
81
82 #[cfg_attr(feature = "sync", doc = "driver.databases().create(name);")]
92 #[cfg_attr(not(feature = "sync"), doc = "driver.databases().create(name).await;")]
93 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
95 pub async fn create(&self, name: impl Into<String>) -> Result {
96 let name = name.into();
97 self.run_failsafe(name, |server_connection, name| async move { server_connection.create_database(name).await })
98 .await
99 }
100
101 #[cfg_attr(feature = "sync", doc = "driver.databases().all();")]
107 #[cfg_attr(not(feature = "sync"), doc = "driver.databases().all().await;")]
108 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
110 pub async fn all(&self) -> Result<Vec<Database>> {
111 let mut error_buffer = Vec::with_capacity(self.connection.server_count());
112 for (server_id, server_connection) in self.connection.connections() {
113 match server_connection.all_databases().await {
114 Ok(list) => {
115 return list.into_iter().map(|db_info| Database::new(db_info, self.connection.clone())).collect()
116 }
117 Err(err) => error_buffer.push(format!("- {}: {}", server_id, err)),
118 }
119 }
120 Err(ConnectionError::ServerConnectionFailedWithError { error: error_buffer.join("\n") })?
121 }
122
123 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
124 async fn run_failsafe<F, P, R>(&self, name: String, task: F) -> Result<R>
125 where
126 F: Fn(ServerConnection, String) -> P,
127 P: Future<Output = Result<R>>,
128 {
129 let mut error_buffer = Vec::with_capacity(self.connection.server_count());
130 for (server_id, server_connection) in self.connection.connections() {
131 match task(server_connection.clone(), name.clone()).await {
132 Ok(res) => return Ok(res),
133 Err(Error::Connection(ConnectionError::CloudReplicaNotPrimary)) => {
134 return Database::get(name, self.connection.clone())
135 .await?
136 .run_on_primary_replica(|database| {
137 let task = &task;
138 async move { task(database.connection().clone(), database.name().to_owned()).await }
139 })
140 .await
141 }
142 err @ Err(Error::Connection(ConnectionError::ConnectionIsClosed)) => return err,
143 Err(err) => error_buffer.push(format!("- {}: {}", server_id, err)),
144 }
145 }
146 Err(ConnectionError::ServerConnectionFailedWithError { error: error_buffer.join("\n") })?
147 }
148}