typedb_driver/database/
database.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#[cfg(not(feature = "sync"))]
21use std::future::Future;
22use std::{
23    collections::HashMap,
24    fmt,
25    fs::File,
26    io::{BufWriter, Write},
27    path::Path,
28    sync::{Arc, RwLock},
29    thread::sleep,
30    time::Duration,
31};
32
33use itertools::Itertools;
34use prost::Message;
35use tracing::{debug, error};
36
37use crate::{
38    common::{
39        address::Address,
40        error::ConnectionError,
41        info::{DatabaseInfo, ReplicaInfo},
42        Error, Result,
43    },
44    connection::{database::export_stream::DatabaseExportStream, server_connection::ServerConnection},
45    database::migration::{try_create_export_file, try_open_existing_export_file, DatabaseExportAnswer},
46    driver::TypeDBDriver,
47    error::{InternalError, MigrationError},
48    resolve, Transaction, TransactionOptions, TransactionType,
49};
50
51/// A TypeDB database
52pub struct Database {
53    name: String,
54    replicas: RwLock<Vec<Replica>>,
55    server_connections: HashMap<Address, ServerConnection>,
56}
57
58impl Database {
59    const PRIMARY_REPLICA_TASK_MAX_RETRIES: usize = 10;
60    const FETCH_REPLICAS_MAX_RETRIES: usize = 10;
61    const WAIT_FOR_PRIMARY_REPLICA_SELECTION: Duration = Duration::from_secs(2);
62
63    pub(super) fn new(
64        database_info: DatabaseInfo,
65        server_connections: HashMap<Address, ServerConnection>,
66    ) -> Result<Self> {
67        let name = database_info.name.clone();
68        let replicas = RwLock::new(Replica::try_from_info(database_info, &server_connections)?);
69        Ok(Self { name, replicas, server_connections })
70    }
71
72    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
73    pub(super) async fn get(name: String, server_connections: HashMap<Address, ServerConnection>) -> Result<Self> {
74        Ok(Self {
75            name: name.clone(),
76            replicas: RwLock::new(Replica::fetch_all(name, &server_connections).await?),
77            server_connections,
78        })
79    }
80
81    /// Retrieves the database name as a string.
82    pub fn name(&self) -> &str {
83        self.name.as_str()
84    }
85
86    /// Returns the `Replica` instances for this database.
87    /// _Only works in TypeDB Cloud / Enterprise_
88    ///
89    /// # Examples
90    ///
91    /// ```rust
92    /// database.replicas_info()
93    /// ```
94    pub fn replicas_info(&self) -> Vec<ReplicaInfo> {
95        self.replicas.read().unwrap().iter().map(Replica::to_info).collect()
96    }
97
98    /// Returns the primary replica for this database.
99    /// _Only works in TypeDB Cloud / Enterprise_
100    ///
101    /// # Examples
102    ///
103    /// ```rust
104    /// database.primary_replica_info()
105    /// ```
106    pub fn primary_replica_info(&self) -> Option<ReplicaInfo> {
107        self.primary_replica().map(|replica| replica.to_info())
108    }
109
110    /// Returns the preferred replica for this database.
111    /// Operations which can be run on any replica will prefer to use this replica.
112    /// _Only works in TypeDB Cloud / Enterprise_
113    ///
114    /// # Examples
115    ///
116    /// ```rust
117    /// database.preferred_replica_info();
118    /// ```
119    pub fn preferred_replica_info(&self) -> Option<ReplicaInfo> {
120        self.preferred_replica().map(|replica| replica.to_info())
121    }
122
123    /// Deletes this database.
124    ///
125    /// # Examples
126    ///
127    /// ```rust
128    #[cfg_attr(feature = "sync", doc = "database.delete();")]
129    #[cfg_attr(not(feature = "sync"), doc = "database.delete().await;")]
130    /// ```
131    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
132    pub async fn delete(self: Arc<Self>) -> Result {
133        self.run_on_primary_replica(|database| database.delete()).await
134    }
135
136    /// Returns a full schema text as a valid TypeQL define query string.
137    ///
138    /// # Examples
139    ///
140    /// ```rust
141    #[cfg_attr(feature = "sync", doc = "database.schema();")]
142    #[cfg_attr(not(feature = "sync"), doc = "database.schema().await;")]
143    /// ```
144    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
145    pub async fn schema(&self) -> Result<String> {
146        self.run_failsafe(|database| async move { database.schema().await }).await
147    }
148
149    /// Returns the types in the schema as a valid TypeQL define query string.
150    ///
151    /// # Examples
152    ///
153    /// ```rust
154    #[cfg_attr(feature = "sync", doc = "database.type_schema();")]
155    #[cfg_attr(not(feature = "sync"), doc = "database.type_schema().await;")]
156    /// ```
157    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
158    pub async fn type_schema(&self) -> Result<String> {
159        self.run_failsafe(|database| async move { database.type_schema().await }).await
160    }
161
162    /// Export a database into a schema definition and a data files saved to the disk.
163    /// This is a blocking operation and may take a significant amount of time depending on the database size.
164    ///
165    /// # Arguments
166    ///
167    /// * `schema_file_path` — The path to the schema definition file to be created
168    /// * `data_file_path` — The path to the data file to be created
169    ///
170    /// # Examples
171    ///
172    /// ```rust
173    #[cfg_attr(feature = "sync", doc = "database.export_to_file(schema_path, data_path);")]
174    #[cfg_attr(not(feature = "sync"), doc = "database.export_to_file(schema_path, data_path).await;")]
175    /// ```
176    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
177    pub async fn export_to_file(&self, schema_file_path: impl AsRef<Path>, data_file_path: impl AsRef<Path>) -> Result {
178        let schema_file_path = schema_file_path.as_ref();
179        let data_file_path = data_file_path.as_ref();
180        if schema_file_path == data_file_path {
181            return Err(Error::Migration(MigrationError::CannotExportToTheSameFile));
182        }
183
184        let _ = try_create_export_file(schema_file_path)?;
185        if let Err(err) = try_create_export_file(data_file_path) {
186            let _ = std::fs::remove_file(schema_file_path);
187            return Err(err);
188        }
189
190        let result = self
191            .run_failsafe(|database| async move {
192                // File opening should be idempotent for multiple function invocations
193                let schema_file = try_open_existing_export_file(schema_file_path)?;
194                let data_file = try_open_existing_export_file(data_file_path)?;
195                database.export_to_file(schema_file, data_file).await
196            })
197            .await;
198
199        if result.is_err() {
200            let _ = std::fs::remove_file(schema_file_path);
201            let _ = std::fs::remove_file(data_file_path);
202        }
203        result
204    }
205
206    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
207    pub(crate) async fn run_failsafe<F, P, R>(&self, task: F) -> Result<R>
208    where
209        F: Fn(ServerDatabase) -> P,
210        P: Future<Output = Result<R>>,
211    {
212        match self.run_on_any_replica(&task).await {
213            Err(Error::Connection(ConnectionError::ClusterReplicaNotPrimary)) => {
214                debug!("Attempted to run on a non-primary replica, retrying on primary...");
215                self.run_on_primary_replica(&task).await
216            }
217            res => res,
218        }
219    }
220
221    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
222    pub(super) async fn run_on_any_replica<F, P, R>(&self, task: F) -> Result<R>
223    where
224        F: Fn(ServerDatabase) -> P,
225        P: Future<Output = Result<R>>,
226    {
227        let replicas = self.replicas.read().unwrap().clone();
228        for replica in replicas {
229            match task(replica.database.clone()).await {
230                Err(Error::Connection(
231                    ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
232                )) => {
233                    debug!("Unable to connect to {}. Attempting next server.", replica.server);
234                }
235                res => return res,
236            }
237        }
238        Err(Self::unable_to_connect_error(&self.server_connections))
239    }
240
241    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
242    pub(super) async fn run_on_primary_replica<F, P, R>(&self, task: F) -> Result<R>
243    where
244        F: Fn(ServerDatabase) -> P,
245        P: Future<Output = Result<R>>,
246    {
247        let mut primary_replica =
248            if let Some(replica) = self.primary_replica() { replica } else { self.seek_primary_replica().await? };
249
250        for _ in 0..Self::PRIMARY_REPLICA_TASK_MAX_RETRIES {
251            match task(primary_replica.database.clone()).await {
252                Err(Error::Connection(
253                    ConnectionError::ClusterReplicaNotPrimary
254                    | ConnectionError::ServerConnectionFailedStatusError { .. }
255                    | ConnectionError::ConnectionFailed,
256                )) => {
257                    debug!("Primary replica error, waiting...");
258                    Self::wait_for_primary_replica_selection().await;
259                    primary_replica = self.seek_primary_replica().await?;
260                }
261                res => return res,
262            }
263        }
264        Err(Self::unable_to_connect_error(&self.server_connections))
265    }
266
267    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
268    async fn seek_primary_replica(&self) -> Result<Replica> {
269        for _ in 0..Self::FETCH_REPLICAS_MAX_RETRIES {
270            let replicas = Replica::fetch_all(self.name.clone(), &self.server_connections).await?;
271            *self.replicas.write().unwrap() = replicas;
272            if let Some(replica) = self.primary_replica() {
273                return Ok(replica);
274            }
275            Self::wait_for_primary_replica_selection().await;
276        }
277        Err(Self::unable_to_connect_error(&self.server_connections))
278    }
279
280    fn unable_to_connect_error(server_connections: &HashMap<Address, ServerConnection>) -> Error {
281        Error::Connection(ConnectionError::ServerConnectionFailed {
282            addresses: server_connections.keys().map(Address::clone).collect_vec(),
283        })
284    }
285
286    fn primary_replica(&self) -> Option<Replica> {
287        self.replicas.read().unwrap().iter().filter(|r| r.is_primary).max_by_key(|r| r.term).cloned()
288    }
289
290    fn preferred_replica(&self) -> Option<Replica> {
291        self.replicas.read().unwrap().iter().filter(|r| r.is_preferred).max_by_key(|r| r.term).cloned()
292    }
293
294    #[cfg(feature = "sync")]
295    fn wait_for_primary_replica_selection() {
296        sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION);
297    }
298
299    #[cfg(not(feature = "sync"))]
300    async fn wait_for_primary_replica_selection() {
301        tokio::time::sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION).await
302    }
303}
304
305impl fmt::Debug for Database {
306    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
307        f.debug_struct("Database").field("name", &self.name).field("replicas", &self.replicas).finish()
308    }
309}
310
311/// The metadata and state of an individual raft replica of a database.
312#[derive(Clone)]
313pub(super) struct Replica {
314    /// The server hosting this replica
315    server: Address,
316    /// Retrieves the database name for which this is a replica
317    database_name: String,
318    /// Checks whether this is the primary replica of the raft cluster.
319    is_primary: bool,
320    /// The raft protocol ‘term’ of this replica.
321    term: i64,
322    /// Checks whether this is the preferred replica of the raft cluster. If true, Operations which can be run on any replica will prefer to use this replica.
323    is_preferred: bool,
324    /// Retrieves the database for which this is a replica
325    database: ServerDatabase,
326}
327
328impl Replica {
329    fn new(name: String, metadata: ReplicaInfo, server_connection: ServerConnection) -> Self {
330        Self {
331            server: metadata.server,
332            database_name: name.clone(),
333            is_primary: metadata.is_primary,
334            term: metadata.term,
335            is_preferred: metadata.is_preferred,
336            database: ServerDatabase::new(name, server_connection),
337        }
338    }
339
340    fn try_from_info(
341        database_info: DatabaseInfo,
342        server_connections: &HashMap<Address, ServerConnection>,
343    ) -> Result<Vec<Self>> {
344        database_info
345            .replicas
346            .into_iter()
347            .map(|replica| {
348                if server_connections.len() == 1 {
349                    Ok(Self::new(
350                        database_info.name.clone(),
351                        replica,
352                        server_connections.values().next().unwrap().clone(),
353                    ))
354                } else {
355                    // TODO: actually check the advertised == provided, if that is the strategy we want
356                    let server_connection = server_connections
357                        .get(&replica.server)
358                        .ok_or_else(|| InternalError::UnknownServer { server: replica.server.clone() })?;
359                    Ok(Self::new(database_info.name.clone(), replica, server_connection.clone()))
360                }
361            })
362            .collect()
363    }
364
365    fn to_info(&self) -> ReplicaInfo {
366        ReplicaInfo {
367            server: self.server.clone(),
368            is_primary: self.is_primary,
369            is_preferred: self.is_preferred,
370            term: self.term,
371        }
372    }
373
374    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
375    async fn fetch_all(name: String, server_connections: &HashMap<Address, ServerConnection>) -> Result<Vec<Self>> {
376        for (server, server_connection) in server_connections {
377            let res = server_connection.get_database_replicas(name.clone()).await;
378            match res {
379                Ok(info) => {
380                    return Self::try_from_info(info, server_connections);
381                }
382                Err(Error::Connection(
383                    ConnectionError::DatabaseNotFound { .. }
384                    | ConnectionError::ServerConnectionFailedStatusError { .. }
385                    | ConnectionError::ConnectionFailed,
386                )) => {
387                    error!(
388                        "Failed to fetch replica info for database '{}' from {}. Attempting next server.",
389                        name, server
390                    );
391                }
392                Err(err) => return Err(err),
393            }
394        }
395        Err(Database::unable_to_connect_error(server_connections))
396    }
397}
398
399impl fmt::Debug for Replica {
400    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
401        f.debug_struct("Replica")
402            .field("server", &self.server)
403            .field("database_name", &self.database_name)
404            .field("is_primary", &self.is_primary)
405            .field("term", &self.term)
406            .field("is_preferred", &self.is_preferred)
407            .finish()
408    }
409}
410
411#[derive(Clone, Debug)]
412pub(crate) struct ServerDatabase {
413    name: String,
414    connection: ServerConnection,
415}
416
417impl ServerDatabase {
418    fn new(name: String, connection: ServerConnection) -> Self {
419        Self { name, connection }
420    }
421
422    pub fn name(&self) -> &str {
423        self.name.as_str()
424    }
425
426    pub(crate) fn connection(&self) -> &ServerConnection {
427        &self.connection
428    }
429
430    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
431    async fn delete(self) -> Result {
432        self.connection.delete_database(self.name).await
433    }
434
435    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
436    async fn schema(&self) -> Result<String> {
437        self.connection.database_schema(self.name.clone()).await
438    }
439
440    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
441    async fn type_schema(&self) -> Result<String> {
442        self.connection.database_type_schema(self.name.clone()).await
443    }
444
445    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
446    async fn export_to_file(&self, mut schema_file: File, data_file: File) -> Result {
447        let mut export_stream = self.connection.database_export(self.name.clone()).await?;
448        let mut data_writer = BufWriter::new(data_file);
449
450        loop {
451            match resolve!(export_stream.next())? {
452                DatabaseExportAnswer::Done => break,
453                DatabaseExportAnswer::Schema(schema) => {
454                    schema_file.write_all(schema.as_bytes())?;
455                    schema_file.flush()?;
456                }
457                DatabaseExportAnswer::Items(items) => {
458                    for item in items {
459                        let mut buf = Vec::new();
460                        item.encode_length_delimited(&mut buf)
461                            .map_err(|_| Error::Migration(MigrationError::CannotEncodeExportedConcept))?;
462                        data_writer.write_all(&buf)?;
463                    }
464                }
465            }
466        }
467
468        data_writer.flush()?;
469        Ok(())
470    }
471}
472
473impl fmt::Display for ServerDatabase {
474    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475        write!(f, "{}", self.name)
476    }
477}