use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use crate::crypto::os_random;
use crate::crypto::sha256::sha256;
use crate::storage::encryption::argon2id::{derive_key, Argon2Params};
use crate::storage::engine::pager::Pager;
use super::column_policy_gate::{ColumnAccessRequest, ColumnPolicyGate, ColumnPolicyOutcome};
use super::policies::{self as iam_policies, EvalContext, Policy, ResourceRef, SimulationOutcome};
use super::privileges::{
check_grant, Action, AuthzContext, AuthzError, Grant, GrantPrincipal, GrantsView,
PermissionCache, Resource, UserAttributes,
};
use super::vault::{KeyPair, Vault, VaultState};
use super::{now_ms, ApiKey, AuthConfig, AuthError, Role, Session, User, UserId};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum PrincipalRef {
User(UserId),
Group(String),
}
pub const PUBLIC_IAM_GROUP: &str = "__public__";
#[derive(Debug, Clone, Default)]
pub struct SimCtx {
pub current_tenant: Option<String>,
pub peer_ip: Option<std::net::IpAddr>,
pub mfa_present: bool,
pub now_ms: Option<u128>,
}
#[derive(Debug)]
pub struct BootstrapResult {
pub user: User,
pub api_key: ApiKey,
pub certificate: Option<String>,
}
pub struct AuthStore {
users: RwLock<HashMap<UserId, User>>,
sessions: RwLock<HashMap<String, Session>>,
api_key_index: RwLock<HashMap<String, (UserId, Role)>>,
bootstrapped: AtomicBool,
config: AuthConfig,
vault: RwLock<Option<Vault>>,
pager: Option<Arc<Pager>>,
keypair: RwLock<Option<KeyPair>>,
vault_kv: RwLock<HashMap<String, String>>,
grants: RwLock<HashMap<UserId, Vec<Grant>>>,
public_grants: RwLock<Vec<Grant>>,
user_attributes: RwLock<HashMap<UserId, UserAttributes>>,
session_count_by_user: RwLock<HashMap<UserId, u32>>,
permission_cache: RwLock<HashMap<UserId, PermissionCache>>,
policies: RwLock<HashMap<String, Arc<Policy>>>,
user_attachments: RwLock<HashMap<UserId, Vec<String>>>,
group_attachments: RwLock<HashMap<String, Vec<String>>>,
iam_effective_cache: RwLock<HashMap<UserId, Vec<Arc<Policy>>>>,
iam_authorization_enabled: AtomicBool,
visible_collections_cache: super::scope_cache::AuthCache,
}
fn auth_argon2_params() -> Argon2Params {
Argon2Params {
m_cost: 4 * 1024, t_cost: 3,
p: 1,
tag_len: 32,
}
}
impl AuthStore {
pub fn new(config: AuthConfig) -> Self {
Self {
users: RwLock::new(HashMap::new()),
sessions: RwLock::new(HashMap::new()),
api_key_index: RwLock::new(HashMap::new()),
bootstrapped: AtomicBool::new(false),
config,
vault: RwLock::new(None),
pager: None,
keypair: RwLock::new(None),
vault_kv: RwLock::new(HashMap::new()),
grants: RwLock::new(HashMap::new()),
public_grants: RwLock::new(Vec::new()),
user_attributes: RwLock::new(HashMap::new()),
session_count_by_user: RwLock::new(HashMap::new()),
permission_cache: RwLock::new(HashMap::new()),
policies: RwLock::new(HashMap::new()),
user_attachments: RwLock::new(HashMap::new()),
group_attachments: RwLock::new(HashMap::new()),
iam_effective_cache: RwLock::new(HashMap::new()),
iam_authorization_enabled: AtomicBool::new(false),
visible_collections_cache: super::scope_cache::AuthCache::new(
super::scope_cache::DEFAULT_TTL,
),
}
}
pub fn with_vault(
config: AuthConfig,
pager: Arc<Pager>,
passphrase: Option<&str>,
) -> Result<Self, AuthError> {
let vault = Vault::open(&pager, passphrase)?;
let mut store = Self::new(config);
if let Some(state) = vault.load(&pager)? {
store.restore_from_vault(state);
}
*store.vault.write().unwrap_or_else(|e| e.into_inner()) = Some(vault);
store.pager = Some(pager);
Ok(store)
}
pub fn config(&self) -> &AuthConfig {
&self.config
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
pub fn needs_bootstrap(&self) -> bool {
!self.bootstrapped.load(Ordering::Acquire)
&& self.users.read().map(|u| u.is_empty()).unwrap_or(true)
}
fn get_user_cloned(&self, id: &UserId) -> Option<User> {
self.users.read().ok().and_then(|m| m.get(id).cloned())
}
pub fn is_bootstrapped(&self) -> bool {
self.bootstrapped.load(Ordering::Acquire)
}
pub fn bootstrap(&self, username: &str, password: &str) -> Result<BootstrapResult, AuthError> {
if self
.bootstrapped
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return Err(AuthError::Forbidden(
"bootstrap already completed — sealed permanently".to_string(),
));
}
{
let users = self.users.read().map_err(lock_err)?;
if !users.is_empty() {
return Err(AuthError::Forbidden(
"bootstrap already completed — users exist".to_string(),
));
}
}
let user = self.create_user(username, password, Role::Admin)?;
let key = self.create_api_key(username, "bootstrap", Role::Admin)?;
let certificate = if let Some(ref pager) = self.pager {
let kp = KeyPair::generate();
let cert_hex = kp.certificate_hex();
let new_vault = Vault::with_certificate_bytes(pager, &kp.certificate)
.map_err(|e| AuthError::Internal(format!("vault re-seal failed: {e}")))?;
if let Ok(mut kp_guard) = self.keypair.write() {
*kp_guard = Some(kp);
}
if let Ok(mut vault_guard) = self.vault.write() {
*vault_guard = Some(new_vault);
}
self.ensure_vault_secret_key();
self.persist_to_vault();
Some(cert_hex)
} else {
None
};
Ok(BootstrapResult {
user,
api_key: key,
certificate,
})
}
pub fn bootstrap_from_env(&self) -> Option<BootstrapResult> {
if !self.needs_bootstrap() {
return None;
}
let username = std::env::var("REDDB_USERNAME").ok()?;
let password = std::env::var("REDDB_PASSWORD").ok()?;
if username.is_empty() || password.is_empty() {
return None;
}
match self.bootstrap(&username, &password) {
Ok(result) => {
tracing::info!(
username = %reddb_wire::audit_safe_log_field(&username),
"bootstrapped admin user from REDDB_USERNAME/REDDB_PASSWORD"
);
if let Some(ref cert) = result.certificate {
eprintln!("[reddb] CERTIFICATE: {}", cert);
tracing::warn!(
"vault certificate issued — save it: ONLY way to unseal after restart"
);
}
Some(result)
}
Err(e) => {
tracing::error!(err = %e, "env bootstrap failed");
None
}
}
}
fn persist_to_vault_result(&self) -> Result<(), AuthError> {
let vault_guard = self.vault.read().unwrap_or_else(|e| e.into_inner());
if let (Some(ref vault), Some(ref pager)) = (&*vault_guard, &self.pager) {
let state = self.snapshot();
vault.save(pager, &state)?;
}
Ok(())
}
fn persist_to_vault(&self) {
if let Err(e) = self.persist_to_vault_result() {
tracing::error!(err = %e, "vault persist failed");
crate::telemetry::operator_event::OperatorEvent::SecretRotationFailed {
secret_ref: "auth_vault".to_string(),
error: e.to_string(),
}
.emit_global();
}
}
pub fn is_vault_backed(&self) -> bool {
self.pager.is_some()
&& self
.vault
.read()
.map(|guard| guard.is_some())
.unwrap_or(false)
}
pub fn vault_kv_get(&self, key: &str) -> Option<String> {
self.vault_kv
.read()
.ok()
.and_then(|kv| kv.get(key).cloned())
}
pub fn vault_kv_snapshot(&self) -> HashMap<String, String> {
self.vault_kv
.read()
.map(|kv| kv.clone())
.unwrap_or_default()
}
pub fn vault_kv_export_encrypted(&self) -> Result<Option<String>, AuthError> {
if !self.is_vault_backed() {
return Err(AuthError::Forbidden(
"vault KV export requires an enabled, unsealed vault".to_string(),
));
}
let kv = self.vault_kv_snapshot();
if kv.is_empty() {
return Ok(None);
}
let vault_guard = self.vault.read().map_err(lock_err)?;
let vault = vault_guard.as_ref().ok_or_else(|| {
AuthError::Forbidden("vault KV export requires an enabled, unsealed vault".to_string())
})?;
let state = VaultState {
users: Vec::new(),
api_keys: Vec::new(),
bootstrapped: false,
master_secret: None,
kv,
};
Ok(Some(vault.seal_logical_export(&state)?))
}
pub fn vault_kv_try_import(
&self,
entries: HashMap<String, String>,
) -> Result<usize, AuthError> {
if !self.is_vault_backed() {
return Err(AuthError::Forbidden(
"vault KV import requires an enabled, unsealed vault".to_string(),
));
}
if entries.is_empty() {
return Ok(0);
}
let mut previous = HashMap::new();
{
let mut kv = self.vault_kv.write().map_err(lock_err)?;
for (key, value) in &entries {
previous.insert(key.clone(), kv.insert(key.clone(), value.clone()));
}
}
if let Err(err) = self.persist_to_vault_result() {
if let Ok(mut kv) = self.vault_kv.write() {
for (key, old) in previous {
match old {
Some(value) => {
kv.insert(key, value);
}
None => {
kv.remove(&key);
}
}
}
}
return Err(err);
}
Ok(entries.len())
}
pub fn vault_kv_try_import_placeholders(&self, keys: &[String]) -> Result<usize, AuthError> {
let entries = keys
.iter()
.map(|key| (key.clone(), "false".to_string()))
.collect();
self.vault_kv_try_import(entries)
}
pub fn vault_kv_set(&self, key: String, value: String) {
if let Ok(mut kv) = self.vault_kv.write() {
kv.insert(key, value);
}
self.persist_to_vault();
}
pub fn vault_kv_try_set(&self, key: String, value: String) -> Result<(), AuthError> {
if !self.is_vault_backed() {
return Err(AuthError::Forbidden(
"SET SECRET requires an enabled, unsealed vault".to_string(),
));
}
let previous = {
let mut kv = self.vault_kv.write().map_err(lock_err)?;
kv.insert(key.clone(), value)
};
if let Err(err) = self.persist_to_vault_result() {
if let Ok(mut kv) = self.vault_kv.write() {
match previous {
Some(value) => {
kv.insert(key, value);
}
None => {
kv.remove(&key);
}
}
}
return Err(err);
}
Ok(())
}
pub fn vault_kv_delete(&self, key: &str) -> bool {
let existed = self
.vault_kv
.write()
.map(|mut kv| kv.remove(key).is_some())
.unwrap_or(false);
if existed {
self.persist_to_vault();
}
existed
}
pub fn vault_kv_try_delete(&self, key: &str) -> Result<bool, AuthError> {
if !self.is_vault_backed() {
return Err(AuthError::Forbidden(
"DELETE SECRET requires an enabled, unsealed vault".to_string(),
));
}
let removed = {
let mut kv = self.vault_kv.write().map_err(lock_err)?;
kv.remove(key)
};
if removed.is_none() {
return Ok(false);
}
if let Err(err) = self.persist_to_vault_result() {
if let Ok(mut kv) = self.vault_kv.write() {
if let Some(value) = removed {
kv.insert(key.to_string(), value);
}
}
return Err(err);
}
Ok(true)
}
pub fn vault_kv_keys(&self) -> Vec<String> {
self.vault_kv
.read()
.map(|kv| kv.keys().cloned().collect())
.unwrap_or_default()
}
pub fn vault_secret_key(&self) -> Option<[u8; 32]> {
let hex_str = self.vault_kv_get("red.secret.aes_key")?;
let bytes = hex::decode(hex_str).ok()?;
if bytes.len() == 32 {
let mut key = [0u8; 32];
key.copy_from_slice(&bytes);
Some(key)
} else {
None
}
}
pub fn ensure_vault_secret_key(&self) {
if self.vault_kv_get("red.secret.aes_key").is_none() {
let key = random_bytes(32);
self.vault_kv_set("red.secret.aes_key".to_string(), hex::encode(key));
}
}
fn snapshot(&self) -> VaultState {
let users_guard = self.users.read().unwrap_or_else(|e| e.into_inner());
let users: Vec<User> = users_guard.values().cloned().collect();
let mut api_keys = Vec::new();
for user in &users {
let owner = UserId::from_parts(user.tenant_id.as_deref(), &user.username);
for key in &user.api_keys {
api_keys.push((owner.clone(), key.clone()));
}
}
let master_secret = self
.keypair
.read()
.ok()
.and_then(|guard| guard.as_ref().map(|kp| kp.master_secret.clone()));
let kv = self.vault_kv.read().map(|m| m.clone()).unwrap_or_default();
VaultState {
users,
api_keys,
bootstrapped: self.bootstrapped.load(Ordering::Acquire),
master_secret,
kv,
}
}
fn restore_from_vault(&mut self, state: VaultState) {
if state.bootstrapped {
self.bootstrapped.store(true, Ordering::Release);
}
if let Some(secret) = state.master_secret {
let kp = KeyPair::from_master_secret(secret);
if let Ok(mut guard) = self.keypair.write() {
*guard = Some(kp);
}
}
if let Ok(mut kv) = self.vault_kv.write() {
*kv = state.kv;
}
let mut users = self.users.write().unwrap_or_else(|e| e.into_inner());
let mut idx = self
.api_key_index
.write()
.unwrap_or_else(|e| e.into_inner());
for user in state.users {
let id = UserId::from_parts(user.tenant_id.as_deref(), &user.username);
for key in &user.api_keys {
idx.insert(key.key.clone(), (id.clone(), key.role));
}
users.insert(id, user);
}
drop(idx);
drop(users);
self.rehydrate_acl();
self.rehydrate_iam();
}
pub fn create_user(
&self,
username: &str,
password: &str,
role: Role,
) -> Result<User, AuthError> {
self.create_user_in_tenant(None, username, password, role)
}
pub fn create_user_in_tenant(
&self,
tenant_id: Option<&str>,
username: &str,
password: &str,
role: Role,
) -> Result<User, AuthError> {
let id = UserId::from_parts(tenant_id, username);
let mut users = self.users.write().map_err(lock_err)?;
if users.contains_key(&id) {
return Err(AuthError::UserExists(id.to_string()));
}
let now = now_ms();
let user = User {
username: username.to_string(),
tenant_id: tenant_id.map(|s| s.to_string()),
password_hash: hash_password(password),
scram_verifier: Some(make_scram_verifier(password)),
role,
api_keys: Vec::new(),
created_at: now,
updated_at: now,
enabled: true,
};
users.insert(id, user.clone());
drop(users); self.persist_to_vault();
Ok(user)
}
pub fn lookup_scram_verifier(&self, id: &UserId) -> Option<crate::auth::scram::ScramVerifier> {
let users = self.users.read().ok()?;
users.get(id).and_then(|u| u.scram_verifier.clone())
}
pub fn lookup_scram_verifier_global(
&self,
username: &str,
) -> Option<crate::auth::scram::ScramVerifier> {
self.lookup_scram_verifier(&UserId::platform(username))
}
pub fn list_users(&self) -> Vec<User> {
let users = match self.users.read() {
Ok(g) => g,
Err(_) => return Vec::new(),
};
users
.values()
.map(|u| User {
password_hash: String::new(), ..u.clone()
})
.collect()
}
pub fn list_users_scoped(&self, tenant_filter: Option<Option<&str>>) -> Vec<User> {
let users = match self.users.read() {
Ok(g) => g,
Err(_) => return Vec::new(),
};
users
.values()
.filter(|u| match tenant_filter {
None => true,
Some(t) => u.tenant_id.as_deref() == t,
})
.map(|u| User {
password_hash: String::new(), ..u.clone()
})
.collect()
}
pub fn get_user(&self, tenant_id: Option<&str>, username: &str) -> Option<User> {
let id = UserId::from_parts(tenant_id, username);
self.get_user_cloned(&id).map(|u| User {
password_hash: String::new(),
..u
})
}
pub fn delete_user(&self, username: &str) -> Result<(), AuthError> {
self.delete_user_in_tenant(None, username)
}
pub fn delete_user_in_tenant(
&self,
tenant_id: Option<&str>,
username: &str,
) -> Result<(), AuthError> {
let id = UserId::from_parts(tenant_id, username);
let mut users = self.users.write().map_err(lock_err)?;
let user = users
.remove(&id)
.ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
if let Ok(mut idx) = self.api_key_index.write() {
for api_key in &user.api_keys {
idx.remove(&api_key.key);
}
}
if let Ok(mut sessions) = self.sessions.write() {
sessions
.retain(|_, s| !(s.username == username && s.tenant_id.as_deref() == tenant_id));
}
self.persist_to_vault();
Ok(())
}
pub fn change_password(
&self,
username: &str,
old_password: &str,
new_password: &str,
) -> Result<(), AuthError> {
self.change_password_in_tenant(None, username, old_password, new_password)
}
pub fn change_password_in_tenant(
&self,
tenant_id: Option<&str>,
username: &str,
old_password: &str,
new_password: &str,
) -> Result<(), AuthError> {
let id = UserId::from_parts(tenant_id, username);
let mut users = self.users.write().map_err(lock_err)?;
let user = users
.get_mut(&id)
.ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
if !verify_password(old_password, &user.password_hash) {
return Err(AuthError::InvalidCredentials);
}
user.password_hash = hash_password(new_password);
user.scram_verifier = Some(make_scram_verifier(new_password));
user.updated_at = now_ms();
drop(users); self.persist_to_vault();
Ok(())
}
pub fn change_role(&self, username: &str, new_role: Role) -> Result<(), AuthError> {
self.change_role_in_tenant(None, username, new_role)
}
pub fn change_role_in_tenant(
&self,
tenant_id: Option<&str>,
username: &str,
new_role: Role,
) -> Result<(), AuthError> {
let id = UserId::from_parts(tenant_id, username);
let mut users = self.users.write().map_err(lock_err)?;
let user = users
.get_mut(&id)
.ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
let prior_role = user.role;
user.role = new_role;
user.updated_at = now_ms();
if new_role == Role::Admin && prior_role != Role::Admin {
crate::telemetry::operator_event::OperatorEvent::AdminCapabilityGranted {
granted_to: id.to_string(),
capability: "Role::Admin".to_string(),
granted_by: "auth_store::change_role".to_string(),
}
.emit_global();
}
for key in &mut user.api_keys {
if key.role > new_role {
key.role = new_role;
}
}
if let Ok(mut idx) = self.api_key_index.write() {
for key in &user.api_keys {
if let Some(entry) = idx.get_mut(&key.key) {
entry.1 = key.role;
}
}
}
self.persist_to_vault();
Ok(())
}
pub fn authenticate(&self, username: &str, password: &str) -> Result<Session, AuthError> {
self.authenticate_in_tenant(None, username, password)
}
pub fn authenticate_in_tenant(
&self,
tenant_id: Option<&str>,
username: &str,
password: &str,
) -> Result<Session, AuthError> {
let id = UserId::from_parts(tenant_id, username);
let users = self.users.read().map_err(lock_err)?;
let user = users.get(&id).ok_or(AuthError::InvalidCredentials)?;
if !user.enabled {
return Err(AuthError::InvalidCredentials);
}
if !verify_password(password, &user.password_hash) {
return Err(AuthError::InvalidCredentials);
}
let token = match self.keypair.read().ok().and_then(|g| {
g.as_ref().map(|kp| {
let token_id = random_hex(16);
let sig = kp.sign(format!("session:{}", token_id).as_bytes());
format!("rs_{}{}", token_id, hex::encode(&sig[..16]))
})
}) {
Some(signed_token) => signed_token,
None => generate_session_token(),
};
let now = now_ms();
let session = Session {
token,
username: username.to_string(),
tenant_id: user.tenant_id.clone(),
role: user.role,
created_at: now,
expires_at: now + (self.config.session_ttl_secs as u128 * 1000),
};
drop(users);
let mut sessions = self.sessions.write().map_err(lock_err)?;
sessions.insert(session.token.clone(), session.clone());
Ok(session)
}
pub fn validate_token(&self, token: &str) -> Option<(String, Role)> {
self.validate_token_full(token)
.map(|(id, role)| (id.username, role))
}
pub fn validate_token_full(&self, token: &str) -> Option<(UserId, Role)> {
if token.starts_with("rs_") {
if let Ok(sessions) = self.sessions.read() {
if let Some(session) = sessions.get(token) {
let now = now_ms();
if now < session.expires_at {
return Some((
UserId::from_parts(session.tenant_id.as_deref(), &session.username),
session.role,
));
}
}
}
return None;
}
if token.starts_with("rk_") {
if let Ok(idx) = self.api_key_index.read() {
return idx.get(token).cloned();
}
return None;
}
None
}
pub fn create_api_key(
&self,
username: &str,
name: &str,
role: Role,
) -> Result<ApiKey, AuthError> {
self.create_api_key_in_tenant(None, username, name, role)
}
pub fn create_api_key_in_tenant(
&self,
tenant_id: Option<&str>,
username: &str,
name: &str,
role: Role,
) -> Result<ApiKey, AuthError> {
let id = UserId::from_parts(tenant_id, username);
let mut users = self.users.write().map_err(lock_err)?;
let user = users
.get_mut(&id)
.ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
if role > user.role {
return Err(AuthError::RoleExceeded {
requested: role,
ceiling: user.role,
});
}
let api_key = ApiKey {
key: generate_api_key(),
name: name.to_string(),
role,
created_at: now_ms(),
};
user.api_keys.push(api_key.clone());
user.updated_at = now_ms();
if let Ok(mut idx) = self.api_key_index.write() {
idx.insert(api_key.key.clone(), (id.clone(), api_key.role));
}
drop(users); self.persist_to_vault();
Ok(api_key)
}
pub fn revoke_api_key(&self, key: &str) -> Result<(), AuthError> {
let mut users = self.users.write().map_err(lock_err)?;
let owner_id: UserId = {
if let Ok(idx) = self.api_key_index.read() {
if let Some((id, _)) = idx.get(key) {
id.clone()
} else {
return Err(AuthError::KeyNotFound(key.to_string()));
}
} else {
let owner = users
.iter()
.find(|(_, u)| u.api_keys.iter().any(|k| k.key == key));
match owner {
Some((id, _)) => id.clone(),
None => return Err(AuthError::KeyNotFound(key.to_string())),
}
}
};
let user = users
.get_mut(&owner_id)
.ok_or_else(|| AuthError::KeyNotFound(key.to_string()))?;
user.api_keys.retain(|k| k.key != key);
user.updated_at = now_ms();
if let Ok(mut idx) = self.api_key_index.write() {
idx.remove(key);
}
self.persist_to_vault();
Ok(())
}
pub fn revoke_session(&self, token: &str) {
if let Ok(mut sessions) = self.sessions.write() {
sessions.remove(token);
}
}
pub fn purge_expired_sessions(&self) -> usize {
let now = now_ms();
if let Ok(mut sessions) = self.sessions.write() {
let before = sessions.len();
sessions.retain(|_, s| s.expires_at > now);
return before - sessions.len();
}
0
}
pub fn grant(
&self,
granter: &UserId,
granter_role: Role,
principal: GrantPrincipal,
resource: Resource,
actions: Vec<Action>,
with_grant_option: bool,
tenant: Option<String>,
) -> Result<(), AuthError> {
if granter_role != Role::Admin {
return Err(AuthError::Forbidden(format!(
"GRANT requires Admin role; granter `{}` has `{:?}`",
granter, granter_role
)));
}
if granter.tenant.is_some() && granter.tenant != tenant {
return Err(AuthError::Forbidden(format!(
"cross-tenant GRANT denied: granter tenant `{:?}` != grant tenant `{:?}`",
granter.tenant, tenant
)));
}
let mut actions_set = std::collections::BTreeSet::new();
for a in actions {
actions_set.insert(a);
}
let g = Grant {
principal: principal.clone(),
resource,
actions: actions_set,
with_grant_option,
granted_by: granter.to_string(),
granted_at: now_ms(),
tenant,
columns: None,
};
match &principal {
GrantPrincipal::User(uid) => {
self.grants
.write()
.unwrap_or_else(|e| e.into_inner())
.entry(uid.clone())
.or_default()
.push(g.clone());
self.invalidate_permission_cache(Some(uid));
}
GrantPrincipal::Public => {
self.public_grants
.write()
.unwrap_or_else(|e| e.into_inner())
.push(g.clone());
self.invalidate_permission_cache(None);
}
GrantPrincipal::Group(_) => {
return Err(AuthError::Forbidden(
"GROUP principals are not yet supported; use a USER or PUBLIC".to_string(),
));
}
}
self.invalidate_visible_collections_for_tenant(g.tenant.as_deref());
self.persist_acl_to_kv();
Ok(())
}
pub fn revoke(
&self,
granter_role: Role,
principal: &GrantPrincipal,
resource: &Resource,
actions: &[Action],
) -> Result<usize, AuthError> {
if granter_role != Role::Admin {
return Err(AuthError::Forbidden(format!(
"REVOKE requires Admin role; granter has `{:?}`",
granter_role
)));
}
let removed = match principal {
GrantPrincipal::User(uid) => {
let mut g = self.grants.write().unwrap_or_else(|e| e.into_inner());
let before = g.get(uid).map(|v| v.len()).unwrap_or(0);
if let Some(list) = g.get_mut(uid) {
list.retain(|gr| {
!(gr.resource == *resource
&& (actions.iter().any(|a| gr.actions.contains(a))
|| (gr.actions.contains(&Action::All) && !actions.is_empty())))
});
}
let after = g.get(uid).map(|v| v.len()).unwrap_or(0);
drop(g);
self.invalidate_permission_cache(Some(uid));
before - after
}
GrantPrincipal::Public => {
let mut p = self
.public_grants
.write()
.unwrap_or_else(|e| e.into_inner());
let before = p.len();
p.retain(|gr| {
!(gr.resource == *resource
&& (actions.iter().any(|a| gr.actions.contains(a))
|| (gr.actions.contains(&Action::All) && !actions.is_empty())))
});
let after = p.len();
drop(p);
self.invalidate_permission_cache(None);
before - after
}
GrantPrincipal::Group(_) => 0,
};
if removed > 0 {
match principal {
GrantPrincipal::User(uid) => {
self.invalidate_visible_collections_for_tenant(uid.tenant.as_deref());
}
GrantPrincipal::Public | GrantPrincipal::Group(_) => {
self.invalidate_visible_collections_cache();
}
}
self.persist_acl_to_kv();
}
Ok(removed)
}
pub fn visible_collections_for_scope(
&self,
tenant: Option<&str>,
role: Role,
principal: &str,
all_collections: &[String],
) -> std::collections::HashSet<String> {
let key = super::scope_cache::ScopeKey::new(tenant, principal, role);
if let Some(hit) = self.visible_collections_cache.get(&key) {
return hit;
}
let ctx = AuthzContext {
principal,
effective_role: role,
tenant,
};
let mut visible = std::collections::HashSet::new();
for collection in all_collections {
let resource = Resource::table_from_name(collection);
if self.check_grant(&ctx, Action::Select, &resource).is_ok() {
visible.insert(collection.clone());
}
}
self.visible_collections_cache.insert(key, visible.clone());
visible
}
pub fn auth_cache_stats(&self) -> super::scope_cache::AuthCacheStats {
self.visible_collections_cache.stats()
}
pub fn invalidate_visible_collections_cache(&self) {
self.visible_collections_cache.invalidate_all();
}
pub fn invalidate_visible_collections_for_tenant(&self, tenant: Option<&str>) {
self.visible_collections_cache.invalidate_tenant(tenant);
}
pub fn effective_grants(&self, uid: &UserId) -> Vec<Grant> {
let mut out = Vec::new();
if let Ok(g) = self.grants.read() {
if let Some(list) = g.get(uid) {
out.extend(list.iter().cloned());
}
}
if let Ok(p) = self.public_grants.read() {
out.extend(p.iter().cloned());
}
out
}
pub fn check_grant(
&self,
ctx: &AuthzContext<'_>,
action: Action,
resource: &Resource,
) -> Result<(), AuthzError> {
if ctx.effective_role == Role::Admin {
return Ok(());
}
let uid = UserId::from_parts(ctx.tenant, ctx.principal);
if let Ok(cache) = self.permission_cache.read() {
if let Some(pc) = cache.get(&uid) {
if pc.allows(resource, action) {
return Ok(());
}
}
}
let user_grants = self
.grants
.read()
.ok()
.and_then(|g| g.get(&uid).cloned())
.unwrap_or_default();
let any_user_grants = self
.grants
.read()
.ok()
.map(|g| g.values().any(|list| !list.is_empty()))
.unwrap_or(false);
let public_grants = self
.public_grants
.read()
.ok()
.map(|p| p.clone())
.unwrap_or_default();
if user_grants.is_empty() && public_grants.is_empty() && any_user_grants {
return Err(AuthzError::PermissionDenied {
action,
resource: resource.clone(),
principal: ctx.principal.to_string(),
});
}
let view = GrantsView {
user_grants: &user_grants,
public_grants: &public_grants,
};
let result = check_grant(ctx, action, resource, &view);
if result.is_ok() {
let pc = PermissionCache::build(&user_grants, &public_grants);
if let Ok(mut cache) = self.permission_cache.write() {
cache.insert(uid, pc);
}
}
result
}
pub fn set_user_attributes(
&self,
uid: &UserId,
attrs: UserAttributes,
) -> Result<(), AuthError> {
let users = self.users.read().map_err(lock_err)?;
if !users.contains_key(uid) {
return Err(AuthError::UserNotFound(uid.to_string()));
}
drop(users);
self.user_attributes
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(uid.clone(), attrs);
self.invalidate_iam_cache(Some(uid));
self.persist_acl_to_kv();
Ok(())
}
pub fn user_attributes(&self, uid: &UserId) -> UserAttributes {
self.user_attributes
.read()
.ok()
.and_then(|m| m.get(uid).cloned())
.unwrap_or_default()
}
pub fn add_user_to_group(&self, uid: &UserId, group: &str) -> Result<(), AuthError> {
if group.trim().is_empty() {
return Err(AuthError::Forbidden("group name cannot be empty".into()));
}
let mut attrs = self.user_attributes(uid);
if !attrs.groups.iter().any(|g| g == group) {
attrs.groups.push(group.to_string());
attrs.groups.sort();
}
self.set_user_attributes(uid, attrs)
}
pub fn remove_user_from_group(&self, uid: &UserId, group: &str) -> Result<(), AuthError> {
let mut attrs = self.user_attributes(uid);
attrs.groups.retain(|g| g != group);
self.set_user_attributes(uid, attrs)
}
pub fn set_user_enabled(&self, uid: &UserId, enabled: bool) -> Result<(), AuthError> {
let mut users = self.users.write().map_err(lock_err)?;
let user = users
.get_mut(uid)
.ok_or_else(|| AuthError::UserNotFound(uid.to_string()))?;
user.enabled = enabled;
user.updated_at = now_ms();
drop(users);
self.persist_to_vault();
Ok(())
}
pub fn authenticate_with_attrs(
&self,
tenant_id: Option<&str>,
username: &str,
password: &str,
) -> Result<Session, AuthError> {
let uid = UserId::from_parts(tenant_id, username);
let attrs = self.user_attributes(&uid);
if let Some(deadline) = attrs.valid_until {
if now_ms() >= deadline {
return Err(AuthError::Forbidden(format!(
"account `{}` expired (VALID UNTIL exceeded)",
uid
)));
}
}
if let Some(limit) = attrs.connection_limit {
let current = self
.session_count_by_user
.read()
.ok()
.and_then(|m| m.get(&uid).copied())
.unwrap_or(0);
if current >= limit {
return Err(AuthError::Forbidden(format!(
"account `{}` exceeded CONNECTION LIMIT ({})",
uid, limit
)));
}
}
let session = self.authenticate_in_tenant(tenant_id, username, password)?;
if let Ok(mut counts) = self.session_count_by_user.write() {
*counts.entry(uid).or_insert(0) += 1;
}
Ok(session)
}
pub fn decrement_session_count(&self, uid: &UserId) {
if let Ok(mut counts) = self.session_count_by_user.write() {
if let Some(c) = counts.get_mut(uid) {
*c = c.saturating_sub(1);
}
}
}
pub fn rehydrate_acl(&self) {
let kv_snapshot: Vec<(String, String)> = self
.vault_kv
.read()
.map(|kv| {
kv.iter()
.filter(|(k, _)| {
k.starts_with("red.acl.grants.")
|| k.starts_with("red.acl.attrs.")
|| k == &"red.acl.public_grants"
})
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
})
.unwrap_or_default();
for (k, v) in kv_snapshot {
if k == "red.acl.public_grants" {
if let Some(parsed) = decode_grants_blob(&v) {
*self
.public_grants
.write()
.unwrap_or_else(|e| e.into_inner()) = parsed;
}
} else if let Some(suffix) = k.strip_prefix("red.acl.grants.") {
if let Some(uid) = decode_uid(suffix) {
if let Some(mut parsed) = decode_grants_blob(&v) {
for g in parsed.iter_mut() {
g.principal = GrantPrincipal::User(uid.clone());
}
self.grants
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(uid, parsed);
}
}
} else if let Some(suffix) = k.strip_prefix("red.acl.attrs.") {
if let Some(uid) = decode_uid(suffix) {
if let Some(parsed) = decode_attrs_blob(&v) {
self.user_attributes
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(uid, parsed);
}
}
}
}
self.permission_cache
.write()
.unwrap_or_else(|e| e.into_inner())
.clear();
}
fn persist_acl_to_kv(&self) {
let public = self
.public_grants
.read()
.ok()
.map(|p| encode_grants_blob(&p))
.unwrap_or_default();
self.vault_kv_set("red.acl.public_grants".to_string(), public);
let snapshot: Vec<(UserId, Vec<Grant>)> = self
.grants
.read()
.ok()
.map(|g| g.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
.unwrap_or_default();
for (uid, list) in snapshot {
let key = format!("red.acl.grants.{}", encode_uid(&uid));
let val = encode_grants_blob(&list);
self.vault_kv_set(key, val);
}
let attrs_snapshot: Vec<(UserId, UserAttributes)> = self
.user_attributes
.read()
.ok()
.map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
.unwrap_or_default();
for (uid, attrs) in attrs_snapshot {
let key = format!("red.acl.attrs.{}", encode_uid(&uid));
let val = encode_attrs_blob(&attrs);
self.vault_kv_set(key, val);
}
}
fn invalidate_permission_cache(&self, uid: Option<&UserId>) {
if let Ok(mut cache) = self.permission_cache.write() {
match uid {
Some(u) => {
cache.remove(u);
}
None => cache.clear(),
}
}
}
pub fn put_policy(&self, p: Policy) -> Result<(), AuthError> {
if p.id.starts_with("_grant_") || p.id.starts_with("_default_") {
return Err(AuthError::Forbidden(format!(
"policy id `{}` is reserved",
p.id
)));
}
self.put_policy_internal(p)
}
pub fn put_policy_internal(&self, p: Policy) -> Result<(), AuthError> {
p.validate()
.map_err(|e| AuthError::Forbidden(format!("invalid policy `{}`: {e}", p.id)))?;
let id = p.id.clone();
self.policies
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(id, Arc::new(p));
self.iam_authorization_enabled
.store(true, Ordering::Release);
self.iam_effective_cache
.write()
.unwrap_or_else(|e| e.into_inner())
.clear();
self.invalidate_visible_collections_cache();
self.persist_iam_to_kv();
Ok(())
}
pub fn iam_authorization_enabled(&self) -> bool {
self.iam_authorization_enabled.load(Ordering::Acquire)
}
pub fn delete_policy(&self, id: &str) -> Result<(), AuthError> {
let removed = self
.policies
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(id)
.is_some();
if !removed {
return Err(AuthError::Forbidden(format!("policy `{id}` not found")));
}
if let Ok(mut ua) = self.user_attachments.write() {
for ids in ua.values_mut() {
ids.retain(|p| p != id);
}
ua.retain(|_, v| !v.is_empty());
}
if let Ok(mut ga) = self.group_attachments.write() {
for ids in ga.values_mut() {
ids.retain(|p| p != id);
}
ga.retain(|_, v| !v.is_empty());
}
self.iam_effective_cache
.write()
.unwrap_or_else(|e| e.into_inner())
.clear();
self.invalidate_visible_collections_cache();
self.persist_iam_to_kv();
Ok(())
}
pub fn list_policies(&self) -> Vec<Arc<Policy>> {
let map = match self.policies.read() {
Ok(g) => g,
Err(_) => return Vec::new(),
};
let mut out: Vec<Arc<Policy>> = map.values().cloned().collect();
out.sort_by(|a, b| a.id.cmp(&b.id));
out
}
pub fn get_policy(&self, id: &str) -> Option<Arc<Policy>> {
self.policies.read().ok().and_then(|m| m.get(id).cloned())
}
pub fn group_policies(&self, group: &str) -> Vec<Arc<Policy>> {
let policies = self.policies.read();
let attachments = self.group_attachments.read();
let mut out = Vec::new();
if let (Ok(p_map), Ok(ga_map)) = (policies, attachments) {
if let Some(ids) = ga_map.get(group) {
for id in ids {
if let Some(p) = p_map.get(id) {
out.push(p.clone());
}
}
}
}
out.sort_by(|a, b| a.id.cmp(&b.id));
out
}
pub fn delete_synthetic_grant_policies(
&self,
principal: &GrantPrincipal,
resource: &Resource,
actions: &[Action],
) -> usize {
let attached = match principal {
GrantPrincipal::User(uid) => self
.user_attachments
.read()
.ok()
.and_then(|m| m.get(uid).cloned())
.unwrap_or_default(),
GrantPrincipal::Group(group) => self
.group_attachments
.read()
.ok()
.and_then(|m| m.get(group).cloned())
.unwrap_or_default(),
GrantPrincipal::Public => self
.group_attachments
.read()
.ok()
.and_then(|m| m.get(PUBLIC_IAM_GROUP).cloned())
.unwrap_or_default(),
};
if attached.is_empty() {
return 0;
}
let mut delete_ids = Vec::new();
if let Ok(policies) = self.policies.read() {
for id in attached {
let Some(policy) = policies.get(&id) else {
continue;
};
if !policy.id.starts_with("_grant_") {
continue;
}
if synthetic_grant_matches(policy, resource, actions) {
delete_ids.push(policy.id.clone());
}
}
}
let mut deleted = 0usize;
for id in delete_ids {
if self.delete_policy(&id).is_ok() {
deleted += 1;
}
}
deleted
}
pub fn attach_policy(&self, principal: PrincipalRef, policy_id: &str) -> Result<(), AuthError> {
if !self
.policies
.read()
.map(|m| m.contains_key(policy_id))
.unwrap_or(false)
{
return Err(AuthError::Forbidden(format!(
"policy `{policy_id}` not found"
)));
}
match &principal {
PrincipalRef::User(uid) => {
let mut ua = self
.user_attachments
.write()
.unwrap_or_else(|e| e.into_inner());
let list = ua.entry(uid.clone()).or_default();
if !list.iter().any(|p| p == policy_id) {
list.push(policy_id.to_string());
}
drop(ua);
self.invalidate_iam_cache(Some(uid));
}
PrincipalRef::Group(g) => {
let mut ga = self
.group_attachments
.write()
.unwrap_or_else(|e| e.into_inner());
let list = ga.entry(g.clone()).or_default();
if !list.iter().any(|p| p == policy_id) {
list.push(policy_id.to_string());
}
drop(ga);
self.invalidate_iam_cache(None);
}
}
self.persist_iam_to_kv();
Ok(())
}
pub fn detach_policy(&self, principal: PrincipalRef, policy_id: &str) -> Result<(), AuthError> {
match &principal {
PrincipalRef::User(uid) => {
if let Ok(mut ua) = self.user_attachments.write() {
if let Some(list) = ua.get_mut(uid) {
list.retain(|p| p != policy_id);
if list.is_empty() {
ua.remove(uid);
}
}
}
self.invalidate_iam_cache(Some(uid));
}
PrincipalRef::Group(g) => {
if let Ok(mut ga) = self.group_attachments.write() {
if let Some(list) = ga.get_mut(g) {
list.retain(|p| p != policy_id);
if list.is_empty() {
ga.remove(g);
}
}
}
self.invalidate_iam_cache(None);
}
}
self.persist_iam_to_kv();
Ok(())
}
pub fn effective_policies(&self, user: &UserId) -> Vec<Arc<Policy>> {
if let Ok(cache) = self.iam_effective_cache.read() {
if let Some(hit) = cache.get(user) {
return hit.clone();
}
}
let policies = self.policies.read();
let user_attachments = self.user_attachments.read();
let group_attachments = self.group_attachments.read();
let mut groups = self
.user_attributes
.read()
.ok()
.and_then(|m| m.get(user).map(|attrs| attrs.groups.clone()))
.unwrap_or_default();
groups.insert(0, PUBLIC_IAM_GROUP.to_string());
let mut out: Vec<Arc<Policy>> = Vec::new();
if let (Ok(p_map), Ok(ua_map), Ok(ga_map)) = (policies, user_attachments, group_attachments)
{
for group in groups {
if let Some(ids) = ga_map.get(&group) {
for id in ids {
if let Some(p) = p_map.get(id) {
out.push(p.clone());
}
}
}
}
if let Some(ids) = ua_map.get(user) {
for id in ids {
if let Some(p) = p_map.get(id) {
out.push(p.clone());
}
}
}
}
if let Ok(mut cache) = self.iam_effective_cache.write() {
cache.insert(user.clone(), out.clone());
}
out
}
pub fn simulate(
&self,
principal: &UserId,
action: &str,
resource: &ResourceRef,
ctx_extras: SimCtx,
) -> SimulationOutcome {
let user_role = self
.users
.read()
.ok()
.and_then(|u| u.get(principal).map(|u| u.role));
let principal_is_admin_role = user_role == Some(Role::Admin);
let now = ctx_extras.now_ms.unwrap_or_else(now_ms);
let ctx = EvalContext {
principal_tenant: principal.tenant.clone(),
current_tenant: ctx_extras.current_tenant,
peer_ip: ctx_extras.peer_ip,
mfa_present: ctx_extras.mfa_present,
now_ms: now,
principal_is_admin_role,
};
let pols = self.effective_policies(principal);
let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
iam_policies::simulate(&refs, action, resource, &ctx)
}
pub fn check_policy_authz(
&self,
principal: &UserId,
action: &str,
resource: &ResourceRef,
ctx: &EvalContext,
) -> bool {
let pols = self.effective_policies(principal);
let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
let decision = iam_policies::evaluate(&refs, action, resource, ctx);
matches!(
decision,
iam_policies::Decision::Allow { .. } | iam_policies::Decision::AdminBypass
)
}
pub fn check_column_projection_authz(
&self,
principal: &UserId,
request: &ColumnAccessRequest,
ctx: &EvalContext,
) -> ColumnPolicyOutcome {
let pols = self.effective_policies(principal);
let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
ColumnPolicyGate::new(&refs).evaluate(request, ctx)
}
fn invalidate_iam_cache(&self, uid: Option<&UserId>) {
if let Ok(mut cache) = self.iam_effective_cache.write() {
match uid {
Some(u) => {
cache.remove(u);
}
None => cache.clear(),
}
}
}
pub fn invalidate_all_iam_cache(&self) {
self.invalidate_iam_cache(None);
}
pub fn rehydrate_iam(&self) {
let mut enabled = self
.vault_kv_get("red.iam.enabled")
.map(|v| v == "true")
.unwrap_or(false);
if let Some(blob) = self.vault_kv_get("red.iam.policies") {
if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
if let Some(obj) = val.as_object() {
let mut map = HashMap::new();
for (id, body) in obj.iter() {
let s = body.to_string_compact();
if let Ok(p) = Policy::from_json_str(&s) {
map.insert(id.clone(), Arc::new(p));
}
}
if !map.is_empty() {
enabled = true;
}
*self.policies.write().unwrap_or_else(|e| e.into_inner()) = map;
}
}
}
if let Some(blob) = self.vault_kv_get("red.iam.attachments.users") {
if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
if let Some(obj) = val.as_object() {
let mut map: HashMap<UserId, Vec<String>> = HashMap::new();
for (encoded_uid, ids_v) in obj.iter() {
let Some(uid) = decode_uid(encoded_uid) else {
continue;
};
if let Some(arr) = ids_v.as_array() {
let ids: Vec<String> = arr
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
map.insert(uid, ids);
}
}
*self
.user_attachments
.write()
.unwrap_or_else(|e| e.into_inner()) = map;
}
}
}
if let Some(blob) = self.vault_kv_get("red.iam.attachments.groups") {
if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
if let Some(obj) = val.as_object() {
let mut map: HashMap<String, Vec<String>> = HashMap::new();
for (g, ids_v) in obj.iter() {
if let Some(arr) = ids_v.as_array() {
let ids: Vec<String> = arr
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
map.insert(g.clone(), ids);
}
}
*self
.group_attachments
.write()
.unwrap_or_else(|e| e.into_inner()) = map;
}
}
}
self.iam_authorization_enabled
.store(enabled, Ordering::Release);
self.invalidate_iam_cache(None);
}
fn persist_iam_to_kv(&self) {
let enabled = if self.iam_authorization_enabled() {
"true"
} else {
"false"
};
self.vault_kv_set("red.iam.enabled".to_string(), enabled.to_string());
let policies_obj = {
let map = self.policies.read().unwrap_or_else(|e| e.into_inner());
let mut obj = crate::serde_json::Map::new();
for (id, p) in map.iter() {
let s = p.to_json_string();
if let Ok(v) = crate::serde_json::from_str::<crate::serde_json::Value>(&s) {
obj.insert(id.clone(), v);
}
}
crate::serde_json::Value::Object(obj).to_string_compact()
};
self.vault_kv_set("red.iam.policies".to_string(), policies_obj);
let users_obj = {
let map = self
.user_attachments
.read()
.unwrap_or_else(|e| e.into_inner());
let mut obj = crate::serde_json::Map::new();
for (uid, ids) in map.iter() {
let arr = crate::serde_json::Value::Array(
ids.iter()
.map(|s| crate::serde_json::Value::String(s.clone()))
.collect(),
);
obj.insert(encode_uid(uid), arr);
}
crate::serde_json::Value::Object(obj).to_string_compact()
};
self.vault_kv_set("red.iam.attachments.users".to_string(), users_obj);
let groups_obj = {
let map = self
.group_attachments
.read()
.unwrap_or_else(|e| e.into_inner());
let mut obj = crate::serde_json::Map::new();
for (g, ids) in map.iter() {
let arr = crate::serde_json::Value::Array(
ids.iter()
.map(|s| crate::serde_json::Value::String(s.clone()))
.collect(),
);
obj.insert(g.clone(), arr);
}
crate::serde_json::Value::Object(obj).to_string_compact()
};
self.vault_kv_set("red.iam.attachments.groups".to_string(), groups_obj);
}
}
fn synthetic_grant_matches(policy: &Policy, resource: &Resource, actions: &[Action]) -> bool {
policy.statements.iter().any(|st| {
st.effect == crate::auth::policies::Effect::Allow
&& st.condition.is_none()
&& grant_actions_overlap(&st.actions, actions)
&& grant_resource_matches(&st.resources, resource)
})
}
fn grant_actions_overlap(
patterns: &[crate::auth::policies::ActionPattern],
actions: &[Action],
) -> bool {
if actions.contains(&Action::All) {
return true;
}
patterns.iter().any(|pat| match pat {
crate::auth::policies::ActionPattern::Wildcard => true,
crate::auth::policies::ActionPattern::Exact(s) => {
actions.iter().any(|a| s.eq_ignore_ascii_case(a.as_str()))
}
crate::auth::policies::ActionPattern::Prefix(_) => false,
})
}
fn grant_resource_matches(
patterns: &[crate::auth::policies::ResourcePattern],
resource: &Resource,
) -> bool {
let expected = grant_resource_pattern(resource);
patterns.iter().any(|pat| pat == &expected)
}
fn grant_resource_pattern(resource: &Resource) -> crate::auth::policies::ResourcePattern {
use crate::auth::policies::ResourcePattern;
match resource {
Resource::Database => ResourcePattern::Glob("table:*".to_string()),
Resource::Schema(s) => ResourcePattern::Glob(format!("table:{s}.*")),
Resource::Table { schema, table } => ResourcePattern::Exact {
kind: "table".to_string(),
name: match schema {
Some(s) => format!("{s}.{table}"),
None => table.clone(),
},
},
Resource::Function { schema, name } => ResourcePattern::Exact {
kind: "function".to_string(),
name: match schema {
Some(s) => format!("{s}.{name}"),
None => name.clone(),
},
},
}
}
fn encode_uid(uid: &UserId) -> String {
match &uid.tenant {
Some(t) => format!("{}/{}", t, uid.username),
None => format!("*/{}", uid.username),
}
}
fn decode_uid(s: &str) -> Option<UserId> {
let (tenant, username) = s.split_once('/')?;
Some(if tenant == "*" {
UserId::platform(username)
} else {
UserId::scoped(tenant, username)
})
}
fn encode_resource(r: &Resource) -> String {
match r {
Resource::Database => "db".into(),
Resource::Schema(s) => format!("schema:{}", s),
Resource::Table { schema, table } => {
format!("table:{}:{}", schema.as_deref().unwrap_or("*"), table)
}
Resource::Function { schema, name } => {
format!("func:{}:{}", schema.as_deref().unwrap_or("*"), name)
}
}
}
fn decode_resource(s: &str) -> Option<Resource> {
if s == "db" {
return Some(Resource::Database);
}
if let Some(rest) = s.strip_prefix("schema:") {
return Some(Resource::Schema(rest.to_string()));
}
if let Some(rest) = s.strip_prefix("table:") {
let (schema, table) = rest.split_once(':')?;
return Some(Resource::Table {
schema: if schema == "*" {
None
} else {
Some(schema.to_string())
},
table: table.to_string(),
});
}
if let Some(rest) = s.strip_prefix("func:") {
let (schema, name) = rest.split_once(':')?;
return Some(Resource::Function {
schema: if schema == "*" {
None
} else {
Some(schema.to_string())
},
name: name.to_string(),
});
}
None
}
fn encode_grants_blob(grants: &[Grant]) -> String {
let mut out = String::new();
for g in grants {
let actions: Vec<&str> = g.actions.iter().map(|a| a.as_str()).collect();
out.push_str(&format!(
"GRANT|{}|{}|{}|{}|{}|{}\n",
encode_resource(&g.resource),
actions.join(","),
g.with_grant_option,
g.tenant.as_deref().unwrap_or("*"),
g.granted_by,
g.granted_at,
));
}
out
}
fn decode_grants_blob(s: &str) -> Option<Vec<Grant>> {
let mut out = Vec::new();
for line in s.lines() {
if line.is_empty() {
continue;
}
let parts: Vec<&str> = line.split('|').collect();
if parts.len() != 7 || parts[0] != "GRANT" {
return None;
}
let resource = decode_resource(parts[1])?;
let mut actions = std::collections::BTreeSet::new();
for token in parts[2].split(',') {
if let Some(a) = Action::from_keyword(token) {
actions.insert(a);
}
}
let with_grant_option = parts[3] == "true";
let tenant = if parts[4] == "*" {
None
} else {
Some(parts[4].to_string())
};
let granted_by = parts[5].to_string();
let granted_at: u128 = parts[6].parse().unwrap_or(0);
out.push(Grant {
principal: GrantPrincipal::Public,
resource,
actions,
with_grant_option,
granted_by,
granted_at,
tenant,
columns: None,
});
}
Some(out)
}
fn encode_attrs_blob(a: &UserAttributes) -> String {
let valid = a
.valid_until
.map(|t| t.to_string())
.unwrap_or_else(|| "*".into());
let limit = a
.connection_limit
.map(|l| l.to_string())
.unwrap_or_else(|| "*".into());
let path = a.search_path.clone().unwrap_or_else(|| "*".into());
let groups = if a.groups.is_empty() {
"*".to_string()
} else {
a.groups.join(",")
};
format!("ATTR|{}|{}|{}|{}\n", valid, limit, path, groups)
}
fn decode_attrs_blob(s: &str) -> Option<UserAttributes> {
let line = s.lines().next()?;
let parts: Vec<&str> = line.split('|').collect();
if !(parts.len() == 4 || parts.len() == 5) || parts[0] != "ATTR" {
return None;
}
let groups = if parts.get(4).copied().unwrap_or("*") == "*" {
Vec::new()
} else {
parts[4]
.split(',')
.filter(|g| !g.is_empty())
.map(|g| g.to_string())
.collect()
};
Some(UserAttributes {
valid_until: if parts[1] == "*" {
None
} else {
parts[1].parse().ok()
},
connection_limit: if parts[2] == "*" {
None
} else {
parts[2].parse().ok()
},
search_path: if parts[3] == "*" {
None
} else {
Some(parts[3].to_string())
},
groups,
})
}
fn make_scram_verifier(password: &str) -> crate::auth::scram::ScramVerifier {
let salt = random_bytes(16);
crate::auth::scram::ScramVerifier::from_password(
password,
salt,
crate::auth::scram::DEFAULT_ITER,
)
}
pub(crate) fn hash_password(password: &str) -> String {
let salt = random_bytes(16);
let params = auth_argon2_params();
let hash = derive_key(password.as_bytes(), &salt, ¶ms);
format!("argon2id${}${}", hex::encode(&salt), hex::encode(&hash))
}
pub(crate) fn verify_password(password: &str, stored_hash: &str) -> bool {
let parts: Vec<&str> = stored_hash.splitn(3, '$').collect();
if parts.len() != 3 || parts[0] != "argon2id" {
return false;
}
let salt = match hex::decode(parts[1]) {
Ok(s) => s,
Err(_) => return false,
};
let expected_hash = match hex::decode(parts[2]) {
Ok(h) => h,
Err(_) => return false,
};
let params = auth_argon2_params();
let computed = derive_key(password.as_bytes(), &salt, ¶ms);
constant_time_eq(&computed, &expected_hash)
}
fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
if a.len() != b.len() {
return false;
}
let mut diff: u8 = 0;
for (x, y) in a.iter().zip(b.iter()) {
diff |= x ^ y;
}
diff == 0
}
fn generate_session_token() -> String {
format!("rs_{}", hex::encode(random_bytes(32)))
}
fn generate_api_key() -> String {
format!("rk_{}", hex::encode(random_bytes(32)))
}
fn random_hex(n: usize) -> String {
hex::encode(random_bytes(n))
}
pub(crate) fn random_bytes(n: usize) -> Vec<u8> {
let mut buf = vec![0u8; n.max(32)];
if os_random::fill_bytes(&mut buf).is_err() {
let seed = now_ms().to_le_bytes();
for (i, byte) in buf.iter_mut().enumerate() {
*byte = seed[i % seed.len()] ^ (i as u8);
}
}
let digest = sha256(&buf);
if n <= 32 {
digest[..n].to_vec()
} else {
let mut out = Vec::with_capacity(n);
let mut prev = digest;
while out.len() < n {
out.extend_from_slice(&prev[..std::cmp::min(32, n - out.len())]);
prev = sha256(&prev);
}
out
}
}
fn lock_err<T>(_: T) -> AuthError {
AuthError::Internal("lock poisoned".to_string())
}
#[cfg(test)]
mod tests {
use super::*;
fn test_config() -> AuthConfig {
AuthConfig {
enabled: true,
session_ttl_secs: 60,
require_auth: true,
auto_encrypt_storage: false,
vault_enabled: false,
cert: Default::default(),
oauth: Default::default(),
}
}
#[test]
fn test_create_and_list_users() {
let store = AuthStore::new(test_config());
store.create_user("alice", "pass1", Role::Admin).unwrap();
store.create_user("bob", "pass2", Role::Read).unwrap();
let users = store.list_users();
assert_eq!(users.len(), 2);
for u in &users {
assert!(u.password_hash.is_empty());
}
}
#[test]
fn test_create_duplicate_user() {
let store = AuthStore::new(test_config());
store.create_user("alice", "pass", Role::Admin).unwrap();
let err = store.create_user("alice", "pass2", Role::Read).unwrap_err();
assert!(matches!(err, AuthError::UserExists(_)));
}
#[test]
fn test_authenticate_and_validate() {
let store = AuthStore::new(test_config());
store.create_user("alice", "secret", Role::Write).unwrap();
let session = store.authenticate("alice", "secret").unwrap();
assert!(session.token.starts_with("rs_"));
let (username, role) = store.validate_token(&session.token).unwrap();
assert_eq!(username, "alice");
assert_eq!(role, Role::Write);
}
#[test]
fn test_authenticate_wrong_password() {
let store = AuthStore::new(test_config());
store.create_user("alice", "secret", Role::Read).unwrap();
let err = store.authenticate("alice", "wrong").unwrap_err();
assert!(matches!(err, AuthError::InvalidCredentials));
}
#[test]
fn test_api_key_lifecycle() {
let store = AuthStore::new(test_config());
store.create_user("alice", "pass", Role::Admin).unwrap();
let key = store
.create_api_key("alice", "ci-token", Role::Write)
.unwrap();
assert!(key.key.starts_with("rk_"));
let (username, role) = store.validate_token(&key.key).unwrap();
assert_eq!(username, "alice");
assert_eq!(role, Role::Write);
store.revoke_api_key(&key.key).unwrap();
assert!(store.validate_token(&key.key).is_none());
}
#[test]
fn test_api_key_role_exceeded() {
let store = AuthStore::new(test_config());
store.create_user("bob", "pass", Role::Read).unwrap();
let err = store
.create_api_key("bob", "escalate", Role::Admin)
.unwrap_err();
assert!(matches!(err, AuthError::RoleExceeded { .. }));
}
#[test]
fn test_change_password() {
let store = AuthStore::new(test_config());
store.create_user("alice", "old", Role::Write).unwrap();
store.change_password("alice", "old", "new").unwrap();
assert!(store.authenticate("alice", "old").is_err());
assert!(store.authenticate("alice", "new").is_ok());
}
#[test]
fn test_change_role() {
let store = AuthStore::new(test_config());
store.create_user("alice", "pass", Role::Admin).unwrap();
store.create_api_key("alice", "key1", Role::Admin).unwrap();
store.change_role("alice", Role::Read).unwrap();
let users = store.list_users();
let alice = users.iter().find(|u| u.username == "alice").unwrap();
assert_eq!(alice.role, Role::Read);
assert_eq!(alice.api_keys[0].role, Role::Read);
}
#[test]
fn test_delete_user() {
let store = AuthStore::new(test_config());
store.create_user("alice", "pass", Role::Admin).unwrap();
let key = store.create_api_key("alice", "key1", Role::Read).unwrap();
let session = store.authenticate("alice", "pass").unwrap();
store.delete_user("alice").unwrap();
assert!(store.validate_token(&key.key).is_none());
assert!(store.validate_token(&session.token).is_none());
assert!(store.list_users().is_empty());
}
#[test]
fn test_revoke_session() {
let store = AuthStore::new(test_config());
store.create_user("alice", "pass", Role::Read).unwrap();
let session = store.authenticate("alice", "pass").unwrap();
store.revoke_session(&session.token);
assert!(store.validate_token(&session.token).is_none());
}
#[test]
fn test_password_hash_format() {
let hash = hash_password("test");
assert!(hash.starts_with("argon2id$"));
let parts: Vec<&str> = hash.splitn(3, '$').collect();
assert_eq!(parts.len(), 3);
assert_eq!(parts[1].len(), 32);
assert_eq!(parts[2].len(), 64);
}
#[test]
fn test_constant_time_eq() {
assert!(constant_time_eq(b"hello", b"hello"));
assert!(!constant_time_eq(b"hello", b"world"));
assert!(!constant_time_eq(b"short", b"longer"));
}
#[test]
fn test_bootstrap_seals_permanently() {
let store = AuthStore::new(test_config());
assert!(store.needs_bootstrap());
assert!(!store.is_bootstrapped());
let result = store.bootstrap("admin", "secret");
assert!(result.is_ok());
let br = result.unwrap();
assert_eq!(br.user.username, "admin");
assert_eq!(br.user.role, Role::Admin);
assert!(br.api_key.key.starts_with("rk_"));
assert!(br.certificate.is_none());
assert!(!store.needs_bootstrap());
assert!(store.is_bootstrapped());
let result = store.bootstrap("admin2", "secret2");
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("sealed permanently"));
assert_eq!(store.list_users().len(), 1);
assert_eq!(store.list_users()[0].username, "admin");
}
#[test]
fn test_bootstrap_after_manual_user_creation() {
let store = AuthStore::new(test_config());
store.create_user("existing", "pass", Role::Read).unwrap();
assert!(!store.needs_bootstrap()); }
#[test]
fn test_same_username_two_tenants_distinct() {
let store = AuthStore::new(test_config());
store
.create_user_in_tenant(Some("acme"), "alice", "pw-acme", Role::Write)
.unwrap();
store
.create_user_in_tenant(Some("globex"), "alice", "pw-globex", Role::Read)
.unwrap();
let users = store.list_users();
assert_eq!(users.len(), 2);
assert!(store
.authenticate_in_tenant(Some("acme"), "alice", "pw-acme")
.is_ok());
assert!(store
.authenticate_in_tenant(Some("globex"), "alice", "pw-globex")
.is_ok());
assert!(store
.authenticate_in_tenant(Some("acme"), "alice", "pw-globex")
.is_err());
assert!(store
.authenticate_in_tenant(Some("globex"), "alice", "pw-acme")
.is_err());
}
#[test]
fn test_session_carries_tenant() {
let store = AuthStore::new(test_config());
store
.create_user_in_tenant(Some("acme"), "alice", "pw", Role::Admin)
.unwrap();
let session = store
.authenticate_in_tenant(Some("acme"), "alice", "pw")
.unwrap();
assert_eq!(session.tenant_id.as_deref(), Some("acme"));
let (id, role) = store.validate_token_full(&session.token).unwrap();
assert_eq!(id.tenant.as_deref(), Some("acme"));
assert_eq!(id.username, "alice");
assert_eq!(role, Role::Admin);
}
#[test]
fn test_platform_user_has_no_tenant() {
let store = AuthStore::new(test_config());
store.create_user("admin", "pw", Role::Admin).unwrap();
let session = store.authenticate("admin", "pw").unwrap();
assert!(session.tenant_id.is_none());
let (id, _) = store.validate_token_full(&session.token).unwrap();
assert!(id.tenant.is_none());
}
#[test]
fn test_lookup_scram_verifier_global_resolves_platform() {
let store = AuthStore::new(test_config());
store.create_user("admin", "pw", Role::Admin).unwrap();
store
.create_user_in_tenant(Some("acme"), "admin", "pw", Role::Admin)
.unwrap();
let v = store.lookup_scram_verifier_global("admin");
assert!(v.is_some());
let v_acme = store.lookup_scram_verifier(&UserId::scoped("acme", "admin"));
assert!(v_acme.is_some());
assert_ne!(v.unwrap().salt, v_acme.unwrap().salt);
}
#[test]
fn test_delete_in_tenant_does_not_touch_other_tenant() {
let store = AuthStore::new(test_config());
store
.create_user_in_tenant(Some("acme"), "alice", "pw", Role::Admin)
.unwrap();
store
.create_user_in_tenant(Some("globex"), "alice", "pw", Role::Admin)
.unwrap();
store.delete_user_in_tenant(Some("acme"), "alice").unwrap();
assert!(store
.authenticate_in_tenant(Some("globex"), "alice", "pw")
.is_ok());
assert!(store
.authenticate_in_tenant(Some("acme"), "alice", "pw")
.is_err());
}
#[test]
fn test_user_id_display() {
assert_eq!(UserId::platform("admin").to_string(), "admin");
assert_eq!(UserId::scoped("acme", "alice").to_string(), "acme/alice");
}
}