typedb_driver/database/
database.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#[cfg(not(feature = "sync"))]
21use std::future::Future;
22use std::{
23    collections::HashMap,
24    fmt,
25    sync::{
26        atomic::{AtomicU64, Ordering},
27        Arc, RwLock,
28    },
29    thread::sleep,
30    time::Duration,
31};
32
33use itertools::Itertools;
34use log::{debug, error};
35
36use crate::{
37    common::{
38        address::Address,
39        error::ConnectionError,
40        info::{DatabaseInfo, ReplicaInfo},
41        Error, Result,
42    },
43    connection::server_connection::ServerConnection,
44    driver::TypeDBDriver,
45    error::InternalError,
46    Options, Transaction, TransactionType,
47};
48
49/// A TypeDB database
50pub struct Database {
51    name: String,
52    replicas: RwLock<Vec<Replica>>,
53    server_connections: HashMap<Address, ServerConnection>,
54}
55
56impl Database {
57    const PRIMARY_REPLICA_TASK_MAX_RETRIES: usize = 10;
58    const FETCH_REPLICAS_MAX_RETRIES: usize = 10;
59    const WAIT_FOR_PRIMARY_REPLICA_SELECTION: Duration = Duration::from_secs(2);
60
61    pub(super) fn new(
62        database_info: DatabaseInfo,
63        server_connections: HashMap<Address, ServerConnection>,
64    ) -> Result<Self> {
65        let name = database_info.name.clone();
66        let replicas = RwLock::new(Replica::try_from_info(database_info, &server_connections)?);
67        Ok(Self { name, replicas, server_connections })
68    }
69
70    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
71    pub(super) async fn get(name: String, server_connections: HashMap<Address, ServerConnection>) -> Result<Self> {
72        Ok(Self {
73            name: name.clone(),
74            replicas: RwLock::new(Replica::fetch_all(name, &server_connections).await?),
75            server_connections,
76        })
77    }
78
79    /// Retrieves the database name as a string.
80    pub fn name(&self) -> &str {
81        self.name.as_str()
82    }
83
84    /// Returns the `Replica` instances for this database.
85    /// _Only works in TypeDB Cloud_
86    ///
87    /// # Examples
88    ///
89    /// ```rust
90    /// database.replicas_info()
91    /// ```
92    pub fn replicas_info(&self) -> Vec<ReplicaInfo> {
93        self.replicas.read().unwrap().iter().map(Replica::to_info).collect()
94    }
95
96    /// Returns the primary replica for this database.
97    /// _Only works in TypeDB Cloud_
98    ///
99    /// # Examples
100    ///
101    /// ```rust
102    /// database.primary_replica_info()
103    /// ```
104    pub fn primary_replica_info(&self) -> Option<ReplicaInfo> {
105        self.primary_replica().map(|replica| replica.to_info())
106    }
107
108    /// Returns the preferred replica for this database.
109    /// Operations which can be run on any replica will prefer to use this replica.
110    /// _Only works in TypeDB Cloud_
111    ///
112    /// # Examples
113    ///
114    /// ```rust
115    /// database.preferred_replica_info();
116    /// ```
117    pub fn preferred_replica_info(&self) -> Option<ReplicaInfo> {
118        self.preferred_replica().map(|replica| replica.to_info())
119    }
120
121    /// Deletes this database.
122    ///
123    /// # Examples
124    ///
125    /// ```rust
126    #[cfg_attr(feature = "sync", doc = "database.delete();")]
127    #[cfg_attr(not(feature = "sync"), doc = "database.delete().await;")]
128    /// ```
129    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
130    pub async fn delete(self: Arc<Self>) -> Result {
131        self.run_on_primary_replica(|database| database.delete()).await
132    }
133
134    /// Returns a full schema text as a valid TypeQL define query string.
135    ///
136    /// # Examples
137    ///
138    /// ```rust
139    #[cfg_attr(feature = "sync", doc = "database.schema();")]
140    #[cfg_attr(not(feature = "sync"), doc = "database.schema().await;")]
141    /// ```
142    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
143    pub async fn schema(&self) -> Result<String> {
144        self.run_failsafe(|database| async move { database.schema().await }).await
145    }
146
147    /// Returns the types in the schema as a valid TypeQL define query string.
148    ///
149    /// # Examples
150    ///
151    /// ```rust
152    #[cfg_attr(feature = "sync", doc = "database.type_schema();")]
153    #[cfg_attr(not(feature = "sync"), doc = "database.type_schema().await;")]
154    /// ```
155    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
156    pub async fn type_schema(&self) -> Result<String> {
157        self.run_failsafe(|database| async move { database.type_schema().await }).await
158    }
159
160    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
161    pub(crate) async fn run_failsafe<F, P, R>(&self, task: F) -> Result<R>
162    where
163        F: Fn(ServerDatabase) -> P,
164        P: Future<Output = Result<R>>,
165    {
166        match self.run_on_any_replica(&task).await {
167            Err(Error::Connection(ConnectionError::CloudReplicaNotPrimary)) => {
168                debug!("Attempted to run on a non-primary replica, retrying on primary...");
169                self.run_on_primary_replica(&task).await
170            }
171            res => res,
172        }
173    }
174
175    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
176    pub(super) async fn run_on_any_replica<F, P, R>(&self, task: F) -> Result<R>
177    where
178        F: Fn(ServerDatabase) -> P,
179        P: Future<Output = Result<R>>,
180    {
181        let replicas = self.replicas.read().unwrap().clone();
182        for replica in replicas {
183            match task(replica.database.clone()).await {
184                Err(Error::Connection(
185                    ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
186                )) => {
187                    debug!("Unable to connect to {}. Attempting next server.", replica.server);
188                }
189                res => return res,
190            }
191        }
192        Err(Self::unable_to_connect_error(&self.server_connections))
193    }
194
195    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
196    pub(super) async fn run_on_primary_replica<F, P, R>(&self, task: F) -> Result<R>
197    where
198        F: Fn(ServerDatabase) -> P,
199        P: Future<Output = Result<R>>,
200    {
201        let mut primary_replica =
202            if let Some(replica) = self.primary_replica() { replica } else { self.seek_primary_replica().await? };
203
204        for _ in 0..Self::PRIMARY_REPLICA_TASK_MAX_RETRIES {
205            match task(primary_replica.database.clone()).await {
206                Err(Error::Connection(
207                    ConnectionError::CloudReplicaNotPrimary
208                    | ConnectionError::ServerConnectionFailedStatusError { .. }
209                    | ConnectionError::ConnectionFailed,
210                )) => {
211                    debug!("Primary replica error, waiting...");
212                    Self::wait_for_primary_replica_selection().await;
213                    primary_replica = self.seek_primary_replica().await?;
214                }
215                res => return res,
216            }
217        }
218        Err(Self::unable_to_connect_error(&self.server_connections))
219    }
220
221    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
222    async fn seek_primary_replica(&self) -> Result<Replica> {
223        for _ in 0..Self::FETCH_REPLICAS_MAX_RETRIES {
224            let replicas = Replica::fetch_all(self.name.clone(), &self.server_connections).await?;
225            *self.replicas.write().unwrap() = replicas;
226            if let Some(replica) = self.primary_replica() {
227                return Ok(replica);
228            }
229            Self::wait_for_primary_replica_selection().await;
230        }
231        Err(Self::unable_to_connect_error(&self.server_connections))
232    }
233
234    fn unable_to_connect_error(server_connections: &HashMap<Address, ServerConnection>) -> Error {
235        Error::Connection(ConnectionError::ServerConnectionFailed {
236            addresses: server_connections.keys().map(Address::clone).collect_vec(),
237        })
238    }
239
240    fn primary_replica(&self) -> Option<Replica> {
241        self.replicas.read().unwrap().iter().filter(|r| r.is_primary).max_by_key(|r| r.term).cloned()
242    }
243
244    fn preferred_replica(&self) -> Option<Replica> {
245        self.replicas.read().unwrap().iter().filter(|r| r.is_preferred).max_by_key(|r| r.term).cloned()
246    }
247
248    #[cfg(feature = "sync")]
249    fn wait_for_primary_replica_selection() {
250        sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION);
251    }
252
253    #[cfg(not(feature = "sync"))]
254    async fn wait_for_primary_replica_selection() {
255        tokio::time::sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION).await
256    }
257}
258
259impl fmt::Debug for Database {
260    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261        f.debug_struct("Database").field("name", &self.name).field("replicas", &self.replicas).finish()
262    }
263}
264
265/// The metadata and state of an individual raft replica of a database.
266#[derive(Clone)]
267pub(super) struct Replica {
268    /// The server hosting this replica
269    server: Address,
270    /// Retrieves the database name for which this is a replica
271    database_name: String,
272    /// Checks whether this is the primary replica of the raft cluster.
273    is_primary: bool,
274    /// The raft protocol ‘term’ of this replica.
275    term: i64,
276    /// Checks whether this is the preferred replica of the raft cluster. If true, Operations which can be run on any replica will prefer to use this replica.
277    is_preferred: bool,
278    /// Retrieves the database for which this is a replica
279    database: ServerDatabase,
280}
281
282impl Replica {
283    fn new(name: String, metadata: ReplicaInfo, server_connection: ServerConnection) -> Self {
284        Self {
285            server: metadata.server,
286            database_name: name.clone(),
287            is_primary: metadata.is_primary,
288            term: metadata.term,
289            is_preferred: metadata.is_preferred,
290            database: ServerDatabase::new(name, server_connection),
291        }
292    }
293
294    fn try_from_info(
295        database_info: DatabaseInfo,
296        server_connections: &HashMap<Address, ServerConnection>,
297    ) -> Result<Vec<Self>> {
298        database_info
299            .replicas
300            .into_iter()
301            .map(|replica| {
302                if server_connections.len() == 1 {
303                    Ok(Self::new(
304                        database_info.name.clone(),
305                        replica,
306                        server_connections.values().next().unwrap().clone(),
307                    ))
308                } else {
309                    // TODO: actually check the advertised == provided, if that is the strategy we want
310                    let server_connection = server_connections
311                        .get(&replica.server)
312                        .ok_or_else(|| InternalError::UnknownServer { server: replica.server.clone() })?;
313                    Ok(Self::new(database_info.name.clone(), replica, server_connection.clone()))
314                }
315            })
316            .collect()
317    }
318
319    fn to_info(&self) -> ReplicaInfo {
320        ReplicaInfo {
321            server: self.server.clone(),
322            is_primary: self.is_primary,
323            is_preferred: self.is_preferred,
324            term: self.term,
325        }
326    }
327
328    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
329    async fn fetch_all(name: String, server_connections: &HashMap<Address, ServerConnection>) -> Result<Vec<Self>> {
330        for (server, server_connection) in server_connections {
331            let res = server_connection.get_database_replicas(name.clone()).await;
332            match res {
333                Ok(info) => {
334                    return Self::try_from_info(info, server_connections);
335                }
336                Err(Error::Connection(
337                    ConnectionError::DatabaseNotFound { .. }
338                    | ConnectionError::ServerConnectionFailedStatusError { .. }
339                    | ConnectionError::ConnectionFailed,
340                )) => {
341                    error!(
342                        "Failed to fetch replica info for database '{}' from {}. Attempting next server.",
343                        name, server
344                    );
345                }
346                Err(err) => return Err(err),
347            }
348        }
349        Err(Database::unable_to_connect_error(server_connections))
350    }
351}
352
353impl fmt::Debug for Replica {
354    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355        f.debug_struct("Replica")
356            .field("server", &self.server)
357            .field("database_name", &self.database_name)
358            .field("is_primary", &self.is_primary)
359            .field("term", &self.term)
360            .field("is_preferred", &self.is_preferred)
361            .finish()
362    }
363}
364
365#[derive(Clone, Debug)]
366pub(crate) struct ServerDatabase {
367    name: String,
368    connection: ServerConnection,
369}
370
371impl ServerDatabase {
372    fn new(name: String, connection: ServerConnection) -> Self {
373        Self { name, connection }
374    }
375
376    pub fn name(&self) -> &str {
377        self.name.as_str()
378    }
379
380    pub(crate) fn connection(&self) -> &ServerConnection {
381        &self.connection
382    }
383
384    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
385    async fn delete(self) -> Result {
386        self.connection.delete_database(self.name).await
387    }
388
389    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
390    async fn schema(&self) -> Result<String> {
391        self.connection.database_schema(self.name.clone()).await
392    }
393
394    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
395    async fn type_schema(&self) -> Result<String> {
396        self.connection.database_type_schema(self.name.clone()).await
397    }
398}
399
400impl fmt::Display for ServerDatabase {
401    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
402        write!(f, "{}", self.name)
403    }
404}