pub mod bigquery;
pub mod database;
use colored::Colorize;
use crate::settings::{BigQuery, Database, Settings};
use thiserror::Error;
use tracing::info;
#[cfg(feature = "memory-database")]
use std::sync::Arc;
#[cfg(feature = "memory-database")]
use sea_orm::MockDatabase;
#[derive(Clone)]
pub struct ServerDatabase {
pub bigquery: Option<bigquery::BigQueryClient>,
pub databases: Vec<database::DatabaseClient>,
}
impl ServerDatabase {
#[cfg(feature = "memory-database")]
pub fn new_with_mock_database(name: String, database: MockDatabase) -> Self {
ServerDatabase {
bigquery: None,
databases: vec![database::DatabaseClient {
name,
connection: Arc::new(database.into_connection()),
}],
}
}
#[cfg(feature = "memory-database")]
pub async fn new_with_memory_database(name: String) -> Result<Self> {
info!("Initializing ServerDatabase with memory database...");
let database = database::DatabaseClient::new_with_memory_database(name.clone())
.await
.map_err(|e| DataError::Database(e.to_string()))?;
Ok(ServerDatabase {
bigquery: None,
databases: vec![database],
})
}
pub(crate) async fn new_with_settings(settings: &Settings) -> Result<Self> {
info!("Initializing ServerDatabase with provided settings...");
let data = settings.data.as_ref();
let is_enabled = |flag: Option<bool>| flag.unwrap_or(true);
let bigquery = match data.and_then(|d| d.bigquery.as_ref()) {
Some(cfg) if is_enabled(cfg.enabled) => {
Some(ServerDatabase::create_bigquery_client(cfg).await?)
}
_ => None,
};
let mut databases = Vec::new();
if let Some(configs) = data.and_then(|d| d.databases.as_ref()) {
for config in configs {
if is_enabled(config.enabled) {
databases.push(ServerDatabase::create_database_client(config).await?);
}
}
}
Ok(ServerDatabase {
bigquery,
databases,
})
}
async fn create_bigquery_client(settings: &BigQuery) -> Result<bigquery::BigQueryClient> {
info!("Setting up BigQuery client...");
let credential = settings.credential.as_ref().ok_or_else(|| {
DataError::Configuration("BigQuery credential is missing.".red().to_string())
})?;
let dataset = settings.dataset.as_ref().ok_or_else(|| {
DataError::Configuration("BigQuery dataset is missing.".red().to_string())
})?;
let client =
bigquery::BigQueryClient::new_with_credentials_and_dataset(credential, dataset)
.await
.map_err(|e| DataError::BigQuery(e.to_string().red().to_string()))?;
info!("BigQuery client initialized.");
if settings.print_tables.unwrap_or(false) {
Self::log_bigquery_tables(&client);
}
Ok(client)
}
fn log_bigquery_tables(client: &bigquery::BigQueryClient) {
let tables = client.get_tables();
info!(
"{} {}",
"Database initialized successfully.".white(),
format!("Loaded {} BigQuery tables.", tables.len()).white()
);
for table in tables {
info!(
"\t{}",
format!("Table: {}", table.resource()).white().dimmed()
);
}
}
async fn create_database_client(
settings: &Database,
) -> Result<database::DatabaseClient, DataError> {
info!("Setting up [{}] Database client...", settings.name.blue());
let url = settings.url.as_ref().ok_or_else(|| {
DataError::Configuration(format!(
"Database URL is missing for [{}].",
settings.name.blue()
))
})?;
let database_client = database::DatabaseClient::new(
settings.name.clone(),
url.clone(),
settings.min_pool_size.unwrap_or(1),
settings.max_pool_size.unwrap_or(1),
settings.logging,
settings.logging_level.clone(),
settings.aquire_timeout,
settings.max_lifetime,
settings.idle_timeout,
settings.connect_timeout,
)
.await
.map_err(|e| DataError::Database(e.to_string().red().to_string()))?;
info!(
"Database client initialized for [{}].",
settings.name.blue()
);
Self::validate_connection(&database_client).await?;
Ok(database_client)
}
async fn validate_connection(client: &database::DatabaseClient) -> Result<()> {
client
.connection
.ping()
.await
.map_err(|e| DataError::Database(e.to_string().red().to_string()))?;
let backend = client.connection.get_database_backend();
info!(
"The database connection is valid on the backend {:?} for [{}].",
backend,
client.name.blue(),
);
Ok(())
}
#[cfg(feature = "memory-database")]
pub fn close(&self) {
let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
rt.block_on(async {
for database in self.databases.iter() {
let db = database.clone();
if let Ok(conn) = Arc::try_unwrap(db.connection) {
let _ = conn.close().await;
}
}
});
}
#[cfg(not(feature = "memory-database"))]
pub fn close(&self) {
let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
rt.block_on(async {
for database in self.databases.iter() {
let _ = database.connection.clone().close().await;
}
});
}
}
pub type Result<T, E = DataError> = std::result::Result<T, E>;
#[derive(Debug, Error)]
pub enum DataError {
#[error("Invalid database configuration: {0}")]
Configuration(String),
#[error("BigQuery error: {0}")]
BigQuery(String),
#[error("Database error: {0}")]
Database(String),
}