1use std::{
21 collections::{HashMap, HashSet},
22 fmt,
23 sync::Arc,
24};
25
26use itertools::Itertools;
27use tracing::{debug, error, info, trace};
28use tracing_subscriber::{fmt as tracing_fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
29
30use crate::{
31 common::{
32 address::Address,
33 error::{ConnectionError, Error},
34 Result,
35 },
36 connection::{runtime::BackgroundRuntime, server_connection::ServerConnection},
37 Credentials, DatabaseManager, DriverOptions, Transaction, TransactionOptions, TransactionType, UserManager,
38};
39
40pub struct TypeDBDriver {
42 server_connections: HashMap<Address, ServerConnection>,
43 database_manager: DatabaseManager,
44 user_manager: UserManager,
45 background_runtime: Arc<BackgroundRuntime>,
46}
47
48impl TypeDBDriver {
49 const DRIVER_LANG: &'static str = "rust";
50 const VERSION: &'static str = match option_env!("CARGO_PKG_VERSION") {
51 None => "0.0.0",
52 Some(version) => version,
53 };
54
55 pub const DEFAULT_ADDRESS: &'static str = "localhost:1729";
56
57 pub fn init_logging() {
68 use std::sync::Once;
69 static INIT: Once = Once::new();
70
71 INIT.call_once(|| {
72 let clib_level = if let Ok(typedb_driver_clib_log) = std::env::var("TYPEDB_DRIVER_CLIB_LOG") {
73 typedb_driver_clib_log
74 } else {
75 "info".to_owned()
76 };
77 let env_filter = if let Ok(typedb_log_level) = std::env::var("TYPEDB_DRIVER_LOG") {
79 EnvFilter::new(&format!("typedb_driver={},typedb_driver_clib={}", typedb_log_level, clib_level))
80 } else if let Ok(rust_log) = std::env::var("RUST_LOG") {
81 EnvFilter::new(&format!("typedb_driver={},typedb_driver_clib={}", rust_log, clib_level))
83 } else {
84 EnvFilter::new(&format!("typedb_driver=info,typedb_driver_clib={}", clib_level))
85 };
86
87 if let Err(e) =
89 tracing_subscriber::registry().with(env_filter).with(tracing_fmt::layer().with_target(false)).try_init()
90 {
91 eprintln!("Failed to initialize logging: {}", e);
92 }
93 });
94 }
95
96 #[cfg_attr(
108 feature = "sync",
109 doc = "TypeDBDriver::new(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None))"
110 )]
111 #[cfg_attr(
112 not(feature = "sync"),
113 doc = "TypeDBDriver::new(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None)).await"
114 )]
115 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
117 pub async fn new(
118 address: impl AsRef<str>,
119 credentials: Credentials,
120 driver_options: DriverOptions,
121 ) -> Result<Self> {
122 debug!("Creating new TypeDB driver connection to {}", address.as_ref());
123 Self::new_with_description(address, credentials, driver_options, Self::DRIVER_LANG).await
124 }
125
126 #[cfg_attr(
141 feature = "sync",
142 doc = "TypeDBDriver::new_with_description(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None), \"rust\")"
143 )]
144 #[cfg_attr(
145 not(feature = "sync"),
146 doc = "TypeDBDriver::new_with_description(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None), \"rust\").await"
147 )]
148 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
150 pub async fn new_with_description(
151 address: impl AsRef<str>,
152 credentials: Credentials,
153 driver_options: DriverOptions,
154 driver_lang: impl AsRef<str>,
155 ) -> Result<Self> {
156 Self::init_logging();
157
158 debug!("Initializing TypeDB driver with description: {}", driver_lang.as_ref());
159 let id = address.as_ref().to_string();
160 let address: Address = id.parse()?;
161
162 let background_runtime = Arc::new(BackgroundRuntime::new()?);
163
164 debug!("Establishing server connection to {}", address);
165 let (server_connection, database_info) = ServerConnection::new(
166 background_runtime.clone(),
167 address.clone(),
168 credentials,
169 driver_options,
170 driver_lang.as_ref(),
171 Self::VERSION,
172 )
173 .await?;
174 debug!("Successfully connected to server at {}", address);
175
176 let server_connections: HashMap<Address, ServerConnection> = [(address, server_connection)].into();
185 let database_manager = DatabaseManager::new(server_connections.clone(), database_info)?;
186 let user_manager = UserManager::new(server_connections.clone());
187 debug!("Created database manager and user manager");
188
189 debug!("TypeDB driver initialization completed successfully");
190 Ok(Self { server_connections, database_manager, user_manager, background_runtime })
191 }
192
193 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
194 async fn fetch_server_list(
195 background_runtime: Arc<BackgroundRuntime>,
196 addresses: impl IntoIterator<Item = impl AsRef<str>> + Clone,
197 credentials: Credentials,
198 driver_options: DriverOptions,
199 ) -> Result<HashSet<Address>> {
200 let addresses: Vec<Address> = addresses.into_iter().map(|addr| addr.as_ref().parse()).try_collect()?;
201 for address in &addresses {
202 let server_connection = ServerConnection::new(
203 background_runtime.clone(),
204 address.clone(),
205 credentials.clone(),
206 driver_options.clone(),
207 Self::DRIVER_LANG,
208 Self::VERSION,
209 )
210 .await;
211 match server_connection {
212 Ok((server_connection, _)) => match server_connection.servers_all() {
213 Ok(servers) => return Ok(servers.into_iter().collect()),
214 Err(Error::Connection(
215 ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
216 )) => (),
217 Err(err) => Err(err)?,
218 },
219 Err(Error::Connection(
220 ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
221 )) => (),
222 Err(err) => Err(err)?,
223 }
224 }
225 Err(ConnectionError::ServerConnectionFailed { addresses }.into())
226 }
227
228 pub fn is_open(&self) -> bool {
236 self.background_runtime.is_open()
237 }
238
239 pub fn databases(&self) -> &DatabaseManager {
240 &self.database_manager
241 }
242
243 pub fn users(&self) -> &UserManager {
244 &self.user_manager
245 }
246
247 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
250 pub async fn transaction(
251 &self,
252 database_name: impl AsRef<str>,
253 transaction_type: TransactionType,
254 ) -> Result<Transaction> {
255 self.transaction_with_options(database_name, transaction_type, TransactionOptions::new()).await
256 }
257
258 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
272 pub async fn transaction_with_options(
273 &self,
274 database_name: impl AsRef<str>,
275 transaction_type: TransactionType,
276 options: TransactionOptions,
277 ) -> Result<Transaction> {
278 let database_name = database_name.as_ref();
279 debug!("Opening transaction for database: {} with type: {:?}", database_name, transaction_type);
280
281 let database = self.database_manager.get_cached_or_fetch(database_name).await?;
282 let transaction_stream = database
283 .run_failsafe(|database| async move {
284 let res = database.connection().open_transaction(database.name(), transaction_type, options).await;
285 res
286 })
287 .await?;
288
289 debug!("Successfully opened transaction for database: {}", database_name);
290 Ok(Transaction::new(transaction_stream))
291 }
292
293 pub fn force_close(&self) -> Result {
301 if !self.is_open() {
302 return Ok(());
303 }
304
305 debug!("Closing TypeDB driver connection");
306 let result =
307 self.server_connections.values().map(ServerConnection::force_close).try_collect().map_err(Into::into);
308 let close_result = self.background_runtime.force_close().and(result);
309
310 match &close_result {
311 Ok(_) => debug!("Successfully closed TypeDB driver connection"),
312 Err(e) => error!("Failed to close TypeDB driver connection: {}", e),
313 }
314
315 close_result
316 }
317
318 pub(crate) fn server_count(&self) -> usize {
319 self.server_connections.len()
320 }
321
322 pub(crate) fn servers(&self) -> impl Iterator<Item = &Address> {
323 self.server_connections.keys()
324 }
325
326 pub(crate) fn connection(&self, id: &Address) -> Option<&ServerConnection> {
327 self.server_connections.get(id)
328 }
329
330 pub(crate) fn connections(&self) -> impl Iterator<Item = (&Address, &ServerConnection)> + '_ {
331 self.server_connections.iter()
332 }
333
334 pub(crate) fn unable_to_connect_error(&self) -> Error {
335 Error::Connection(ConnectionError::ServerConnectionFailed {
336 addresses: self.servers().map(Address::clone).collect_vec(),
337 })
338 }
339}
340
341impl fmt::Debug for TypeDBDriver {
342 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343 f.debug_struct("Connection").field("server_connections", &self.server_connections).finish()
344 }
345}