Skip to main content

typedb_driver/database/
database_manager.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#[cfg(not(feature = "sync"))]
21use std::future::Future;
22
23use super::Database;
24use crate::{
25    common::{error::ConnectionError, Result},
26    connection::ServerConnection,
27    Connection, Error,
28};
29
30/// Provides access to all database management methods.
31#[derive(Clone, Debug)]
32pub struct DatabaseManager {
33    connection: Connection,
34}
35
36/// Provides access to all database management methods.
37impl DatabaseManager {
38    pub fn new(connection: Connection) -> Self {
39        Self { connection }
40    }
41
42    /// Retrieve the database with the given name.
43    ///
44    /// # Arguments
45    ///
46    /// * `name` -- The name of the database to retrieve
47    ///
48    /// # Examples
49    ///
50    /// ```rust
51    #[cfg_attr(feature = "sync", doc = "driver.databases().get(name);")]
52    #[cfg_attr(not(feature = "sync"), doc = "driver.databases().get(name).await;")]
53    /// ```
54    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
55    pub async fn get(&self, name: impl Into<String>) -> Result<Database> {
56        let name = name.into();
57        if !self.contains(name.clone()).await? {
58            return Err(ConnectionError::DatabaseDoesNotExist { name }.into());
59        }
60        Database::get(name, self.connection.clone()).await
61    }
62
63    /// Checks if a database with the given name exists
64    ///
65    /// # Arguments
66    ///
67    /// * `name` -- The database name to be checked
68    ///
69    /// # Examples
70    ///
71    /// ```rust
72    #[cfg_attr(feature = "sync", doc = "driver.databases().contains(name);")]
73    #[cfg_attr(not(feature = "sync"), doc = "driver.databases().contains(name).await;")]
74    /// ```
75    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
76    pub async fn contains(&self, name: impl Into<String>) -> Result<bool> {
77        let name = name.into();
78        self.run_failsafe(name, |server_connection, name| async move { server_connection.database_exists(name).await })
79            .await
80    }
81
82    /// Create a database with the given name
83    ///
84    /// # Arguments
85    ///
86    /// * `name` -- The name of the database to be created
87    ///
88    /// # Examples
89    ///
90    /// ```rust
91    #[cfg_attr(feature = "sync", doc = "driver.databases().create(name);")]
92    #[cfg_attr(not(feature = "sync"), doc = "driver.databases().create(name).await;")]
93    /// ```
94    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
95    pub async fn create(&self, name: impl Into<String>) -> Result {
96        let name = name.into();
97        self.run_failsafe(name, |server_connection, name| async move { server_connection.create_database(name).await })
98            .await
99    }
100
101    /// Retrieves all databases present on the TypeDB server
102    ///
103    /// # Examples
104    ///
105    /// ```rust
106    #[cfg_attr(feature = "sync", doc = "driver.databases().all();")]
107    #[cfg_attr(not(feature = "sync"), doc = "driver.databases().all().await;")]
108    /// ```
109    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
110    pub async fn all(&self) -> Result<Vec<Database>> {
111        let mut error_buffer = Vec::with_capacity(self.connection.server_count());
112        for (server_id, server_connection) in self.connection.connections() {
113            match server_connection.all_databases().await {
114                Ok(list) => {
115                    return list.into_iter().map(|db_info| Database::new(db_info, self.connection.clone())).collect()
116                }
117                Err(err) => error_buffer.push(format!("- {}: {}", server_id, err)),
118            }
119        }
120        Err(ConnectionError::ServerConnectionFailedWithError { error: error_buffer.join("\n") })?
121    }
122
123    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
124    async fn run_failsafe<F, P, R>(&self, name: String, task: F) -> Result<R>
125    where
126        F: Fn(ServerConnection, String) -> P,
127        P: Future<Output = Result<R>>,
128    {
129        let mut error_buffer = Vec::with_capacity(self.connection.server_count());
130        for (server_id, server_connection) in self.connection.connections() {
131            match task(server_connection.clone(), name.clone()).await {
132                Ok(res) => return Ok(res),
133                Err(Error::Connection(ConnectionError::CloudReplicaNotPrimary)) => {
134                    return Database::get(name, self.connection.clone())
135                        .await?
136                        .run_on_primary_replica(|database| {
137                            let task = &task;
138                            async move { task(database.connection().clone(), database.name().to_owned()).await }
139                        })
140                        .await
141                }
142                err @ Err(Error::Connection(ConnectionError::ConnectionIsClosed)) => return err,
143                Err(err) => error_buffer.push(format!("- {}: {}", server_id, err)),
144            }
145        }
146        Err(ConnectionError::ServerConnectionFailedWithError { error: error_buffer.join("\n") })?
147    }
148}