Skip to main content

reddb_server/auth/
store.rs

1//! AuthStore -- manages users, sessions, and API keys in memory.
2//!
3//! Password hashing delegates to the existing Argon2id implementation in
4//! `crate::storage::encryption::argon2id`.  Token generation uses the
5//! OS CSPRNG (`crate::crypto::os_random`) plus SHA-256.
6
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, RwLock};
10
11use crate::crypto::os_random;
12use crate::crypto::sha256::sha256;
13use crate::storage::encryption::argon2id::{derive_key, Argon2Params};
14use crate::storage::engine::pager::Pager;
15
16use super::column_policy_gate::{ColumnAccessRequest, ColumnPolicyGate, ColumnPolicyOutcome};
17use super::policies::{self as iam_policies, EvalContext, Policy, ResourceRef, SimulationOutcome};
18use super::privileges::{
19    check_grant, Action, AuthzContext, AuthzError, Grant, GrantPrincipal, GrantsView,
20    PermissionCache, Resource, UserAttributes,
21};
22use super::vault::{KeyPair, Vault, VaultState};
23use super::{now_ms, ApiKey, AuthConfig, AuthError, Role, Session, User, UserId};
24
25// ---------------------------------------------------------------------------
26// PrincipalRef + SimCtx — IAM policy attachments
27// ---------------------------------------------------------------------------
28
29/// Principal targeted by `attach_policy` / `detach_policy`.
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub enum PrincipalRef {
32    User(UserId),
33    Group(String),
34}
35
36/// Reserved IAM group that every principal belongs to. Used by the
37/// GRANT-to-PUBLIC compatibility layer.
38pub const PUBLIC_IAM_GROUP: &str = "__public__";
39
40/// Optional context overrides for `simulate` — anything not set falls back
41/// to a default value when the kernel evaluates the request.
42#[derive(Debug, Clone, Default)]
43pub struct SimCtx {
44    pub current_tenant: Option<String>,
45    pub peer_ip: Option<std::net::IpAddr>,
46    pub mfa_present: bool,
47    pub now_ms: Option<u128>,
48}
49
50// ---------------------------------------------------------------------------
51// BootstrapResult
52// ---------------------------------------------------------------------------
53
54/// Result of a successful bootstrap operation.
55///
56/// The `certificate` is the hex-encoded string the admin must save --
57/// it is the ONLY way to unseal the vault after a restart.
58#[derive(Debug)]
59pub struct BootstrapResult {
60    pub user: User,
61    pub api_key: ApiKey,
62    /// Certificate hex string.  `None` when vault is not configured.
63    pub certificate: Option<String>,
64}
65
66// ---------------------------------------------------------------------------
67// AuthStore
68// ---------------------------------------------------------------------------
69
70/// Central in-process authority for auth state.
71///
72/// All mutations are guarded by `RwLock`s so the store is `Send + Sync`.
73pub struct AuthStore {
74    /// `(tenant_id, username) -> User`. Tenant scoping is built into the
75    /// key so `alice@acme` and `alice@globex` are distinct identities.
76    users: RwLock<HashMap<UserId, User>>,
77    sessions: RwLock<HashMap<String, Session>>,
78    /// key-string -> (owner UserId, role)
79    api_key_index: RwLock<HashMap<String, (UserId, Role)>>,
80    /// Once true, bootstrap() is permanently sealed.
81    bootstrapped: AtomicBool,
82    config: AuthConfig,
83    /// Optional encrypted vault for persisting auth state to pager pages.
84    vault: RwLock<Option<Vault>>,
85    /// Reference to the pager for vault page I/O.
86    pager: Option<Arc<Pager>>,
87    /// Certificate-based keypair for token signing and vault seal.
88    /// Populated after bootstrap or after restoring from a sealed vault.
89    keypair: RwLock<Option<KeyPair>>,
90    /// Encrypted key-value store for arbitrary secrets.
91    /// Persisted to vault alongside users/api_keys.
92    vault_kv: RwLock<HashMap<String, String>>,
93    /// Per-user GRANT rows. Persisted via `vault_kv` under the
94    /// `red.acl.grants.<tenant>/<user>` key prefix so existing snapshot
95    /// logic keeps working without modification. See `privileges` module
96    /// for the resolution algorithm.
97    grants: RwLock<HashMap<UserId, Vec<Grant>>>,
98    /// PUBLIC grants — apply to every authenticated principal.
99    public_grants: RwLock<Vec<Grant>>,
100    /// PG-style account attributes (`VALID UNTIL`, `CONNECTION LIMIT`,
101    /// `search_path`). Keyed by `UserId`. Persisted under
102    /// `red.acl.attrs.<tenant>/<user>`.
103    user_attributes: RwLock<HashMap<UserId, UserAttributes>>,
104    /// Live session count per user — used by `CONNECTION LIMIT`
105    /// enforcement on login. Bumped at authenticate, decremented at
106    /// session revoke / expiry.
107    session_count_by_user: RwLock<HashMap<UserId, u32>>,
108    /// Pre-resolved (resource, action) cache built per-user so the
109    /// hot path skips a linear scan of the user's grants on every
110    /// statement. Invalidated on GRANT / REVOKE / ALTER USER.
111    permission_cache: RwLock<HashMap<UserId, PermissionCache>>,
112    /// IAM-style policies, keyed by id. Persisted under
113    /// `red.iam.policies`. The kernel in `super::policies` owns the
114    /// Policy type — this map just deduplicates and shares.
115    policies: RwLock<HashMap<String, Arc<Policy>>>,
116    /// Per-user policy attachments — ordered list of policy ids.
117    /// Persisted under `red.iam.attachments.users`.
118    user_attachments: RwLock<HashMap<UserId, Vec<String>>>,
119    /// Per-group policy attachments. Users join groups through
120    /// `UserAttributes::groups`; effective policies resolve group
121    /// attachments before user-direct attachments.
122    group_attachments: RwLock<HashMap<String, Vec<String>>>,
123    /// Cached effective `Vec<Arc<Policy>>` per user. Invalidated on
124    /// any policy mutation that affects the user's attachments.
125    iam_effective_cache: RwLock<HashMap<UserId, Vec<Arc<Policy>>>>,
126    /// Once any IAM policy is installed, authorization switches to the
127    /// IAM evaluator and stays deny-by-default even if policies are
128    /// later deleted. Persisted under `red.iam.enabled`.
129    iam_authorization_enabled: AtomicBool,
130    /// `(tenant, role) → HashSet<CollectionId>` cache used by the AI
131    /// pipeline (issue #119). Distinct from `permission_cache`, which
132    /// is keyed by `UserId` and answers `(resource, action)` lookups —
133    /// this cache answers the inverse "what collections is this scope
134    /// allowed to read?" query that `AuthorizedSearch` uses to
135    /// pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidates before any
136    /// similarity score is computed. Entries TTL out at 60s and are
137    /// invalidated explicitly on GRANT/REVOKE/CREATE POLICY/DROP
138    /// POLICY/DROP COLLECTION.
139    visible_collections_cache: super::scope_cache::AuthCache,
140}
141
142// Use fast-but-safe Argon2id params for auth hashing (smaller than the
143// default 64 MB so that user-management RPCs respond quickly).
144fn auth_argon2_params() -> Argon2Params {
145    Argon2Params {
146        m_cost: 4 * 1024, // 4 MB
147        t_cost: 3,
148        p: 1,
149        tag_len: 32,
150    }
151}
152
153impl AuthStore {
154    // -----------------------------------------------------------------
155    // Construction
156    // -----------------------------------------------------------------
157
158    pub fn new(config: AuthConfig) -> Self {
159        Self {
160            users: RwLock::new(HashMap::new()),
161            sessions: RwLock::new(HashMap::new()),
162            api_key_index: RwLock::new(HashMap::new()),
163            bootstrapped: AtomicBool::new(false),
164            config,
165            vault: RwLock::new(None),
166            pager: None,
167            keypair: RwLock::new(None),
168            vault_kv: RwLock::new(HashMap::new()),
169            grants: RwLock::new(HashMap::new()),
170            public_grants: RwLock::new(Vec::new()),
171            user_attributes: RwLock::new(HashMap::new()),
172            session_count_by_user: RwLock::new(HashMap::new()),
173            permission_cache: RwLock::new(HashMap::new()),
174            policies: RwLock::new(HashMap::new()),
175            user_attachments: RwLock::new(HashMap::new()),
176            group_attachments: RwLock::new(HashMap::new()),
177            iam_effective_cache: RwLock::new(HashMap::new()),
178            iam_authorization_enabled: AtomicBool::new(false),
179            visible_collections_cache: super::scope_cache::AuthCache::new(
180                super::scope_cache::DEFAULT_TTL,
181            ),
182        }
183    }
184
185    /// Create an `AuthStore` backed by encrypted vault pages inside the
186    /// main `.rdb` database file.
187    ///
188    /// If vault pages already exist, their contents are loaded and
189    /// restored into the in-memory store.  All subsequent mutations are
190    /// automatically persisted to the vault pages via the pager.
191    pub fn with_vault(
192        config: AuthConfig,
193        pager: Arc<Pager>,
194        passphrase: Option<&str>,
195    ) -> Result<Self, AuthError> {
196        let vault = Vault::open(&pager, passphrase)?;
197        let mut store = Self::new(config);
198
199        // Restore persisted state if vault pages exist.
200        if let Some(state) = vault.load(&pager)? {
201            store.restore_from_vault(state);
202        }
203
204        *store.vault.write().unwrap_or_else(|e| e.into_inner()) = Some(vault);
205        store.pager = Some(pager);
206        Ok(store)
207    }
208
209    pub fn config(&self) -> &AuthConfig {
210        &self.config
211    }
212
213    pub fn is_enabled(&self) -> bool {
214        self.config.enabled
215    }
216
217    /// Returns true when no users exist yet and bootstrap hasn't been sealed.
218    pub fn needs_bootstrap(&self) -> bool {
219        !self.bootstrapped.load(Ordering::Acquire)
220            && self.users.read().map(|u| u.is_empty()).unwrap_or(true)
221    }
222
223    /// Internal: read-locked lookup by `UserId`.
224    fn get_user_cloned(&self, id: &UserId) -> Option<User> {
225        self.users.read().ok().and_then(|m| m.get(id).cloned())
226    }
227
228    /// Whether bootstrap has already been performed (sealed).
229    pub fn is_bootstrapped(&self) -> bool {
230        self.bootstrapped.load(Ordering::Acquire)
231    }
232
233    /// Bootstrap the first admin user. One-shot, irreversible.
234    ///
235    /// Uses an atomic compare-exchange to guarantee that even under
236    /// concurrent calls, only the first one succeeds. Once sealed,
237    /// all subsequent calls fail immediately -- there is no undo.
238    ///
239    /// When a vault/pager is configured, a certificate-based keypair is
240    /// generated and the vault is re-encrypted with the certificate-derived
241    /// key.  The certificate hex string is returned in `BootstrapResult`
242    /// so the admin can save it.
243    pub fn bootstrap(&self, username: &str, password: &str) -> Result<BootstrapResult, AuthError> {
244        // Atomic seal: only the first caller wins.
245        if self
246            .bootstrapped
247            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
248            .is_err()
249        {
250            return Err(AuthError::Forbidden(
251                "bootstrap already completed — sealed permanently".to_string(),
252            ));
253        }
254
255        // Double-check users are actually empty (belt and suspenders).
256        {
257            let users = self.users.read().map_err(lock_err)?;
258            if !users.is_empty() {
259                return Err(AuthError::Forbidden(
260                    "bootstrap already completed — users exist".to_string(),
261                ));
262            }
263        }
264
265        let user = self.create_user(username, password, Role::Admin)?;
266        let key = self.create_api_key(username, "bootstrap", Role::Admin)?;
267
268        // Generate a certificate-based keypair and re-seal the vault.
269        let certificate = if let Some(ref pager) = self.pager {
270            let kp = KeyPair::generate();
271            let cert_hex = kp.certificate_hex();
272
273            // Re-create the vault using the certificate-derived key.
274            let new_vault = Vault::with_certificate_bytes(pager, &kp.certificate)
275                .map_err(|e| AuthError::Internal(format!("vault re-seal failed: {e}")))?;
276
277            // Store the keypair so token signing works immediately.
278            if let Ok(mut kp_guard) = self.keypair.write() {
279                *kp_guard = Some(kp);
280            }
281
282            // Replace the vault and persist with the master secret included.
283            if let Ok(mut vault_guard) = self.vault.write() {
284                *vault_guard = Some(new_vault);
285            }
286            // Generate the AES-256 secret key for Value::Secret encryption.
287            self.ensure_vault_secret_key();
288            self.persist_to_vault();
289
290            Some(cert_hex)
291        } else {
292            None
293        };
294
295        Ok(BootstrapResult {
296            user,
297            api_key: key,
298            certificate,
299        })
300    }
301
302    /// Auto-bootstrap from environment variables if no users exist.
303    ///
304    /// Checks `REDDB_USERNAME` and `REDDB_PASSWORD`. If both are set and
305    /// the user store is empty, creates the first admin user automatically.
306    /// This mirrors the Docker pattern (`MYSQL_ROOT_PASSWORD`, etc.).
307    ///
308    /// Returns `Some(BootstrapResult)` if bootstrapped, `None` if skipped.
309    pub fn bootstrap_from_env(&self) -> Option<BootstrapResult> {
310        if !self.needs_bootstrap() {
311            return None;
312        }
313
314        let username = std::env::var("REDDB_USERNAME").ok()?;
315        let password = std::env::var("REDDB_PASSWORD").ok()?;
316
317        if username.is_empty() || password.is_empty() {
318            return None;
319        }
320
321        match self.bootstrap(&username, &password) {
322            Ok(result) => {
323                // F-04: `username` is REDDB_USERNAME — operator-supplied
324                // (env), but still routed through the LogField escaper
325                // because env strings cross trust boundaries in some
326                // deployment models (k8s downward API, Vault sidecar,
327                // external secret operator). See ADR 0010.
328                tracing::info!(
329                    username = %reddb_wire::audit_safe_log_field(&username),
330                    "bootstrapped admin user from REDDB_USERNAME/REDDB_PASSWORD"
331                );
332                if let Some(ref cert) = result.certificate {
333                    // Certificate must be readable by the operator — keep it
334                    // in the log stream but print raw to stderr too so it
335                    // survives even if the log file gets rotated.
336                    eprintln!("[reddb] CERTIFICATE: {}", cert);
337                    tracing::warn!(
338                        "vault certificate issued — save it: ONLY way to unseal after restart"
339                    );
340                }
341                Some(result)
342            }
343            Err(e) => {
344                tracing::error!(err = %e, "env bootstrap failed");
345                None
346            }
347        }
348    }
349
350    // -----------------------------------------------------------------
351    // Vault persistence
352    // -----------------------------------------------------------------
353
354    /// Persist the current auth state to the vault pages (if configured).
355    fn persist_to_vault_result(&self) -> Result<(), AuthError> {
356        let vault_guard = self.vault.read().unwrap_or_else(|e| e.into_inner());
357        if let (Some(ref vault), Some(ref pager)) = (&*vault_guard, &self.pager) {
358            let state = self.snapshot();
359            vault.save(pager, &state)?;
360        }
361        Ok(())
362    }
363
364    /// Persist the current auth state to the vault pages (if configured).
365    ///
366    /// Legacy auth mutations still treat in-memory state as authoritative.
367    /// New secret-management paths use the `try_` methods below so callers
368    /// get a hard error if the vault write fails.
369    fn persist_to_vault(&self) {
370        if let Err(e) = self.persist_to_vault_result() {
371            tracing::error!(err = %e, "vault persist failed");
372            // Issue #205 — vault persist is the secret-rotation
373            // serialization point: when this fails, freshly rotated
374            // credentials live only in memory and a process restart
375            // loses them. Operator-grade event so the operator can
376            // intervene before the next restart.
377            crate::telemetry::operator_event::OperatorEvent::SecretRotationFailed {
378                secret_ref: "auth_vault".to_string(),
379                error: e.to_string(),
380            }
381            .emit_global();
382        }
383    }
384
385    /// True when this store has an encrypted vault and pager wired in.
386    pub fn is_vault_backed(&self) -> bool {
387        self.pager.is_some()
388            && self
389                .vault
390                .read()
391                .map(|guard| guard.is_some())
392                .unwrap_or(false)
393    }
394
395    // -----------------------------------------------------------------
396    // Vault KV — encrypted key-value store for arbitrary secrets
397    // -----------------------------------------------------------------
398
399    /// Read a value from the vault KV store. Returns `None` if not set.
400    pub fn vault_kv_get(&self, key: &str) -> Option<String> {
401        self.vault_kv
402            .read()
403            .ok()
404            .and_then(|kv| kv.get(key).cloned())
405    }
406
407    /// Snapshot vault KV values for statement-local secret resolution.
408    pub fn vault_kv_snapshot(&self) -> HashMap<String, String> {
409        self.vault_kv
410            .read()
411            .map(|kv| kv.clone())
412            .unwrap_or_default()
413    }
414
415    /// Export vault KV as an encrypted logical blob for JSONL dump/restore.
416    /// Returns `None` when the vault has no KV entries.
417    pub fn vault_kv_export_encrypted(&self) -> Result<Option<String>, AuthError> {
418        if !self.is_vault_backed() {
419            return Err(AuthError::Forbidden(
420                "vault KV export requires an enabled, unsealed vault".to_string(),
421            ));
422        }
423        let kv = self.vault_kv_snapshot();
424        if kv.is_empty() {
425            return Ok(None);
426        }
427
428        let vault_guard = self.vault.read().map_err(lock_err)?;
429        let vault = vault_guard.as_ref().ok_or_else(|| {
430            AuthError::Forbidden("vault KV export requires an enabled, unsealed vault".to_string())
431        })?;
432        let state = VaultState {
433            users: Vec::new(),
434            api_keys: Vec::new(),
435            bootstrapped: false,
436            master_secret: None,
437            kv,
438        };
439        Ok(Some(vault.seal_logical_export(&state)?))
440    }
441
442    /// Merge imported vault KV entries and fail if the encrypted vault
443    /// write cannot be made durable.
444    pub fn vault_kv_try_import(
445        &self,
446        entries: HashMap<String, String>,
447    ) -> Result<usize, AuthError> {
448        if !self.is_vault_backed() {
449            return Err(AuthError::Forbidden(
450                "vault KV import requires an enabled, unsealed vault".to_string(),
451            ));
452        }
453        if entries.is_empty() {
454            return Ok(0);
455        }
456
457        let mut previous = HashMap::new();
458        {
459            let mut kv = self.vault_kv.write().map_err(lock_err)?;
460            for (key, value) in &entries {
461                previous.insert(key.clone(), kv.insert(key.clone(), value.clone()));
462            }
463        }
464
465        if let Err(err) = self.persist_to_vault_result() {
466            if let Ok(mut kv) = self.vault_kv.write() {
467                for (key, old) in previous {
468                    match old {
469                        Some(value) => {
470                            kv.insert(key, value);
471                        }
472                        None => {
473                            kv.remove(&key);
474                        }
475                    }
476                }
477            }
478            return Err(err);
479        }
480
481        Ok(entries.len())
482    }
483
484    /// Import false placeholders for secrets whose encrypted payload
485    /// could not be decrypted during logical restore.
486    pub fn vault_kv_try_import_placeholders(&self, keys: &[String]) -> Result<usize, AuthError> {
487        let entries = keys
488            .iter()
489            .map(|key| (key.clone(), "false".to_string()))
490            .collect();
491        self.vault_kv_try_import(entries)
492    }
493
494    /// Write a value to the vault KV store, persisting to disk.
495    pub fn vault_kv_set(&self, key: String, value: String) {
496        if let Ok(mut kv) = self.vault_kv.write() {
497            kv.insert(key, value);
498        }
499        self.persist_to_vault();
500    }
501
502    /// Write a value to the vault KV store and fail if the vault write
503    /// cannot be made durable.
504    pub fn vault_kv_try_set(&self, key: String, value: String) -> Result<(), AuthError> {
505        if !self.is_vault_backed() {
506            return Err(AuthError::Forbidden(
507                "SET SECRET requires an enabled, unsealed vault".to_string(),
508            ));
509        }
510
511        let previous = {
512            let mut kv = self.vault_kv.write().map_err(lock_err)?;
513            kv.insert(key.clone(), value)
514        };
515
516        if let Err(err) = self.persist_to_vault_result() {
517            if let Ok(mut kv) = self.vault_kv.write() {
518                match previous {
519                    Some(value) => {
520                        kv.insert(key, value);
521                    }
522                    None => {
523                        kv.remove(&key);
524                    }
525                }
526            }
527            return Err(err);
528        }
529
530        Ok(())
531    }
532
533    /// Delete a value from the vault KV store. Returns true if it existed.
534    pub fn vault_kv_delete(&self, key: &str) -> bool {
535        let existed = self
536            .vault_kv
537            .write()
538            .map(|mut kv| kv.remove(key).is_some())
539            .unwrap_or(false);
540        if existed {
541            self.persist_to_vault();
542        }
543        existed
544    }
545
546    /// Delete a value from the vault KV store and fail if the vault write
547    /// cannot be made durable.
548    pub fn vault_kv_try_delete(&self, key: &str) -> Result<bool, AuthError> {
549        if !self.is_vault_backed() {
550            return Err(AuthError::Forbidden(
551                "DELETE SECRET requires an enabled, unsealed vault".to_string(),
552            ));
553        }
554
555        let removed = {
556            let mut kv = self.vault_kv.write().map_err(lock_err)?;
557            kv.remove(key)
558        };
559
560        if removed.is_none() {
561            return Ok(false);
562        }
563
564        if let Err(err) = self.persist_to_vault_result() {
565            if let Ok(mut kv) = self.vault_kv.write() {
566                if let Some(value) = removed {
567                    kv.insert(key.to_string(), value);
568                }
569            }
570            return Err(err);
571        }
572
573        Ok(true)
574    }
575
576    /// List all keys in the vault KV store.
577    pub fn vault_kv_keys(&self) -> Vec<String> {
578        self.vault_kv
579            .read()
580            .map(|kv| kv.keys().cloned().collect())
581            .unwrap_or_default()
582    }
583
584    /// Convenience: get the 32-byte secret key for Value::Secret encryption.
585    /// Generated on first boot and stored at `red.secret.aes_key`.
586    pub fn vault_secret_key(&self) -> Option<[u8; 32]> {
587        let hex_str = self.vault_kv_get("red.secret.aes_key")?;
588        let bytes = hex::decode(hex_str).ok()?;
589        if bytes.len() == 32 {
590            let mut key = [0u8; 32];
591            key.copy_from_slice(&bytes);
592            Some(key)
593        } else {
594            None
595        }
596    }
597
598    /// Generate and store the AES-256 secret key on first boot if not present.
599    pub fn ensure_vault_secret_key(&self) {
600        if self.vault_kv_get("red.secret.aes_key").is_none() {
601            let key = random_bytes(32);
602            self.vault_kv_set("red.secret.aes_key".to_string(), hex::encode(key));
603        }
604    }
605
606    /// Take a snapshot of the current auth state for vault serialization.
607    fn snapshot(&self) -> VaultState {
608        let users_guard = self.users.read().unwrap_or_else(|e| e.into_inner());
609        let users: Vec<User> = users_guard.values().cloned().collect();
610
611        // Collect (owner UserId, api_key) pairs from all users so a
612        // tenant-scoped owner can be reattached on restore.
613        let mut api_keys = Vec::new();
614        for user in &users {
615            let owner = UserId::from_parts(user.tenant_id.as_deref(), &user.username);
616            for key in &user.api_keys {
617                api_keys.push((owner.clone(), key.clone()));
618            }
619        }
620
621        // Include the master secret if a keypair is loaded.
622        let master_secret = self
623            .keypair
624            .read()
625            .ok()
626            .and_then(|guard| guard.as_ref().map(|kp| kp.master_secret.clone()));
627
628        let kv = self.vault_kv.read().map(|m| m.clone()).unwrap_or_default();
629
630        VaultState {
631            users,
632            api_keys,
633            bootstrapped: self.bootstrapped.load(Ordering::Acquire),
634            master_secret,
635            kv,
636        }
637    }
638
639    /// Restore the in-memory auth state from a vault snapshot.
640    fn restore_from_vault(&mut self, state: VaultState) {
641        // Restore bootstrap seal.
642        if state.bootstrapped {
643            self.bootstrapped.store(true, Ordering::Release);
644        }
645
646        // Restore keypair from master secret (if present).
647        if let Some(secret) = state.master_secret {
648            let kp = KeyPair::from_master_secret(secret);
649            if let Ok(mut guard) = self.keypair.write() {
650                *guard = Some(kp);
651            }
652        }
653
654        // Restore KV store.
655        if let Ok(mut kv) = self.vault_kv.write() {
656            *kv = state.kv;
657        }
658
659        // Restore users.
660        let mut users = self.users.write().unwrap_or_else(|e| e.into_inner());
661        let mut idx = self
662            .api_key_index
663            .write()
664            .unwrap_or_else(|e| e.into_inner());
665
666        for user in state.users {
667            let id = UserId::from_parts(user.tenant_id.as_deref(), &user.username);
668            // Register API keys in the index.
669            for key in &user.api_keys {
670                idx.insert(key.key.clone(), (id.clone(), key.role));
671            }
672            users.insert(id, user);
673        }
674        drop(idx);
675        drop(users);
676
677        self.rehydrate_acl();
678        self.rehydrate_iam();
679    }
680
681    // -----------------------------------------------------------------
682    // User management
683    // -----------------------------------------------------------------
684
685    /// Create a new platform-scoped user (`tenant_id = None`).
686    ///
687    /// For tenant-scoped creation, use [`Self::create_user_in_tenant`].
688    pub fn create_user(
689        &self,
690        username: &str,
691        password: &str,
692        role: Role,
693    ) -> Result<User, AuthError> {
694        self.create_user_in_tenant(None, username, password, role)
695    }
696
697    /// Create a user under the given tenant scope. `tenant_id == None`
698    /// produces a platform-wide user. `(tenant, username)` is the
699    /// uniqueness key — the same `username` may exist independently
700    /// under multiple tenants.
701    pub fn create_user_in_tenant(
702        &self,
703        tenant_id: Option<&str>,
704        username: &str,
705        password: &str,
706        role: Role,
707    ) -> Result<User, AuthError> {
708        self.create_user_in_tenant_with_ownership(tenant_id, username, password, role, false)
709    }
710
711    pub fn create_system_user(
712        &self,
713        username: &str,
714        password: &str,
715        role: Role,
716        tenant_id: Option<&str>,
717    ) -> Result<User, AuthError> {
718        self.create_user_in_tenant_with_ownership(tenant_id, username, password, role, true)
719    }
720
721    fn create_user_in_tenant_with_ownership(
722        &self,
723        tenant_id: Option<&str>,
724        username: &str,
725        password: &str,
726        role: Role,
727        system_owned: bool,
728    ) -> Result<User, AuthError> {
729        let id = UserId::from_parts(tenant_id, username);
730        let mut users = self.users.write().map_err(lock_err)?;
731        if users.contains_key(&id) {
732            return Err(AuthError::UserExists(id.to_string()));
733        }
734
735        let now = now_ms();
736        let user = User {
737            username: username.to_string(),
738            tenant_id: tenant_id.map(|s| s.to_string()),
739            password_hash: hash_password(password),
740            scram_verifier: Some(make_scram_verifier(password)),
741            role,
742            api_keys: Vec::new(),
743            created_at: now,
744            updated_at: now,
745            enabled: true,
746            system_owned,
747        };
748        users.insert(id, user.clone());
749        drop(users); // release lock before vault I/O
750        self.persist_to_vault();
751        Ok(user)
752    }
753
754    /// Look up a user's SCRAM verifier by full `UserId`.
755    ///
756    /// The wire handshake passes the tenant resolved from the session
757    /// (or `None` for the bootstrap admin) so cross-tenant collisions
758    /// never authenticate the wrong identity.
759    pub fn lookup_scram_verifier(&self, id: &UserId) -> Option<crate::auth::scram::ScramVerifier> {
760        let users = self.users.read().ok()?;
761        users.get(id).and_then(|u| u.scram_verifier.clone())
762    }
763
764    /// Backwards-compatible shim for the v2 wire bootstrap path: looks
765    /// up a user by username assuming the platform (`tenant=None`)
766    /// scope. Use this only where the handshake hasn't yet learned the
767    /// caller's tenant.
768    pub fn lookup_scram_verifier_global(
769        &self,
770        username: &str,
771    ) -> Option<crate::auth::scram::ScramVerifier> {
772        self.lookup_scram_verifier(&UserId::platform(username))
773    }
774
775    /// Return all users (password hashes redacted).
776    pub fn list_users(&self) -> Vec<User> {
777        let users = match self.users.read() {
778            Ok(g) => g,
779            Err(_) => return Vec::new(),
780        };
781        users
782            .values()
783            .map(|u| User {
784                password_hash: String::new(), // redacted
785                ..u.clone()
786            })
787            .collect()
788    }
789
790    /// Return users restricted to a tenant scope.
791    ///
792    /// `tenant_filter`:
793    ///   - `None` listing in `Some(None)` — only platform users
794    ///   - `Some(Some("acme"))` — only users in tenant `acme`
795    ///   - `None` argument — all users (admin-only callers)
796    pub fn list_users_scoped(&self, tenant_filter: Option<Option<&str>>) -> Vec<User> {
797        let users = match self.users.read() {
798            Ok(g) => g,
799            Err(_) => return Vec::new(),
800        };
801        users
802            .values()
803            .filter(|u| match tenant_filter {
804                None => true,
805                Some(t) => u.tenant_id.as_deref() == t,
806            })
807            .map(|u| User {
808                password_hash: String::new(), // redacted
809                ..u.clone()
810            })
811            .collect()
812    }
813
814    /// Look up a single user by `(tenant, username)`. Password hash
815    /// is redacted.
816    /// Whether the given principal's user record is system-owned. Returns
817    /// `false` for unknown principals — an absent record is never treated as
818    /// operator-owned. Used to populate `EvalContext::principal_is_system_owned`
819    /// on the runtime authorization hot path.
820    pub fn principal_is_system_owned(&self, principal: &UserId) -> bool {
821        self.get_user_cloned(principal)
822            .map(|u| u.system_owned)
823            .unwrap_or(false)
824    }
825
826    pub fn get_user(&self, tenant_id: Option<&str>, username: &str) -> Option<User> {
827        let id = UserId::from_parts(tenant_id, username);
828        self.get_user_cloned(&id).map(|u| User {
829            password_hash: String::new(),
830            ..u
831        })
832    }
833
834    /// Delete a platform-scoped user (`tenant_id = None`) and revoke
835    /// all of their API keys + sessions.
836    ///
837    /// For tenant-scoped deletes, use [`Self::delete_user_in_tenant`].
838    pub fn delete_user(&self, username: &str) -> Result<(), AuthError> {
839        self.delete_user_in_tenant(None, username)
840    }
841
842    /// Delete a user identified by `(tenant_id, username)` and revoke
843    /// all of their API keys + sessions.
844    pub fn delete_user_in_tenant(
845        &self,
846        tenant_id: Option<&str>,
847        username: &str,
848    ) -> Result<(), AuthError> {
849        let id = UserId::from_parts(tenant_id, username);
850        let mut users = self.users.write().map_err(lock_err)?;
851        let user = users
852            .get(&id)
853            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
854        reject_system_owned(&id, user)?;
855        let user = users
856            .remove(&id)
857            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
858
859        // Remove API key index entries.
860        if let Ok(mut idx) = self.api_key_index.write() {
861            for api_key in &user.api_keys {
862                idx.remove(&api_key.key);
863            }
864        }
865
866        // Remove sessions belonging to this user (match on tenant+username
867        // so we don't tear down a same-named user in another tenant).
868        if let Ok(mut sessions) = self.sessions.write() {
869            sessions
870                .retain(|_, s| !(s.username == username && s.tenant_id.as_deref() == tenant_id));
871        }
872
873        self.persist_to_vault();
874        Ok(())
875    }
876
877    /// Change password (requires the old password). Defaults to
878    /// platform tenant; use [`Self::change_password_in_tenant`] for
879    /// scoped users.
880    pub fn change_password(
881        &self,
882        username: &str,
883        old_password: &str,
884        new_password: &str,
885    ) -> Result<(), AuthError> {
886        self.change_password_in_tenant(None, username, old_password, new_password)
887    }
888
889    pub fn change_password_in_tenant(
890        &self,
891        tenant_id: Option<&str>,
892        username: &str,
893        old_password: &str,
894        new_password: &str,
895    ) -> Result<(), AuthError> {
896        let id = UserId::from_parts(tenant_id, username);
897        let mut users = self.users.write().map_err(lock_err)?;
898        let user = users
899            .get_mut(&id)
900            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
901        reject_system_owned(&id, user)?;
902
903        if !verify_password(old_password, &user.password_hash) {
904            return Err(AuthError::InvalidCredentials);
905        }
906
907        user.password_hash = hash_password(new_password);
908        user.scram_verifier = Some(make_scram_verifier(new_password));
909        user.updated_at = now_ms();
910        drop(users); // release lock before vault I/O
911        self.persist_to_vault();
912        Ok(())
913    }
914
915    /// Change a user's role (admin-only operation). Defaults to platform
916    /// tenant; use [`Self::change_role_in_tenant`] for scoped users.
917    pub fn change_role(&self, username: &str, new_role: Role) -> Result<(), AuthError> {
918        self.change_role_in_tenant(None, username, new_role)
919    }
920
921    pub fn change_role_in_tenant(
922        &self,
923        tenant_id: Option<&str>,
924        username: &str,
925        new_role: Role,
926    ) -> Result<(), AuthError> {
927        let id = UserId::from_parts(tenant_id, username);
928        let mut users = self.users.write().map_err(lock_err)?;
929        let user = users
930            .get_mut(&id)
931            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
932        reject_system_owned(&id, user)?;
933
934        let prior_role = user.role;
935        user.role = new_role;
936        user.updated_at = now_ms();
937
938        // Issue #205 — promotion to Admin is an operator-grade event:
939        // the new role grants destructive capabilities (DROP, ALTER,
940        // GRANT) that an operator must observe out-of-band even when
941        // the auth path itself is healthy.
942        if new_role == Role::Admin && prior_role != Role::Admin {
943            crate::telemetry::operator_event::OperatorEvent::AdminCapabilityGranted {
944                granted_to: id.to_string(),
945                capability: "Role::Admin".to_string(),
946                granted_by: "auth_store::change_role".to_string(),
947            }
948            .emit_global();
949        }
950
951        // Downgrade any API keys that now exceed the user's role.
952        for key in &mut user.api_keys {
953            if key.role > new_role {
954                key.role = new_role;
955            }
956        }
957
958        // Update the api_key_index as well.
959        if let Ok(mut idx) = self.api_key_index.write() {
960            for key in &user.api_keys {
961                if let Some(entry) = idx.get_mut(&key.key) {
962                    entry.1 = key.role;
963                }
964            }
965        }
966
967        self.persist_to_vault();
968        Ok(())
969    }
970
971    // -----------------------------------------------------------------
972    // Authentication (login)
973    // -----------------------------------------------------------------
974
975    /// Verify credentials for a platform-tenant user (`tenant_id = None`)
976    /// and create a session. For tenant-scoped login use
977    /// [`Self::authenticate_in_tenant`].
978    ///
979    /// When a keypair is available (certificate-based seal), session tokens
980    /// are signed with the master secret so the server can verify they were
981    /// genuinely issued by this vault instance.
982    pub fn authenticate(&self, username: &str, password: &str) -> Result<Session, AuthError> {
983        self.authenticate_in_tenant(None, username, password)
984    }
985
986    /// Verify credentials for `(tenant_id, username, password)` and
987    /// create a session. Tenant-aware: `alice@acme` and `alice@globex`
988    /// authenticate independently.
989    pub fn authenticate_in_tenant(
990        &self,
991        tenant_id: Option<&str>,
992        username: &str,
993        password: &str,
994    ) -> Result<Session, AuthError> {
995        let id = UserId::from_parts(tenant_id, username);
996        let users = self.users.read().map_err(lock_err)?;
997        let user = users.get(&id).ok_or(AuthError::InvalidCredentials)?;
998
999        if !user.enabled {
1000            return Err(AuthError::InvalidCredentials);
1001        }
1002
1003        if !verify_password(password, &user.password_hash) {
1004            return Err(AuthError::InvalidCredentials);
1005        }
1006
1007        // Generate token: signed if keypair is available, random otherwise.
1008        let token = match self.keypair.read().ok().and_then(|g| {
1009            g.as_ref().map(|kp| {
1010                let token_id = random_hex(16);
1011                let sig = kp.sign(format!("session:{}", token_id).as_bytes());
1012                // Take first 16 bytes of signature for compact token.
1013                format!("rs_{}{}", token_id, hex::encode(&sig[..16]))
1014            })
1015        }) {
1016            Some(signed_token) => signed_token,
1017            None => generate_session_token(),
1018        };
1019
1020        let now = now_ms();
1021        let session = Session {
1022            token,
1023            username: username.to_string(),
1024            tenant_id: user.tenant_id.clone(),
1025            role: user.role,
1026            created_at: now,
1027            expires_at: now + (self.config.session_ttl_secs as u128 * 1000),
1028        };
1029
1030        drop(users); // release read lock before acquiring write
1031
1032        let mut sessions = self.sessions.write().map_err(lock_err)?;
1033        sessions.insert(session.token.clone(), session.clone());
1034        Ok(session)
1035    }
1036
1037    // -----------------------------------------------------------------
1038    // Token validation
1039    // -----------------------------------------------------------------
1040
1041    /// Validate a token (session or API key).
1042    ///
1043    /// Returns `(username, role)` if valid, `None` otherwise. Tenant
1044    /// scope is dropped here for compatibility with the bulk of the
1045    /// existing caller surface (routing, gRPC control, redwire). Use
1046    /// [`Self::validate_token_full`] when the caller needs the
1047    /// resolved `UserId` (e.g. to pin `CURRENT_TENANT()`).
1048    pub fn validate_token(&self, token: &str) -> Option<(String, Role)> {
1049        self.validate_token_full(token)
1050            .map(|(id, role)| (id.username, role))
1051    }
1052
1053    /// Tenant-aware token validation. Returns the resolved `UserId`
1054    /// (which carries the tenant) and the granted `Role`.
1055    pub fn validate_token_full(&self, token: &str) -> Option<(UserId, Role)> {
1056        // Try session tokens first.
1057        if token.starts_with("rs_") {
1058            if let Ok(sessions) = self.sessions.read() {
1059                if let Some(session) = sessions.get(token) {
1060                    let now = now_ms();
1061                    if now < session.expires_at {
1062                        return Some((
1063                            UserId::from_parts(session.tenant_id.as_deref(), &session.username),
1064                            session.role,
1065                        ));
1066                    }
1067                }
1068            }
1069            return None;
1070        }
1071
1072        // Try API keys.
1073        if token.starts_with("rk_") {
1074            if let Ok(idx) = self.api_key_index.read() {
1075                return idx.get(token).cloned();
1076            }
1077            return None;
1078        }
1079
1080        None
1081    }
1082
1083    // -----------------------------------------------------------------
1084    // API Key management
1085    // -----------------------------------------------------------------
1086
1087    /// Create a persistent API key for a platform-tenant user.
1088    ///
1089    /// For tenant-scoped users use [`Self::create_api_key_in_tenant`].
1090    pub fn create_api_key(
1091        &self,
1092        username: &str,
1093        name: &str,
1094        role: Role,
1095    ) -> Result<ApiKey, AuthError> {
1096        self.create_api_key_in_tenant(None, username, name, role)
1097    }
1098
1099    pub fn create_api_key_in_tenant(
1100        &self,
1101        tenant_id: Option<&str>,
1102        username: &str,
1103        name: &str,
1104        role: Role,
1105    ) -> Result<ApiKey, AuthError> {
1106        let id = UserId::from_parts(tenant_id, username);
1107        let mut users = self.users.write().map_err(lock_err)?;
1108        let user = users
1109            .get_mut(&id)
1110            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1111
1112        // The key's role cannot exceed the user's role.
1113        if role > user.role {
1114            return Err(AuthError::RoleExceeded {
1115                requested: role,
1116                ceiling: user.role,
1117            });
1118        }
1119
1120        let api_key = ApiKey {
1121            key: generate_api_key(),
1122            name: name.to_string(),
1123            role,
1124            created_at: now_ms(),
1125        };
1126
1127        user.api_keys.push(api_key.clone());
1128        user.updated_at = now_ms();
1129
1130        // Update the index.
1131        if let Ok(mut idx) = self.api_key_index.write() {
1132            idx.insert(api_key.key.clone(), (id.clone(), api_key.role));
1133        }
1134
1135        drop(users); // release lock before vault I/O
1136        self.persist_to_vault();
1137        Ok(api_key)
1138    }
1139
1140    /// Revoke (delete) an API key.
1141    pub fn revoke_api_key(&self, key: &str) -> Result<(), AuthError> {
1142        let mut users = self.users.write().map_err(lock_err)?;
1143
1144        // Find which user owns this key (look up by the api_key_index
1145        // first; fall back to a scan for legacy state restored before
1146        // the index was reseeded).
1147        let owner_id: UserId = {
1148            if let Ok(idx) = self.api_key_index.read() {
1149                if let Some((id, _)) = idx.get(key) {
1150                    id.clone()
1151                } else {
1152                    return Err(AuthError::KeyNotFound(key.to_string()));
1153                }
1154            } else {
1155                let owner = users
1156                    .iter()
1157                    .find(|(_, u)| u.api_keys.iter().any(|k| k.key == key));
1158                match owner {
1159                    Some((id, _)) => id.clone(),
1160                    None => return Err(AuthError::KeyNotFound(key.to_string())),
1161                }
1162            }
1163        };
1164
1165        let user = users
1166            .get_mut(&owner_id)
1167            .ok_or_else(|| AuthError::KeyNotFound(key.to_string()))?;
1168        user.api_keys.retain(|k| k.key != key);
1169        user.updated_at = now_ms();
1170
1171        // Remove from index.
1172        if let Ok(mut idx) = self.api_key_index.write() {
1173            idx.remove(key);
1174        }
1175
1176        self.persist_to_vault();
1177        Ok(())
1178    }
1179
1180    // -----------------------------------------------------------------
1181    // Session management
1182    // -----------------------------------------------------------------
1183
1184    /// Revoke a session token.
1185    pub fn revoke_session(&self, token: &str) {
1186        if let Ok(mut sessions) = self.sessions.write() {
1187            sessions.remove(token);
1188        }
1189    }
1190
1191    /// Purge expired sessions (housekeeping).
1192    pub fn purge_expired_sessions(&self) -> usize {
1193        let now = now_ms();
1194        if let Ok(mut sessions) = self.sessions.write() {
1195            let before = sessions.len();
1196            sessions.retain(|_, s| s.expires_at > now);
1197            return before - sessions.len();
1198        }
1199        0
1200    }
1201
1202    // -----------------------------------------------------------------
1203    // Granular RBAC — GRANT / REVOKE
1204    //
1205    // The privilege engine lives in `super::privileges`. These helpers
1206    // are the AuthStore facade: they keep an in-memory map of grants per
1207    // user (plus a `public_grants` list), persist additions/removals to
1208    // the existing `vault_kv` store, and rebuild the per-user
1209    // `PermissionCache` so the hot path stays O(1).
1210    //
1211    // Persistence design: rather than extend the snapshot/restore
1212    // pipeline (Agent #2's territory) we serialise grants and account
1213    // attributes to the vault KV store. That gives us atomic write +
1214    // encrypted-at-rest semantics for free without touching the
1215    // existing USER/KEY/KV serializer paths. On restart `rehydrate_acl`
1216    // reads these KV entries back into the in-memory maps.
1217    // -----------------------------------------------------------------
1218
1219    /// Persist a grant. Returns `Forbidden` when the granting user is
1220    /// not Admin or attempts a cross-tenant grant.
1221    pub fn grant(
1222        &self,
1223        granter: &UserId,
1224        granter_role: Role,
1225        principal: GrantPrincipal,
1226        resource: Resource,
1227        actions: Vec<Action>,
1228        with_grant_option: bool,
1229        tenant: Option<String>,
1230    ) -> Result<(), AuthError> {
1231        if granter_role != Role::Admin {
1232            return Err(AuthError::Forbidden(format!(
1233                "GRANT requires Admin role; granter `{}` has `{:?}`",
1234                granter, granter_role
1235            )));
1236        }
1237
1238        // Cross-tenant guard: a tenant-scoped admin cannot mint grants
1239        // outside their tenant. Platform admin (tenant=None) may grant
1240        // anywhere.
1241        if granter.tenant.is_some() && granter.tenant != tenant {
1242            return Err(AuthError::Forbidden(format!(
1243                "cross-tenant GRANT denied: granter tenant `{:?}` != grant tenant `{:?}`",
1244                granter.tenant, tenant
1245            )));
1246        }
1247
1248        let mut actions_set = std::collections::BTreeSet::new();
1249        for a in actions {
1250            actions_set.insert(a);
1251        }
1252        let g = Grant {
1253            principal: principal.clone(),
1254            resource,
1255            actions: actions_set,
1256            with_grant_option,
1257            granted_by: granter.to_string(),
1258            granted_at: now_ms(),
1259            tenant,
1260            columns: None,
1261        };
1262
1263        match &principal {
1264            GrantPrincipal::User(uid) => {
1265                self.grants
1266                    .write()
1267                    .unwrap_or_else(|e| e.into_inner())
1268                    .entry(uid.clone())
1269                    .or_default()
1270                    .push(g.clone());
1271                self.invalidate_permission_cache(Some(uid));
1272            }
1273            GrantPrincipal::Public => {
1274                self.public_grants
1275                    .write()
1276                    .unwrap_or_else(|e| e.into_inner())
1277                    .push(g.clone());
1278                self.invalidate_permission_cache(None);
1279            }
1280            GrantPrincipal::Group(_) => {
1281                return Err(AuthError::Forbidden(
1282                    "GROUP principals are not yet supported; use a USER or PUBLIC".to_string(),
1283                ));
1284            }
1285        }
1286
1287        // Issue #119: a fresh grant changes the visible-collections set
1288        // for `(tenant, role)` callers under the same tenant. Drop those
1289        // cache entries so the next AI command sees the new SELECT
1290        // privilege immediately.
1291        self.invalidate_visible_collections_for_tenant(g.tenant.as_deref());
1292
1293        self.persist_acl_to_kv();
1294        Ok(())
1295    }
1296
1297    /// Drop matching grants from a principal. Returns the number of
1298    /// grants removed.
1299    pub fn revoke(
1300        &self,
1301        granter_role: Role,
1302        principal: &GrantPrincipal,
1303        resource: &Resource,
1304        actions: &[Action],
1305    ) -> Result<usize, AuthError> {
1306        if granter_role != Role::Admin {
1307            return Err(AuthError::Forbidden(format!(
1308                "REVOKE requires Admin role; granter has `{:?}`",
1309                granter_role
1310            )));
1311        }
1312
1313        let removed = match principal {
1314            GrantPrincipal::User(uid) => {
1315                let mut g = self.grants.write().unwrap_or_else(|e| e.into_inner());
1316                let before = g.get(uid).map(|v| v.len()).unwrap_or(0);
1317                if let Some(list) = g.get_mut(uid) {
1318                    list.retain(|gr| {
1319                        !(gr.resource == *resource
1320                            && (actions.iter().any(|a| gr.actions.contains(a))
1321                                || (gr.actions.contains(&Action::All) && !actions.is_empty())))
1322                    });
1323                }
1324                let after = g.get(uid).map(|v| v.len()).unwrap_or(0);
1325                drop(g);
1326                self.invalidate_permission_cache(Some(uid));
1327                before - after
1328            }
1329            GrantPrincipal::Public => {
1330                let mut p = self
1331                    .public_grants
1332                    .write()
1333                    .unwrap_or_else(|e| e.into_inner());
1334                let before = p.len();
1335                p.retain(|gr| {
1336                    !(gr.resource == *resource
1337                        && (actions.iter().any(|a| gr.actions.contains(a))
1338                            || (gr.actions.contains(&Action::All) && !actions.is_empty())))
1339                });
1340                let after = p.len();
1341                drop(p);
1342                self.invalidate_permission_cache(None);
1343                before - after
1344            }
1345            GrantPrincipal::Group(_) => 0,
1346        };
1347
1348        if removed > 0 {
1349            // Issue #119: REVOKE may shrink the visible-collections set
1350            // for any `(tenant, role)` slot. We don't know the exact
1351            // tenant when the principal is `Public`, so a `Public`
1352            // revoke clears the whole cache; user revokes scope to the
1353            // user's tenant.
1354            match principal {
1355                GrantPrincipal::User(uid) => {
1356                    self.invalidate_visible_collections_for_tenant(uid.tenant.as_deref());
1357                }
1358                GrantPrincipal::Public | GrantPrincipal::Group(_) => {
1359                    self.invalidate_visible_collections_cache();
1360                }
1361            }
1362            self.persist_acl_to_kv();
1363        }
1364        Ok(removed)
1365    }
1366
1367    /// Compute the set of collection ids a given `(tenant, role)`
1368    /// scope can read, consulting the explicit grant table. The result
1369    /// is cached for `super::scope_cache::DEFAULT_TTL` (60s) and
1370    /// invalidated on every GRANT/REVOKE/policy/collection mutation
1371    /// that could change the answer.
1372    ///
1373    /// `all_collections` is the full list of collection ids known to
1374    /// the storage layer. The runtime hands it in so this module stays
1375    /// decoupled from the storage crate. Each collection passes through
1376    /// `check_grant(SELECT)` under a synthetic `(principal, role,
1377    /// tenant)` view. The cache key includes principal because direct
1378    /// grants can differ between users that share the same tenant and
1379    /// role.
1380    pub fn visible_collections_for_scope(
1381        &self,
1382        tenant: Option<&str>,
1383        role: Role,
1384        principal: &str,
1385        all_collections: &[String],
1386    ) -> std::collections::HashSet<String> {
1387        let key = super::scope_cache::ScopeKey::new(tenant, principal, role);
1388        if let Some(hit) = self.visible_collections_cache.get(&key) {
1389            return hit;
1390        }
1391        // Slow path: walk every collection through `check_grant`. We
1392        // build the AuthzContext once, then reuse it per resource.
1393        let ctx = AuthzContext {
1394            principal,
1395            effective_role: role,
1396            tenant,
1397        };
1398        let mut visible = std::collections::HashSet::new();
1399        for collection in all_collections {
1400            let resource = Resource::table_from_name(collection);
1401            if self.check_grant(&ctx, Action::Select, &resource).is_ok() {
1402                visible.insert(collection.clone());
1403            }
1404        }
1405        self.visible_collections_cache.insert(key, visible.clone());
1406        visible
1407    }
1408
1409    /// Stats probe required by issue #119 — exposes hit/miss counts and
1410    /// invalidations for the visible-collections cache so metrics
1411    /// pipelines can compute a hit rate.
1412    pub fn auth_cache_stats(&self) -> super::scope_cache::AuthCacheStats {
1413        self.visible_collections_cache.stats()
1414    }
1415
1416    /// Drop every cached `(tenant, role)` entry. Called from CREATE
1417    /// POLICY / DROP POLICY / DROP COLLECTION paths where the affected
1418    /// tenant set is unknown.
1419    pub fn invalidate_visible_collections_cache(&self) {
1420        self.visible_collections_cache.invalidate_all();
1421    }
1422
1423    /// Drop cached entries for one tenant. Called from GRANT / REVOKE
1424    /// where the principal's tenant is known.
1425    pub fn invalidate_visible_collections_for_tenant(&self, tenant: Option<&str>) {
1426        self.visible_collections_cache.invalidate_tenant(tenant);
1427    }
1428
1429    /// Snapshot of every grant the principal effectively has, including
1430    /// `Public` grants. Audit / introspection helper.
1431    pub fn effective_grants(&self, uid: &UserId) -> Vec<Grant> {
1432        let mut out = Vec::new();
1433        if let Ok(g) = self.grants.read() {
1434            if let Some(list) = g.get(uid) {
1435                out.extend(list.iter().cloned());
1436            }
1437        }
1438        if let Ok(p) = self.public_grants.read() {
1439            out.extend(p.iter().cloned());
1440        }
1441        out
1442    }
1443
1444    /// Run a privilege check using the in-memory grant tables. Returns
1445    /// `Ok(())` on allow, `Err(AuthzError)` on deny.
1446    pub fn check_grant(
1447        &self,
1448        ctx: &AuthzContext<'_>,
1449        action: Action,
1450        resource: &Resource,
1451    ) -> Result<(), AuthzError> {
1452        if ctx.effective_role == Role::Admin {
1453            return Ok(());
1454        }
1455
1456        let uid = UserId::from_parts(ctx.tenant, ctx.principal);
1457
1458        // Fast path: per-user pre-resolved cache.
1459        if let Ok(cache) = self.permission_cache.read() {
1460            if let Some(pc) = cache.get(&uid) {
1461                if pc.allows(resource, action) {
1462                    return Ok(());
1463                }
1464            }
1465        }
1466
1467        // Slow path: linear scan + rebuild cache as a side-effect.
1468        let user_grants = self
1469            .grants
1470            .read()
1471            .ok()
1472            .and_then(|g| g.get(&uid).cloned())
1473            .unwrap_or_default();
1474        let any_user_grants = self
1475            .grants
1476            .read()
1477            .ok()
1478            .map(|g| g.values().any(|list| !list.is_empty()))
1479            .unwrap_or(false);
1480        let public_grants = self
1481            .public_grants
1482            .read()
1483            .ok()
1484            .map(|p| p.clone())
1485            .unwrap_or_default();
1486        if user_grants.is_empty() && public_grants.is_empty() && any_user_grants {
1487            return Err(AuthzError::PermissionDenied {
1488                action,
1489                resource: resource.clone(),
1490                principal: ctx.principal.to_string(),
1491            });
1492        }
1493        let view = GrantsView {
1494            user_grants: &user_grants,
1495            public_grants: &public_grants,
1496        };
1497        let result = check_grant(ctx, action, resource, &view);
1498
1499        if result.is_ok() {
1500            let pc = PermissionCache::build(&user_grants, &public_grants);
1501            if let Ok(mut cache) = self.permission_cache.write() {
1502                cache.insert(uid, pc);
1503            }
1504        }
1505        result
1506    }
1507
1508    // -----------------------------------------------------------------
1509    // ALTER USER attributes (VALID UNTIL, CONNECTION LIMIT, etc.)
1510    // -----------------------------------------------------------------
1511
1512    /// Replace the attribute record for `uid`.
1513    pub fn set_user_attributes(
1514        &self,
1515        uid: &UserId,
1516        attrs: UserAttributes,
1517    ) -> Result<(), AuthError> {
1518        let users = self.users.read().map_err(lock_err)?;
1519        if !users.contains_key(uid) {
1520            return Err(AuthError::UserNotFound(uid.to_string()));
1521        }
1522        drop(users);
1523
1524        self.user_attributes
1525            .write()
1526            .unwrap_or_else(|e| e.into_inner())
1527            .insert(uid.clone(), attrs);
1528        self.invalidate_iam_cache(Some(uid));
1529        self.persist_acl_to_kv();
1530        Ok(())
1531    }
1532
1533    /// Read the attributes for `uid`. Returns `Default::default()` for
1534    /// users that have never been altered.
1535    pub fn user_attributes(&self, uid: &UserId) -> UserAttributes {
1536        self.user_attributes
1537            .read()
1538            .ok()
1539            .and_then(|m| m.get(uid).cloned())
1540            .unwrap_or_default()
1541    }
1542
1543    pub fn add_user_to_group(&self, uid: &UserId, group: &str) -> Result<(), AuthError> {
1544        if group.trim().is_empty() {
1545            return Err(AuthError::Forbidden("group name cannot be empty".into()));
1546        }
1547        let mut attrs = self.user_attributes(uid);
1548        if !attrs.groups.iter().any(|g| g == group) {
1549            attrs.groups.push(group.to_string());
1550            attrs.groups.sort();
1551        }
1552        self.set_user_attributes(uid, attrs)
1553    }
1554
1555    pub fn remove_user_from_group(&self, uid: &UserId, group: &str) -> Result<(), AuthError> {
1556        let mut attrs = self.user_attributes(uid);
1557        attrs.groups.retain(|g| g != group);
1558        self.set_user_attributes(uid, attrs)
1559    }
1560
1561    /// Toggle `User.enabled` without rotating credentials.
1562    pub fn set_user_enabled(&self, uid: &UserId, enabled: bool) -> Result<(), AuthError> {
1563        let mut users = self.users.write().map_err(lock_err)?;
1564        let user = users
1565            .get_mut(uid)
1566            .ok_or_else(|| AuthError::UserNotFound(uid.to_string()))?;
1567        reject_system_owned(uid, user)?;
1568        user.enabled = enabled;
1569        user.updated_at = now_ms();
1570        drop(users);
1571        self.persist_to_vault();
1572        Ok(())
1573    }
1574
1575    // -----------------------------------------------------------------
1576    // Login-side enforcement (HTTP path)
1577    // -----------------------------------------------------------------
1578
1579    /// Authenticate with VALID UNTIL / CONNECTION LIMIT enforcement.
1580    /// Wraps `authenticate_in_tenant` and additionally:
1581    ///   * rejects logins after `valid_until`,
1582    ///   * rejects logins when the live session count would exceed the
1583    ///     `connection_limit` attribute.
1584    pub fn authenticate_with_attrs(
1585        &self,
1586        tenant_id: Option<&str>,
1587        username: &str,
1588        password: &str,
1589    ) -> Result<Session, AuthError> {
1590        let uid = UserId::from_parts(tenant_id, username);
1591        let attrs = self.user_attributes(&uid);
1592
1593        if let Some(deadline) = attrs.valid_until {
1594            if now_ms() >= deadline {
1595                return Err(AuthError::Forbidden(format!(
1596                    "account `{}` expired (VALID UNTIL exceeded)",
1597                    uid
1598                )));
1599            }
1600        }
1601
1602        if let Some(limit) = attrs.connection_limit {
1603            let current = self
1604                .session_count_by_user
1605                .read()
1606                .ok()
1607                .and_then(|m| m.get(&uid).copied())
1608                .unwrap_or(0);
1609            if current >= limit {
1610                return Err(AuthError::Forbidden(format!(
1611                    "account `{}` exceeded CONNECTION LIMIT ({})",
1612                    uid, limit
1613                )));
1614            }
1615        }
1616
1617        let session = self.authenticate_in_tenant(tenant_id, username, password)?;
1618
1619        if let Ok(mut counts) = self.session_count_by_user.write() {
1620            *counts.entry(uid).or_insert(0) += 1;
1621        }
1622        Ok(session)
1623    }
1624
1625    /// Decrement the live-session count for `uid`. Call from session
1626    /// revoke / expiry paths so CONNECTION LIMIT stays accurate.
1627    pub fn decrement_session_count(&self, uid: &UserId) {
1628        if let Ok(mut counts) = self.session_count_by_user.write() {
1629            if let Some(c) = counts.get_mut(uid) {
1630                *c = c.saturating_sub(1);
1631            }
1632        }
1633    }
1634
1635    // -----------------------------------------------------------------
1636    // ACL persistence — vault_kv backed
1637    // -----------------------------------------------------------------
1638
1639    /// Re-read the ACL state from `vault_kv`. Call after vault load /
1640    /// restore so the in-memory maps reflect the persisted data.
1641    pub fn rehydrate_acl(&self) {
1642        let kv_snapshot: Vec<(String, String)> = self
1643            .vault_kv
1644            .read()
1645            .map(|kv| {
1646                kv.iter()
1647                    .filter(|(k, _)| {
1648                        k.starts_with("red.acl.grants.")
1649                            || k.starts_with("red.acl.attrs.")
1650                            || k == &"red.acl.public_grants"
1651                    })
1652                    .map(|(k, v)| (k.clone(), v.clone()))
1653                    .collect()
1654            })
1655            .unwrap_or_default();
1656
1657        for (k, v) in kv_snapshot {
1658            if k == "red.acl.public_grants" {
1659                if let Some(parsed) = decode_grants_blob(&v) {
1660                    *self
1661                        .public_grants
1662                        .write()
1663                        .unwrap_or_else(|e| e.into_inner()) = parsed;
1664                }
1665            } else if let Some(suffix) = k.strip_prefix("red.acl.grants.") {
1666                if let Some(uid) = decode_uid(suffix) {
1667                    if let Some(mut parsed) = decode_grants_blob(&v) {
1668                        // Restore the principal field — the on-disk
1669                        // line stores only resource+action shape.
1670                        for g in parsed.iter_mut() {
1671                            g.principal = GrantPrincipal::User(uid.clone());
1672                        }
1673                        self.grants
1674                            .write()
1675                            .unwrap_or_else(|e| e.into_inner())
1676                            .insert(uid, parsed);
1677                    }
1678                }
1679            } else if let Some(suffix) = k.strip_prefix("red.acl.attrs.") {
1680                if let Some(uid) = decode_uid(suffix) {
1681                    if let Some(parsed) = decode_attrs_blob(&v) {
1682                        self.user_attributes
1683                            .write()
1684                            .unwrap_or_else(|e| e.into_inner())
1685                            .insert(uid, parsed);
1686                    }
1687                }
1688            }
1689        }
1690
1691        self.permission_cache
1692            .write()
1693            .unwrap_or_else(|e| e.into_inner())
1694            .clear();
1695    }
1696
1697    /// Snapshot every ACL change back into the vault KV store.
1698    fn persist_acl_to_kv(&self) {
1699        let public = self
1700            .public_grants
1701            .read()
1702            .ok()
1703            .map(|p| encode_grants_blob(&p))
1704            .unwrap_or_default();
1705        self.vault_kv_set("red.acl.public_grants".to_string(), public);
1706
1707        let snapshot: Vec<(UserId, Vec<Grant>)> = self
1708            .grants
1709            .read()
1710            .ok()
1711            .map(|g| g.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
1712            .unwrap_or_default();
1713        for (uid, list) in snapshot {
1714            let key = format!("red.acl.grants.{}", encode_uid(&uid));
1715            let val = encode_grants_blob(&list);
1716            self.vault_kv_set(key, val);
1717        }
1718
1719        let attrs_snapshot: Vec<(UserId, UserAttributes)> = self
1720            .user_attributes
1721            .read()
1722            .ok()
1723            .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
1724            .unwrap_or_default();
1725        for (uid, attrs) in attrs_snapshot {
1726            let key = format!("red.acl.attrs.{}", encode_uid(&uid));
1727            let val = encode_attrs_blob(&attrs);
1728            self.vault_kv_set(key, val);
1729        }
1730    }
1731
1732    fn invalidate_permission_cache(&self, uid: Option<&UserId>) {
1733        if let Ok(mut cache) = self.permission_cache.write() {
1734            match uid {
1735                Some(u) => {
1736                    cache.remove(u);
1737                }
1738                None => cache.clear(),
1739            }
1740        }
1741    }
1742
1743    // -----------------------------------------------------------------
1744    // IAM policies — put / delete / attach / detach / simulate
1745    //
1746    // The kernel in `super::policies` owns the Policy type and the
1747    // evaluator. AuthStore handles persistence + per-user cache + the
1748    // GRANT translation layer (synthetic `_grant_*` policies).
1749    // -----------------------------------------------------------------
1750
1751    /// Insert or replace a policy by id. Rejects synthetic ids
1752    /// (`_grant_*` / `_default_*`) so callers can't hand-write them
1753    /// from the public API. Use `put_policy_internal` for synthetic
1754    /// inserts.
1755    pub fn put_policy(&self, p: Policy) -> Result<(), AuthError> {
1756        if p.id.starts_with("_grant_") || p.id.starts_with("_default_") {
1757            return Err(AuthError::Forbidden(format!(
1758                "policy id `{}` is reserved",
1759                p.id
1760            )));
1761        }
1762        self.put_policy_internal(p)
1763    }
1764
1765    /// Internal put bypassing the synthetic-namespace guard. Used by
1766    /// the GRANT translation layer; exposed publicly so integration
1767    /// tests can register synthetic `_grant_*` policies without going
1768    /// through the SQL frontend.
1769    pub fn put_policy_internal(&self, p: Policy) -> Result<(), AuthError> {
1770        p.validate()
1771            .map_err(|e| AuthError::Forbidden(format!("invalid policy `{}`: {e}", p.id)))?;
1772        let id = p.id.clone();
1773        self.policies
1774            .write()
1775            .unwrap_or_else(|e| e.into_inner())
1776            .insert(id, Arc::new(p));
1777        self.iam_authorization_enabled
1778            .store(true, Ordering::Release);
1779        self.iam_effective_cache
1780            .write()
1781            .unwrap_or_else(|e| e.into_inner())
1782            .clear();
1783        // Issue #119: a policy mutation can change the visible-
1784        // collections answer for any (tenant, role); we don't know
1785        // which up-front, so blow the whole cache.
1786        self.invalidate_visible_collections_cache();
1787        self.persist_iam_to_kv();
1788        Ok(())
1789    }
1790
1791    /// Whether the IAM evaluator should be authoritative for runtime
1792    /// authorization. This flips on the first policy write and remains
1793    /// on after deletes so dropping all policies leaves the instance in
1794    /// default-deny rather than silently returning to role fallback.
1795    pub fn iam_authorization_enabled(&self) -> bool {
1796        self.iam_authorization_enabled.load(Ordering::Acquire)
1797    }
1798
1799    /// Remove a policy and any attachments referencing it.
1800    pub fn delete_policy(&self, id: &str) -> Result<(), AuthError> {
1801        let removed = self
1802            .policies
1803            .write()
1804            .unwrap_or_else(|e| e.into_inner())
1805            .remove(id)
1806            .is_some();
1807        if !removed {
1808            return Err(AuthError::Forbidden(format!("policy `{id}` not found")));
1809        }
1810        // Detach from every user / group.
1811        if let Ok(mut ua) = self.user_attachments.write() {
1812            for ids in ua.values_mut() {
1813                ids.retain(|p| p != id);
1814            }
1815            ua.retain(|_, v| !v.is_empty());
1816        }
1817        if let Ok(mut ga) = self.group_attachments.write() {
1818            for ids in ga.values_mut() {
1819                ids.retain(|p| p != id);
1820            }
1821            ga.retain(|_, v| !v.is_empty());
1822        }
1823        self.iam_effective_cache
1824            .write()
1825            .unwrap_or_else(|e| e.into_inner())
1826            .clear();
1827        // Issue #119: dropping a policy can shrink any caller's visible
1828        // set; clear the (tenant, role) cache so AI commands re-resolve.
1829        self.invalidate_visible_collections_cache();
1830        self.persist_iam_to_kv();
1831        Ok(())
1832    }
1833
1834    /// List all policies (id-sorted for deterministic output).
1835    pub fn list_policies(&self) -> Vec<Arc<Policy>> {
1836        let map = match self.policies.read() {
1837            Ok(g) => g,
1838            Err(_) => return Vec::new(),
1839        };
1840        let mut out: Vec<Arc<Policy>> = map.values().cloned().collect();
1841        out.sort_by(|a, b| a.id.cmp(&b.id));
1842        out
1843    }
1844
1845    /// Fetch a single policy by id.
1846    pub fn get_policy(&self, id: &str) -> Option<Arc<Policy>> {
1847        self.policies.read().ok().and_then(|m| m.get(id).cloned())
1848    }
1849
1850    /// List policies directly attached to a group.
1851    pub fn group_policies(&self, group: &str) -> Vec<Arc<Policy>> {
1852        let policies = self.policies.read();
1853        let attachments = self.group_attachments.read();
1854        let mut out = Vec::new();
1855        if let (Ok(p_map), Ok(ga_map)) = (policies, attachments) {
1856            if let Some(ids) = ga_map.get(group) {
1857                for id in ids {
1858                    if let Some(p) = p_map.get(id) {
1859                        out.push(p.clone());
1860                    }
1861                }
1862            }
1863        }
1864        out.sort_by(|a, b| a.id.cmp(&b.id));
1865        out
1866    }
1867
1868    /// Delete synthetic policies produced by SQL GRANT translation.
1869    /// REVOKE uses this to keep the IAM lane and the legacy grant table
1870    /// in lock-step.
1871    pub fn delete_synthetic_grant_policies(
1872        &self,
1873        principal: &GrantPrincipal,
1874        resource: &Resource,
1875        actions: &[Action],
1876    ) -> usize {
1877        let attached = match principal {
1878            GrantPrincipal::User(uid) => self
1879                .user_attachments
1880                .read()
1881                .ok()
1882                .and_then(|m| m.get(uid).cloned())
1883                .unwrap_or_default(),
1884            GrantPrincipal::Group(group) => self
1885                .group_attachments
1886                .read()
1887                .ok()
1888                .and_then(|m| m.get(group).cloned())
1889                .unwrap_or_default(),
1890            GrantPrincipal::Public => self
1891                .group_attachments
1892                .read()
1893                .ok()
1894                .and_then(|m| m.get(PUBLIC_IAM_GROUP).cloned())
1895                .unwrap_or_default(),
1896        };
1897        if attached.is_empty() {
1898            return 0;
1899        }
1900
1901        let mut delete_ids = Vec::new();
1902        if let Ok(policies) = self.policies.read() {
1903            for id in attached {
1904                let Some(policy) = policies.get(&id) else {
1905                    continue;
1906                };
1907                if !policy.id.starts_with("_grant_") {
1908                    continue;
1909                }
1910                if synthetic_grant_matches(policy, resource, actions) {
1911                    delete_ids.push(policy.id.clone());
1912                }
1913            }
1914        }
1915
1916        let mut deleted = 0usize;
1917        for id in delete_ids {
1918            if self.delete_policy(&id).is_ok() {
1919                deleted += 1;
1920            }
1921        }
1922        deleted
1923    }
1924
1925    /// Attach a policy to a user or group. Returns an error if the
1926    /// policy id doesn't exist.
1927    pub fn attach_policy(&self, principal: PrincipalRef, policy_id: &str) -> Result<(), AuthError> {
1928        if !self
1929            .policies
1930            .read()
1931            .map(|m| m.contains_key(policy_id))
1932            .unwrap_or(false)
1933        {
1934            return Err(AuthError::Forbidden(format!(
1935                "policy `{policy_id}` not found"
1936            )));
1937        }
1938        match &principal {
1939            PrincipalRef::User(uid) => {
1940                let mut ua = self
1941                    .user_attachments
1942                    .write()
1943                    .unwrap_or_else(|e| e.into_inner());
1944                let list = ua.entry(uid.clone()).or_default();
1945                if !list.iter().any(|p| p == policy_id) {
1946                    list.push(policy_id.to_string());
1947                }
1948                drop(ua);
1949                self.invalidate_iam_cache(Some(uid));
1950            }
1951            PrincipalRef::Group(g) => {
1952                let mut ga = self
1953                    .group_attachments
1954                    .write()
1955                    .unwrap_or_else(|e| e.into_inner());
1956                let list = ga.entry(g.clone()).or_default();
1957                if !list.iter().any(|p| p == policy_id) {
1958                    list.push(policy_id.to_string());
1959                }
1960                drop(ga);
1961                self.invalidate_iam_cache(None);
1962            }
1963        }
1964        self.persist_iam_to_kv();
1965        Ok(())
1966    }
1967
1968    /// Remove a policy attachment from a user or group.
1969    pub fn detach_policy(&self, principal: PrincipalRef, policy_id: &str) -> Result<(), AuthError> {
1970        match &principal {
1971            PrincipalRef::User(uid) => {
1972                if let Ok(mut ua) = self.user_attachments.write() {
1973                    if let Some(list) = ua.get_mut(uid) {
1974                        list.retain(|p| p != policy_id);
1975                        if list.is_empty() {
1976                            ua.remove(uid);
1977                        }
1978                    }
1979                }
1980                self.invalidate_iam_cache(Some(uid));
1981            }
1982            PrincipalRef::Group(g) => {
1983                if let Ok(mut ga) = self.group_attachments.write() {
1984                    if let Some(list) = ga.get_mut(g) {
1985                        list.retain(|p| p != policy_id);
1986                        if list.is_empty() {
1987                            ga.remove(g);
1988                        }
1989                    }
1990                }
1991                self.invalidate_iam_cache(None);
1992            }
1993        }
1994        self.persist_iam_to_kv();
1995        Ok(())
1996    }
1997
1998    /// Resolve the ordered list of effective policies for a user:
1999    /// group attachments first (least specific), then user
2000    /// attachments (most specific). Cached per user.
2001    pub fn effective_policies(&self, user: &UserId) -> Vec<Arc<Policy>> {
2002        if let Ok(cache) = self.iam_effective_cache.read() {
2003            if let Some(hit) = cache.get(user) {
2004                return hit.clone();
2005            }
2006        }
2007        let policies = self.policies.read();
2008        let user_attachments = self.user_attachments.read();
2009        let group_attachments = self.group_attachments.read();
2010        let mut groups = self
2011            .user_attributes
2012            .read()
2013            .ok()
2014            .and_then(|m| m.get(user).map(|attrs| attrs.groups.clone()))
2015            .unwrap_or_default();
2016        groups.insert(0, PUBLIC_IAM_GROUP.to_string());
2017        let mut out: Vec<Arc<Policy>> = Vec::new();
2018        if let (Ok(p_map), Ok(ua_map), Ok(ga_map)) = (policies, user_attachments, group_attachments)
2019        {
2020            for group in groups {
2021                if let Some(ids) = ga_map.get(&group) {
2022                    for id in ids {
2023                        if let Some(p) = p_map.get(id) {
2024                            out.push(p.clone());
2025                        }
2026                    }
2027                }
2028            }
2029            if let Some(ids) = ua_map.get(user) {
2030                for id in ids {
2031                    if let Some(p) = p_map.get(id) {
2032                        out.push(p.clone());
2033                    }
2034                }
2035            }
2036        }
2037        if let Ok(mut cache) = self.iam_effective_cache.write() {
2038            cache.insert(user.clone(), out.clone());
2039        }
2040        out
2041    }
2042
2043    /// Run the policy simulator for a principal. Synthesises an
2044    /// `EvalContext` from the user record + caller-supplied extras.
2045    pub fn simulate(
2046        &self,
2047        principal: &UserId,
2048        action: &str,
2049        resource: &ResourceRef,
2050        ctx_extras: SimCtx,
2051    ) -> SimulationOutcome {
2052        let (user_role, user_system_owned) = self
2053            .users
2054            .read()
2055            .ok()
2056            .and_then(|u| u.get(principal).map(|u| (Some(u.role), u.system_owned)))
2057            .unwrap_or((None, false));
2058        let principal_is_admin_role = user_role == Some(Role::Admin);
2059        let now = ctx_extras.now_ms.unwrap_or_else(now_ms);
2060        let ctx = EvalContext {
2061            principal_tenant: principal.tenant.clone(),
2062            current_tenant: ctx_extras.current_tenant,
2063            peer_ip: ctx_extras.peer_ip,
2064            mfa_present: ctx_extras.mfa_present,
2065            now_ms: now,
2066            principal_is_admin_role,
2067            principal_is_system_owned: user_system_owned,
2068            principal_is_platform_scoped: principal.tenant.is_none(),
2069        };
2070        let pols = self.effective_policies(principal);
2071        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
2072        iam_policies::simulate(&refs, action, resource, &ctx)
2073    }
2074
2075    /// Production hot-path policy evaluation. Returns `true` on Allow
2076    /// / AdminBypass, `false` on Deny / DefaultDeny.
2077    pub fn check_policy_authz(
2078        &self,
2079        principal: &UserId,
2080        action: &str,
2081        resource: &ResourceRef,
2082        ctx: &EvalContext,
2083    ) -> bool {
2084        let pols = self.effective_policies(principal);
2085        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
2086        let decision = iam_policies::evaluate(&refs, action, resource, ctx);
2087        matches!(
2088            decision,
2089            iam_policies::Decision::Allow { .. } | iam_policies::Decision::AdminBypass
2090        )
2091    }
2092
2093    /// Evaluate a resolved table projection through the column policy
2094    /// gate. Query paths should pass already-resolved column names; this
2095    /// helper intentionally does not parse SQL projection syntax.
2096    pub fn check_column_projection_authz(
2097        &self,
2098        principal: &UserId,
2099        request: &ColumnAccessRequest,
2100        ctx: &EvalContext,
2101    ) -> ColumnPolicyOutcome {
2102        let pols = self.effective_policies(principal);
2103        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
2104        ColumnPolicyGate::new(&refs).evaluate(request, ctx)
2105    }
2106
2107    fn invalidate_iam_cache(&self, uid: Option<&UserId>) {
2108        if let Ok(mut cache) = self.iam_effective_cache.write() {
2109            match uid {
2110                Some(u) => {
2111                    cache.remove(u);
2112                }
2113                None => cache.clear(),
2114            }
2115        }
2116    }
2117
2118    /// Drop every effective-policy cache entry. Called from execution
2119    /// paths that mutate policies/attachments without knowing which
2120    /// users will be affected.
2121    pub fn invalidate_all_iam_cache(&self) {
2122        self.invalidate_iam_cache(None);
2123    }
2124
2125    // -----------------------------------------------------------------
2126    // IAM persistence — vault_kv backed under `red.iam.*` keys
2127    // -----------------------------------------------------------------
2128
2129    /// Reload IAM state (policies + attachments) from the vault KV.
2130    /// Replaces the legacy `rehydrate_acl` reader — pre-1.0 we drop
2131    /// the old `red.acl.*` blob format entirely.
2132    pub fn rehydrate_iam(&self) {
2133        let mut enabled = self
2134            .vault_kv_get("red.iam.enabled")
2135            .map(|v| v == "true")
2136            .unwrap_or(false);
2137        // Policies — single JSON object keyed by id.
2138        if let Some(blob) = self.vault_kv_get("red.iam.policies") {
2139            if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
2140                if let Some(obj) = val.as_object() {
2141                    let mut map = HashMap::new();
2142                    for (id, body) in obj.iter() {
2143                        let s = body.to_string_compact();
2144                        if let Ok(p) = Policy::from_json_str(&s) {
2145                            map.insert(id.clone(), Arc::new(p));
2146                        }
2147                    }
2148                    if !map.is_empty() {
2149                        enabled = true;
2150                    }
2151                    *self.policies.write().unwrap_or_else(|e| e.into_inner()) = map;
2152                }
2153            }
2154        }
2155        // User attachments.
2156        if let Some(blob) = self.vault_kv_get("red.iam.attachments.users") {
2157            if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
2158                if let Some(obj) = val.as_object() {
2159                    let mut map: HashMap<UserId, Vec<String>> = HashMap::new();
2160                    for (encoded_uid, ids_v) in obj.iter() {
2161                        let Some(uid) = decode_uid(encoded_uid) else {
2162                            continue;
2163                        };
2164                        if let Some(arr) = ids_v.as_array() {
2165                            let ids: Vec<String> = arr
2166                                .iter()
2167                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
2168                                .collect();
2169                            map.insert(uid, ids);
2170                        }
2171                    }
2172                    *self
2173                        .user_attachments
2174                        .write()
2175                        .unwrap_or_else(|e| e.into_inner()) = map;
2176                }
2177            }
2178        }
2179        // Group attachments.
2180        if let Some(blob) = self.vault_kv_get("red.iam.attachments.groups") {
2181            if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
2182                if let Some(obj) = val.as_object() {
2183                    let mut map: HashMap<String, Vec<String>> = HashMap::new();
2184                    for (g, ids_v) in obj.iter() {
2185                        if let Some(arr) = ids_v.as_array() {
2186                            let ids: Vec<String> = arr
2187                                .iter()
2188                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
2189                                .collect();
2190                            map.insert(g.clone(), ids);
2191                        }
2192                    }
2193                    *self
2194                        .group_attachments
2195                        .write()
2196                        .unwrap_or_else(|e| e.into_inner()) = map;
2197                }
2198            }
2199        }
2200        self.iam_authorization_enabled
2201            .store(enabled, Ordering::Release);
2202        self.invalidate_iam_cache(None);
2203    }
2204
2205    /// Snapshot policies + attachments into the vault KV. Called
2206    /// after every mutation.
2207    fn persist_iam_to_kv(&self) {
2208        let enabled = if self.iam_authorization_enabled() {
2209            "true"
2210        } else {
2211            "false"
2212        };
2213        self.vault_kv_set("red.iam.enabled".to_string(), enabled.to_string());
2214
2215        // Policies: `{ "<id>": <policy_json>, ... }`
2216        let policies_obj = {
2217            let map = self.policies.read().unwrap_or_else(|e| e.into_inner());
2218            let mut obj = crate::serde_json::Map::new();
2219            for (id, p) in map.iter() {
2220                let s = p.to_json_string();
2221                if let Ok(v) = crate::serde_json::from_str::<crate::serde_json::Value>(&s) {
2222                    obj.insert(id.clone(), v);
2223                }
2224            }
2225            crate::serde_json::Value::Object(obj).to_string_compact()
2226        };
2227        self.vault_kv_set("red.iam.policies".to_string(), policies_obj);
2228
2229        // User attachments: `{ "<encoded_uid>": [ "<policy_id>", ... ], ... }`
2230        let users_obj = {
2231            let map = self
2232                .user_attachments
2233                .read()
2234                .unwrap_or_else(|e| e.into_inner());
2235            let mut obj = crate::serde_json::Map::new();
2236            for (uid, ids) in map.iter() {
2237                let arr = crate::serde_json::Value::Array(
2238                    ids.iter()
2239                        .map(|s| crate::serde_json::Value::String(s.clone()))
2240                        .collect(),
2241                );
2242                obj.insert(encode_uid(uid), arr);
2243            }
2244            crate::serde_json::Value::Object(obj).to_string_compact()
2245        };
2246        self.vault_kv_set("red.iam.attachments.users".to_string(), users_obj);
2247
2248        // Group attachments.
2249        let groups_obj = {
2250            let map = self
2251                .group_attachments
2252                .read()
2253                .unwrap_or_else(|e| e.into_inner());
2254            let mut obj = crate::serde_json::Map::new();
2255            for (g, ids) in map.iter() {
2256                let arr = crate::serde_json::Value::Array(
2257                    ids.iter()
2258                        .map(|s| crate::serde_json::Value::String(s.clone()))
2259                        .collect(),
2260                );
2261                obj.insert(g.clone(), arr);
2262            }
2263            crate::serde_json::Value::Object(obj).to_string_compact()
2264        };
2265        self.vault_kv_set("red.iam.attachments.groups".to_string(), groups_obj);
2266    }
2267}
2268
2269fn synthetic_grant_matches(policy: &Policy, resource: &Resource, actions: &[Action]) -> bool {
2270    policy.statements.iter().any(|st| {
2271        st.effect == crate::auth::policies::Effect::Allow
2272            && st.condition.is_none()
2273            && grant_actions_overlap(&st.actions, actions)
2274            && grant_resource_matches(&st.resources, resource)
2275    })
2276}
2277
2278fn grant_actions_overlap(
2279    patterns: &[crate::auth::policies::ActionPattern],
2280    actions: &[Action],
2281) -> bool {
2282    if actions.contains(&Action::All) {
2283        return true;
2284    }
2285    patterns.iter().any(|pat| match pat {
2286        crate::auth::policies::ActionPattern::Wildcard => true,
2287        crate::auth::policies::ActionPattern::Exact(s) => {
2288            actions.iter().any(|a| s.eq_ignore_ascii_case(a.as_str()))
2289        }
2290        crate::auth::policies::ActionPattern::Prefix(_) => false,
2291    })
2292}
2293
2294fn grant_resource_matches(
2295    patterns: &[crate::auth::policies::ResourcePattern],
2296    resource: &Resource,
2297) -> bool {
2298    let expected = grant_resource_pattern(resource);
2299    patterns.iter().any(|pat| pat == &expected)
2300}
2301
2302fn grant_resource_pattern(resource: &Resource) -> crate::auth::policies::ResourcePattern {
2303    use crate::auth::policies::ResourcePattern;
2304
2305    match resource {
2306        Resource::Database => ResourcePattern::Glob("table:*".to_string()),
2307        Resource::Schema(s) => ResourcePattern::Glob(format!("table:{s}.*")),
2308        Resource::Table { schema, table } => ResourcePattern::Exact {
2309            kind: "table".to_string(),
2310            name: match schema {
2311                Some(s) => format!("{s}.{table}"),
2312                None => table.clone(),
2313            },
2314        },
2315        Resource::Function { schema, name } => ResourcePattern::Exact {
2316            kind: "function".to_string(),
2317            name: match schema {
2318                Some(s) => format!("{s}.{name}"),
2319                None => name.clone(),
2320            },
2321        },
2322    }
2323}
2324
2325// ===========================================================================
2326// ACL serialization helpers — line-oriented, human-readable so an
2327// operator inspecting the vault dump can spot misconfigurations.
2328//
2329// Format (one record per line):
2330//   GRANT|<resource>|<actions_csv>|<with_grant_option>|<tenant_or_*>|<granted_by>|<granted_at>
2331//   ATTR|<valid_until>|<connection_limit>|<search_path>
2332//
2333// Resources are encoded as:
2334//   db                          → Database
2335//   schema:<name>               → Schema(name)
2336//   table:<schema_or_*>:<name>  → Table { schema, table }
2337//   func:<schema_or_*>:<name>   → Function { schema, name }
2338// ===========================================================================
2339
2340fn encode_uid(uid: &UserId) -> String {
2341    match &uid.tenant {
2342        Some(t) => format!("{}/{}", t, uid.username),
2343        None => format!("*/{}", uid.username),
2344    }
2345}
2346
2347fn decode_uid(s: &str) -> Option<UserId> {
2348    let (tenant, username) = s.split_once('/')?;
2349    Some(if tenant == "*" {
2350        UserId::platform(username)
2351    } else {
2352        UserId::scoped(tenant, username)
2353    })
2354}
2355
2356fn encode_resource(r: &Resource) -> String {
2357    match r {
2358        Resource::Database => "db".into(),
2359        Resource::Schema(s) => format!("schema:{}", s),
2360        Resource::Table { schema, table } => {
2361            format!("table:{}:{}", schema.as_deref().unwrap_or("*"), table)
2362        }
2363        Resource::Function { schema, name } => {
2364            format!("func:{}:{}", schema.as_deref().unwrap_or("*"), name)
2365        }
2366    }
2367}
2368
2369fn decode_resource(s: &str) -> Option<Resource> {
2370    if s == "db" {
2371        return Some(Resource::Database);
2372    }
2373    if let Some(rest) = s.strip_prefix("schema:") {
2374        return Some(Resource::Schema(rest.to_string()));
2375    }
2376    if let Some(rest) = s.strip_prefix("table:") {
2377        let (schema, table) = rest.split_once(':')?;
2378        return Some(Resource::Table {
2379            schema: if schema == "*" {
2380                None
2381            } else {
2382                Some(schema.to_string())
2383            },
2384            table: table.to_string(),
2385        });
2386    }
2387    if let Some(rest) = s.strip_prefix("func:") {
2388        let (schema, name) = rest.split_once(':')?;
2389        return Some(Resource::Function {
2390            schema: if schema == "*" {
2391                None
2392            } else {
2393                Some(schema.to_string())
2394            },
2395            name: name.to_string(),
2396        });
2397    }
2398    None
2399}
2400
2401fn encode_grants_blob(grants: &[Grant]) -> String {
2402    let mut out = String::new();
2403    for g in grants {
2404        let actions: Vec<&str> = g.actions.iter().map(|a| a.as_str()).collect();
2405        out.push_str(&format!(
2406            "GRANT|{}|{}|{}|{}|{}|{}\n",
2407            encode_resource(&g.resource),
2408            actions.join(","),
2409            g.with_grant_option,
2410            g.tenant.as_deref().unwrap_or("*"),
2411            g.granted_by,
2412            g.granted_at,
2413        ));
2414    }
2415    out
2416}
2417
2418fn decode_grants_blob(s: &str) -> Option<Vec<Grant>> {
2419    let mut out = Vec::new();
2420    for line in s.lines() {
2421        if line.is_empty() {
2422            continue;
2423        }
2424        let parts: Vec<&str> = line.split('|').collect();
2425        if parts.len() != 7 || parts[0] != "GRANT" {
2426            return None;
2427        }
2428        let resource = decode_resource(parts[1])?;
2429        let mut actions = std::collections::BTreeSet::new();
2430        for token in parts[2].split(',') {
2431            if let Some(a) = Action::from_keyword(token) {
2432                actions.insert(a);
2433            }
2434        }
2435        let with_grant_option = parts[3] == "true";
2436        let tenant = if parts[4] == "*" {
2437            None
2438        } else {
2439            Some(parts[4].to_string())
2440        };
2441        let granted_by = parts[5].to_string();
2442        let granted_at: u128 = parts[6].parse().unwrap_or(0);
2443        out.push(Grant {
2444            // Principal field is reconstructed by the loader from the
2445            // storage-key prefix; default to `Public` here.
2446            principal: GrantPrincipal::Public,
2447            resource,
2448            actions,
2449            with_grant_option,
2450            granted_by,
2451            granted_at,
2452            tenant,
2453            columns: None,
2454        });
2455    }
2456    Some(out)
2457}
2458
2459fn encode_attrs_blob(a: &UserAttributes) -> String {
2460    let valid = a
2461        .valid_until
2462        .map(|t| t.to_string())
2463        .unwrap_or_else(|| "*".into());
2464    let limit = a
2465        .connection_limit
2466        .map(|l| l.to_string())
2467        .unwrap_or_else(|| "*".into());
2468    let path = a.search_path.clone().unwrap_or_else(|| "*".into());
2469    let groups = if a.groups.is_empty() {
2470        "*".to_string()
2471    } else {
2472        a.groups.join(",")
2473    };
2474    format!("ATTR|{}|{}|{}|{}\n", valid, limit, path, groups)
2475}
2476
2477fn decode_attrs_blob(s: &str) -> Option<UserAttributes> {
2478    let line = s.lines().next()?;
2479    let parts: Vec<&str> = line.split('|').collect();
2480    if !(parts.len() == 4 || parts.len() == 5) || parts[0] != "ATTR" {
2481        return None;
2482    }
2483    let groups = if parts.get(4).copied().unwrap_or("*") == "*" {
2484        Vec::new()
2485    } else {
2486        parts[4]
2487            .split(',')
2488            .filter(|g| !g.is_empty())
2489            .map(|g| g.to_string())
2490            .collect()
2491    };
2492    Some(UserAttributes {
2493        valid_until: if parts[1] == "*" {
2494            None
2495        } else {
2496            parts[1].parse().ok()
2497        },
2498        connection_limit: if parts[2] == "*" {
2499            None
2500        } else {
2501            parts[2].parse().ok()
2502        },
2503        search_path: if parts[3] == "*" {
2504            None
2505        } else {
2506            Some(parts[3].to_string())
2507        },
2508        groups,
2509    })
2510}
2511
2512// ===========================================================================
2513// Password hashing
2514// ===========================================================================
2515
2516/// Derive a SCRAM-SHA-256 verifier for a fresh user / password
2517/// rotation. Salt is 16 random bytes; iter is the engine default
2518/// (`scram::DEFAULT_ITER`). Stored alongside the Argon2 password
2519/// hash so HTTP login + v2 SCRAM can both authenticate the same
2520/// user.
2521fn make_scram_verifier(password: &str) -> crate::auth::scram::ScramVerifier {
2522    let salt = random_bytes(16);
2523    crate::auth::scram::ScramVerifier::from_password(
2524        password,
2525        salt,
2526        crate::auth::scram::DEFAULT_ITER,
2527    )
2528}
2529
2530/// Hash a password using Argon2id.
2531///
2532/// Format: `argon2id$<salt_hex>$<hash_hex>`
2533pub(crate) fn hash_password(password: &str) -> String {
2534    let salt = random_bytes(16);
2535    let params = auth_argon2_params();
2536    let hash = derive_key(password.as_bytes(), &salt, &params);
2537    format!("argon2id${}${}", hex::encode(&salt), hex::encode(&hash))
2538}
2539
2540/// Verify a password against a stored `argon2id$<salt>$<hash>` string.
2541pub(crate) fn verify_password(password: &str, stored_hash: &str) -> bool {
2542    let parts: Vec<&str> = stored_hash.splitn(3, '$').collect();
2543    if parts.len() != 3 || parts[0] != "argon2id" {
2544        return false;
2545    }
2546
2547    let salt = match hex::decode(parts[1]) {
2548        Ok(s) => s,
2549        Err(_) => return false,
2550    };
2551
2552    let expected_hash = match hex::decode(parts[2]) {
2553        Ok(h) => h,
2554        Err(_) => return false,
2555    };
2556
2557    let params = auth_argon2_params();
2558    let computed = derive_key(password.as_bytes(), &salt, &params);
2559    constant_time_eq(&computed, &expected_hash)
2560}
2561
2562/// Constant-time byte comparison to avoid timing side-channels.
2563fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
2564    if a.len() != b.len() {
2565        return false;
2566    }
2567    let mut diff: u8 = 0;
2568    for (x, y) in a.iter().zip(b.iter()) {
2569        diff |= x ^ y;
2570    }
2571    diff == 0
2572}
2573
2574// ===========================================================================
2575// Token generation
2576// ===========================================================================
2577
2578fn generate_session_token() -> String {
2579    format!("rs_{}", hex::encode(random_bytes(32)))
2580}
2581
2582fn generate_api_key() -> String {
2583    format!("rk_{}", hex::encode(random_bytes(32)))
2584}
2585
2586/// Generate `n` random bytes and return as a hex string.
2587fn random_hex(n: usize) -> String {
2588    hex::encode(random_bytes(n))
2589}
2590
2591/// Generate `n` cryptographically random bytes using the OS CSPRNG,
2592/// then mix with SHA-256 for domain separation.
2593pub(crate) fn random_bytes(n: usize) -> Vec<u8> {
2594    let mut buf = vec![0u8; n.max(32)];
2595    if os_random::fill_bytes(&mut buf).is_err() {
2596        // Fallback: use system time and pointers as entropy (best-effort).
2597        let seed = now_ms().to_le_bytes();
2598        for (i, byte) in buf.iter_mut().enumerate() {
2599            *byte = seed[i % seed.len()] ^ (i as u8);
2600        }
2601    }
2602    // SHA-256 mix to ensure uniform distribution.
2603    let digest = sha256(&buf);
2604    if n <= 32 {
2605        digest[..n].to_vec()
2606    } else {
2607        // Chain SHA-256 for longer outputs (unusual but supported).
2608        let mut out = Vec::with_capacity(n);
2609        let mut prev = digest;
2610        while out.len() < n {
2611            out.extend_from_slice(&prev[..std::cmp::min(32, n - out.len())]);
2612            prev = sha256(&prev);
2613        }
2614        out
2615    }
2616}
2617
2618// ===========================================================================
2619// Helpers
2620// ===========================================================================
2621
2622fn lock_err<T>(_: T) -> AuthError {
2623    AuthError::Internal("lock poisoned".to_string())
2624}
2625
2626fn reject_system_owned(uid: &UserId, user: &User) -> Result<(), AuthError> {
2627    if user.system_owned {
2628        return Err(AuthError::SystemUserImmutable {
2629            username: uid.to_string(),
2630        });
2631    }
2632    Ok(())
2633}
2634
2635// ===========================================================================
2636// Tests
2637// ===========================================================================
2638
2639#[cfg(test)]
2640mod tests {
2641    use super::*;
2642
2643    fn test_config() -> AuthConfig {
2644        AuthConfig {
2645            enabled: true,
2646            session_ttl_secs: 60,
2647            require_auth: true,
2648            auto_encrypt_storage: false,
2649            vault_enabled: false,
2650            cert: Default::default(),
2651            oauth: Default::default(),
2652        }
2653    }
2654
2655    #[test]
2656    fn test_create_and_list_users() {
2657        let store = AuthStore::new(test_config());
2658        store.create_user("alice", "pass1", Role::Admin).unwrap();
2659        store.create_user("bob", "pass2", Role::Read).unwrap();
2660
2661        let users = store.list_users();
2662        assert_eq!(users.len(), 2);
2663        // Password hashes should be redacted.
2664        for u in &users {
2665            assert!(u.password_hash.is_empty());
2666        }
2667    }
2668
2669    #[test]
2670    fn test_create_duplicate_user() {
2671        let store = AuthStore::new(test_config());
2672        store.create_user("alice", "pass", Role::Admin).unwrap();
2673        let err = store.create_user("alice", "pass2", Role::Read).unwrap_err();
2674        assert!(matches!(err, AuthError::UserExists(_)));
2675    }
2676
2677    #[test]
2678    fn test_authenticate_and_validate() {
2679        let store = AuthStore::new(test_config());
2680        store.create_user("alice", "secret", Role::Write).unwrap();
2681
2682        let session = store.authenticate("alice", "secret").unwrap();
2683        assert!(session.token.starts_with("rs_"));
2684
2685        let (username, role) = store.validate_token(&session.token).unwrap();
2686        assert_eq!(username, "alice");
2687        assert_eq!(role, Role::Write);
2688    }
2689
2690    #[test]
2691    fn test_authenticate_wrong_password() {
2692        let store = AuthStore::new(test_config());
2693        store.create_user("alice", "secret", Role::Read).unwrap();
2694
2695        let err = store.authenticate("alice", "wrong").unwrap_err();
2696        assert!(matches!(err, AuthError::InvalidCredentials));
2697    }
2698
2699    #[test]
2700    fn test_api_key_lifecycle() {
2701        let store = AuthStore::new(test_config());
2702        store.create_user("alice", "pass", Role::Admin).unwrap();
2703
2704        let key = store
2705            .create_api_key("alice", "ci-token", Role::Write)
2706            .unwrap();
2707        assert!(key.key.starts_with("rk_"));
2708
2709        let (username, role) = store.validate_token(&key.key).unwrap();
2710        assert_eq!(username, "alice");
2711        assert_eq!(role, Role::Write);
2712
2713        store.revoke_api_key(&key.key).unwrap();
2714        assert!(store.validate_token(&key.key).is_none());
2715    }
2716
2717    #[test]
2718    fn test_api_key_role_exceeded() {
2719        let store = AuthStore::new(test_config());
2720        store.create_user("bob", "pass", Role::Read).unwrap();
2721
2722        let err = store
2723            .create_api_key("bob", "escalate", Role::Admin)
2724            .unwrap_err();
2725        assert!(matches!(err, AuthError::RoleExceeded { .. }));
2726    }
2727
2728    #[test]
2729    fn test_change_password() {
2730        let store = AuthStore::new(test_config());
2731        store.create_user("alice", "old", Role::Write).unwrap();
2732
2733        store.change_password("alice", "old", "new").unwrap();
2734
2735        // Old password should fail.
2736        assert!(store.authenticate("alice", "old").is_err());
2737        // New password should succeed.
2738        assert!(store.authenticate("alice", "new").is_ok());
2739    }
2740
2741    #[test]
2742    fn test_change_role() {
2743        let store = AuthStore::new(test_config());
2744        store.create_user("alice", "pass", Role::Admin).unwrap();
2745        store.create_api_key("alice", "key1", Role::Admin).unwrap();
2746
2747        store.change_role("alice", Role::Read).unwrap();
2748
2749        // User's role should be Read now.
2750        let users = store.list_users();
2751        let alice = users.iter().find(|u| u.username == "alice").unwrap();
2752        assert_eq!(alice.role, Role::Read);
2753
2754        // API keys should have been downgraded.
2755        assert_eq!(alice.api_keys[0].role, Role::Read);
2756    }
2757
2758    #[test]
2759    fn test_system_owned_user_blocks_destructive_mutations() {
2760        let store = AuthStore::new(test_config());
2761        store
2762            .create_system_user("system", "pass", Role::Admin, None)
2763            .unwrap();
2764
2765        let uid = UserId::platform("system");
2766        let err = store.delete_user("system").unwrap_err();
2767        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
2768
2769        let err = store.change_password("system", "pass", "new").unwrap_err();
2770        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
2771
2772        let err = store.change_role("system", Role::Read).unwrap_err();
2773        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
2774
2775        let err = store.set_user_enabled(&uid, false).unwrap_err();
2776        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
2777
2778        let key = store
2779            .create_api_key("system", "rotation", Role::Admin)
2780            .unwrap();
2781        assert!(store.validate_token(&key.key).is_some());
2782        store.revoke_api_key(&key.key).unwrap();
2783        assert!(store.validate_token(&key.key).is_none());
2784    }
2785
2786    #[test]
2787    fn test_tenant_scoped_system_owned_user_guardrail_integration() {
2788        // Acceptance #1 + #2 + #4 (tenant-scoped half) for issue #647.
2789        //
2790        // The destructive-mutation guard (`reject_system_owned`) is on
2791        // the tenant-aware code paths (`*_in_tenant`), but the existing
2792        // `test_system_owned_user_blocks_destructive_mutations` only
2793        // exercises the platform shim. Pin the tenant-scoped path so a
2794        // future refactor that loses the guard on `*_in_tenant` is
2795        // caught.
2796        let store = AuthStore::new(test_config());
2797        store
2798            .create_system_user("ops", "pass", Role::Admin, Some("acme"))
2799            .unwrap();
2800
2801        let uid = UserId::scoped("acme", "ops");
2802
2803        // delete / disable / password-change / role-change — all four
2804        // destructive paths block on a tenant-scoped system-owned user.
2805        let err = store
2806            .delete_user_in_tenant(Some("acme"), "ops")
2807            .unwrap_err();
2808        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
2809
2810        let err = store
2811            .change_password_in_tenant(Some("acme"), "ops", "pass", "new")
2812            .unwrap_err();
2813        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
2814
2815        let err = store
2816            .change_role_in_tenant(Some("acme"), "ops", Role::Read)
2817            .unwrap_err();
2818        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
2819
2820        let err = store.set_user_enabled(&uid, false).unwrap_err();
2821        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
2822
2823        // Tenant-isolation sanity: a same-named non-system user under a
2824        // different tenant remains fully mutable. Without this the test
2825        // above could be passing because the guard refuses all `ops`
2826        // users by name rather than by id.
2827        store
2828            .create_user_in_tenant(Some("globex"), "ops", "pass", Role::Admin)
2829            .unwrap();
2830        store
2831            .change_role_in_tenant(Some("globex"), "ops", Role::Read)
2832            .unwrap();
2833        store.delete_user_in_tenant(Some("globex"), "ops").unwrap();
2834
2835        // API-key rotation path still works for the system-owned user
2836        // (intentionally bypasses `reject_system_owned`).
2837        let key = store
2838            .create_api_key_in_tenant(Some("acme"), "ops", "rotation", Role::Admin)
2839            .unwrap();
2840        assert!(store.validate_token(&key.key).is_some());
2841        store.revoke_api_key(&key.key).unwrap();
2842        assert!(store.validate_token(&key.key).is_none());
2843    }
2844
2845    #[test]
2846    fn test_regular_user_mutations_still_work() {
2847        let store = AuthStore::new(test_config());
2848        store.create_user("alice", "old", Role::Admin).unwrap();
2849
2850        let uid = UserId::platform("alice");
2851        store.set_user_enabled(&uid, false).unwrap();
2852        assert!(matches!(
2853            store.authenticate("alice", "old"),
2854            Err(AuthError::InvalidCredentials)
2855        ));
2856
2857        store.set_user_enabled(&uid, true).unwrap();
2858        store.change_password("alice", "old", "new").unwrap();
2859        store.change_role("alice", Role::Read).unwrap();
2860        store.delete_user("alice").unwrap();
2861        assert!(matches!(
2862            store.authenticate("alice", "new"),
2863            Err(AuthError::InvalidCredentials)
2864        ));
2865    }
2866
2867    #[test]
2868    fn test_delete_user() {
2869        let store = AuthStore::new(test_config());
2870        store.create_user("alice", "pass", Role::Admin).unwrap();
2871        let key = store.create_api_key("alice", "key1", Role::Read).unwrap();
2872        let session = store.authenticate("alice", "pass").unwrap();
2873
2874        store.delete_user("alice").unwrap();
2875
2876        assert!(store.validate_token(&key.key).is_none());
2877        assert!(store.validate_token(&session.token).is_none());
2878        assert!(store.list_users().is_empty());
2879    }
2880
2881    #[test]
2882    fn test_revoke_session() {
2883        let store = AuthStore::new(test_config());
2884        store.create_user("alice", "pass", Role::Read).unwrap();
2885        let session = store.authenticate("alice", "pass").unwrap();
2886
2887        store.revoke_session(&session.token);
2888        assert!(store.validate_token(&session.token).is_none());
2889    }
2890
2891    #[test]
2892    fn test_password_hash_format() {
2893        let hash = hash_password("test");
2894        assert!(hash.starts_with("argon2id$"));
2895        let parts: Vec<&str> = hash.splitn(3, '$').collect();
2896        assert_eq!(parts.len(), 3);
2897        // Salt is 16 bytes = 32 hex chars.
2898        assert_eq!(parts[1].len(), 32);
2899        // Hash is 32 bytes = 64 hex chars.
2900        assert_eq!(parts[2].len(), 64);
2901    }
2902
2903    #[test]
2904    fn test_constant_time_eq() {
2905        assert!(constant_time_eq(b"hello", b"hello"));
2906        assert!(!constant_time_eq(b"hello", b"world"));
2907        assert!(!constant_time_eq(b"short", b"longer"));
2908    }
2909
2910    #[test]
2911    fn test_bootstrap_seals_permanently() {
2912        let store = AuthStore::new(test_config());
2913
2914        assert!(store.needs_bootstrap());
2915        assert!(!store.is_bootstrapped());
2916
2917        // First bootstrap succeeds
2918        let result = store.bootstrap("admin", "secret");
2919        assert!(result.is_ok());
2920        let br = result.unwrap();
2921        assert_eq!(br.user.username, "admin");
2922        assert_eq!(br.user.role, Role::Admin);
2923        assert!(br.api_key.key.starts_with("rk_"));
2924        // No vault configured, so no certificate.
2925        assert!(br.certificate.is_none());
2926
2927        // Sealed now
2928        assert!(!store.needs_bootstrap());
2929        assert!(store.is_bootstrapped());
2930
2931        // Second bootstrap fails -- sealed permanently
2932        let result = store.bootstrap("admin2", "secret2");
2933        assert!(result.is_err());
2934        let err = result.unwrap_err();
2935        assert!(err.to_string().contains("sealed permanently"));
2936
2937        // Only 1 user exists (the first one)
2938        assert_eq!(store.list_users().len(), 1);
2939        assert_eq!(store.list_users()[0].username, "admin");
2940    }
2941
2942    #[test]
2943    fn test_bootstrap_after_manual_user_creation() {
2944        let store = AuthStore::new(test_config());
2945
2946        // Create a user manually first
2947        store.create_user("existing", "pass", Role::Read).unwrap();
2948
2949        // Bootstrap sees the seal hasn't been set but users exist
2950        // The atomic seal fires first, then the users check catches it
2951        assert!(!store.needs_bootstrap()); // users exist → false
2952    }
2953
2954    // ---------------------------------------------------------------
2955    // Tenant scoping
2956    // ---------------------------------------------------------------
2957
2958    #[test]
2959    fn test_same_username_two_tenants_distinct() {
2960        let store = AuthStore::new(test_config());
2961        store
2962            .create_user_in_tenant(Some("acme"), "alice", "pw-acme", Role::Write)
2963            .unwrap();
2964        store
2965            .create_user_in_tenant(Some("globex"), "alice", "pw-globex", Role::Read)
2966            .unwrap();
2967
2968        // Two distinct users.
2969        let users = store.list_users();
2970        assert_eq!(users.len(), 2);
2971
2972        // Each verifies its own password under its own tenant.
2973        assert!(store
2974            .authenticate_in_tenant(Some("acme"), "alice", "pw-acme")
2975            .is_ok());
2976        assert!(store
2977            .authenticate_in_tenant(Some("globex"), "alice", "pw-globex")
2978            .is_ok());
2979
2980        // Cross-tenant credentials are rejected.
2981        assert!(store
2982            .authenticate_in_tenant(Some("acme"), "alice", "pw-globex")
2983            .is_err());
2984        assert!(store
2985            .authenticate_in_tenant(Some("globex"), "alice", "pw-acme")
2986            .is_err());
2987    }
2988
2989    #[test]
2990    fn test_session_carries_tenant() {
2991        let store = AuthStore::new(test_config());
2992        store
2993            .create_user_in_tenant(Some("acme"), "alice", "pw", Role::Admin)
2994            .unwrap();
2995        let session = store
2996            .authenticate_in_tenant(Some("acme"), "alice", "pw")
2997            .unwrap();
2998        assert_eq!(session.tenant_id.as_deref(), Some("acme"));
2999
3000        let (id, role) = store.validate_token_full(&session.token).unwrap();
3001        assert_eq!(id.tenant.as_deref(), Some("acme"));
3002        assert_eq!(id.username, "alice");
3003        assert_eq!(role, Role::Admin);
3004    }
3005
3006    #[test]
3007    fn test_platform_user_has_no_tenant() {
3008        let store = AuthStore::new(test_config());
3009        store.create_user("admin", "pw", Role::Admin).unwrap();
3010        let session = store.authenticate("admin", "pw").unwrap();
3011        assert!(session.tenant_id.is_none());
3012
3013        let (id, _) = store.validate_token_full(&session.token).unwrap();
3014        assert!(id.tenant.is_none());
3015    }
3016
3017    #[test]
3018    fn test_lookup_scram_verifier_global_resolves_platform() {
3019        let store = AuthStore::new(test_config());
3020        store.create_user("admin", "pw", Role::Admin).unwrap();
3021        store
3022            .create_user_in_tenant(Some("acme"), "admin", "pw", Role::Admin)
3023            .unwrap();
3024
3025        // The global helper picks the platform-tenant user only.
3026        let v = store.lookup_scram_verifier_global("admin");
3027        assert!(v.is_some());
3028
3029        // The tenant-scoped user has its own verifier.
3030        let v_acme = store.lookup_scram_verifier(&UserId::scoped("acme", "admin"));
3031        assert!(v_acme.is_some());
3032
3033        // The two verifiers carry independent salts.
3034        assert_ne!(v.unwrap().salt, v_acme.unwrap().salt);
3035    }
3036
3037    #[test]
3038    fn test_delete_in_tenant_does_not_touch_other_tenant() {
3039        let store = AuthStore::new(test_config());
3040        store
3041            .create_user_in_tenant(Some("acme"), "alice", "pw", Role::Admin)
3042            .unwrap();
3043        store
3044            .create_user_in_tenant(Some("globex"), "alice", "pw", Role::Admin)
3045            .unwrap();
3046
3047        store.delete_user_in_tenant(Some("acme"), "alice").unwrap();
3048
3049        // Globex still alive.
3050        assert!(store
3051            .authenticate_in_tenant(Some("globex"), "alice", "pw")
3052            .is_ok());
3053        // Acme gone.
3054        assert!(store
3055            .authenticate_in_tenant(Some("acme"), "alice", "pw")
3056            .is_err());
3057    }
3058
3059    #[test]
3060    fn test_user_id_display() {
3061        assert_eq!(UserId::platform("admin").to_string(), "admin");
3062        assert_eq!(UserId::scoped("acme", "alice").to_string(), "acme/alice");
3063    }
3064}