pub mod db;
pub mod drivers;
#[cfg(feature = "db-sqlx")]
pub mod migrate;
pub mod outbox;
pub mod tx;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use futures::future::BoxFuture;
use crate::web::tenant::TenantConfig;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum AccessIntent {
Read,
Write,
}
#[derive(Debug)]
#[non_exhaustive]
pub struct DataError {
pub kind: DataErrorKind,
pub message: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum DataErrorKind {
Config,
Connection,
Query,
Timeout,
Conflict,
Other,
}
impl DataError {
pub fn new(kind: DataErrorKind, message: impl Into<String>) -> Self {
Self {
kind,
message: message.into(),
}
}
pub fn config(m: impl Into<String>) -> Self {
Self::new(DataErrorKind::Config, m)
}
pub fn connection(m: impl Into<String>) -> Self {
Self::new(DataErrorKind::Connection, m)
}
pub fn query(m: impl Into<String>) -> Self {
Self::new(DataErrorKind::Query, m)
}
pub fn timeout(m: impl Into<String>) -> Self {
Self::new(DataErrorKind::Timeout, m)
}
pub fn conflict(m: impl Into<String>) -> Self {
Self::new(DataErrorKind::Conflict, m)
}
pub fn other(m: impl Into<String>) -> Self {
Self::new(DataErrorKind::Other, m)
}
pub fn is_retryable(&self) -> bool {
matches!(
self.kind,
DataErrorKind::Connection | DataErrorKind::Timeout
)
}
}
impl std::fmt::Display for DataError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "datasource error ({:?}): {}", self.kind, self.message)
}
}
impl std::error::Error for DataError {}
pub trait DataSource: Send + Sync + 'static {
type Conn: Send;
fn acquire(&self, intent: AccessIntent) -> BoxFuture<'_, Result<Self::Conn, DataError>>;
fn name(&self) -> &'static str;
}
#[derive(Default)]
pub struct ReadAfterWritePin {
wrote: AtomicBool,
}
impl ReadAfterWritePin {
pub fn new() -> Self {
Self::default()
}
pub fn apply(&self, intent: AccessIntent) -> AccessIntent {
match intent {
AccessIntent::Write => {
self.wrote.store(true, Ordering::Relaxed);
AccessIntent::Write
}
AccessIntent::Read if self.wrote.load(Ordering::Relaxed) => AccessIntent::Write,
AccessIntent::Read => AccessIntent::Read,
}
}
}
pub struct DataSourceRegistry<D: DataSource> {
default: D,
by_name: HashMap<&'static str, D>,
}
impl<D: DataSource> DataSourceRegistry<D> {
pub fn new(default: D) -> Self {
Self {
default,
by_name: HashMap::new(),
}
}
pub fn with(mut self, name: &'static str, ds: D) -> Self {
self.by_name.insert(name, ds);
self
}
pub fn for_tenant(&self, tenant: Option<&TenantConfig>) -> &D {
tenant
.and_then(|t| self.by_name.get(t.datasource.as_str()))
.unwrap_or(&self.default)
}
pub fn iter(&self) -> impl Iterator<Item = (&'static str, &D)> {
let mut named: Vec<_> = self.by_name.iter().map(|(k, v)| (*k, v)).collect();
named.sort_by_key(|(k, _)| *k);
std::iter::once(("", &self.default)).chain(named)
}
pub async fn acquire(
&self,
ds: &D,
intent: AccessIntent,
pin: &ReadAfterWritePin,
) -> Result<D::Conn, DataError> {
ds.acquire(pin.apply(intent)).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::web::tenant::{TenantConfig, TenantId};
#[test]
fn pin_upgrades_reads_after_first_write() {
let pin = ReadAfterWritePin::new();
assert_eq!(pin.apply(AccessIntent::Read), AccessIntent::Read);
assert_eq!(pin.apply(AccessIntent::Write), AccessIntent::Write);
assert_eq!(pin.apply(AccessIntent::Read), AccessIntent::Write);
assert_eq!(pin.apply(AccessIntent::Read), AccessIntent::Write);
}
struct FakeDs(&'static str);
impl DataSource for FakeDs {
type Conn = ();
fn acquire(&self, _: AccessIntent) -> BoxFuture<'_, Result<(), DataError>> {
Box::pin(async { Ok(()) })
}
fn name(&self) -> &'static str {
self.0
}
}
fn tenant(ds: &str) -> TenantConfig {
TenantConfig {
id: TenantId::new("t"),
display_name: "T".into(),
datasource: ds.into(),
}
}
#[test]
fn registry_routes_by_tenant_datasource_with_default_fallback() {
let reg = DataSourceRegistry::new(FakeDs("default"))
.with("acme", FakeDs("acme"))
.with("globex", FakeDs("globex"));
assert_eq!(reg.for_tenant(Some(&tenant("acme"))).name(), "acme");
assert_eq!(reg.for_tenant(Some(&tenant("unknown"))).name(), "default");
assert_eq!(reg.for_tenant(None).name(), "default");
}
#[test]
fn data_error_taxonomy_classifies_retryability() {
assert!(DataError::connection("pool down").is_retryable());
assert!(DataError::timeout("slow").is_retryable());
assert!(!DataError::query("syntax").is_retryable());
assert!(!DataError::config("bad url").is_retryable());
assert!(!DataError::conflict("dup key").is_retryable());
let e = DataError::connection("x");
assert_eq!(e.kind, DataErrorKind::Connection);
assert!(e.to_string().contains("Connection"));
}
#[test]
fn registry_iter_is_deterministic_default_first() {
let reg = DataSourceRegistry::new(FakeDs("default"))
.with("b", FakeDs("b"))
.with("a", FakeDs("a"));
let order: Vec<&str> = reg.iter().map(|(n, _)| n).collect();
assert_eq!(order, vec!["", "a", "b"]);
}
}