1#[cfg(not(feature = "sync"))]
21use std::future::Future;
22use std::{fmt, sync::RwLock, thread::sleep, time::Duration};
23
24use log::{debug, error};
25
26use crate::{
27 common::{
28 address::Address,
29 error::ConnectionError,
30 info::{DatabaseInfo, ReplicaInfo},
31 Error, Result,
32 },
33 connection::ServerConnection,
34 error::InternalError,
35 Connection,
36};
37
38pub struct Database {
40 name: String,
41 replicas: RwLock<Vec<Replica>>,
42 connection: Connection,
43}
44
45impl Database {
46 const PRIMARY_REPLICA_TASK_MAX_RETRIES: usize = 10;
47 const FETCH_REPLICAS_MAX_RETRIES: usize = 10;
48 const WAIT_FOR_PRIMARY_REPLICA_SELECTION: Duration = Duration::from_secs(2);
49
50 pub(super) fn new(database_info: DatabaseInfo, connection: Connection) -> Result<Self> {
51 let name = database_info.name.clone();
52 let replicas = RwLock::new(Replica::try_from_info(database_info, &connection)?);
53 Ok(Self { name, replicas, connection })
54 }
55
56 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
57 pub(super) async fn get(name: String, connection: Connection) -> Result<Self> {
58 Ok(Self {
59 name: name.clone(),
60 replicas: RwLock::new(Replica::fetch_all(name, connection.clone()).await?),
61 connection,
62 })
63 }
64
65 pub fn name(&self) -> &str {
67 self.name.as_str()
68 }
69
70 pub fn replicas_info(&self) -> Vec<ReplicaInfo> {
79 self.replicas.read().unwrap().iter().map(Replica::to_info).collect()
80 }
81
82 pub fn primary_replica_info(&self) -> Option<ReplicaInfo> {
91 self.primary_replica().map(|replica| replica.to_info())
92 }
93
94 pub fn preferred_replica_info(&self) -> Option<ReplicaInfo> {
104 self.preferred_replica().map(|replica| replica.to_info())
105 }
106
107 #[cfg_attr(feature = "sync", doc = "database.delete();")]
113 #[cfg_attr(not(feature = "sync"), doc = "database.delete().await;")]
114 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
116 pub async fn delete(self) -> Result {
117 self.run_on_primary_replica(|database| database.delete()).await
118 }
119
120 #[cfg_attr(feature = "sync", doc = "database.schema();")]
126 #[cfg_attr(not(feature = "sync"), doc = "database.schema().await;")]
127 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
129 pub async fn schema(&self) -> Result<String> {
130 self.run_failsafe(|database| async move { database.schema().await }).await
131 }
132
133 #[cfg_attr(feature = "sync", doc = "database.type_schema();")]
139 #[cfg_attr(not(feature = "sync"), doc = "database.type_schema().await;")]
140 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
142 pub async fn type_schema(&self) -> Result<String> {
143 self.run_failsafe(|database| async move { database.type_schema().await }).await
144 }
145
146 #[cfg_attr(feature = "sync", doc = "database.rule_schema();")]
152 #[cfg_attr(not(feature = "sync"), doc = "database.rule_schema().await;")]
153 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
155 pub async fn rule_schema(&self) -> Result<String> {
156 self.run_failsafe(|database| async move { database.rule_schema().await }).await
157 }
158
159 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
160 pub(crate) async fn run_failsafe<F, P, R>(&self, task: F) -> Result<R>
161 where
162 F: Fn(ServerDatabase) -> P,
163 P: Future<Output = Result<R>>,
164 {
165 match self.run_on_any_replica(&task).await {
166 Err(Error::Connection(ConnectionError::CloudReplicaNotPrimary)) => {
167 debug!("Attempted to run on a non-primary replica, retrying on primary...");
168 self.run_on_primary_replica(&task).await
169 }
170 res => res,
171 }
172 }
173
174 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
175 pub(super) async fn run_on_any_replica<F, P, R>(&self, task: F) -> Result<R>
176 where
177 F: Fn(ServerDatabase) -> P,
178 P: Future<Output = Result<R>>,
179 {
180 let replicas = self.replicas.read().unwrap().clone();
181 for replica in replicas {
182 match task(replica.database.clone()).await {
183 Err(Error::Connection(
184 ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
185 )) => {
186 debug!("Unable to connect to {}. Attempting next server.", replica.server);
187 }
188 res => return res,
189 }
190 }
191 Err(self.connection.unable_to_connect_error())
192 }
193
194 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
195 pub(super) async fn run_on_primary_replica<F, P, R>(&self, task: F) -> Result<R>
196 where
197 F: Fn(ServerDatabase) -> P,
198 P: Future<Output = Result<R>>,
199 {
200 let mut primary_replica =
201 if let Some(replica) = self.primary_replica() { replica } else { self.seek_primary_replica().await? };
202
203 for _ in 0..Self::PRIMARY_REPLICA_TASK_MAX_RETRIES {
204 match task(primary_replica.database.clone()).await {
205 Err(Error::Connection(
206 ConnectionError::CloudReplicaNotPrimary
207 | ConnectionError::ServerConnectionFailedStatusError { .. }
208 | ConnectionError::ConnectionFailed,
209 )) => {
210 debug!("Primary replica error, waiting...");
211 Self::wait_for_primary_replica_selection().await;
212 primary_replica = self.seek_primary_replica().await?;
213 }
214 res => return res,
215 }
216 }
217 Err(self.connection.unable_to_connect_error())
218 }
219
220 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
221 async fn seek_primary_replica(&self) -> Result<Replica> {
222 for _ in 0..Self::FETCH_REPLICAS_MAX_RETRIES {
223 let replicas = Replica::fetch_all(self.name.clone(), self.connection.clone()).await?;
224 *self.replicas.write().unwrap() = replicas;
225 if let Some(replica) = self.primary_replica() {
226 return Ok(replica);
227 }
228 Self::wait_for_primary_replica_selection().await;
229 }
230 Err(self.connection.unable_to_connect_error())
231 }
232
233 fn primary_replica(&self) -> Option<Replica> {
234 self.replicas.read().unwrap().iter().filter(|r| r.is_primary).max_by_key(|r| r.term).cloned()
235 }
236
237 fn preferred_replica(&self) -> Option<Replica> {
238 self.replicas.read().unwrap().iter().filter(|r| r.is_preferred).max_by_key(|r| r.term).cloned()
239 }
240
241 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
242 async fn wait_for_primary_replica_selection() {
243 sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION);
245 }
246}
247
248impl fmt::Debug for Database {
249 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250 f.debug_struct("Database").field("name", &self.name).field("replicas", &self.replicas).finish()
251 }
252}
253
254#[derive(Clone)]
256pub(super) struct Replica {
257 server: Address,
259 database_name: String,
261 is_primary: bool,
263 term: i64,
265 is_preferred: bool,
267 database: ServerDatabase,
269}
270
271impl Replica {
272 fn new(name: String, metadata: ReplicaInfo, server_connection: ServerConnection) -> Self {
273 Self {
274 server: metadata.server,
275 database_name: name.clone(),
276 is_primary: metadata.is_primary,
277 term: metadata.term,
278 is_preferred: metadata.is_preferred,
279 database: ServerDatabase::new(name, server_connection),
280 }
281 }
282
283 fn try_from_info(database_info: DatabaseInfo, connection: &Connection) -> Result<Vec<Self>> {
284 database_info
285 .replicas
286 .into_iter()
287 .map(|replica| {
288 let server_connection = connection
289 .connection(&replica.server)
290 .ok_or_else(|| InternalError::UnknownServer { server: replica.server.clone() })?;
291 Ok(Self::new(database_info.name.clone(), replica, server_connection.clone()))
292 })
293 .collect()
294 }
295
296 fn to_info(&self) -> ReplicaInfo {
297 ReplicaInfo {
298 server: self.server.clone(),
299 is_primary: self.is_primary,
300 is_preferred: self.is_preferred,
301 term: self.term,
302 }
303 }
304
305 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
306 async fn fetch_all(name: String, connection: Connection) -> Result<Vec<Self>> {
307 for (server, server_connection) in connection.connections() {
308 let res = server_connection.get_database_replicas(name.clone()).await;
309 match res {
310 Ok(info) => {
311 return Self::try_from_info(info, &connection);
312 }
313 Err(Error::Connection(
314 ConnectionError::DatabaseDoesNotExist { .. }
315 | ConnectionError::ServerConnectionFailedStatusError { .. }
316 | ConnectionError::ConnectionFailed,
317 )) => {
318 error!(
319 "Failed to fetch replica info for database '{}' from {}. Attempting next server.",
320 name, server
321 );
322 }
323 Err(err) => return Err(err),
324 }
325 }
326 Err(connection.unable_to_connect_error())
327 }
328}
329
330impl fmt::Debug for Replica {
331 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332 f.debug_struct("Replica")
333 .field("server", &self.server)
334 .field("database_name", &self.database_name)
335 .field("is_primary", &self.is_primary)
336 .field("term", &self.term)
337 .field("is_preferred", &self.is_preferred)
338 .finish()
339 }
340}
341
342#[derive(Clone, Debug)]
343pub(crate) struct ServerDatabase {
344 name: String,
345 connection: ServerConnection,
346}
347
348impl ServerDatabase {
349 fn new(name: String, connection: ServerConnection) -> Self {
350 Self { name, connection }
351 }
352
353 pub(super) fn name(&self) -> &str {
354 self.name.as_str()
355 }
356
357 pub(crate) fn connection(&self) -> &ServerConnection {
358 &self.connection
359 }
360
361 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
362 async fn delete(self) -> Result {
363 self.connection.delete_database(self.name).await
364 }
365
366 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
367 async fn schema(&self) -> Result<String> {
368 self.connection.database_schema(self.name.clone()).await
369 }
370
371 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
372 async fn type_schema(&self) -> Result<String> {
373 self.connection.database_type_schema(self.name.clone()).await
374 }
375
376 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
377 async fn rule_schema(&self) -> Result<String> {
378 self.connection.database_rule_schema(self.name.clone()).await
379 }
380}
381
382impl fmt::Display for ServerDatabase {
383 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384 write!(f, "{}", self.name)
385 }
386}