pub mod dialect;
pub mod find;
pub mod insert;
pub mod trait_impls;
pub mod trx_token;
pub mod update;
#[cfg(feature = "sqlite")]
pub mod sqlite_specific;
use std::sync::atomic::{AtomicBool, Ordering};
use crate::error::{WalletError, WalletResult};
use crate::storage::StorageConfig;
use crate::tables::Settings;
use crate::types::Chain;
pub struct StorageSqlx<DB: sqlx::Database> {
pub write_pool: sqlx::Pool<DB>,
pub read_pool: Option<sqlx::Pool<DB>>,
pub config: StorageConfig,
pub chain: Chain,
pub active: AtomicBool,
pub storage_identity_key: String,
pub settings: tokio::sync::RwLock<Option<Settings>>,
}
impl<DB: sqlx::Database> StorageSqlx<DB> {
pub fn read_pool(&self) -> &sqlx::Pool<DB> {
self.read_pool.as_ref().unwrap_or(&self.write_pool)
}
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Relaxed)
}
pub fn set_active(&self, active: bool) {
self.active.store(active, Ordering::Relaxed);
}
}
#[cfg(feature = "sqlite")]
pub type SqliteStorage = StorageSqlx<sqlx::Sqlite>;
#[cfg(feature = "sqlite")]
impl SqliteStorage {
pub async fn new_sqlite(config: StorageConfig, chain: Chain) -> WalletResult<Self> {
let (write_pool, read_pool) = sqlite_specific::create_sqlite_pools(&config).await?;
Ok(Self {
write_pool,
read_pool: Some(read_pool),
config,
chain,
active: AtomicBool::new(false),
storage_identity_key: String::new(),
settings: tokio::sync::RwLock::new(None),
})
}
pub async fn begin_sqlite_transaction(&self) -> WalletResult<trx_token::TrxToken> {
let tx = self.write_pool.begin().await?;
let inner: SqliteTrxInner = std::sync::Arc::new(tokio::sync::Mutex::new(Some(tx)));
Ok(trx_token::TrxToken::new(inner))
}
pub fn extract_sqlite_trx(trx: &trx_token::TrxToken) -> WalletResult<&SqliteTrxInner> {
trx.downcast_ref::<SqliteTrxInner>().ok_or_else(|| {
WalletError::Internal("TrxToken does not contain a SQLite transaction".to_string())
})
}
}
#[cfg(feature = "sqlite")]
pub type SqliteTrxInner =
std::sync::Arc<tokio::sync::Mutex<Option<sqlx::Transaction<'static, sqlx::Sqlite>>>>;
#[cfg(feature = "mysql")]
pub type MysqlStorage = StorageSqlx<sqlx::MySql>;
#[cfg(feature = "mysql")]
pub type MysqlTrxInner =
std::sync::Arc<tokio::sync::Mutex<Option<sqlx::Transaction<'static, sqlx::MySql>>>>;
#[cfg(feature = "mysql")]
impl MysqlStorage {
pub async fn new_mysql(config: StorageConfig, chain: Chain) -> WalletResult<Self> {
use sqlx::mysql::MySqlPoolOptions;
let pool = MySqlPoolOptions::new()
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.idle_timeout(config.idle_timeout)
.acquire_timeout(config.connect_timeout)
.connect(&config.url)
.await?;
Ok(Self {
write_pool: pool,
read_pool: None,
config,
chain,
active: AtomicBool::new(false),
storage_identity_key: String::new(),
settings: tokio::sync::RwLock::new(None),
})
}
pub async fn begin_mysql_transaction(&self) -> WalletResult<trx_token::TrxToken> {
let tx = self.write_pool.begin().await?;
let inner: MysqlTrxInner = std::sync::Arc::new(tokio::sync::Mutex::new(Some(tx)));
Ok(trx_token::TrxToken::new(inner))
}
pub fn extract_mysql_trx(trx: &trx_token::TrxToken) -> WalletResult<&MysqlTrxInner> {
trx.downcast_ref::<MysqlTrxInner>().ok_or_else(|| {
WalletError::Internal("TrxToken does not contain a MySQL transaction".to_string())
})
}
}
#[cfg(feature = "postgres")]
pub type PgStorage = StorageSqlx<sqlx::Postgres>;
#[cfg(feature = "postgres")]
pub type PgTrxInner =
std::sync::Arc<tokio::sync::Mutex<Option<sqlx::Transaction<'static, sqlx::Postgres>>>>;
#[cfg(feature = "postgres")]
impl PgStorage {
pub async fn new_postgres(config: StorageConfig, chain: Chain) -> WalletResult<Self> {
use sqlx::postgres::PgPoolOptions;
let pool = PgPoolOptions::new()
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.idle_timeout(config.idle_timeout)
.acquire_timeout(config.connect_timeout)
.connect(&config.url)
.await?;
Ok(Self {
write_pool: pool,
read_pool: None,
config,
chain,
active: AtomicBool::new(false),
storage_identity_key: String::new(),
settings: tokio::sync::RwLock::new(None),
})
}
pub async fn begin_pg_transaction(&self) -> WalletResult<trx_token::TrxToken> {
let tx = self.write_pool.begin().await?;
let inner: PgTrxInner = std::sync::Arc::new(tokio::sync::Mutex::new(Some(tx)));
Ok(trx_token::TrxToken::new(inner))
}
pub fn extract_pg_trx(trx: &trx_token::TrxToken) -> WalletResult<&PgTrxInner> {
trx.downcast_ref::<PgTrxInner>().ok_or_else(|| {
WalletError::Internal("TrxToken does not contain a PostgreSQL transaction".to_string())
})
}
}