1#[cfg(not(feature = "sync"))]
21use std::future::Future;
22use std::{
23 collections::HashMap,
24 fmt,
25 fs::File,
26 io::{BufWriter, Write},
27 path::Path,
28 sync::{
29 atomic::{AtomicU64, Ordering},
30 Arc, RwLock,
31 },
32 thread::sleep,
33 time::Duration,
34};
35
36use itertools::Itertools;
37use log::{debug, error};
38use prost::Message;
39
40use crate::{
41 common::{
42 address::Address,
43 error::ConnectionError,
44 info::{DatabaseInfo, ReplicaInfo},
45 Error, Result,
46 },
47 connection::{database::export_stream::DatabaseExportStream, server_connection::ServerConnection},
48 database::migration::{try_create_export_file, try_open_existing_export_file, DatabaseExportAnswer},
49 driver::TypeDBDriver,
50 error::{InternalError, MigrationError},
51 resolve, Transaction, TransactionOptions, TransactionType,
52};
53
54pub struct Database {
56 name: String,
57 replicas: RwLock<Vec<Replica>>,
58 server_connections: HashMap<Address, ServerConnection>,
59}
60
61impl Database {
62 const PRIMARY_REPLICA_TASK_MAX_RETRIES: usize = 10;
63 const FETCH_REPLICAS_MAX_RETRIES: usize = 10;
64 const WAIT_FOR_PRIMARY_REPLICA_SELECTION: Duration = Duration::from_secs(2);
65
66 pub(super) fn new(
67 database_info: DatabaseInfo,
68 server_connections: HashMap<Address, ServerConnection>,
69 ) -> Result<Self> {
70 let name = database_info.name.clone();
71 let replicas = RwLock::new(Replica::try_from_info(database_info, &server_connections)?);
72 Ok(Self { name, replicas, server_connections })
73 }
74
75 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
76 pub(super) async fn get(name: String, server_connections: HashMap<Address, ServerConnection>) -> Result<Self> {
77 Ok(Self {
78 name: name.clone(),
79 replicas: RwLock::new(Replica::fetch_all(name, &server_connections).await?),
80 server_connections,
81 })
82 }
83
84 pub fn name(&self) -> &str {
86 self.name.as_str()
87 }
88
89 pub fn replicas_info(&self) -> Vec<ReplicaInfo> {
98 self.replicas.read().unwrap().iter().map(Replica::to_info).collect()
99 }
100
101 pub fn primary_replica_info(&self) -> Option<ReplicaInfo> {
110 self.primary_replica().map(|replica| replica.to_info())
111 }
112
113 pub fn preferred_replica_info(&self) -> Option<ReplicaInfo> {
123 self.preferred_replica().map(|replica| replica.to_info())
124 }
125
126 #[cfg_attr(feature = "sync", doc = "database.delete();")]
132 #[cfg_attr(not(feature = "sync"), doc = "database.delete().await;")]
133 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
135 pub async fn delete(self: Arc<Self>) -> Result {
136 self.run_on_primary_replica(|database| database.delete()).await
137 }
138
139 #[cfg_attr(feature = "sync", doc = "database.schema();")]
145 #[cfg_attr(not(feature = "sync"), doc = "database.schema().await;")]
146 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
148 pub async fn schema(&self) -> Result<String> {
149 self.run_failsafe(|database| async move { database.schema().await }).await
150 }
151
152 #[cfg_attr(feature = "sync", doc = "database.type_schema();")]
158 #[cfg_attr(not(feature = "sync"), doc = "database.type_schema().await;")]
159 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
161 pub async fn type_schema(&self) -> Result<String> {
162 self.run_failsafe(|database| async move { database.type_schema().await }).await
163 }
164
165 #[cfg_attr(feature = "sync", doc = "database.export_to_file(schema_path, data_path);")]
177 #[cfg_attr(not(feature = "sync"), doc = "database.export_to_file(schema_path, data_path).await;")]
178 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
180 pub async fn export_to_file(&self, schema_file_path: impl AsRef<Path>, data_file_path: impl AsRef<Path>) -> Result {
181 let schema_file_path = schema_file_path.as_ref();
182 let data_file_path = data_file_path.as_ref();
183 if schema_file_path == data_file_path {
184 return Err(Error::Migration(MigrationError::CannotExportToTheSameFile));
185 }
186
187 let _ = try_create_export_file(schema_file_path)?;
188 if let Err(err) = try_create_export_file(data_file_path) {
189 let _ = std::fs::remove_file(schema_file_path);
190 return Err(err);
191 }
192
193 let result = self
194 .run_failsafe(|database| async move {
195 let schema_file = try_open_existing_export_file(schema_file_path)?;
197 let data_file = try_open_existing_export_file(data_file_path)?;
198 database.export_to_file(schema_file, data_file).await
199 })
200 .await;
201
202 if result.is_err() {
203 let _ = std::fs::remove_file(schema_file_path);
204 let _ = std::fs::remove_file(data_file_path);
205 }
206 result
207 }
208
209 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
210 pub(crate) async fn run_failsafe<F, P, R>(&self, task: F) -> Result<R>
211 where
212 F: Fn(ServerDatabase) -> P,
213 P: Future<Output = Result<R>>,
214 {
215 match self.run_on_any_replica(&task).await {
216 Err(Error::Connection(ConnectionError::ClusterReplicaNotPrimary)) => {
217 debug!("Attempted to run on a non-primary replica, retrying on primary...");
218 self.run_on_primary_replica(&task).await
219 }
220 res => res,
221 }
222 }
223
224 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
225 pub(super) async fn run_on_any_replica<F, P, R>(&self, task: F) -> Result<R>
226 where
227 F: Fn(ServerDatabase) -> P,
228 P: Future<Output = Result<R>>,
229 {
230 let replicas = self.replicas.read().unwrap().clone();
231 for replica in replicas {
232 match task(replica.database.clone()).await {
233 Err(Error::Connection(
234 ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
235 )) => {
236 debug!("Unable to connect to {}. Attempting next server.", replica.server);
237 }
238 res => return res,
239 }
240 }
241 Err(Self::unable_to_connect_error(&self.server_connections))
242 }
243
244 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
245 pub(super) async fn run_on_primary_replica<F, P, R>(&self, task: F) -> Result<R>
246 where
247 F: Fn(ServerDatabase) -> P,
248 P: Future<Output = Result<R>>,
249 {
250 let mut primary_replica =
251 if let Some(replica) = self.primary_replica() { replica } else { self.seek_primary_replica().await? };
252
253 for _ in 0..Self::PRIMARY_REPLICA_TASK_MAX_RETRIES {
254 match task(primary_replica.database.clone()).await {
255 Err(Error::Connection(
256 ConnectionError::ClusterReplicaNotPrimary
257 | ConnectionError::ServerConnectionFailedStatusError { .. }
258 | ConnectionError::ConnectionFailed,
259 )) => {
260 debug!("Primary replica error, waiting...");
261 Self::wait_for_primary_replica_selection().await;
262 primary_replica = self.seek_primary_replica().await?;
263 }
264 res => return res,
265 }
266 }
267 Err(Self::unable_to_connect_error(&self.server_connections))
268 }
269
270 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
271 async fn seek_primary_replica(&self) -> Result<Replica> {
272 for _ in 0..Self::FETCH_REPLICAS_MAX_RETRIES {
273 let replicas = Replica::fetch_all(self.name.clone(), &self.server_connections).await?;
274 *self.replicas.write().unwrap() = replicas;
275 if let Some(replica) = self.primary_replica() {
276 return Ok(replica);
277 }
278 Self::wait_for_primary_replica_selection().await;
279 }
280 Err(Self::unable_to_connect_error(&self.server_connections))
281 }
282
283 fn unable_to_connect_error(server_connections: &HashMap<Address, ServerConnection>) -> Error {
284 Error::Connection(ConnectionError::ServerConnectionFailed {
285 addresses: server_connections.keys().map(Address::clone).collect_vec(),
286 })
287 }
288
289 fn primary_replica(&self) -> Option<Replica> {
290 self.replicas.read().unwrap().iter().filter(|r| r.is_primary).max_by_key(|r| r.term).cloned()
291 }
292
293 fn preferred_replica(&self) -> Option<Replica> {
294 self.replicas.read().unwrap().iter().filter(|r| r.is_preferred).max_by_key(|r| r.term).cloned()
295 }
296
297 #[cfg(feature = "sync")]
298 fn wait_for_primary_replica_selection() {
299 sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION);
300 }
301
302 #[cfg(not(feature = "sync"))]
303 async fn wait_for_primary_replica_selection() {
304 tokio::time::sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION).await
305 }
306}
307
308impl fmt::Debug for Database {
309 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310 f.debug_struct("Database").field("name", &self.name).field("replicas", &self.replicas).finish()
311 }
312}
313
314#[derive(Clone)]
316pub(super) struct Replica {
317 server: Address,
319 database_name: String,
321 is_primary: bool,
323 term: i64,
325 is_preferred: bool,
327 database: ServerDatabase,
329}
330
331impl Replica {
332 fn new(name: String, metadata: ReplicaInfo, server_connection: ServerConnection) -> Self {
333 Self {
334 server: metadata.server,
335 database_name: name.clone(),
336 is_primary: metadata.is_primary,
337 term: metadata.term,
338 is_preferred: metadata.is_preferred,
339 database: ServerDatabase::new(name, server_connection),
340 }
341 }
342
343 fn try_from_info(
344 database_info: DatabaseInfo,
345 server_connections: &HashMap<Address, ServerConnection>,
346 ) -> Result<Vec<Self>> {
347 database_info
348 .replicas
349 .into_iter()
350 .map(|replica| {
351 if server_connections.len() == 1 {
352 Ok(Self::new(
353 database_info.name.clone(),
354 replica,
355 server_connections.values().next().unwrap().clone(),
356 ))
357 } else {
358 let server_connection = server_connections
360 .get(&replica.server)
361 .ok_or_else(|| InternalError::UnknownServer { server: replica.server.clone() })?;
362 Ok(Self::new(database_info.name.clone(), replica, server_connection.clone()))
363 }
364 })
365 .collect()
366 }
367
368 fn to_info(&self) -> ReplicaInfo {
369 ReplicaInfo {
370 server: self.server.clone(),
371 is_primary: self.is_primary,
372 is_preferred: self.is_preferred,
373 term: self.term,
374 }
375 }
376
377 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
378 async fn fetch_all(name: String, server_connections: &HashMap<Address, ServerConnection>) -> Result<Vec<Self>> {
379 for (server, server_connection) in server_connections {
380 let res = server_connection.get_database_replicas(name.clone()).await;
381 match res {
382 Ok(info) => {
383 return Self::try_from_info(info, server_connections);
384 }
385 Err(Error::Connection(
386 ConnectionError::DatabaseNotFound { .. }
387 | ConnectionError::ServerConnectionFailedStatusError { .. }
388 | ConnectionError::ConnectionFailed,
389 )) => {
390 error!(
391 "Failed to fetch replica info for database '{}' from {}. Attempting next server.",
392 name, server
393 );
394 }
395 Err(err) => return Err(err),
396 }
397 }
398 Err(Database::unable_to_connect_error(server_connections))
399 }
400}
401
402impl fmt::Debug for Replica {
403 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404 f.debug_struct("Replica")
405 .field("server", &self.server)
406 .field("database_name", &self.database_name)
407 .field("is_primary", &self.is_primary)
408 .field("term", &self.term)
409 .field("is_preferred", &self.is_preferred)
410 .finish()
411 }
412}
413
414#[derive(Clone, Debug)]
415pub(crate) struct ServerDatabase {
416 name: String,
417 connection: ServerConnection,
418}
419
420impl ServerDatabase {
421 fn new(name: String, connection: ServerConnection) -> Self {
422 Self { name, connection }
423 }
424
425 pub fn name(&self) -> &str {
426 self.name.as_str()
427 }
428
429 pub(crate) fn connection(&self) -> &ServerConnection {
430 &self.connection
431 }
432
433 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
434 async fn delete(self) -> Result {
435 self.connection.delete_database(self.name).await
436 }
437
438 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
439 async fn schema(&self) -> Result<String> {
440 self.connection.database_schema(self.name.clone()).await
441 }
442
443 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
444 async fn type_schema(&self) -> Result<String> {
445 self.connection.database_type_schema(self.name.clone()).await
446 }
447
448 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
449 async fn export_to_file(&self, mut schema_file: File, data_file: File) -> Result {
450 let mut export_stream = self.connection.database_export(self.name.clone()).await?;
451 let mut data_writer = BufWriter::new(data_file);
452
453 loop {
454 match resolve!(export_stream.next())? {
455 DatabaseExportAnswer::Done => break,
456 DatabaseExportAnswer::Schema(schema) => {
457 schema_file.write_all(schema.as_bytes())?;
458 schema_file.flush()?;
459 }
460 DatabaseExportAnswer::Items(items) => {
461 for item in items {
462 let mut buf = Vec::new();
463 item.encode_length_delimited(&mut buf)
464 .map_err(|_| Error::Migration(MigrationError::CannotEncodeExportedConcept))?;
465 data_writer.write_all(&buf)?;
466 }
467 }
468 }
469 }
470
471 data_writer.flush()?;
472 Ok(())
473 }
474}
475
476impl fmt::Display for ServerDatabase {
477 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
478 write!(f, "{}", self.name)
479 }
480}