Skip to main content

typedb_driver/
driver.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20use std::{
21    collections::{HashMap, HashSet},
22    fmt,
23    sync::Arc,
24};
25
26use itertools::Itertools;
27use tracing::{debug, error, info, trace};
28use tracing_subscriber::{fmt as tracing_fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
29
30use crate::{
31    common::{
32        address::Address,
33        error::{ConnectionError, Error},
34        Result,
35    },
36    connection::{runtime::BackgroundRuntime, server_connection::ServerConnection},
37    Credentials, DatabaseManager, DriverOptions, Transaction, TransactionOptions, TransactionType, UserManager,
38};
39
40/// A connection to a TypeDB server which serves as the starting point for all interaction.
41pub struct TypeDBDriver {
42    server_connections: HashMap<Address, ServerConnection>,
43    database_manager: DatabaseManager,
44    user_manager: UserManager,
45    background_runtime: Arc<BackgroundRuntime>,
46}
47
48impl TypeDBDriver {
49    const DRIVER_LANG: &'static str = "rust";
50    const VERSION: &'static str = match option_env!("CARGO_PKG_VERSION") {
51        None => "0.0.0",
52        Some(version) => version,
53    };
54
55    pub const DEFAULT_ADDRESS: &'static str = "localhost:1729";
56
57    /// Creates a new TypeDB Server connection.
58    ///
59    /// # Arguments
60    ///
61    /// * `address` — The address (host:port) on which the TypeDB Server is running
62    /// * `credentials` — The Credentials to connect with
63    /// * `driver_options` — The DriverOptions to connect with
64    ///
65    /// # Examples
66    ///
67    /// ```rust
68    #[cfg_attr(
69        feature = "sync",
70        doc = "TypeDBDriver::new(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None))"
71    )]
72    #[cfg_attr(
73        not(feature = "sync"),
74        doc = "TypeDBDriver::new(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None)).await"
75    )]
76    /// ```
77    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
78    pub async fn new(
79        address: impl AsRef<str>,
80        credentials: Credentials,
81        driver_options: DriverOptions,
82    ) -> Result<Self> {
83        debug!("Creating new TypeDB driver connection to {}", address.as_ref());
84        Self::new_with_description(address, credentials, driver_options, Self::DRIVER_LANG).await
85    }
86
87    /// Creates a new TypeDB Server connection with a description.
88    /// This method is generally used by TypeDB drivers built on top of the Rust driver.
89    /// In other cases, use [`Self::new`] instead.
90    ///
91    /// # Arguments
92    ///
93    /// * `address` — The address (host:port) on which the TypeDB Server is running
94    /// * `credentials` — The Credentials to connect with
95    /// * `driver_options` — The DriverOptions to connect with
96    /// * `driver_lang` — The language of the driver connecting to the server
97    ///
98    /// # Examples
99    ///
100    /// ```rust
101    #[cfg_attr(
102        feature = "sync",
103        doc = "TypeDBDriver::new_with_description(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None), \"rust\")"
104    )]
105    #[cfg_attr(
106        not(feature = "sync"),
107        doc = "TypeDBDriver::new_with_description(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None), \"rust\").await"
108    )]
109    /// ```
110    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
111    pub async fn new_with_description(
112        address: impl AsRef<str>,
113        credentials: Credentials,
114        driver_options: DriverOptions,
115        driver_lang: impl AsRef<str>,
116    ) -> Result<Self> {
117        debug!("Initializing TypeDB driver with description: {}", driver_lang.as_ref());
118        let id = address.as_ref().to_string();
119        let address: Address = id.parse()?;
120
121        let background_runtime = Arc::new(BackgroundRuntime::new()?);
122
123        debug!("Establishing server connection to {}", address);
124        let (server_connection, database_info) = ServerConnection::new(
125            background_runtime.clone(),
126            address.clone(),
127            credentials,
128            driver_options,
129            driver_lang.as_ref(),
130            Self::VERSION,
131        )
132        .await?;
133        debug!("Successfully connected to server at {}", address);
134
135        // // validate
136        // let advertised_address = server_connection
137        //     .servers_all()?
138        //     .into_iter()
139        //     .exactly_one()
140        //     .map_err(|e| ConnectionError::ServerConnectionFailedStatusError { error: e.to_string() })?;
141
142        // TODO: this solidifies the assumption that servers don't change
143        let server_connections: HashMap<Address, ServerConnection> = [(address, server_connection)].into();
144        let database_manager = DatabaseManager::new(server_connections.clone(), database_info)?;
145        let user_manager = UserManager::new(server_connections.clone());
146        debug!("Created database manager and user manager");
147
148        debug!("TypeDB driver initialization completed successfully");
149        Ok(Self { server_connections, database_manager, user_manager, background_runtime })
150    }
151
152    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
153    async fn fetch_server_list(
154        background_runtime: Arc<BackgroundRuntime>,
155        addresses: impl IntoIterator<Item = impl AsRef<str>> + Clone,
156        credentials: Credentials,
157        driver_options: DriverOptions,
158    ) -> Result<HashSet<Address>> {
159        let addresses: Vec<Address> = addresses.into_iter().map(|addr| addr.as_ref().parse()).try_collect()?;
160        for address in &addresses {
161            let server_connection = ServerConnection::new(
162                background_runtime.clone(),
163                address.clone(),
164                credentials.clone(),
165                driver_options.clone(),
166                Self::DRIVER_LANG,
167                Self::VERSION,
168            )
169            .await;
170            match server_connection {
171                Ok((server_connection, _)) => match server_connection.servers_all() {
172                    Ok(servers) => return Ok(servers.into_iter().collect()),
173                    Err(Error::Connection(
174                        ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
175                    )) => (),
176                    Err(err) => Err(err)?,
177                },
178                Err(Error::Connection(
179                    ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
180                )) => (),
181                Err(err) => Err(err)?,
182            }
183        }
184        Err(ConnectionError::ServerConnectionFailed { addresses }.into())
185    }
186
187    /// Checks it this connection is opened.
188    //
189    /// # Examples
190    ///
191    /// ```rust
192    /// driver.is_open()
193    /// ```
194    pub fn is_open(&self) -> bool {
195        self.background_runtime.is_open()
196    }
197
198    pub fn databases(&self) -> &DatabaseManager {
199        &self.database_manager
200    }
201
202    pub fn users(&self) -> &UserManager {
203        &self.user_manager
204    }
205
206    /// Opens a transaction with default options.
207    /// See [`TypeDBDriver::transaction_with_options`]
208    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
209    pub async fn transaction(
210        &self,
211        database_name: impl AsRef<str>,
212        transaction_type: TransactionType,
213    ) -> Result<Transaction> {
214        self.transaction_with_options(database_name, transaction_type, TransactionOptions::new()).await
215    }
216
217    /// Performs a TypeQL query in this transaction.
218    ///
219    /// # Arguments
220    ///
221    /// * `database_name` — The name of the database to connect to
222    /// * `transaction_type` — The TransactionType to open the transaction with
223    /// * `options` — The TransactionOptions to open the transaction with
224    ///
225    /// # Examples
226    ///
227    /// ```rust
228    /// transaction.transaction_with_options(database_name, transaction_type, options)
229    /// ```
230    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
231    pub async fn transaction_with_options(
232        &self,
233        database_name: impl AsRef<str>,
234        transaction_type: TransactionType,
235        options: TransactionOptions,
236    ) -> Result<Transaction> {
237        let database_name = database_name.as_ref();
238        debug!("Opening transaction for database: {} with type: {:?}", database_name, transaction_type);
239
240        let database = self.database_manager.get_cached_or_fetch(database_name).await?;
241        let transaction_stream = database
242            .run_failsafe(|database| async move {
243                let res = database.connection().open_transaction(database.name(), transaction_type, options).await;
244                res
245            })
246            .await?;
247
248        debug!("Successfully opened transaction for database: {}", database_name);
249        Ok(Transaction::new(transaction_stream))
250    }
251
252    /// Closes this connection if it is open.
253    ///
254    /// # Examples
255    ///
256    /// ```rust
257    /// driver.force_close()
258    /// ```
259    pub fn force_close(&self) -> Result {
260        if !self.is_open() {
261            return Ok(());
262        }
263
264        debug!("Closing TypeDB driver connection");
265        let result =
266            self.server_connections.values().map(ServerConnection::force_close).try_collect().map_err(Into::into);
267        let close_result = self.background_runtime.force_close().and(result);
268
269        match &close_result {
270            Ok(_) => debug!("Successfully closed TypeDB driver connection"),
271            Err(e) => error!("Failed to close TypeDB driver connection: {}", e),
272        }
273
274        close_result
275    }
276
277    pub(crate) fn server_count(&self) -> usize {
278        self.server_connections.len()
279    }
280
281    pub(crate) fn servers(&self) -> impl Iterator<Item = &Address> {
282        self.server_connections.keys()
283    }
284
285    pub(crate) fn connection(&self, id: &Address) -> Option<&ServerConnection> {
286        self.server_connections.get(id)
287    }
288
289    pub(crate) fn connections(&self) -> impl Iterator<Item = (&Address, &ServerConnection)> + '_ {
290        self.server_connections.iter()
291    }
292
293    pub(crate) fn unable_to_connect_error(&self) -> Error {
294        Error::Connection(ConnectionError::ServerConnectionFailed {
295            addresses: self.servers().map(Address::clone).collect_vec(),
296        })
297    }
298}
299
300impl fmt::Debug for TypeDBDriver {
301    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
302        f.debug_struct("Connection").field("server_connections", &self.server_connections).finish()
303    }
304}