1#[cfg(not(feature = "sync"))]
21use std::future::Future;
22use std::{
23 collections::HashMap,
24 fmt,
25 fs::File,
26 io::{BufWriter, Write},
27 path::Path,
28 sync::{Arc, RwLock},
29 thread::sleep,
30 time::Duration,
31};
32
33use itertools::Itertools;
34use prost::Message;
35use tracing::{debug, error};
36
37use crate::{
38 common::{
39 address::Address,
40 error::ConnectionError,
41 info::{DatabaseInfo, ReplicaInfo},
42 Error, Result,
43 },
44 connection::{database::export_stream::DatabaseExportStream, server_connection::ServerConnection},
45 database::migration::{try_create_export_file, try_open_existing_export_file, DatabaseExportAnswer},
46 driver::TypeDBDriver,
47 error::{InternalError, MigrationError},
48 resolve, Transaction, TransactionOptions, TransactionType,
49};
50
51pub struct Database {
53 name: String,
54 replicas: RwLock<Vec<Replica>>,
55 server_connections: HashMap<Address, ServerConnection>,
56}
57
58impl Database {
59 const PRIMARY_REPLICA_TASK_MAX_RETRIES: usize = 10;
60 const FETCH_REPLICAS_MAX_RETRIES: usize = 10;
61 const WAIT_FOR_PRIMARY_REPLICA_SELECTION: Duration = Duration::from_secs(2);
62
63 pub(super) fn new(
64 database_info: DatabaseInfo,
65 server_connections: HashMap<Address, ServerConnection>,
66 ) -> Result<Self> {
67 let name = database_info.name.clone();
68 let replicas = RwLock::new(Replica::try_from_info(database_info, &server_connections)?);
69 Ok(Self { name, replicas, server_connections })
70 }
71
72 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
73 pub(super) async fn get(name: String, server_connections: HashMap<Address, ServerConnection>) -> Result<Self> {
74 Ok(Self {
75 name: name.clone(),
76 replicas: RwLock::new(Replica::fetch_all(name, &server_connections).await?),
77 server_connections,
78 })
79 }
80
81 pub fn name(&self) -> &str {
83 self.name.as_str()
84 }
85
86 pub fn replicas_info(&self) -> Vec<ReplicaInfo> {
95 self.replicas.read().unwrap().iter().map(Replica::to_info).collect()
96 }
97
98 pub fn primary_replica_info(&self) -> Option<ReplicaInfo> {
107 self.primary_replica().map(|replica| replica.to_info())
108 }
109
110 pub fn preferred_replica_info(&self) -> Option<ReplicaInfo> {
120 self.preferred_replica().map(|replica| replica.to_info())
121 }
122
123 #[cfg_attr(feature = "sync", doc = "database.delete();")]
129 #[cfg_attr(not(feature = "sync"), doc = "database.delete().await;")]
130 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
132 pub async fn delete(self: Arc<Self>) -> Result {
133 self.run_on_primary_replica(|database| database.delete()).await
134 }
135
136 #[cfg_attr(feature = "sync", doc = "database.schema();")]
142 #[cfg_attr(not(feature = "sync"), doc = "database.schema().await;")]
143 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
145 pub async fn schema(&self) -> Result<String> {
146 self.run_failsafe(|database| async move { database.schema().await }).await
147 }
148
149 #[cfg_attr(feature = "sync", doc = "database.type_schema();")]
155 #[cfg_attr(not(feature = "sync"), doc = "database.type_schema().await;")]
156 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
158 pub async fn type_schema(&self) -> Result<String> {
159 self.run_failsafe(|database| async move { database.type_schema().await }).await
160 }
161
162 #[cfg_attr(feature = "sync", doc = "database.export_to_file(schema_path, data_path);")]
174 #[cfg_attr(not(feature = "sync"), doc = "database.export_to_file(schema_path, data_path).await;")]
175 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
177 pub async fn export_to_file(&self, schema_file_path: impl AsRef<Path>, data_file_path: impl AsRef<Path>) -> Result {
178 let schema_file_path = schema_file_path.as_ref();
179 let data_file_path = data_file_path.as_ref();
180 if schema_file_path == data_file_path {
181 return Err(Error::Migration(MigrationError::CannotExportToTheSameFile));
182 }
183
184 let _ = try_create_export_file(schema_file_path)?;
185 if let Err(err) = try_create_export_file(data_file_path) {
186 let _ = std::fs::remove_file(schema_file_path);
187 return Err(err);
188 }
189
190 let result = self
191 .run_failsafe(|database| async move {
192 let schema_file = try_open_existing_export_file(schema_file_path)?;
194 let data_file = try_open_existing_export_file(data_file_path)?;
195 database.export_to_file(schema_file, data_file).await
196 })
197 .await;
198
199 if result.is_err() {
200 let _ = std::fs::remove_file(schema_file_path);
201 let _ = std::fs::remove_file(data_file_path);
202 }
203 result
204 }
205
206 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
207 pub(crate) async fn run_failsafe<F, P, R>(&self, task: F) -> Result<R>
208 where
209 F: Fn(ServerDatabase) -> P,
210 P: Future<Output = Result<R>>,
211 {
212 match self.run_on_any_replica(&task).await {
213 Err(Error::Connection(ConnectionError::ClusterReplicaNotPrimary)) => {
214 debug!("Attempted to run on a non-primary replica, retrying on primary...");
215 self.run_on_primary_replica(&task).await
216 }
217 res => res,
218 }
219 }
220
221 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
222 pub(super) async fn run_on_any_replica<F, P, R>(&self, task: F) -> Result<R>
223 where
224 F: Fn(ServerDatabase) -> P,
225 P: Future<Output = Result<R>>,
226 {
227 let replicas = self.replicas.read().unwrap().clone();
228 for replica in replicas {
229 match task(replica.database.clone()).await {
230 Err(Error::Connection(
231 ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed,
232 )) => {
233 debug!("Unable to connect to {}. Attempting next server.", replica.server);
234 }
235 res => return res,
236 }
237 }
238 Err(Self::unable_to_connect_error(&self.server_connections))
239 }
240
241 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
242 pub(super) async fn run_on_primary_replica<F, P, R>(&self, task: F) -> Result<R>
243 where
244 F: Fn(ServerDatabase) -> P,
245 P: Future<Output = Result<R>>,
246 {
247 let mut primary_replica =
248 if let Some(replica) = self.primary_replica() { replica } else { self.seek_primary_replica().await? };
249
250 for _ in 0..Self::PRIMARY_REPLICA_TASK_MAX_RETRIES {
251 match task(primary_replica.database.clone()).await {
252 Err(Error::Connection(
253 ConnectionError::ClusterReplicaNotPrimary
254 | ConnectionError::ServerConnectionFailedStatusError { .. }
255 | ConnectionError::ConnectionFailed,
256 )) => {
257 debug!("Primary replica error, waiting...");
258 Self::wait_for_primary_replica_selection().await;
259 primary_replica = self.seek_primary_replica().await?;
260 }
261 res => return res,
262 }
263 }
264 Err(Self::unable_to_connect_error(&self.server_connections))
265 }
266
267 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
268 async fn seek_primary_replica(&self) -> Result<Replica> {
269 for _ in 0..Self::FETCH_REPLICAS_MAX_RETRIES {
270 let replicas = Replica::fetch_all(self.name.clone(), &self.server_connections).await?;
271 *self.replicas.write().unwrap() = replicas;
272 if let Some(replica) = self.primary_replica() {
273 return Ok(replica);
274 }
275 Self::wait_for_primary_replica_selection().await;
276 }
277 Err(Self::unable_to_connect_error(&self.server_connections))
278 }
279
280 fn unable_to_connect_error(server_connections: &HashMap<Address, ServerConnection>) -> Error {
281 Error::Connection(ConnectionError::ServerConnectionFailed {
282 addresses: server_connections.keys().map(Address::clone).collect_vec(),
283 })
284 }
285
286 fn primary_replica(&self) -> Option<Replica> {
287 self.replicas.read().unwrap().iter().filter(|r| r.is_primary).max_by_key(|r| r.term).cloned()
288 }
289
290 fn preferred_replica(&self) -> Option<Replica> {
291 self.replicas.read().unwrap().iter().filter(|r| r.is_preferred).max_by_key(|r| r.term).cloned()
292 }
293
294 #[cfg(feature = "sync")]
295 fn wait_for_primary_replica_selection() {
296 sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION);
297 }
298
299 #[cfg(not(feature = "sync"))]
300 async fn wait_for_primary_replica_selection() {
301 tokio::time::sleep(Self::WAIT_FOR_PRIMARY_REPLICA_SELECTION).await
302 }
303}
304
305impl fmt::Debug for Database {
306 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
307 f.debug_struct("Database").field("name", &self.name).field("replicas", &self.replicas).finish()
308 }
309}
310
311#[derive(Clone)]
313pub(super) struct Replica {
314 server: Address,
316 database_name: String,
318 is_primary: bool,
320 term: i64,
322 is_preferred: bool,
324 database: ServerDatabase,
326}
327
328impl Replica {
329 fn new(name: String, metadata: ReplicaInfo, server_connection: ServerConnection) -> Self {
330 Self {
331 server: metadata.server,
332 database_name: name.clone(),
333 is_primary: metadata.is_primary,
334 term: metadata.term,
335 is_preferred: metadata.is_preferred,
336 database: ServerDatabase::new(name, server_connection),
337 }
338 }
339
340 fn try_from_info(
341 database_info: DatabaseInfo,
342 server_connections: &HashMap<Address, ServerConnection>,
343 ) -> Result<Vec<Self>> {
344 database_info
345 .replicas
346 .into_iter()
347 .map(|replica| {
348 if server_connections.len() == 1 {
349 Ok(Self::new(
350 database_info.name.clone(),
351 replica,
352 server_connections.values().next().unwrap().clone(),
353 ))
354 } else {
355 let server_connection = server_connections
357 .get(&replica.server)
358 .ok_or_else(|| InternalError::UnknownServer { server: replica.server.clone() })?;
359 Ok(Self::new(database_info.name.clone(), replica, server_connection.clone()))
360 }
361 })
362 .collect()
363 }
364
365 fn to_info(&self) -> ReplicaInfo {
366 ReplicaInfo {
367 server: self.server.clone(),
368 is_primary: self.is_primary,
369 is_preferred: self.is_preferred,
370 term: self.term,
371 }
372 }
373
374 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
375 async fn fetch_all(name: String, server_connections: &HashMap<Address, ServerConnection>) -> Result<Vec<Self>> {
376 for (server, server_connection) in server_connections {
377 let res = server_connection.get_database_replicas(name.clone()).await;
378 match res {
379 Ok(info) => {
380 return Self::try_from_info(info, server_connections);
381 }
382 Err(Error::Connection(
383 ConnectionError::DatabaseNotFound { .. }
384 | ConnectionError::ServerConnectionFailedStatusError { .. }
385 | ConnectionError::ConnectionFailed,
386 )) => {
387 error!(
388 "Failed to fetch replica info for database '{}' from {}. Attempting next server.",
389 name, server
390 );
391 }
392 Err(err) => return Err(err),
393 }
394 }
395 Err(Database::unable_to_connect_error(server_connections))
396 }
397}
398
399impl fmt::Debug for Replica {
400 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
401 f.debug_struct("Replica")
402 .field("server", &self.server)
403 .field("database_name", &self.database_name)
404 .field("is_primary", &self.is_primary)
405 .field("term", &self.term)
406 .field("is_preferred", &self.is_preferred)
407 .finish()
408 }
409}
410
411#[derive(Clone, Debug)]
412pub(crate) struct ServerDatabase {
413 name: String,
414 connection: ServerConnection,
415}
416
417impl ServerDatabase {
418 fn new(name: String, connection: ServerConnection) -> Self {
419 Self { name, connection }
420 }
421
422 pub fn name(&self) -> &str {
423 self.name.as_str()
424 }
425
426 pub(crate) fn connection(&self) -> &ServerConnection {
427 &self.connection
428 }
429
430 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
431 async fn delete(self) -> Result {
432 self.connection.delete_database(self.name).await
433 }
434
435 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
436 async fn schema(&self) -> Result<String> {
437 self.connection.database_schema(self.name.clone()).await
438 }
439
440 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
441 async fn type_schema(&self) -> Result<String> {
442 self.connection.database_type_schema(self.name.clone()).await
443 }
444
445 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
446 async fn export_to_file(&self, mut schema_file: File, data_file: File) -> Result {
447 let mut export_stream = self.connection.database_export(self.name.clone()).await?;
448 let mut data_writer = BufWriter::new(data_file);
449
450 loop {
451 match resolve!(export_stream.next())? {
452 DatabaseExportAnswer::Done => break,
453 DatabaseExportAnswer::Schema(schema) => {
454 schema_file.write_all(schema.as_bytes())?;
455 schema_file.flush()?;
456 }
457 DatabaseExportAnswer::Items(items) => {
458 for item in items {
459 let mut buf = Vec::new();
460 item.encode_length_delimited(&mut buf)
461 .map_err(|_| Error::Migration(MigrationError::CannotEncodeExportedConcept))?;
462 data_writer.write_all(&buf)?;
463 }
464 }
465 }
466 }
467
468 data_writer.flush()?;
469 Ok(())
470 }
471}
472
473impl fmt::Display for ServerDatabase {
474 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475 write!(f, "{}", self.name)
476 }
477}