typedb_driver/
driver.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20use std::{
21    collections::{HashMap, HashSet},
22    fmt,
23    sync::Arc,
24};
25
26use itertools::Itertools;
27use tracing::{debug, error, info, trace};
28use tracing_subscriber::{fmt as tracing_fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
29
30use crate::{
31    common::{
32        address::Address,
33        error::{ConnectionError, Error},
34        Result,
35    },
36    connection::{runtime::BackgroundRuntime, server_connection::ServerConnection},
37    Credentials, DatabaseManager, DriverOptions, Transaction, TransactionOptions, TransactionType, UserManager,
38};
39
40/// A connection to a TypeDB server which serves as the starting point for all interaction.
41pub struct TypeDBDriver {
42    server_connections: HashMap<Address, ServerConnection>,
43    database_manager: DatabaseManager,
44    user_manager: UserManager,
45    background_runtime: Arc<BackgroundRuntime>,
46}
47
48impl TypeDBDriver {
49    const DRIVER_LANG: &'static str = "rust";
50    const VERSION: &'static str = match option_env!("CARGO_PKG_VERSION") {
51        None => "0.0.0",
52        Some(version) => version,
53    };
54
55    pub const DEFAULT_ADDRESS: &'static str = "localhost:1729";
56
57    /// Initialize logging configuration for the TypeDB driver.
58    ///
59    /// This function sets up tracing with the following priority:
60    /// 1. TYPEDB_DRIVER_LOG environment variable (if set). Use TYPEDB_DRIVER_CLIB_LOG to see memory exchanges
61    /// 1.  environment variable (if set)
62    /// 2. RUST_LOG environment variable (if set)
63    /// 3. Default level (INFO)
64    ///
65    /// The logging is initialized only once using a static flag to prevent
66    /// multiple initializations in applications that create multiple drivers.
67    pub fn init_logging() {
68        use std::sync::Once;
69        static INIT: Once = Once::new();
70
71        INIT.call_once(|| {
72            let clib_level = if let Ok(typedb_driver_clib_log) = std::env::var("TYPEDB_DRIVER_CLIB_LOG") {
73                typedb_driver_clib_log
74            } else {
75                "info".to_owned()
76            };
77            // Try to get log level from TYPEDB_DRIVER_LOG first
78            let env_filter = if let Ok(typedb_log_level) = std::env::var("TYPEDB_DRIVER_LOG") {
79                EnvFilter::new(&format!("typedb_driver={},typedb_driver_clib={}", typedb_log_level, clib_level))
80            } else if let Ok(rust_log) = std::env::var("RUST_LOG") {
81                // If RUST_LOG is set, use it but scope it to typedb_driver only
82                EnvFilter::new(&format!("typedb_driver={},typedb_driver_clib={}", rust_log, clib_level))
83            } else {
84                EnvFilter::new(&format!("typedb_driver=info,typedb_driver_clib={}", clib_level))
85            };
86
87            // Initialize the tracing subscriber
88            if let Err(e) =
89                tracing_subscriber::registry().with(env_filter).with(tracing_fmt::layer().with_target(false)).try_init()
90            {
91                eprintln!("Failed to initialize logging: {}", e);
92            }
93        });
94    }
95
96    /// Creates a new TypeDB Server connection.
97    ///
98    /// # Arguments
99    ///
100    /// * `address` — The address (host:port) on which the TypeDB Server is running
101    /// * `credentials` — The Credentials to connect with
102    /// * `driver_options` — The DriverOptions to connect with
103    ///
104    /// # Examples
105    ///
106    /// ```rust
107    #[cfg_attr(
108        feature = "sync",
109        doc = "TypeDBDriver::new(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None))"
110    )]
111    #[cfg_attr(
112        not(feature = "sync"),
113        doc = "TypeDBDriver::new(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None)).await"
114    )]
115    /// ```
116    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
117    pub async fn new(
118        address: impl AsRef<str>,
119        credentials: Credentials,
120        driver_options: DriverOptions,
121    ) -> Result<Self> {
122        debug!("Creating new TypeDB driver connection to {}", address.as_ref());
123        Self::new_with_description(address, credentials, driver_options, Self::DRIVER_LANG).await
124    }
125
126    /// Creates a new TypeDB Server connection with a description.
127    /// This method is generally used by TypeDB drivers built on top of the Rust driver.
128    /// In other cases, use [`Self::new`] instead.
129    ///
130    /// # Arguments
131    ///
132    /// * `address` — The address (host:port) on which the TypeDB Server is running
133    /// * `credentials` — The Credentials to connect with
134    /// * `driver_options` — The DriverOptions to connect with
135    /// * `driver_lang` — The language of the driver connecting to the server
136    ///
137    /// # Examples
138    ///
139    /// ```rust
140    #[cfg_attr(
141        feature = "sync",
142        doc = "TypeDBDriver::new_with_description(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None), \"rust\")"
143    )]
144    #[cfg_attr(
145        not(feature = "sync"),
146        doc = "TypeDBDriver::new_with_description(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None), \"rust\").await"
147    )]
148    /// ```
149    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
150    pub async fn new_with_description(
151        address: impl AsRef<str>,
152        credentials: Credentials,
153        driver_options: DriverOptions,
154        driver_lang: impl AsRef<str>,
155    ) -> Result<Self> {
156        Self::init_logging();
157
158        debug!("Initializing TypeDB driver with description: {}", driver_lang.as_ref());
159        let id = address.as_ref().to_string();
160        let address: Address = id.parse()?;
161
162        let background_runtime = Arc::new(BackgroundRuntime::new()?);
163
164        debug!("Establishing server connection to {}", address);
165        let (server_connection, database_info) = ServerConnection::new(
166            background_runtime.clone(),
167            address.clone(),
168            credentials,
169            driver_options,
170            driver_lang.as_ref(),
171            Self::VERSION,
172        )
173        .await?;
174        debug!("Successfully connected to server at {}", address);
175
176        // // validate
177        // let advertised_address = server_connection
178        //     .servers_all()?
179        //     .into_iter()
180        //     .exactly_one()
181        //     .map_err(|e| ConnectionError::ServerConnectionFailedStatusError { error: e.to_string() })?;
182
183        // TODO: this solidifies the assumption that servers don't change
184        let server_connections: HashMap<Address, ServerConnection> = [(address, server_connection)].into();
185        let database_manager = DatabaseManager::new(server_connections.clone(), database_info)?;
186        let user_manager = UserManager::new(server_connections.clone());
187        debug!("Created database manager and user manager");
188
189        debug!("TypeDB driver initialization completed successfully");
190        Ok(Self { server_connections, database_manager, user_manager, background_runtime })
191    }
192
193    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
194    async fn fetch_server_list(
195        background_runtime: Arc<BackgroundRuntime>,
196        addresses: impl IntoIterator<Item = impl AsRef<str>> + Clone,
197        credentials: Credentials,
198        driver_options: DriverOptions,
199    ) -> Result<HashSet<Address>> {
200        let addresses: Vec<Address> = addresses.into_iter().map(|addr| addr.as_ref().parse()).try_collect()?;
201        for address in &addresses {
202            let server_connection = ServerConnection::new(
203                background_runtime.clone(),
204                address.clone(),
205                credentials.clone(),
206                driver_options.clone(),
207                Self::DRIVER_LANG,
208                Self::VERSION,
209            )
210            .await;
211            match server_connection {
212                Ok((server_connection, _)) => match server_connection.servers_all() {
213                    Ok(servers) => return Ok(servers.into_iter().collect()),
214                    Err(Error::Connection(
215                        ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
216                    )) => (),
217                    Err(err) => Err(err)?,
218                },
219                Err(Error::Connection(
220                    ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
221                )) => (),
222                Err(err) => Err(err)?,
223            }
224        }
225        Err(ConnectionError::ServerConnectionFailed { addresses }.into())
226    }
227
228    /// Checks it this connection is opened.
229    //
230    /// # Examples
231    ///
232    /// ```rust
233    /// driver.is_open()
234    /// ```
235    pub fn is_open(&self) -> bool {
236        self.background_runtime.is_open()
237    }
238
239    pub fn databases(&self) -> &DatabaseManager {
240        &self.database_manager
241    }
242
243    pub fn users(&self) -> &UserManager {
244        &self.user_manager
245    }
246
247    /// Opens a transaction with default options.
248    /// See [`TypeDBDriver::transaction_with_options`]
249    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
250    pub async fn transaction(
251        &self,
252        database_name: impl AsRef<str>,
253        transaction_type: TransactionType,
254    ) -> Result<Transaction> {
255        self.transaction_with_options(database_name, transaction_type, TransactionOptions::new()).await
256    }
257
258    /// Performs a TypeQL query in this transaction.
259    ///
260    /// # Arguments
261    ///
262    /// * `database_name` — The name of the database to connect to
263    /// * `transaction_type` — The TransactionType to open the transaction with
264    /// * `options` — The TransactionOptions to open the transaction with
265    ///
266    /// # Examples
267    ///
268    /// ```rust
269    /// transaction.transaction_with_options(database_name, transaction_type, options)
270    /// ```
271    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
272    pub async fn transaction_with_options(
273        &self,
274        database_name: impl AsRef<str>,
275        transaction_type: TransactionType,
276        options: TransactionOptions,
277    ) -> Result<Transaction> {
278        let database_name = database_name.as_ref();
279        debug!("Opening transaction for database: {} with type: {:?}", database_name, transaction_type);
280
281        let database = self.database_manager.get_cached_or_fetch(database_name).await?;
282        let transaction_stream = database
283            .run_failsafe(|database| async move {
284                let res = database.connection().open_transaction(database.name(), transaction_type, options).await;
285                res
286            })
287            .await?;
288
289        debug!("Successfully opened transaction for database: {}", database_name);
290        Ok(Transaction::new(transaction_stream))
291    }
292
293    /// Closes this connection if it is open.
294    ///
295    /// # Examples
296    ///
297    /// ```rust
298    /// driver.force_close()
299    /// ```
300    pub fn force_close(&self) -> Result {
301        if !self.is_open() {
302            return Ok(());
303        }
304
305        debug!("Closing TypeDB driver connection");
306        let result =
307            self.server_connections.values().map(ServerConnection::force_close).try_collect().map_err(Into::into);
308        let close_result = self.background_runtime.force_close().and(result);
309
310        match &close_result {
311            Ok(_) => debug!("Successfully closed TypeDB driver connection"),
312            Err(e) => error!("Failed to close TypeDB driver connection: {}", e),
313        }
314
315        close_result
316    }
317
318    pub(crate) fn server_count(&self) -> usize {
319        self.server_connections.len()
320    }
321
322    pub(crate) fn servers(&self) -> impl Iterator<Item = &Address> {
323        self.server_connections.keys()
324    }
325
326    pub(crate) fn connection(&self, id: &Address) -> Option<&ServerConnection> {
327        self.server_connections.get(id)
328    }
329
330    pub(crate) fn connections(&self) -> impl Iterator<Item = (&Address, &ServerConnection)> + '_ {
331        self.server_connections.iter()
332    }
333
334    pub(crate) fn unable_to_connect_error(&self) -> Error {
335        Error::Connection(ConnectionError::ServerConnectionFailed {
336            addresses: self.servers().map(Address::clone).collect_vec(),
337        })
338    }
339}
340
341impl fmt::Debug for TypeDBDriver {
342    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343        f.debug_struct("Connection").field("server_connections", &self.server_connections).finish()
344    }
345}