use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU8, Ordering},
};
use arc_swap::ArcSwap;
use dashmap::DashMap;
use fraiseql_core::{db::traits::DatabaseAdapter, runtime::Executor};
use fraiseql_error::FraiseQLError;
use serde::Deserialize;
use tokio::sync::Semaphore;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum TenantStatus {
Active = 0,
Suspended = 1,
}
impl TenantStatus {
const fn from_u8(v: u8) -> Self {
match v {
1 => Self::Suspended,
_ => Self::Active,
}
}
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Active => "active",
Self::Suspended => "suspended",
}
}
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct TenantQuota {
#[serde(default)]
pub max_requests_per_sec: Option<u32>,
#[serde(default)]
pub max_concurrent: Option<u32>,
#[serde(default)]
pub max_storage_bytes: Option<u64>,
}
struct TenantEntry<A: DatabaseAdapter> {
executor: Arc<ArcSwap<Executor<A>>>,
status: AtomicU8,
concurrency: Option<Arc<Semaphore>>,
quota_exceeded: AtomicBool,
quota: TenantQuota,
}
impl<A: DatabaseAdapter> TenantEntry<A> {
fn new(executor: Arc<Executor<A>>) -> Self {
Self {
executor: Arc::new(ArcSwap::from(executor)),
status: AtomicU8::new(TenantStatus::Active as u8),
concurrency: None,
quota_exceeded: AtomicBool::new(false),
quota: TenantQuota::default(),
}
}
fn with_quota(mut self, quota: TenantQuota) -> Self {
self.concurrency = quota.max_concurrent.map(|n| Arc::new(Semaphore::new(n as usize)));
self.quota = quota;
self
}
fn status(&self) -> TenantStatus {
TenantStatus::from_u8(self.status.load(Ordering::Relaxed))
}
fn set_status(&self, status: TenantStatus) {
self.status.store(status as u8, Ordering::Relaxed);
}
}
const SUSPENDED_RETRY_AFTER_SECS: u64 = 60;
pub struct TenantExecutorRegistry<A: DatabaseAdapter> {
default: Arc<ArcSwap<Executor<A>>>,
tenants: DashMap<String, TenantEntry<A>>,
}
impl<A: DatabaseAdapter> TenantExecutorRegistry<A> {
#[must_use]
pub fn new(default: Arc<ArcSwap<Executor<A>>>) -> Self {
Self {
default,
tenants: DashMap::new(),
}
}
pub fn executor_for(
&self,
tenant_key: Option<&str>,
) -> fraiseql_error::Result<arc_swap::Guard<Arc<Executor<A>>>> {
match tenant_key {
None => Ok(self.default.load()),
Some(key) => {
let entry = self.tenants.get(key).ok_or_else(|| {
FraiseQLError::unauthorized(format!("Tenant '{key}' is not registered"))
})?;
self.require_active(key, entry.value())?;
Ok(entry.value().executor.load())
},
}
}
fn require_active(&self, key: &str, entry: &TenantEntry<A>) -> fraiseql_error::Result<()> {
if entry.status() == TenantStatus::Suspended {
return Err(FraiseQLError::ServiceUnavailable {
message: format!("Tenant '{key}' is suspended"),
retry_after: Some(SUSPENDED_RETRY_AFTER_SECS),
});
}
Ok(())
}
pub fn executor_for_admin(
&self,
key: &str,
) -> fraiseql_error::Result<arc_swap::Guard<Arc<Executor<A>>>> {
let entry = self.tenants.get(key).ok_or_else(|| {
FraiseQLError::unauthorized(format!("Tenant '{key}' is not registered"))
})?;
Ok(entry.value().executor.load())
}
pub fn upsert(&self, key: impl Into<String>, executor: Arc<Executor<A>>) -> bool {
let key = key.into();
if let Some(existing) = self.tenants.get(&key) {
existing.value().executor.store(executor);
false
} else {
self.tenants.insert(key, TenantEntry::new(executor));
true
}
}
pub fn upsert_with_quota(
&self,
key: impl Into<String>,
executor: Arc<Executor<A>>,
quota: TenantQuota,
) -> bool {
let key = key.into();
if let Some(existing) = self.tenants.get(&key) {
let prev_status = existing.value().status();
drop(existing);
self.tenants.remove(&key);
let entry = TenantEntry::new(executor).with_quota(quota);
entry.set_status(prev_status);
self.tenants.insert(key, entry);
false
} else {
self.tenants.insert(key, TenantEntry::new(executor).with_quota(quota));
true
}
}
pub fn try_acquire_concurrency(
&self,
key: &str,
) -> fraiseql_error::Result<Option<tokio::sync::OwnedSemaphorePermit>> {
let entry = self.tenants.get(key).ok_or_else(|| FraiseQLError::not_found("tenant", key))?;
if let Some(ref sem) = entry.value().concurrency {
match sem.clone().try_acquire_owned() {
Ok(permit) => Ok(Some(permit)),
Err(_) => Err(FraiseQLError::RateLimited {
message: format!(
"Tenant '{key}' concurrency limit reached (max {})",
entry.value().quota.max_concurrent.unwrap_or(0)
),
retry_after_secs: 1,
}),
}
} else {
Ok(None)
}
}
#[must_use]
pub fn is_quota_exceeded(&self, key: &str) -> bool {
self.tenants
.get(key)
.is_some_and(|e| e.value().quota_exceeded.load(Ordering::Relaxed))
}
pub fn set_quota_exceeded(&self, key: &str, exceeded: bool) {
if let Some(entry) = self.tenants.get(key) {
entry.value().quota_exceeded.store(exceeded, Ordering::Relaxed);
}
}
pub fn tenant_quota(&self, key: &str) -> fraiseql_error::Result<TenantQuota> {
let entry = self.tenants.get(key).ok_or_else(|| FraiseQLError::not_found("tenant", key))?;
Ok(entry.value().quota.clone())
}
pub fn remove(&self, key: &str) -> fraiseql_error::Result<()> {
self.tenants
.remove(key)
.map(|_| ())
.ok_or_else(|| FraiseQLError::not_found("tenant", key))
}
pub fn suspend(&self, key: &str) -> fraiseql_error::Result<()> {
let entry = self.tenants.get(key).ok_or_else(|| FraiseQLError::not_found("tenant", key))?;
entry.value().set_status(TenantStatus::Suspended);
Ok(())
}
pub fn resume(&self, key: &str) -> fraiseql_error::Result<()> {
let entry = self.tenants.get(key).ok_or_else(|| FraiseQLError::not_found("tenant", key))?;
entry.value().set_status(TenantStatus::Active);
Ok(())
}
pub fn tenant_status(&self, key: &str) -> fraiseql_error::Result<TenantStatus> {
let entry = self.tenants.get(key).ok_or_else(|| FraiseQLError::not_found("tenant", key))?;
Ok(entry.value().status())
}
#[must_use]
pub fn tenant_keys(&self) -> Vec<String> {
self.tenants.iter().map(|e| e.key().clone()).collect()
}
#[must_use]
pub fn len(&self) -> usize {
self.tenants.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.tenants.is_empty()
}
#[must_use]
pub fn default_executor(&self) -> arc_swap::Guard<Arc<Executor<A>>> {
self.default.load()
}
pub async fn health_check(&self, key: &str) -> fraiseql_error::Result<()> {
let entry = self.tenants.get(key).ok_or_else(|| FraiseQLError::not_found("tenant", key))?;
let executor = entry.value().executor.load();
executor.adapter().health_check().await
}
}