typedb_driver/database/
database.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#[cfg(not(feature = "sync"))]
21use std::future::Future;
22use std::{
23    collections::HashMap,
24    fmt,
25    fs::File,
26    io::{BufWriter, Write},
27    path::Path,
28    sync::{
29        atomic::{AtomicU64, Ordering},
30        Arc, RwLock,
31    },
32    thread::sleep,
33    time::Duration,
34};
35
36use itertools::Itertools;
37use log::{debug, error};
38use prost::Message;
39
40use crate::{
41    common::{
42        address::Address,
43        error::ConnectionError,
44        info::{DatabaseInfo, ReplicaInfo},
45        Error, Result,
46    },
47    connection::{database::export_stream::DatabaseExportStream, server_connection::ServerConnection},
48    database::migration::{try_create_export_file, try_open_existing_export_file, DatabaseExportAnswer},
49    driver::TypeDBDriver,
50    error::{InternalError, MigrationError},
51    resolve, Transaction, TransactionOptions, TransactionType,
52};
53
54/// A TypeDB database
55pub struct Database {
56    name: String,
57    replicas: RwLock<Vec<Replica>>,
58    server_connections: HashMap<Address, ServerConnection>,
59}
60
61impl Database {
62    const PRIMARY_REPLICA_TASK_MAX_RETRIES: usize = 10;
63    const FETCH_REPLICAS_MAX_RETRIES: usize = 10;
64    const WAIT_FOR_PRIMARY_REPLICA_SELECTION: Duration = Duration::from_secs(2);
65
66    pub(super) fn new(
67        database_info: DatabaseInfo,
68        server_connections: HashMap<Address, ServerConnection>,
69    ) -> Result<Self> {
70        let name = database_info.name.clone();
71        let replicas = RwLock::new(Replica::try_from_info(database_info, &server_connections)?);
72        Ok(Self { name, replicas, server_connections })
73    }
74
75    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
76    pub(super) async fn get(name: String, server_connections: HashMap<Address, ServerConnection>) -> Result<Self> {
77        Ok(Self {
78            name: name.clone(),
79            replicas: RwLock::new(Replica::fetch_all(name, &server_connections).await?),
80            server_connections,
81        })
82    }
83
84    /// Retrieves the database name as a string.
85    pub fn name(&self) -> &str {
86        self.name.as_str()
87    }
88
89    /// Returns the `Replica` instances for this database.
90    /// _Only works in TypeDB Cloud / Enterprise_
91    ///
92    /// # Examples
93    ///
94    /// ```rust
95    /// database.replicas_info()
96    /// ```
97    pub fn replicas_info(&self) -> Vec<ReplicaInfo> {
98        self.replicas.read().unwrap().iter().map(Replica::to_info).collect()
99    }
100
101    /// Returns the primary replica for this database.
102    /// _Only works in TypeDB Cloud / Enterprise_
103    ///
104    /// # Examples
105    ///
106    /// ```rust
107    /// database.primary_replica_info()
108    /// ```
109    pub fn primary_replica_info(&self) -> Option<ReplicaInfo> {
110        self.primary_replica().map(|replica| replica.to_info())
111    }
112
113    /// Returns the preferred replica for this database.
114    /// Operations which can be run on any replica will prefer to use this replica.
115    /// _Only works in TypeDB Cloud / Enterprise_
116    ///
117    /// # Examples
118    ///
119    /// ```rust
120    /// database.preferred_replica_info();
121    /// ```
122    pub fn preferred_replica_info(&self) -> Option<ReplicaInfo> {
123        self.preferred_replica().map(|replica| replica.to_info())
124    }
125
126    /// Deletes this database.
127    ///
128    /// # Examples
129    ///
130    /// ```rust
131    #[cfg_attr(feature = "sync", doc = "database.delete();")]
132    #[cfg_attr(not(feature = "sync"), doc = "database.delete().await;")]
133    /// ```
134    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
135    pub async fn delete(self: Arc<Self>) -> Result {
136        self.run_on_primary_replica(|database| database.delete()).await
137    }
138
139    /// Returns a full schema text as a valid TypeQL define query string.
140    ///
141    /// # Examples
142    ///
143    /// ```rust
144    #[cfg_attr(feature = "sync", doc = "database.schema();")]
145    #[cfg_attr(not(feature = "sync"), doc = "database.schema().await;")]
146    /// ```
147    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
148    pub async fn schema(&self) -> Result<String> {
149        self.run_failsafe(|database| async move { database.schema().await }).await
150    }
151
152    /// Returns the types in the schema as a valid TypeQL define query string.
153    ///
154    /// # Examples
155    ///
156    /// ```rust
157    #[cfg_attr(feature = "sync", doc = "database.type_schema();")]
158    #[cfg_attr(not(feature = "sync"), doc = "database.type_schema().await;")]
159    /// ```
160    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
161    pub async fn type_schema(&self) -> Result<String> {
162        self.run_failsafe(|database| async move { database.type_schema().await }).await
163    }
164
165    /// Export a database into a schema definition and a data files saved to the disk.
166    /// This is a blocking operation and may take a significant amount of time depending on the database size.
167    ///
168    /// # Arguments
169    ///
170    /// * `schema_file_path` — The path to the schema definition file to be created
171    /// * `data_file_path` — The path to the data file to be created
172    ///
173    /// # Examples
174    ///
175    /// ```rust
176    #[cfg_attr(feature = "sync", doc = "database.export_to_file(schema_path, data_path);")]
177    #[cfg_attr(not(feature = "sync"), doc = "database.export_to_file(schema_path, data_path).await;")]
178    /// ```
179    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
180    pub async fn export_to_file(&self, schema_file_path: impl AsRef<Path>, data_file_path: impl AsRef<Path>) -> Result {
181        let schema_file_path = schema_file_path.as_ref();
182        let data_file_path = data_file_path.as_ref();
183        if schema_file_path == data_file_path {
184            return Err(Error::Migration(MigrationError::CannotExportToTheSameFile));
185        }
186
187        let _ = try_create_export_file(schema_file_path)?;
188        if let Err(err) = try_create_export_file(data_file_path) {
189            let _ = std::fs::remove_file(schema_file_path);
190            return Err(err);
191        }
192
193        let result = self
194            .run_failsafe(|database| async move {
195                // File opening should be idempotent for multiple function invocations
196                let schema_file = try_open_existing_export_file(schema_file_path)?;
197                let data_file = try_open_existing_export_file(data_file_path)?;
198                database.export_to_file(schema_file, data_file).await
199            })
200            .await;
201
202        if result.is_err() {
203            let _ = std::fs::remove_file(schema_file_path);
204            let _ = std::fs::remove_file(data_file_path);
205        }
206        result
207    }
208
209    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
210    pub(crate) async fn run_failsafe<F, P, R>(&self, task: F) -> Result<R>
211    where
212        F: Fn(ServerDatabase) -> P,
213        P: Future<Output = Result<R>>,
214    {
215        match self.run_on_any_replica(&task).await {
216            Err(Error::Connection(ConnectionError::ClusterReplicaNotPrimary)) => {
217                debug!("Attempted to run on a non-primary replica, retrying on primary...");
218                self.run_on_primary_replica(&task).await
219            }
220            res => res,
221        }
222    }
223
224    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
225    pub(super) async fn run_on_any_replica<F, P, R>(&self, task: F) -> Result<R>
226    where
227        F: Fn(ServerDatabase) -> P,
228        P: Future<Output = Result<R>>,
229    {
230        let replicas = self.replicas.read().unwrap().clone();
231        for replica in replicas {
232            match task(replica.database.clone()).await {
233                Err(Error::Connection(
234                    ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
235                )) => {
236                    debug!("Unable to connect to {}. Attempting next server.", replica.server);
237                }
238                res => return res,
239            }
240        }
241        Err(Self::unable_to_connect_error(&self.server_connections))
242    }
243
244    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
245    pub(super) async fn run_on_primary_replica<F, P, R>(&self, task: F) -> Result<R>
246    where
247        F: Fn(ServerDatabase) -> P,
248        P: Future<Output = Result<R>>,
249    {
250        let mut primary_replica =
251            if let Some(replica) = self.primary_replica() { replica } else { self.seek_primary_replica().await? };
252
253        for _ in 0..Self::PRIMARY_REPLICA_TASK_MAX_RETRIES {
254            match task(primary_replica.database.clone()).await {
255                Err(Error::Connection(
256                    ConnectionError::ClusterReplicaNotPrimary
257                    | ConnectionError::ServerConnectionFailedStatusError { .. }
258                    | ConnectionError::ConnectionFailed,
259                )) => {
260                    debug!("Primary replica error, waiting...");
261                    Self::wait_for_primary_replica_selection().await;
262                    primary_replica = self.seek_primary_replica().await?;
263                }
264                res => return res,
265            }
266        }
267        Err(Self::unable_to_connect_error(&self.server_connections))
268    }
269
270    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
271    async fn seek_primary_replica(&self) -> Result<Replica> {
272        for _ in 0..Self::FETCH_REPLICAS_MAX_RETRIES {
273            let replicas = Replica::fetch_all(self.name.clone(), &self.server_connections).await?;
274            *self.replicas.write().unwrap() = replicas;
275            if let Some(replica) = self.primary_replica() {
276                return Ok(replica);
277            }
278            Self::wait_for_primary_replica_selection().await;
279        }
280        Err(Self::unable_to_connect_error(&self.server_connections))
281    }
282
283    fn unable_to_connect_error(server_connections: &HashMap<Address, ServerConnection>) -> Error {
284        Error::Connection(ConnectionError::ServerConnectionFailed {
285            addresses: server_connections.keys().map(Address::clone).collect_vec(),
286        })
287    }
288
289    fn primary_replica(&self) -> Option<Replica> {
290        self.replicas.read().unwrap().iter().filter(|r| r.is_primary).max_by_key(|r| r.term).cloned()
291    }
292
293    fn preferred_replica(&self) -> Option<Replica> {
294        self.replicas.read().unwrap().iter().filter(|r| r.is_preferred).max_by_key(|r| r.term).cloned()
295    }
296
297    #[cfg(feature = "sync")]
298    fn wait_for_primary_replica_selection() {
299        sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION);
300    }
301
302    #[cfg(not(feature = "sync"))]
303    async fn wait_for_primary_replica_selection() {
304        tokio::time::sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION).await
305    }
306}
307
308impl fmt::Debug for Database {
309    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310        f.debug_struct("Database").field("name", &self.name).field("replicas", &self.replicas).finish()
311    }
312}
313
314/// The metadata and state of an individual raft replica of a database.
315#[derive(Clone)]
316pub(super) struct Replica {
317    /// The server hosting this replica
318    server: Address,
319    /// Retrieves the database name for which this is a replica
320    database_name: String,
321    /// Checks whether this is the primary replica of the raft cluster.
322    is_primary: bool,
323    /// The raft protocol ‘term’ of this replica.
324    term: i64,
325    /// Checks whether this is the preferred replica of the raft cluster. If true, Operations which can be run on any replica will prefer to use this replica.
326    is_preferred: bool,
327    /// Retrieves the database for which this is a replica
328    database: ServerDatabase,
329}
330
331impl Replica {
332    fn new(name: String, metadata: ReplicaInfo, server_connection: ServerConnection) -> Self {
333        Self {
334            server: metadata.server,
335            database_name: name.clone(),
336            is_primary: metadata.is_primary,
337            term: metadata.term,
338            is_preferred: metadata.is_preferred,
339            database: ServerDatabase::new(name, server_connection),
340        }
341    }
342
343    fn try_from_info(
344        database_info: DatabaseInfo,
345        server_connections: &HashMap<Address, ServerConnection>,
346    ) -> Result<Vec<Self>> {
347        database_info
348            .replicas
349            .into_iter()
350            .map(|replica| {
351                if server_connections.len() == 1 {
352                    Ok(Self::new(
353                        database_info.name.clone(),
354                        replica,
355                        server_connections.values().next().unwrap().clone(),
356                    ))
357                } else {
358                    // TODO: actually check the advertised == provided, if that is the strategy we want
359                    let server_connection = server_connections
360                        .get(&replica.server)
361                        .ok_or_else(|| InternalError::UnknownServer { server: replica.server.clone() })?;
362                    Ok(Self::new(database_info.name.clone(), replica, server_connection.clone()))
363                }
364            })
365            .collect()
366    }
367
368    fn to_info(&self) -> ReplicaInfo {
369        ReplicaInfo {
370            server: self.server.clone(),
371            is_primary: self.is_primary,
372            is_preferred: self.is_preferred,
373            term: self.term,
374        }
375    }
376
377    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
378    async fn fetch_all(name: String, server_connections: &HashMap<Address, ServerConnection>) -> Result<Vec<Self>> {
379        for (server, server_connection) in server_connections {
380            let res = server_connection.get_database_replicas(name.clone()).await;
381            match res {
382                Ok(info) => {
383                    return Self::try_from_info(info, server_connections);
384                }
385                Err(Error::Connection(
386                    ConnectionError::DatabaseNotFound { .. }
387                    | ConnectionError::ServerConnectionFailedStatusError { .. }
388                    | ConnectionError::ConnectionFailed,
389                )) => {
390                    error!(
391                        "Failed to fetch replica info for database '{}' from {}. Attempting next server.",
392                        name, server
393                    );
394                }
395                Err(err) => return Err(err),
396            }
397        }
398        Err(Database::unable_to_connect_error(server_connections))
399    }
400}
401
402impl fmt::Debug for Replica {
403    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404        f.debug_struct("Replica")
405            .field("server", &self.server)
406            .field("database_name", &self.database_name)
407            .field("is_primary", &self.is_primary)
408            .field("term", &self.term)
409            .field("is_preferred", &self.is_preferred)
410            .finish()
411    }
412}
413
414#[derive(Clone, Debug)]
415pub(crate) struct ServerDatabase {
416    name: String,
417    connection: ServerConnection,
418}
419
420impl ServerDatabase {
421    fn new(name: String, connection: ServerConnection) -> Self {
422        Self { name, connection }
423    }
424
425    pub fn name(&self) -> &str {
426        self.name.as_str()
427    }
428
429    pub(crate) fn connection(&self) -> &ServerConnection {
430        &self.connection
431    }
432
433    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
434    async fn delete(self) -> Result {
435        self.connection.delete_database(self.name).await
436    }
437
438    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
439    async fn schema(&self) -> Result<String> {
440        self.connection.database_schema(self.name.clone()).await
441    }
442
443    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
444    async fn type_schema(&self) -> Result<String> {
445        self.connection.database_type_schema(self.name.clone()).await
446    }
447
448    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
449    async fn export_to_file(&self, mut schema_file: File, data_file: File) -> Result {
450        let mut export_stream = self.connection.database_export(self.name.clone()).await?;
451        let mut data_writer = BufWriter::new(data_file);
452
453        loop {
454            match resolve!(export_stream.next())? {
455                DatabaseExportAnswer::Done => break,
456                DatabaseExportAnswer::Schema(schema) => {
457                    schema_file.write_all(schema.as_bytes())?;
458                    schema_file.flush()?;
459                }
460                DatabaseExportAnswer::Items(items) => {
461                    for item in items {
462                        let mut buf = Vec::new();
463                        item.encode_length_delimited(&mut buf)
464                            .map_err(|_| Error::Migration(MigrationError::CannotEncodeExportedConcept))?;
465                        data_writer.write_all(&buf)?;
466                    }
467                }
468            }
469        }
470
471        data_writer.flush()?;
472        Ok(())
473    }
474}
475
476impl fmt::Display for ServerDatabase {
477    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
478        write!(f, "{}", self.name)
479    }
480}