pub mod db;
pub mod drivers;
#[cfg(feature = "db-sqlx")]
pub mod migrate;
pub mod outbox;
pub mod tx;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use futures::future::BoxFuture;
use crate::web::tenant::TenantConfig;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum AccessIntent {
Read,
Write,
}
#[derive(Debug)]
pub struct DataError(pub String);
impl std::fmt::Display for DataError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "datasource error: {}", self.0)
}
}
impl std::error::Error for DataError {}
pub trait DataSource: Send + Sync + 'static {
type Conn: Send;
fn acquire(&self, intent: AccessIntent) -> BoxFuture<'_, Result<Self::Conn, DataError>>;
fn name(&self) -> &'static str;
}
#[derive(Default)]
pub struct ReadAfterWritePin {
wrote: AtomicBool,
}
impl ReadAfterWritePin {
pub fn new() -> Self {
Self::default()
}
pub fn apply(&self, intent: AccessIntent) -> AccessIntent {
match intent {
AccessIntent::Write => {
self.wrote.store(true, Ordering::Relaxed);
AccessIntent::Write
}
AccessIntent::Read if self.wrote.load(Ordering::Relaxed) => AccessIntent::Write,
AccessIntent::Read => AccessIntent::Read,
}
}
}
pub struct DataSourceRegistry<D: DataSource> {
default: D,
by_name: HashMap<&'static str, D>,
}
impl<D: DataSource> DataSourceRegistry<D> {
pub fn new(default: D) -> Self {
Self {
default,
by_name: HashMap::new(),
}
}
pub fn with(mut self, name: &'static str, ds: D) -> Self {
self.by_name.insert(name, ds);
self
}
pub fn for_tenant(&self, tenant: Option<&TenantConfig>) -> &D {
tenant
.and_then(|t| self.by_name.get(t.datasource.as_str()))
.unwrap_or(&self.default)
}
pub fn iter(&self) -> impl Iterator<Item = (&'static str, &D)> {
let mut named: Vec<_> = self.by_name.iter().map(|(k, v)| (*k, v)).collect();
named.sort_by_key(|(k, _)| *k);
std::iter::once(("", &self.default)).chain(named)
}
pub async fn acquire(
&self,
ds: &D,
intent: AccessIntent,
pin: &ReadAfterWritePin,
) -> Result<D::Conn, DataError> {
ds.acquire(pin.apply(intent)).await
}
}