use crate::{
StorageError, StorageResult, UnifiedStorage,
config::{ConfigError, ENV_COLD_PATH, ENV_COLD_SQL_URL, ENV_HOT_PATH},
either::Either,
};
use signet_cold::ColdConnect;
use signet_cold_mdbx::MdbxConnector;
use signet_hot::HotConnect;
use std::env;
use tokio_util::sync::CancellationToken;
#[cfg(any(feature = "postgres", feature = "sqlite"))]
use signet_cold_sql::SqlConnector;
#[cfg(any(feature = "postgres", feature = "sqlite"))]
type EnvColdConnector = Either<MdbxConnector, SqlConnector>;
#[cfg(not(any(feature = "postgres", feature = "sqlite")))]
type EnvColdConnector = Either<MdbxConnector, ()>;
#[derive(Default, Debug)]
pub struct StorageBuilder<H = (), C = ()> {
hot_connector: H,
cold_connector: C,
cancel_token: Option<CancellationToken>,
}
impl StorageBuilder<(), ()> {
pub fn new() -> Self {
Self::default()
}
}
impl<H, C> StorageBuilder<H, C> {
pub fn hot<NewH>(self, hot_connector: NewH) -> StorageBuilder<NewH, C> {
StorageBuilder {
hot_connector,
cold_connector: self.cold_connector,
cancel_token: self.cancel_token,
}
}
pub fn cold<NewC>(self, cold_connector: NewC) -> StorageBuilder<H, NewC> {
StorageBuilder {
hot_connector: self.hot_connector,
cold_connector,
cancel_token: self.cancel_token,
}
}
#[must_use]
pub fn cancel_token(mut self, token: CancellationToken) -> Self {
self.cancel_token = Some(token);
self
}
}
impl<H, C> StorageBuilder<H, C>
where
H: HotConnect,
C: ColdConnect,
{
pub async fn build(self) -> StorageResult<UnifiedStorage<H::Hot, C::Cold>> {
let hot = self
.hot_connector
.connect()
.map_err(|e| StorageError::Config(format!("hot connection failed: {e}")))?;
let cold = self
.cold_connector
.connect()
.await
.map_err(|e| StorageError::Config(format!("cold connection failed: {e}")))?;
let cancel_token = self.cancel_token.unwrap_or_default();
Ok(UnifiedStorage::spawn(hot, cold, cancel_token))
}
}
impl StorageBuilder<MdbxConnector, EnvColdConnector> {
pub fn from_env() -> Result<Self, ConfigError> {
let hot_connector = MdbxConnector::from_env(ENV_HOT_PATH)?;
let has_mdbx = env::var(ENV_COLD_PATH).is_ok();
let has_sql = env::var(ENV_COLD_SQL_URL).is_ok();
let cold_connector = match (has_mdbx, has_sql) {
(true, false) => {
let mdbx = MdbxConnector::from_env(ENV_COLD_PATH)?;
Either::left(mdbx)
}
(false, true) => {
#[cfg(any(feature = "postgres", feature = "sqlite"))]
{
let sql = SqlConnector::from_env(ENV_COLD_SQL_URL)?;
Either::right(sql)
}
#[cfg(not(any(feature = "postgres", feature = "sqlite")))]
{
return Err(ConfigError::FeatureNotEnabled {
feature: "postgres or sqlite",
env_var: ENV_COLD_SQL_URL,
});
}
}
(true, true) => {
return Err(ConfigError::AmbiguousColdBackend);
}
(false, false) => {
return Err(ConfigError::MissingColdBackend);
}
};
Ok(Self { hot_connector, cold_connector, cancel_token: None })
}
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
fn _assert_send<T: Send>(_: T) {}
fn _build_is_send(b: StorageBuilder<MdbxConnector, MdbxConnector>) {
_assert_send(b.build());
}
#[test]
#[serial]
fn from_env_missing_hot_path() {
unsafe {
env::remove_var(ENV_HOT_PATH);
}
assert!(StorageBuilder::from_env().is_err());
}
#[test]
#[serial]
fn from_env_missing_cold_backend() {
unsafe {
env::set_var(ENV_HOT_PATH, "/tmp/hot");
env::remove_var(ENV_COLD_PATH);
env::remove_var(ENV_COLD_SQL_URL);
}
let result = StorageBuilder::from_env();
assert!(matches!(result, Err(ConfigError::MissingColdBackend)));
}
#[test]
#[serial]
fn from_env_ambiguous_cold_backend() {
unsafe {
env::set_var(ENV_HOT_PATH, "/tmp/hot");
env::set_var(ENV_COLD_PATH, "/tmp/cold");
env::set_var(ENV_COLD_SQL_URL, "postgres://localhost/db");
}
let result = StorageBuilder::from_env();
assert!(matches!(result, Err(ConfigError::AmbiguousColdBackend)));
}
#[test]
#[serial]
fn from_env_mdbx_cold() {
unsafe {
env::set_var(ENV_HOT_PATH, "/tmp/hot");
env::set_var(ENV_COLD_PATH, "/tmp/cold");
env::remove_var(ENV_COLD_SQL_URL);
}
let builder = StorageBuilder::from_env().unwrap();
assert!(matches!(builder.cold_connector, Either::Left(_)));
}
}