Skip to main content

typedb_driver/database/
database.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#[cfg(not(feature = "sync"))]
21use std::future::Future;
22use std::{fmt, sync::RwLock, thread::sleep, time::Duration};
23
24use log::{debug, error};
25
26use crate::{
27    common::{
28        address::Address,
29        error::ConnectionError,
30        info::{DatabaseInfo, ReplicaInfo},
31        Error, Result,
32    },
33    connection::ServerConnection,
34    error::InternalError,
35    Connection,
36};
37
38/// A TypeDB database
39pub struct Database {
40    name: String,
41    replicas: RwLock<Vec<Replica>>,
42    connection: Connection,
43}
44
45impl Database {
46    const PRIMARY_REPLICA_TASK_MAX_RETRIES: usize = 10;
47    const FETCH_REPLICAS_MAX_RETRIES: usize = 10;
48    const WAIT_FOR_PRIMARY_REPLICA_SELECTION: Duration = Duration::from_secs(2);
49
50    pub(super) fn new(database_info: DatabaseInfo, connection: Connection) -> Result<Self> {
51        let name = database_info.name.clone();
52        let replicas = RwLock::new(Replica::try_from_info(database_info, &connection)?);
53        Ok(Self { name, replicas, connection })
54    }
55
56    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
57    pub(super) async fn get(name: String, connection: Connection) -> Result<Self> {
58        Ok(Self {
59            name: name.clone(),
60            replicas: RwLock::new(Replica::fetch_all(name, connection.clone()).await?),
61            connection,
62        })
63    }
64
65    /// Retrieves the database name as a string.
66    pub fn name(&self) -> &str {
67        self.name.as_str()
68    }
69
70    /// Returns the `Replica` instances for this database.
71    /// _Only works in TypeDB Cloud_
72    ///
73    /// # Examples
74    ///
75    /// ```rust
76    /// database.replicas_info()
77    /// ```
78    pub fn replicas_info(&self) -> Vec<ReplicaInfo> {
79        self.replicas.read().unwrap().iter().map(Replica::to_info).collect()
80    }
81
82    /// Returns the primary replica for this database.
83    /// _Only works in TypeDB Cloud_
84    ///
85    /// # Examples
86    ///
87    /// ```rust
88    /// database.primary_replica_info()
89    /// ```
90    pub fn primary_replica_info(&self) -> Option<ReplicaInfo> {
91        self.primary_replica().map(|replica| replica.to_info())
92    }
93
94    /// Returns the preferred replica for this database.
95    /// Operations which can be run on any replica will prefer to use this replica.
96    /// _Only works in TypeDB Cloud_
97    ///
98    /// # Examples
99    ///
100    /// ```rust
101    /// database.preferred_replica_info();
102    /// ```
103    pub fn preferred_replica_info(&self) -> Option<ReplicaInfo> {
104        self.preferred_replica().map(|replica| replica.to_info())
105    }
106
107    /// Deletes this database.
108    ///
109    /// # Examples
110    ///
111    /// ```rust
112    #[cfg_attr(feature = "sync", doc = "database.delete();")]
113    #[cfg_attr(not(feature = "sync"), doc = "database.delete().await;")]
114    /// ```
115    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
116    pub async fn delete(self) -> Result {
117        self.run_on_primary_replica(|database| database.delete()).await
118    }
119
120    /// Returns a full schema text as a valid TypeQL define query string.
121    ///
122    /// # Examples
123    ///
124    /// ```rust
125    #[cfg_attr(feature = "sync", doc = "database.schema();")]
126    #[cfg_attr(not(feature = "sync"), doc = "database.schema().await;")]
127    /// ```
128    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
129    pub async fn schema(&self) -> Result<String> {
130        self.run_failsafe(|database| async move { database.schema().await }).await
131    }
132
133    /// Returns the types in the schema as a valid TypeQL define query string.
134    ///
135    /// # Examples
136    ///
137    /// ```rust
138    #[cfg_attr(feature = "sync", doc = "database.type_schema();")]
139    #[cfg_attr(not(feature = "sync"), doc = "database.type_schema().await;")]
140    /// ```
141    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
142    pub async fn type_schema(&self) -> Result<String> {
143        self.run_failsafe(|database| async move { database.type_schema().await }).await
144    }
145
146    /// Returns the rules in the schema as a valid TypeQL define query string.
147    ///
148    /// # Examples
149    ///
150    /// ```rust
151    #[cfg_attr(feature = "sync", doc = "database.rule_schema();")]
152    #[cfg_attr(not(feature = "sync"), doc = "database.rule_schema().await;")]
153    /// ```
154    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
155    pub async fn rule_schema(&self) -> Result<String> {
156        self.run_failsafe(|database| async move { database.rule_schema().await }).await
157    }
158
159    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
160    pub(crate) async fn run_failsafe<F, P, R>(&self, task: F) -> Result<R>
161    where
162        F: Fn(ServerDatabase) -> P,
163        P: Future<Output = Result<R>>,
164    {
165        match self.run_on_any_replica(&task).await {
166            Err(Error::Connection(ConnectionError::CloudReplicaNotPrimary)) => {
167                debug!("Attempted to run on a non-primary replica, retrying on primary...");
168                self.run_on_primary_replica(&task).await
169            }
170            res => res,
171        }
172    }
173
174    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
175    pub(super) async fn run_on_any_replica<F, P, R>(&self, task: F) -> Result<R>
176    where
177        F: Fn(ServerDatabase) -> P,
178        P: Future<Output = Result<R>>,
179    {
180        let replicas = self.replicas.read().unwrap().clone();
181        for replica in replicas {
182            match task(replica.database.clone()).await {
183                Err(Error::Connection(
184                    ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
185                )) => {
186                    debug!("Unable to connect to {}. Attempting next server.", replica.server);
187                }
188                res => return res,
189            }
190        }
191        Err(self.connection.unable_to_connect_error())
192    }
193
194    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
195    pub(super) async fn run_on_primary_replica<F, P, R>(&self, task: F) -> Result<R>
196    where
197        F: Fn(ServerDatabase) -> P,
198        P: Future<Output = Result<R>>,
199    {
200        let mut primary_replica =
201            if let Some(replica) = self.primary_replica() { replica } else { self.seek_primary_replica().await? };
202
203        for _ in 0..Self::PRIMARY_REPLICA_TASK_MAX_RETRIES {
204            match task(primary_replica.database.clone()).await {
205                Err(Error::Connection(
206                    ConnectionError::CloudReplicaNotPrimary
207                    | ConnectionError::ServerConnectionFailedStatusError { .. }
208                    | ConnectionError::ConnectionFailed,
209                )) => {
210                    debug!("Primary replica error, waiting...");
211                    Self::wait_for_primary_replica_selection().await;
212                    primary_replica = self.seek_primary_replica().await?;
213                }
214                res => return res,
215            }
216        }
217        Err(self.connection.unable_to_connect_error())
218    }
219
220    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
221    async fn seek_primary_replica(&self) -> Result<Replica> {
222        for _ in 0..Self::FETCH_REPLICAS_MAX_RETRIES {
223            let replicas = Replica::fetch_all(self.name.clone(), self.connection.clone()).await?;
224            *self.replicas.write().unwrap() = replicas;
225            if let Some(replica) = self.primary_replica() {
226                return Ok(replica);
227            }
228            Self::wait_for_primary_replica_selection().await;
229        }
230        Err(self.connection.unable_to_connect_error())
231    }
232
233    fn primary_replica(&self) -> Option<Replica> {
234        self.replicas.read().unwrap().iter().filter(|r| r.is_primary).max_by_key(|r| r.term).cloned()
235    }
236
237    fn preferred_replica(&self) -> Option<Replica> {
238        self.replicas.read().unwrap().iter().filter(|r| r.is_preferred).max_by_key(|r| r.term).cloned()
239    }
240
241    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
242    async fn wait_for_primary_replica_selection() {
243        // FIXME: blocking sleep! Can't do agnostic async sleep.
244        sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION);
245    }
246}
247
248impl fmt::Debug for Database {
249    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250        f.debug_struct("Database").field("name", &self.name).field("replicas", &self.replicas).finish()
251    }
252}
253
254/// The metadata and state of an individual raft replica of a database.
255#[derive(Clone)]
256pub(super) struct Replica {
257    /// The server hosting this replica
258    server: Address,
259    /// Retrieves the database name for which this is a replica
260    database_name: String,
261    /// Checks whether this is the primary replica of the raft cluster.
262    is_primary: bool,
263    /// The raft protocol ‘term’ of this replica.
264    term: i64,
265    /// Checks whether this is the preferred replica of the raft cluster. If true, Operations which can be run on any replica will prefer to use this replica.
266    is_preferred: bool,
267    /// Retrieves the database for which this is a replica
268    database: ServerDatabase,
269}
270
271impl Replica {
272    fn new(name: String, metadata: ReplicaInfo, server_connection: ServerConnection) -> Self {
273        Self {
274            server: metadata.server,
275            database_name: name.clone(),
276            is_primary: metadata.is_primary,
277            term: metadata.term,
278            is_preferred: metadata.is_preferred,
279            database: ServerDatabase::new(name, server_connection),
280        }
281    }
282
283    fn try_from_info(database_info: DatabaseInfo, connection: &Connection) -> Result<Vec<Self>> {
284        database_info
285            .replicas
286            .into_iter()
287            .map(|replica| {
288                let server_connection = connection
289                    .connection(&replica.server)
290                    .ok_or_else(|| InternalError::UnknownServer { server: replica.server.clone() })?;
291                Ok(Self::new(database_info.name.clone(), replica, server_connection.clone()))
292            })
293            .collect()
294    }
295
296    fn to_info(&self) -> ReplicaInfo {
297        ReplicaInfo {
298            server: self.server.clone(),
299            is_primary: self.is_primary,
300            is_preferred: self.is_preferred,
301            term: self.term,
302        }
303    }
304
305    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
306    async fn fetch_all(name: String, connection: Connection) -> Result<Vec<Self>> {
307        for (server, server_connection) in connection.connections() {
308            let res = server_connection.get_database_replicas(name.clone()).await;
309            match res {
310                Ok(info) => {
311                    return Self::try_from_info(info, &connection);
312                }
313                Err(Error::Connection(
314                    ConnectionError::DatabaseDoesNotExist { .. }
315                    | ConnectionError::ServerConnectionFailedStatusError { .. }
316                    | ConnectionError::ConnectionFailed,
317                )) => {
318                    error!(
319                        "Failed to fetch replica info for database '{}' from {}. Attempting next server.",
320                        name, server
321                    );
322                }
323                Err(err) => return Err(err),
324            }
325        }
326        Err(connection.unable_to_connect_error())
327    }
328}
329
330impl fmt::Debug for Replica {
331    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332        f.debug_struct("Replica")
333            .field("server", &self.server)
334            .field("database_name", &self.database_name)
335            .field("is_primary", &self.is_primary)
336            .field("term", &self.term)
337            .field("is_preferred", &self.is_preferred)
338            .finish()
339    }
340}
341
342#[derive(Clone, Debug)]
343pub(crate) struct ServerDatabase {
344    name: String,
345    connection: ServerConnection,
346}
347
348impl ServerDatabase {
349    fn new(name: String, connection: ServerConnection) -> Self {
350        Self { name, connection }
351    }
352
353    pub(super) fn name(&self) -> &str {
354        self.name.as_str()
355    }
356
357    pub(crate) fn connection(&self) -> &ServerConnection {
358        &self.connection
359    }
360
361    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
362    async fn delete(self) -> Result {
363        self.connection.delete_database(self.name).await
364    }
365
366    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
367    async fn schema(&self) -> Result<String> {
368        self.connection.database_schema(self.name.clone()).await
369    }
370
371    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
372    async fn type_schema(&self) -> Result<String> {
373        self.connection.database_type_schema(self.name.clone()).await
374    }
375
376    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
377    async fn rule_schema(&self) -> Result<String> {
378        self.connection.database_rule_schema(self.name.clone()).await
379    }
380}
381
382impl fmt::Display for ServerDatabase {
383    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384        write!(f, "{}", self.name)
385    }
386}