1use std::{
21 collections::{HashMap, HashSet},
22 fmt,
23 sync::Arc,
24};
25
26use itertools::Itertools;
27use tracing::{debug, error, info, trace};
28use tracing_subscriber::{fmt as tracing_fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
29
30use crate::{
31 common::{
32 address::Address,
33 error::{ConnectionError, Error},
34 Result,
35 },
36 connection::{runtime::BackgroundRuntime, server_connection::ServerConnection},
37 Credentials, DatabaseManager, DriverOptions, Transaction, TransactionOptions, TransactionType, UserManager,
38};
39
40pub struct TypeDBDriver {
42 server_connections: HashMap<Address, ServerConnection>,
43 database_manager: DatabaseManager,
44 user_manager: UserManager,
45 background_runtime: Arc<BackgroundRuntime>,
46}
47
48impl TypeDBDriver {
49 const DRIVER_LANG: &'static str = "rust";
50 const VERSION: &'static str = match option_env!("CARGO_PKG_VERSION") {
51 None => "0.0.0",
52 Some(version) => version,
53 };
54
55 pub const DEFAULT_ADDRESS: &'static str = "localhost:1729";
56
57 #[cfg_attr(
69 feature = "sync",
70 doc = "TypeDBDriver::new(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None))"
71 )]
72 #[cfg_attr(
73 not(feature = "sync"),
74 doc = "TypeDBDriver::new(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None)).await"
75 )]
76 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
78 pub async fn new(
79 address: impl AsRef<str>,
80 credentials: Credentials,
81 driver_options: DriverOptions,
82 ) -> Result<Self> {
83 debug!("Creating new TypeDB driver connection to {}", address.as_ref());
84 Self::new_with_description(address, credentials, driver_options, Self::DRIVER_LANG).await
85 }
86
87 #[cfg_attr(
102 feature = "sync",
103 doc = "TypeDBDriver::new_with_description(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None), \"rust\")"
104 )]
105 #[cfg_attr(
106 not(feature = "sync"),
107 doc = "TypeDBDriver::new_with_description(\"127.0.0.1:1729\", Credentials::new(\"username\", \"password\"), DriverOptions::new(true, None), \"rust\").await"
108 )]
109 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
111 pub async fn new_with_description(
112 address: impl AsRef<str>,
113 credentials: Credentials,
114 driver_options: DriverOptions,
115 driver_lang: impl AsRef<str>,
116 ) -> Result<Self> {
117 debug!("Initializing TypeDB driver with description: {}", driver_lang.as_ref());
118 let id = address.as_ref().to_string();
119 let address: Address = id.parse()?;
120
121 let background_runtime = Arc::new(BackgroundRuntime::new()?);
122
123 debug!("Establishing server connection to {}", address);
124 let (server_connection, database_info) = ServerConnection::new(
125 background_runtime.clone(),
126 address.clone(),
127 credentials,
128 driver_options,
129 driver_lang.as_ref(),
130 Self::VERSION,
131 )
132 .await?;
133 debug!("Successfully connected to server at {}", address);
134
135 let server_connections: HashMap<Address, ServerConnection> = [(address, server_connection)].into();
144 let database_manager = DatabaseManager::new(server_connections.clone(), database_info)?;
145 let user_manager = UserManager::new(server_connections.clone());
146 debug!("Created database manager and user manager");
147
148 debug!("TypeDB driver initialization completed successfully");
149 Ok(Self { server_connections, database_manager, user_manager, background_runtime })
150 }
151
152 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
153 async fn fetch_server_list(
154 background_runtime: Arc<BackgroundRuntime>,
155 addresses: impl IntoIterator<Item = impl AsRef<str>> + Clone,
156 credentials: Credentials,
157 driver_options: DriverOptions,
158 ) -> Result<HashSet<Address>> {
159 let addresses: Vec<Address> = addresses.into_iter().map(|addr| addr.as_ref().parse()).try_collect()?;
160 for address in &addresses {
161 let server_connection = ServerConnection::new(
162 background_runtime.clone(),
163 address.clone(),
164 credentials.clone(),
165 driver_options.clone(),
166 Self::DRIVER_LANG,
167 Self::VERSION,
168 )
169 .await;
170 match server_connection {
171 Ok((server_connection, _)) => match server_connection.servers_all() {
172 Ok(servers) => return Ok(servers.into_iter().collect()),
173 Err(Error::Connection(
174 ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
175 )) => (),
176 Err(err) => Err(err)?,
177 },
178 Err(Error::Connection(
179 ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
180 )) => (),
181 Err(err) => Err(err)?,
182 }
183 }
184 Err(ConnectionError::ServerConnectionFailed { addresses }.into())
185 }
186
187 pub fn is_open(&self) -> bool {
195 self.background_runtime.is_open()
196 }
197
198 pub fn databases(&self) -> &DatabaseManager {
199 &self.database_manager
200 }
201
202 pub fn users(&self) -> &UserManager {
203 &self.user_manager
204 }
205
206 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
209 pub async fn transaction(
210 &self,
211 database_name: impl AsRef<str>,
212 transaction_type: TransactionType,
213 ) -> Result<Transaction> {
214 self.transaction_with_options(database_name, transaction_type, TransactionOptions::new()).await
215 }
216
217 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
231 pub async fn transaction_with_options(
232 &self,
233 database_name: impl AsRef<str>,
234 transaction_type: TransactionType,
235 options: TransactionOptions,
236 ) -> Result<Transaction> {
237 let database_name = database_name.as_ref();
238 debug!("Opening transaction for database: {} with type: {:?}", database_name, transaction_type);
239
240 let database = self.database_manager.get_cached_or_fetch(database_name).await?;
241 let transaction_stream = database
242 .run_failsafe(|database| async move {
243 let res = database.connection().open_transaction(database.name(), transaction_type, options).await;
244 res
245 })
246 .await?;
247
248 debug!("Successfully opened transaction for database: {}", database_name);
249 Ok(Transaction::new(transaction_stream))
250 }
251
252 pub fn force_close(&self) -> Result {
260 if !self.is_open() {
261 return Ok(());
262 }
263
264 debug!("Closing TypeDB driver connection");
265 let result =
266 self.server_connections.values().map(ServerConnection::force_close).try_collect().map_err(Into::into);
267 let close_result = self.background_runtime.force_close().and(result);
268
269 match &close_result {
270 Ok(_) => debug!("Successfully closed TypeDB driver connection"),
271 Err(e) => error!("Failed to close TypeDB driver connection: {}", e),
272 }
273
274 close_result
275 }
276
277 pub(crate) fn server_count(&self) -> usize {
278 self.server_connections.len()
279 }
280
281 pub(crate) fn servers(&self) -> impl Iterator<Item = &Address> {
282 self.server_connections.keys()
283 }
284
285 pub(crate) fn connection(&self, id: &Address) -> Option<&ServerConnection> {
286 self.server_connections.get(id)
287 }
288
289 pub(crate) fn connections(&self) -> impl Iterator<Item = (&Address, &ServerConnection)> + '_ {
290 self.server_connections.iter()
291 }
292
293 pub(crate) fn unable_to_connect_error(&self) -> Error {
294 Error::Connection(ConnectionError::ServerConnectionFailed {
295 addresses: self.servers().map(Address::clone).collect_vec(),
296 })
297 }
298}
299
300impl fmt::Debug for TypeDBDriver {
301 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
302 f.debug_struct("Connection").field("server_connections", &self.server_connections).finish()
303 }
304}