1#[cfg(not(feature = "sync"))]
21use std::future::Future;
22use std::{
23 collections::HashMap,
24 fmt,
25 sync::{
26 atomic::{AtomicU64, Ordering},
27 Arc, RwLock,
28 },
29 thread::sleep,
30 time::Duration,
31};
32
33use itertools::Itertools;
34use log::{debug, error};
35
36use crate::{
37 common::{
38 address::Address,
39 error::ConnectionError,
40 info::{DatabaseInfo, ReplicaInfo},
41 Error, Result,
42 },
43 connection::server_connection::ServerConnection,
44 driver::TypeDBDriver,
45 error::InternalError,
46 Options, Transaction, TransactionType,
47};
48
49pub struct Database {
51 name: String,
52 replicas: RwLock<Vec<Replica>>,
53 server_connections: HashMap<Address, ServerConnection>,
54}
55
56impl Database {
57 const PRIMARY_REPLICA_TASK_MAX_RETRIES: usize = 10;
58 const FETCH_REPLICAS_MAX_RETRIES: usize = 10;
59 const WAIT_FOR_PRIMARY_REPLICA_SELECTION: Duration = Duration::from_secs(2);
60
61 pub(super) fn new(
62 database_info: DatabaseInfo,
63 server_connections: HashMap<Address, ServerConnection>,
64 ) -> Result<Self> {
65 let name = database_info.name.clone();
66 let replicas = RwLock::new(Replica::try_from_info(database_info, &server_connections)?);
67 Ok(Self { name, replicas, server_connections })
68 }
69
70 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
71 pub(super) async fn get(name: String, server_connections: HashMap<Address, ServerConnection>) -> Result<Self> {
72 Ok(Self {
73 name: name.clone(),
74 replicas: RwLock::new(Replica::fetch_all(name, &server_connections).await?),
75 server_connections,
76 })
77 }
78
79 pub fn name(&self) -> &str {
81 self.name.as_str()
82 }
83
84 pub fn replicas_info(&self) -> Vec<ReplicaInfo> {
93 self.replicas.read().unwrap().iter().map(Replica::to_info).collect()
94 }
95
96 pub fn primary_replica_info(&self) -> Option<ReplicaInfo> {
105 self.primary_replica().map(|replica| replica.to_info())
106 }
107
108 pub fn preferred_replica_info(&self) -> Option<ReplicaInfo> {
118 self.preferred_replica().map(|replica| replica.to_info())
119 }
120
121 #[cfg_attr(feature = "sync", doc = "database.delete();")]
127 #[cfg_attr(not(feature = "sync"), doc = "database.delete().await;")]
128 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
130 pub async fn delete(self: Arc<Self>) -> Result {
131 self.run_on_primary_replica(|database| database.delete()).await
132 }
133
134 #[cfg_attr(feature = "sync", doc = "database.schema();")]
140 #[cfg_attr(not(feature = "sync"), doc = "database.schema().await;")]
141 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
143 pub async fn schema(&self) -> Result<String> {
144 self.run_failsafe(|database| async move { database.schema().await }).await
145 }
146
147 #[cfg_attr(feature = "sync", doc = "database.type_schema();")]
153 #[cfg_attr(not(feature = "sync"), doc = "database.type_schema().await;")]
154 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
156 pub async fn type_schema(&self) -> Result<String> {
157 self.run_failsafe(|database| async move { database.type_schema().await }).await
158 }
159
160 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
161 pub(crate) async fn run_failsafe<F, P, R>(&self, task: F) -> Result<R>
162 where
163 F: Fn(ServerDatabase) -> P,
164 P: Future<Output = Result<R>>,
165 {
166 match self.run_on_any_replica(&task).await {
167 Err(Error::Connection(ConnectionError::CloudReplicaNotPrimary)) => {
168 debug!("Attempted to run on a non-primary replica, retrying on primary...");
169 self.run_on_primary_replica(&task).await
170 }
171 res => res,
172 }
173 }
174
175 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
176 pub(super) async fn run_on_any_replica<F, P, R>(&self, task: F) -> Result<R>
177 where
178 F: Fn(ServerDatabase) -> P,
179 P: Future<Output = Result<R>>,
180 {
181 let replicas = self.replicas.read().unwrap().clone();
182 for replica in replicas {
183 match task(replica.database.clone()).await {
184 Err(Error::Connection(
185 ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
186 )) => {
187 debug!("Unable to connect to {}. Attempting next server.", replica.server);
188 }
189 res => return res,
190 }
191 }
192 Err(Self::unable_to_connect_error(&self.server_connections))
193 }
194
195 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
196 pub(super) async fn run_on_primary_replica<F, P, R>(&self, task: F) -> Result<R>
197 where
198 F: Fn(ServerDatabase) -> P,
199 P: Future<Output = Result<R>>,
200 {
201 let mut primary_replica =
202 if let Some(replica) = self.primary_replica() { replica } else { self.seek_primary_replica().await? };
203
204 for _ in 0..Self::PRIMARY_REPLICA_TASK_MAX_RETRIES {
205 match task(primary_replica.database.clone()).await {
206 Err(Error::Connection(
207 ConnectionError::CloudReplicaNotPrimary
208 | ConnectionError::ServerConnectionFailedStatusError { .. }
209 | ConnectionError::ConnectionFailed,
210 )) => {
211 debug!("Primary replica error, waiting...");
212 Self::wait_for_primary_replica_selection().await;
213 primary_replica = self.seek_primary_replica().await?;
214 }
215 res => return res,
216 }
217 }
218 Err(Self::unable_to_connect_error(&self.server_connections))
219 }
220
221 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
222 async fn seek_primary_replica(&self) -> Result<Replica> {
223 for _ in 0..Self::FETCH_REPLICAS_MAX_RETRIES {
224 let replicas = Replica::fetch_all(self.name.clone(), &self.server_connections).await?;
225 *self.replicas.write().unwrap() = replicas;
226 if let Some(replica) = self.primary_replica() {
227 return Ok(replica);
228 }
229 Self::wait_for_primary_replica_selection().await;
230 }
231 Err(Self::unable_to_connect_error(&self.server_connections))
232 }
233
234 fn unable_to_connect_error(server_connections: &HashMap<Address, ServerConnection>) -> Error {
235 Error::Connection(ConnectionError::ServerConnectionFailed {
236 addresses: server_connections.keys().map(Address::clone).collect_vec(),
237 })
238 }
239
240 fn primary_replica(&self) -> Option<Replica> {
241 self.replicas.read().unwrap().iter().filter(|r| r.is_primary).max_by_key(|r| r.term).cloned()
242 }
243
244 fn preferred_replica(&self) -> Option<Replica> {
245 self.replicas.read().unwrap().iter().filter(|r| r.is_preferred).max_by_key(|r| r.term).cloned()
246 }
247
248 #[cfg(feature = "sync")]
249 fn wait_for_primary_replica_selection() {
250 sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION);
251 }
252
253 #[cfg(not(feature = "sync"))]
254 async fn wait_for_primary_replica_selection() {
255 tokio::time::sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION).await
256 }
257}
258
259impl fmt::Debug for Database {
260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261 f.debug_struct("Database").field("name", &self.name).field("replicas", &self.replicas).finish()
262 }
263}
264
265#[derive(Clone)]
267pub(super) struct Replica {
268 server: Address,
270 database_name: String,
272 is_primary: bool,
274 term: i64,
276 is_preferred: bool,
278 database: ServerDatabase,
280}
281
282impl Replica {
283 fn new(name: String, metadata: ReplicaInfo, server_connection: ServerConnection) -> Self {
284 Self {
285 server: metadata.server,
286 database_name: name.clone(),
287 is_primary: metadata.is_primary,
288 term: metadata.term,
289 is_preferred: metadata.is_preferred,
290 database: ServerDatabase::new(name, server_connection),
291 }
292 }
293
294 fn try_from_info(
295 database_info: DatabaseInfo,
296 server_connections: &HashMap<Address, ServerConnection>,
297 ) -> Result<Vec<Self>> {
298 database_info
299 .replicas
300 .into_iter()
301 .map(|replica| {
302 if server_connections.len() == 1 {
303 Ok(Self::new(
304 database_info.name.clone(),
305 replica,
306 server_connections.values().next().unwrap().clone(),
307 ))
308 } else {
309 let server_connection = server_connections
311 .get(&replica.server)
312 .ok_or_else(|| InternalError::UnknownServer { server: replica.server.clone() })?;
313 Ok(Self::new(database_info.name.clone(), replica, server_connection.clone()))
314 }
315 })
316 .collect()
317 }
318
319 fn to_info(&self) -> ReplicaInfo {
320 ReplicaInfo {
321 server: self.server.clone(),
322 is_primary: self.is_primary,
323 is_preferred: self.is_preferred,
324 term: self.term,
325 }
326 }
327
328 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
329 async fn fetch_all(name: String, server_connections: &HashMap<Address, ServerConnection>) -> Result<Vec<Self>> {
330 for (server, server_connection) in server_connections {
331 let res = server_connection.get_database_replicas(name.clone()).await;
332 match res {
333 Ok(info) => {
334 return Self::try_from_info(info, server_connections);
335 }
336 Err(Error::Connection(
337 ConnectionError::DatabaseNotFound { .. }
338 | ConnectionError::ServerConnectionFailedStatusError { .. }
339 | ConnectionError::ConnectionFailed,
340 )) => {
341 error!(
342 "Failed to fetch replica info for database '{}' from {}. Attempting next server.",
343 name, server
344 );
345 }
346 Err(err) => return Err(err),
347 }
348 }
349 Err(Database::unable_to_connect_error(server_connections))
350 }
351}
352
353impl fmt::Debug for Replica {
354 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355 f.debug_struct("Replica")
356 .field("server", &self.server)
357 .field("database_name", &self.database_name)
358 .field("is_primary", &self.is_primary)
359 .field("term", &self.term)
360 .field("is_preferred", &self.is_preferred)
361 .finish()
362 }
363}
364
365#[derive(Clone, Debug)]
366pub(crate) struct ServerDatabase {
367 name: String,
368 connection: ServerConnection,
369}
370
371impl ServerDatabase {
372 fn new(name: String, connection: ServerConnection) -> Self {
373 Self { name, connection }
374 }
375
376 pub fn name(&self) -> &str {
377 self.name.as_str()
378 }
379
380 pub(crate) fn connection(&self) -> &ServerConnection {
381 &self.connection
382 }
383
384 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
385 async fn delete(self) -> Result {
386 self.connection.delete_database(self.name).await
387 }
388
389 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
390 async fn schema(&self) -> Result<String> {
391 self.connection.database_schema(self.name.clone()).await
392 }
393
394 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
395 async fn type_schema(&self) -> Result<String> {
396 self.connection.database_type_schema(self.name.clone()).await
397 }
398}
399
400impl fmt::Display for ServerDatabase {
401 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
402 write!(f, "{}", self.name)
403 }
404}