Skip to main content

reddb_server/auth/
store.rs

1//! AuthStore -- manages users, sessions, and API keys in memory.
2//!
3//! Password hashing delegates to the existing Argon2id implementation in
4//! `crate::storage::encryption::argon2id`.  Token generation uses the
5//! OS CSPRNG (`crate::crypto::os_random`) plus SHA-256.
6#![deny(clippy::disallowed_methods)]
7
8use std::borrow::Cow;
9use std::collections::HashMap;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, RwLock};
12
13use crate::crypto::os_random;
14use crate::crypto::sha256::sha256;
15use crate::storage::encryption::argon2id::{derive_key, Argon2Params};
16use crate::storage::engine::pager::Pager;
17
18use super::column_policy_gate::{ColumnAccessRequest, ColumnPolicyGate, ColumnPolicyOutcome};
19use super::enforcement_mode::{legacy_rbac_decision, PolicyEnforcementMode};
20use super::managed_policy::{ManagedPolicyDecision, ManagedPolicyGate, PolicyOp};
21use super::policies::{self as iam_policies, EvalContext, Policy, ResourceRef, SimulationOutcome};
22use super::privileges::{
23    check_grant, Action, AuthzContext, AuthzError, Grant, GrantPrincipal, GrantsView,
24    PermissionCache, Resource, UserAttributes,
25};
26use super::registry::ConfigRegistry;
27use super::vault::{KeyPair, Vault, VaultState};
28use super::{now_ms, ApiKey, AuthConfig, AuthError, Role, Session, User, UserId};
29use crate::runtime::control_events::{
30    ControlEvent, ControlEventConfig, ControlEventCtx, ControlEventLedger, EventKind, Outcome,
31    Sensitivity,
32};
33
34// ---------------------------------------------------------------------------
35// PrincipalRef + SimCtx — IAM policy attachments
36// ---------------------------------------------------------------------------
37
38/// Principal targeted by `attach_policy` / `detach_policy`.
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub enum PrincipalRef {
41    User(UserId),
42    Group(String),
43}
44
45/// Reserved IAM group that every principal belongs to. Used by the
46/// GRANT-to-PUBLIC compatibility layer.
47pub const PUBLIC_IAM_GROUP: &str = "__public__";
48
49/// Optional context overrides for `simulate` — anything not set falls back
50/// to a default value when the kernel evaluates the request.
51#[derive(Debug, Clone, Default)]
52pub struct SimCtx {
53    pub current_tenant: Option<String>,
54    pub peer_ip: Option<std::net::IpAddr>,
55    pub mfa_present: bool,
56    pub now_ms: Option<u128>,
57}
58
59/// Control-event dependencies for audited IAM policy mutations.
60///
61/// The existing `put_policy` / `delete_policy` / attach / detach
62/// methods remain unaudited for bootstrap and synthetic GRANT internals.
63/// Public mutation surfaces that need evidence pass this context to the
64/// `*_with_control_events` siblings.
65pub struct PolicyMutationControl<'a> {
66    pub ctx: &'a ControlEventCtx<'a>,
67    pub ledger: &'a dyn ControlEventLedger,
68    pub config: ControlEventConfig,
69    pub registry: Option<&'a ConfigRegistry>,
70    pub actor: &'a UserId,
71    pub eval_ctx: &'a EvalContext,
72}
73
74/// Control-event dependencies for audited user lifecycle mutations.
75pub struct UserLifecycleControl<'a> {
76    pub ctx: &'a ControlEventCtx<'a>,
77    pub ledger: &'a dyn ControlEventLedger,
78    pub config: ControlEventConfig,
79}
80
81#[derive(Clone)]
82struct AuthStoreControlEvents {
83    ledger: Arc<dyn ControlEventLedger>,
84    config: ControlEventConfig,
85}
86
87// ---------------------------------------------------------------------------
88// BootstrapResult
89// ---------------------------------------------------------------------------
90
91/// Result of a successful bootstrap operation.
92///
93/// The `certificate` is the hex-encoded string the admin must save --
94/// it is the ONLY way to unseal the vault after a restart.
95#[derive(Debug)]
96pub struct BootstrapResult {
97    pub user: User,
98    pub api_key: ApiKey,
99    /// Certificate hex string.  `None` when vault is not configured.
100    pub certificate: Option<String>,
101}
102
103// ---------------------------------------------------------------------------
104// AuthStore
105// ---------------------------------------------------------------------------
106
107/// Central in-process authority for auth state.
108///
109/// All mutations are guarded by `RwLock`s so the store is `Send + Sync`.
110pub struct AuthStore {
111    /// `(tenant_id, username) -> User`. Tenant scoping is built into the
112    /// key so `alice@acme` and `alice@globex` are distinct identities.
113    users: RwLock<HashMap<UserId, User>>,
114    sessions: RwLock<HashMap<String, Session>>,
115    /// key-string -> (owner UserId, role)
116    api_key_index: RwLock<HashMap<String, (UserId, Role)>>,
117    /// Once true, bootstrap() is permanently sealed.
118    bootstrapped: AtomicBool,
119    config: AuthConfig,
120    /// Optional encrypted vault for persisting auth state to pager pages.
121    vault: RwLock<Option<Vault>>,
122    /// Reference to the pager for vault page I/O.
123    pager: Option<Arc<Pager>>,
124    /// Certificate-based keypair for token signing and vault seal.
125    /// Populated after bootstrap or after restoring from a sealed vault.
126    keypair: RwLock<Option<KeyPair>>,
127    /// Encrypted key-value store for arbitrary secrets.
128    /// Persisted to vault alongside users/api_keys.
129    vault_kv: RwLock<HashMap<String, String>>,
130    /// Per-user GRANT rows. Persisted via `vault_kv` under the
131    /// `red.acl.grants.<tenant>/<user>` key prefix so existing snapshot
132    /// logic keeps working without modification. See `privileges` module
133    /// for the resolution algorithm.
134    grants: RwLock<HashMap<UserId, Vec<Grant>>>,
135    /// PUBLIC grants — apply to every authenticated principal.
136    public_grants: RwLock<Vec<Grant>>,
137    /// PG-style account attributes (`VALID UNTIL`, `CONNECTION LIMIT`,
138    /// `search_path`). Keyed by `UserId`. Persisted under
139    /// `red.acl.attrs.<tenant>/<user>`.
140    user_attributes: RwLock<HashMap<UserId, UserAttributes>>,
141    /// Live session count per user — used by `CONNECTION LIMIT`
142    /// enforcement on login. Bumped at authenticate, decremented at
143    /// session revoke / expiry.
144    session_count_by_user: RwLock<HashMap<UserId, u32>>,
145    /// Pre-resolved (resource, action) cache built per-user so the
146    /// hot path skips a linear scan of the user's grants on every
147    /// statement. Invalidated on GRANT / REVOKE / ALTER USER.
148    permission_cache: RwLock<HashMap<UserId, PermissionCache>>,
149    /// IAM-style policies, keyed by id. Persisted under
150    /// `red.iam.policies`. The kernel in `super::policies` owns the
151    /// Policy type — this map just deduplicates and shares.
152    policies: RwLock<HashMap<String, Arc<Policy>>>,
153    /// Per-user policy attachments — ordered list of policy ids.
154    /// Persisted under `red.iam.attachments.users`.
155    user_attachments: RwLock<HashMap<UserId, Vec<String>>>,
156    /// Per-group policy attachments. Users join groups through
157    /// `UserAttributes::groups`; effective policies resolve group
158    /// attachments before user-direct attachments.
159    group_attachments: RwLock<HashMap<String, Vec<String>>>,
160    /// Cached effective `Vec<Arc<Policy>>` per user. Invalidated on
161    /// any policy mutation that affects the user's attachments.
162    iam_effective_cache: RwLock<HashMap<UserId, Vec<Arc<Policy>>>>,
163    /// Once any IAM policy is installed, authorization switches to the
164    /// IAM evaluator and stays deny-by-default even if policies are
165    /// later deleted. Persisted under `red.iam.enabled`.
166    iam_authorization_enabled: AtomicBool,
167    /// Selects the fallback behaviour when the IAM evaluator returns
168    /// `DefaultDeny`. See [`PolicyEnforcementMode`] and #712 / S5A.
169    /// Default is [`PolicyEnforcementMode::default_existing_install`]
170    /// (`LegacyRbac`) so an existing install upgrading past #712
171    /// keeps its current authorization posture until the operator
172    /// flips it explicitly. The bootstrap path sets this to
173    /// `PolicyOnly` for fresh installs; the boot-time config loader
174    /// supersedes both when a `red.config.policy.enforcement_mode`
175    /// value is present in the configured value store.
176    enforcement_mode: RwLock<PolicyEnforcementMode>,
177    /// Once-per-boot flag used by [`AuthStore::take_legacy_rbac_warn_once`]
178    /// to deliver exactly one `warn`-level log line when the store is
179    /// running under [`PolicyEnforcementMode::LegacyRbac`]. The boot
180    /// path checks the flag and, if it can claim it, emits the line.
181    legacy_rbac_boot_warn_emitted: AtomicBool,
182    /// `(tenant, role) → HashSet<CollectionId>` cache used by the AI
183    /// pipeline (issue #119). Distinct from `permission_cache`, which
184    /// is keyed by `UserId` and answers `(resource, action)` lookups —
185    /// this cache answers the inverse "what collections is this scope
186    /// allowed to read?" query that `AuthorizedSearch` uses to
187    /// pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidates before any
188    /// similarity score is computed. Entries TTL out at 60s and are
189    /// invalidated explicitly on GRANT/REVOKE/CREATE POLICY/DROP
190    /// POLICY/DROP COLLECTION.
191    visible_collections_cache: super::scope_cache::AuthCache,
192    control_events: RwLock<Option<AuthStoreControlEvents>>,
193}
194
195// Use fast-but-safe Argon2id params for auth hashing (smaller than the
196// default 64 MB so that user-management RPCs respond quickly).
197fn auth_argon2_params() -> Argon2Params {
198    Argon2Params {
199        m_cost: 4 * 1024, // 4 MB
200        t_cost: 3,
201        p: 1,
202        tag_len: 32,
203    }
204}
205
206fn policy_control_fields(
207    policy_id: &str,
208    policy: Option<&Policy>,
209    principal: Option<&PrincipalRef>,
210) -> HashMap<String, Sensitivity> {
211    let mut fields = HashMap::new();
212    fields.insert("policy_id".to_string(), Sensitivity::raw(policy_id));
213    if let Some(policy) = policy {
214        let effects = policy
215            .statements
216            .iter()
217            .map(|s| match s.effect {
218                iam_policies::Effect::Allow => "allow",
219                iam_policies::Effect::Deny => "deny",
220            })
221            .collect::<Vec<_>>()
222            .join(",");
223        let actions = policy
224            .statements
225            .iter()
226            .flat_map(|s| s.actions.iter())
227            .map(policy_action_pattern)
228            .collect::<Vec<_>>()
229            .join(",");
230        let resources = policy
231            .statements
232            .iter()
233            .flat_map(|s| s.resources.iter())
234            .map(policy_resource_pattern)
235            .collect::<Vec<_>>()
236            .join(",");
237        fields.insert("effect".to_string(), Sensitivity::raw(effects));
238        fields.insert("action".to_string(), Sensitivity::raw(actions));
239        fields.insert("resource".to_string(), Sensitivity::raw(resources));
240        if let Some(attrs) = policy_principal_attrs(policy) {
241            fields.insert("principal_attrs".to_string(), Sensitivity::raw(attrs));
242        }
243    }
244    if let Some(principal) = principal {
245        fields.insert(
246            "principal".to_string(),
247            Sensitivity::raw(policy_principal_label(principal)),
248        );
249    }
250    fields
251}
252
253fn policy_action_pattern(pattern: &iam_policies::ActionPattern) -> String {
254    match pattern {
255        iam_policies::ActionPattern::Exact(s) => s.clone(),
256        iam_policies::ActionPattern::Wildcard => "*".to_string(),
257        iam_policies::ActionPattern::Prefix(s) => format!("{s}:*"),
258    }
259}
260
261fn policy_resource_pattern(pattern: &iam_policies::ResourcePattern) -> String {
262    match pattern {
263        iam_policies::ResourcePattern::Exact { kind, name } => format!("{kind}:{name}"),
264        iam_policies::ResourcePattern::Glob(s) => s.clone(),
265        iam_policies::ResourcePattern::Wildcard => "*".to_string(),
266    }
267}
268
269fn policy_principal_attrs(policy: &Policy) -> Option<String> {
270    let mut attrs = Vec::new();
271    for statement in &policy.statements {
272        let Some(condition) = &statement.condition else {
273            continue;
274        };
275        if let Some(value) = condition.platform_scoped {
276            attrs.push(format!("platform_scoped={value}"));
277        }
278        if let Some(value) = condition.mfa {
279            attrs.push(format!("mfa={value}"));
280        }
281        if let Some(value) = condition.tenant_match {
282            attrs.push(format!("tenant_match={value}"));
283        }
284    }
285    if attrs.is_empty() {
286        None
287    } else {
288        Some(attrs.join(","))
289    }
290}
291
292fn policy_principal_label(principal: &PrincipalRef) -> String {
293    match principal {
294        PrincipalRef::User(uid) => format!("user:{uid}"),
295        PrincipalRef::Group(group) => format!("group:{group}"),
296    }
297}
298
299fn default_user_lifecycle_ctx<'a>() -> ControlEventCtx<'a> {
300    ControlEventCtx {
301        actor: crate::runtime::control_events::ActorRef::Anonymous,
302        scope: None,
303        request_id: None,
304        trace_id: None,
305    }
306}
307
308fn bootstrap_user_lifecycle_ctx<'a>() -> ControlEventCtx<'a> {
309    ControlEventCtx {
310        actor: crate::runtime::control_events::ActorRef::System("bootstrap"),
311        scope: None,
312        request_id: None,
313        trace_id: None,
314    }
315}
316
317fn user_resource(id: &UserId) -> String {
318    format!("user:{id}")
319}
320
321fn api_key_resource(api_key_id: &str) -> String {
322    format!("apikey:{api_key_id}")
323}
324
325fn api_key_id(key: &str) -> String {
326    hex::encode(sha256(key.as_bytes()))
327}
328
329fn password_evidence() -> Sensitivity {
330    Sensitivity::redacted()
331}
332
333fn user_control_fields(
334    id: &UserId,
335    role: Option<Role>,
336    enabled: Option<bool>,
337    include_password: bool,
338) -> HashMap<String, Sensitivity> {
339    let mut fields = HashMap::new();
340    fields.insert(
341        "username".to_string(),
342        Sensitivity::raw(id.username.clone()),
343    );
344    fields.insert(
345        "tenant_id".to_string(),
346        Sensitivity::raw(id.tenant.clone().unwrap_or_default()),
347    );
348    if let Some(role) = role {
349        fields.insert("role".to_string(), Sensitivity::raw(role.as_str()));
350    }
351    if let Some(enabled) = enabled {
352        fields.insert("enabled".to_string(), Sensitivity::raw(enabled.to_string()));
353    }
354    if include_password {
355        fields.insert("password".to_string(), password_evidence());
356    }
357    fields
358}
359
360fn api_key_control_fields(
361    id: &UserId,
362    role: Role,
363    api_key_id: &str,
364) -> HashMap<String, Sensitivity> {
365    let mut fields = user_control_fields(id, Some(role), None, false);
366    fields.insert("api_key_id".to_string(), Sensitivity::raw(api_key_id));
367    fields.insert("api_key".to_string(), Sensitivity::redacted());
368    fields
369}
370
371fn user_error_is_denied(err: &AuthError) -> bool {
372    !matches!(err, AuthError::Internal(_))
373}
374
375impl AuthStore {
376    // -----------------------------------------------------------------
377    // Construction
378    // -----------------------------------------------------------------
379
380    pub fn new(config: AuthConfig) -> Self {
381        Self {
382            users: RwLock::new(HashMap::new()),
383            sessions: RwLock::new(HashMap::new()),
384            api_key_index: RwLock::new(HashMap::new()),
385            bootstrapped: AtomicBool::new(false),
386            config,
387            vault: RwLock::new(None),
388            pager: None,
389            keypair: RwLock::new(None),
390            vault_kv: RwLock::new(HashMap::new()),
391            grants: RwLock::new(HashMap::new()),
392            public_grants: RwLock::new(Vec::new()),
393            user_attributes: RwLock::new(HashMap::new()),
394            session_count_by_user: RwLock::new(HashMap::new()),
395            permission_cache: RwLock::new(HashMap::new()),
396            policies: RwLock::new(HashMap::new()),
397            user_attachments: RwLock::new(HashMap::new()),
398            group_attachments: RwLock::new(HashMap::new()),
399            iam_effective_cache: RwLock::new(HashMap::new()),
400            iam_authorization_enabled: AtomicBool::new(false),
401            enforcement_mode: RwLock::new(PolicyEnforcementMode::default_existing_install()),
402            legacy_rbac_boot_warn_emitted: AtomicBool::new(false),
403            visible_collections_cache: super::scope_cache::AuthCache::new(
404                super::scope_cache::DEFAULT_TTL,
405            ),
406            control_events: RwLock::new(None),
407        }
408    }
409
410    pub fn configure_control_events(
411        &self,
412        ledger: Arc<dyn ControlEventLedger>,
413        config: ControlEventConfig,
414    ) {
415        *self
416            .control_events
417            .write()
418            .unwrap_or_else(|e| e.into_inner()) = Some(AuthStoreControlEvents { ledger, config });
419    }
420
421    fn configured_control_events(&self) -> Option<AuthStoreControlEvents> {
422        self.control_events
423            .read()
424            .unwrap_or_else(|e| e.into_inner())
425            .clone()
426    }
427
428    /// Create an `AuthStore` backed by encrypted vault pages inside the
429    /// main `.rdb` database file.
430    ///
431    /// If vault pages already exist, their contents are loaded and
432    /// restored into the in-memory store.  All subsequent mutations are
433    /// automatically persisted to the vault pages via the pager.
434    pub fn with_vault(
435        config: AuthConfig,
436        pager: Arc<Pager>,
437        passphrase: Option<&str>,
438    ) -> Result<Self, AuthError> {
439        let vault = Vault::open(&pager, passphrase)?;
440        let mut store = Self::new(config);
441
442        // Restore persisted state if vault pages exist.
443        if let Some(state) = vault.load(&pager)? {
444            store.restore_from_vault(state);
445        }
446
447        *store.vault.write().unwrap_or_else(|e| e.into_inner()) = Some(vault);
448        store.pager = Some(pager);
449        Ok(store)
450    }
451
452    pub fn config(&self) -> &AuthConfig {
453        &self.config
454    }
455
456    pub fn is_enabled(&self) -> bool {
457        self.config.enabled
458    }
459
460    /// Returns true when no users exist yet and bootstrap hasn't been sealed.
461    pub fn needs_bootstrap(&self) -> bool {
462        !self.bootstrapped.load(Ordering::Acquire)
463            && self.users.read().map(|u| u.is_empty()).unwrap_or(true)
464    }
465
466    /// Internal: read-locked lookup by `UserId`.
467    fn get_user_cloned(&self, id: &UserId) -> Option<User> {
468        self.users.read().ok().and_then(|m| m.get(id).cloned())
469    }
470
471    /// Whether bootstrap has already been performed (sealed).
472    pub fn is_bootstrapped(&self) -> bool {
473        self.bootstrapped.load(Ordering::Acquire)
474    }
475
476    /// Bootstrap the first admin user. One-shot, irreversible.
477    ///
478    /// Uses an atomic compare-exchange to guarantee that even under
479    /// concurrent calls, only the first one succeeds. Once sealed,
480    /// all subsequent calls fail immediately -- there is no undo.
481    ///
482    /// When a vault/pager is configured, a certificate-based keypair is
483    /// generated and the vault is re-encrypted with the certificate-derived
484    /// key.  The certificate hex string is returned in `BootstrapResult`
485    /// so the admin can save it.
486    pub fn bootstrap(&self, username: &str, password: &str) -> Result<BootstrapResult, AuthError> {
487        if let Some(configured) = self.configured_control_events() {
488            let ctx = bootstrap_user_lifecycle_ctx();
489            let control = UserLifecycleControl {
490                ctx: &ctx,
491                ledger: configured.ledger.as_ref(),
492                config: configured.config,
493            };
494            self.bootstrap_with_control_events_inner(username, password, &control)
495        } else {
496            self.bootstrap_unaudited(username, password)
497        }
498    }
499
500    pub fn bootstrap_with_control_events(
501        &self,
502        username: &str,
503        password: &str,
504        ctx: &ControlEventCtx<'_>,
505        ledger: &dyn ControlEventLedger,
506        config: ControlEventConfig,
507    ) -> Result<BootstrapResult, AuthError> {
508        let system_ctx = ControlEventCtx {
509            actor: crate::runtime::control_events::ActorRef::System("bootstrap"),
510            scope: ctx.scope.clone(),
511            request_id: ctx.request_id.clone(),
512            trace_id: ctx.trace_id.clone(),
513        };
514        let control = UserLifecycleControl {
515            ctx: &system_ctx,
516            ledger,
517            config,
518        };
519        self.bootstrap_with_control_events_inner(username, password, &control)
520    }
521
522    /// Backwards-compatible bootstrap entry point for older callers
523    /// that used to ask for an operator-owned first admin. Authorization
524    /// is now policy-first, so this creates the same ordinary platform
525    /// admin as [`Self::bootstrap`].
526    pub fn bootstrap_system_admin(
527        &self,
528        username: &str,
529        password: &str,
530    ) -> Result<BootstrapResult, AuthError> {
531        self.bootstrap(username, password)
532    }
533
534    fn bootstrap_unaudited(
535        &self,
536        username: &str,
537        password: &str,
538    ) -> Result<BootstrapResult, AuthError> {
539        // Atomic seal: only the first caller wins.
540        if self
541            .bootstrapped
542            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
543            .is_err()
544        {
545            return Err(AuthError::Forbidden(
546                "bootstrap already completed — sealed permanently".to_string(),
547            ));
548        }
549
550        // Double-check users are actually empty (belt and suspenders).
551        {
552            let users = self.users.read().map_err(lock_err)?;
553            if !users.is_empty() {
554                return Err(AuthError::Forbidden(
555                    "bootstrap already completed — users exist".to_string(),
556                ));
557            }
558        }
559
560        let user = self.create_user_in_tenant_unaudited(None, username, password, Role::Admin)?;
561        let key =
562            self.create_api_key_in_tenant_unaudited(None, username, "bootstrap", Role::Admin)?;
563
564        // Generate a certificate-based keypair and re-seal the vault.
565        let certificate = if let Some(ref pager) = self.pager {
566            let kp = KeyPair::generate();
567            let cert_hex = kp.certificate_hex();
568
569            // Re-create the vault using the certificate-derived key.
570            let new_vault = Vault::with_certificate_bytes(pager, &kp.certificate)
571                .map_err(|e| AuthError::Internal(format!("vault re-seal failed: {e}")))?;
572
573            // Store the keypair so token signing works immediately.
574            if let Ok(mut kp_guard) = self.keypair.write() {
575                *kp_guard = Some(kp);
576            }
577
578            // Replace the vault and persist with the master secret included.
579            if let Ok(mut vault_guard) = self.vault.write() {
580                *vault_guard = Some(new_vault);
581            }
582            // Generate the AES-256 secret key for Value::Secret encryption.
583            self.ensure_vault_secret_key();
584            self.persist_to_vault();
585
586            Some(cert_hex)
587        } else {
588            None
589        };
590
591        // #712 / S5A: fresh bootstraps land in the strict posture.
592        // Persist explicitly to vault_kv so subsequent boots
593        // (rehydrate_iam) pick up `policy_only` instead of the
594        // existing-install default. Existing installs upgrading past
595        // this commit never traverse this branch — `bootstrap()` is
596        // sealed once and never re-runs.
597        self.set_enforcement_mode(PolicyEnforcementMode::default_fresh_bootstrap());
598
599        Ok(BootstrapResult {
600            user,
601            api_key: key,
602            certificate,
603        })
604    }
605
606    fn bootstrap_with_control_events_inner(
607        &self,
608        username: &str,
609        password: &str,
610        control: &UserLifecycleControl<'_>,
611    ) -> Result<BootstrapResult, AuthError> {
612        let id = UserId::from_parts(None, username);
613        match self.bootstrap_unaudited(username, password) {
614            Ok(result) => {
615                let event_result = self.emit_user_lifecycle_allowed(
616                    control,
617                    EventKind::UserCreate,
618                    "user.create",
619                    &id,
620                    user_control_fields(&id, Some(Role::Admin), Some(true), true),
621                );
622                if let Err(err) = event_result {
623                    self.rollback_bootstrap(&id);
624                    return Err(err);
625                }
626                Ok(result)
627            }
628            Err(err) => {
629                self.emit_user_lifecycle_outcome(
630                    control,
631                    if user_error_is_denied(&err) {
632                        Outcome::Denied
633                    } else {
634                        Outcome::Error
635                    },
636                    EventKind::UserCreate,
637                    "user.create",
638                    &id,
639                    Some(err.to_string()),
640                    user_control_fields(&id, Some(Role::Admin), Some(true), true),
641                );
642                Err(err)
643            }
644        }
645    }
646
647    /// Auto-bootstrap from environment variables if no users exist.
648    ///
649    /// Checks `REDDB_USERNAME` and `REDDB_PASSWORD`. If both are set and
650    /// the user store is empty, creates the first admin user automatically.
651    /// This mirrors the Docker pattern (`MYSQL_ROOT_PASSWORD`, etc.).
652    ///
653    /// Returns `Some(BootstrapResult)` if bootstrapped, `None` if skipped.
654    pub fn bootstrap_from_env(&self) -> Option<BootstrapResult> {
655        if !self.needs_bootstrap() {
656            return None;
657        }
658
659        let username = std::env::var("REDDB_USERNAME").ok()?;
660        let password = std::env::var("REDDB_PASSWORD").ok()?;
661
662        if username.is_empty() || password.is_empty() {
663            return None;
664        }
665
666        match self.bootstrap(&username, &password) {
667            Ok(result) => {
668                // F-04: `username` is REDDB_USERNAME — operator-supplied
669                // (env), but still routed through the LogField escaper
670                // because env strings cross trust boundaries in some
671                // deployment models (k8s downward API, Vault sidecar,
672                // external secret operator). See ADR 0010.
673                tracing::info!(
674                    username = %reddb_wire::audit_safe_log_field(&username),
675                    "bootstrapped admin user from REDDB_USERNAME/REDDB_PASSWORD"
676                );
677                if let Some(ref cert) = result.certificate {
678                    // Certificate must be readable by the operator — keep it
679                    // in the log stream but print raw to stderr too so it
680                    // survives even if the log file gets rotated.
681                    eprintln!("[reddb] CERTIFICATE: {}", cert);
682                    tracing::warn!(
683                        "vault certificate issued — save it: ONLY way to unseal after restart"
684                    );
685                }
686                Some(result)
687            }
688            Err(e) => {
689                tracing::error!(err = %e, "env bootstrap failed");
690                None
691            }
692        }
693    }
694
695    // -----------------------------------------------------------------
696    // Vault persistence
697    // -----------------------------------------------------------------
698
699    /// Persist the current auth state to the vault pages (if configured).
700    fn persist_to_vault_result(&self) -> Result<(), AuthError> {
701        let vault_guard = self.vault.read().unwrap_or_else(|e| e.into_inner());
702        if let (Some(ref vault), Some(ref pager)) = (&*vault_guard, &self.pager) {
703            let state = self.snapshot();
704            vault.save(pager, &state)?;
705        }
706        Ok(())
707    }
708
709    /// Persist the current auth state to the vault pages (if configured).
710    ///
711    /// Legacy auth mutations still treat in-memory state as authoritative.
712    /// New secret-management paths use the `try_` methods below so callers
713    /// get a hard error if the vault write fails.
714    fn persist_to_vault(&self) {
715        if let Err(e) = self.persist_to_vault_result() {
716            tracing::error!(err = %e, "vault persist failed");
717            // Issue #205 — vault persist is the secret-rotation
718            // serialization point: when this fails, freshly rotated
719            // credentials live only in memory and a process restart
720            // loses them. Operator-grade event so the operator can
721            // intervene before the next restart.
722            crate::telemetry::operator_event::OperatorEvent::SecretRotationFailed {
723                secret_ref: "auth_vault".to_string(),
724                error: e.to_string(),
725            }
726            .emit_global();
727        }
728    }
729
730    /// True when this store has an encrypted vault and pager wired in.
731    pub fn is_vault_backed(&self) -> bool {
732        self.pager.is_some()
733            && self
734                .vault
735                .read()
736                .map(|guard| guard.is_some())
737                .unwrap_or(false)
738    }
739
740    // -----------------------------------------------------------------
741    // Vault KV — encrypted key-value store for arbitrary secrets
742    // -----------------------------------------------------------------
743
744    /// Read a value from the vault KV store. Returns `None` if not set.
745    pub fn vault_kv_get(&self, key: &str) -> Option<String> {
746        self.vault_kv
747            .read()
748            .ok()
749            .and_then(|kv| kv.get(key).cloned())
750    }
751
752    /// Snapshot vault KV values for statement-local secret resolution.
753    pub fn vault_kv_snapshot(&self) -> HashMap<String, String> {
754        self.vault_kv
755            .read()
756            .map(|kv| kv.clone())
757            .unwrap_or_default()
758    }
759
760    /// Export vault KV as an encrypted logical blob for JSONL dump/restore.
761    /// Returns `None` when the vault has no KV entries.
762    pub fn vault_kv_export_encrypted(&self) -> Result<Option<String>, AuthError> {
763        if !self.is_vault_backed() {
764            return Err(AuthError::Forbidden(
765                "vault KV export requires an enabled, unsealed vault".to_string(),
766            ));
767        }
768        let kv = self.vault_kv_snapshot();
769        if kv.is_empty() {
770            return Ok(None);
771        }
772
773        let vault_guard = self.vault.read().map_err(lock_err)?;
774        let vault = vault_guard.as_ref().ok_or_else(|| {
775            AuthError::Forbidden("vault KV export requires an enabled, unsealed vault".to_string())
776        })?;
777        let state = VaultState {
778            users: Vec::new(),
779            api_keys: Vec::new(),
780            bootstrapped: false,
781            master_secret: None,
782            kv,
783        };
784        Ok(Some(vault.seal_logical_export(&state)?))
785    }
786
787    /// Merge imported vault KV entries and fail if the encrypted vault
788    /// write cannot be made durable.
789    pub fn vault_kv_try_import(
790        &self,
791        entries: HashMap<String, String>,
792    ) -> Result<usize, AuthError> {
793        if !self.is_vault_backed() {
794            return Err(AuthError::Forbidden(
795                "vault KV import requires an enabled, unsealed vault".to_string(),
796            ));
797        }
798        if entries.is_empty() {
799            return Ok(0);
800        }
801
802        let mut previous = HashMap::new();
803        {
804            let mut kv = self.vault_kv.write().map_err(lock_err)?;
805            for (key, value) in &entries {
806                previous.insert(key.clone(), kv.insert(key.clone(), value.clone()));
807            }
808        }
809
810        if let Err(err) = self.persist_to_vault_result() {
811            if let Ok(mut kv) = self.vault_kv.write() {
812                for (key, old) in previous {
813                    match old {
814                        Some(value) => {
815                            kv.insert(key, value);
816                        }
817                        None => {
818                            kv.remove(&key);
819                        }
820                    }
821                }
822            }
823            return Err(err);
824        }
825
826        Ok(entries.len())
827    }
828
829    /// Import false placeholders for secrets whose encrypted payload
830    /// could not be decrypted during logical restore.
831    pub fn vault_kv_try_import_placeholders(&self, keys: &[String]) -> Result<usize, AuthError> {
832        let entries = keys
833            .iter()
834            .map(|key| (key.clone(), "false".to_string()))
835            .collect();
836        self.vault_kv_try_import(entries)
837    }
838
839    /// Write a value to the vault KV store, persisting to disk.
840    pub fn vault_kv_set(&self, key: String, value: String) {
841        if let Ok(mut kv) = self.vault_kv.write() {
842            kv.insert(key, value);
843        }
844        self.persist_to_vault();
845    }
846
847    /// Write a value to the vault KV store and fail if the vault write
848    /// cannot be made durable.
849    pub fn vault_kv_try_set(&self, key: String, value: String) -> Result<(), AuthError> {
850        if !self.is_vault_backed() {
851            return Err(AuthError::Forbidden(
852                "SET SECRET requires an enabled, unsealed vault".to_string(),
853            ));
854        }
855
856        let previous = {
857            let mut kv = self.vault_kv.write().map_err(lock_err)?;
858            kv.insert(key.clone(), value)
859        };
860
861        if let Err(err) = self.persist_to_vault_result() {
862            if let Ok(mut kv) = self.vault_kv.write() {
863                match previous {
864                    Some(value) => {
865                        kv.insert(key, value);
866                    }
867                    None => {
868                        kv.remove(&key);
869                    }
870                }
871            }
872            return Err(err);
873        }
874
875        Ok(())
876    }
877
878    /// Delete a value from the vault KV store. Returns true if it existed.
879    pub fn vault_kv_delete(&self, key: &str) -> bool {
880        let existed = self
881            .vault_kv
882            .write()
883            .map(|mut kv| kv.remove(key).is_some())
884            .unwrap_or(false);
885        if existed {
886            self.persist_to_vault();
887        }
888        existed
889    }
890
891    /// Delete a value from the vault KV store and fail if the vault write
892    /// cannot be made durable.
893    pub fn vault_kv_try_delete(&self, key: &str) -> Result<bool, AuthError> {
894        if !self.is_vault_backed() {
895            return Err(AuthError::Forbidden(
896                "DELETE SECRET requires an enabled, unsealed vault".to_string(),
897            ));
898        }
899
900        let removed = {
901            let mut kv = self.vault_kv.write().map_err(lock_err)?;
902            kv.remove(key)
903        };
904
905        if removed.is_none() {
906            return Ok(false);
907        }
908
909        if let Err(err) = self.persist_to_vault_result() {
910            if let Ok(mut kv) = self.vault_kv.write() {
911                if let Some(value) = removed {
912                    kv.insert(key.to_string(), value);
913                }
914            }
915            return Err(err);
916        }
917
918        Ok(true)
919    }
920
921    /// List all keys in the vault KV store.
922    pub fn vault_kv_keys(&self) -> Vec<String> {
923        self.vault_kv
924            .read()
925            .map(|kv| kv.keys().cloned().collect())
926            .unwrap_or_default()
927    }
928
929    /// Convenience: get the 32-byte secret key for Value::Secret encryption.
930    /// Generated on first boot and stored at `red.secret.aes_key`.
931    pub fn vault_secret_key(&self) -> Option<[u8; 32]> {
932        let hex_str = self.vault_kv_get("red.secret.aes_key")?;
933        let bytes = hex::decode(hex_str).ok()?;
934        if bytes.len() == 32 {
935            let mut key = [0u8; 32];
936            key.copy_from_slice(&bytes);
937            Some(key)
938        } else {
939            None
940        }
941    }
942
943    /// Generate and store the AES-256 secret key on first boot if not present.
944    pub fn ensure_vault_secret_key(&self) {
945        if self.vault_kv_get("red.secret.aes_key").is_none() {
946            let key = random_bytes(32);
947            self.vault_kv_set("red.secret.aes_key".to_string(), hex::encode(key));
948        }
949    }
950
951    /// Take a snapshot of the current auth state for vault serialization.
952    fn snapshot(&self) -> VaultState {
953        let users_guard = self.users.read().unwrap_or_else(|e| e.into_inner());
954        let users: Vec<User> = users_guard.values().cloned().collect();
955
956        // Collect (owner UserId, api_key) pairs from all users so a
957        // tenant-scoped owner can be reattached on restore.
958        let mut api_keys = Vec::new();
959        for user in &users {
960            let owner = UserId::from_parts(user.tenant_id.as_deref(), &user.username);
961            for key in &user.api_keys {
962                api_keys.push((owner.clone(), key.clone()));
963            }
964        }
965
966        // Include the master secret if a keypair is loaded.
967        let master_secret = self
968            .keypair
969            .read()
970            .ok()
971            .and_then(|guard| guard.as_ref().map(|kp| kp.master_secret.clone()));
972
973        let kv = self.vault_kv.read().map(|m| m.clone()).unwrap_or_default();
974
975        VaultState {
976            users,
977            api_keys,
978            bootstrapped: self.bootstrapped.load(Ordering::Acquire),
979            master_secret,
980            kv,
981        }
982    }
983
984    /// Restore the in-memory auth state from a vault snapshot.
985    fn restore_from_vault(&mut self, state: VaultState) {
986        // Restore bootstrap seal.
987        if state.bootstrapped {
988            self.bootstrapped.store(true, Ordering::Release);
989        }
990
991        // Restore keypair from master secret (if present).
992        if let Some(secret) = state.master_secret {
993            let kp = KeyPair::from_master_secret(secret);
994            if let Ok(mut guard) = self.keypair.write() {
995                *guard = Some(kp);
996            }
997        }
998
999        // Restore KV store.
1000        if let Ok(mut kv) = self.vault_kv.write() {
1001            *kv = state.kv;
1002        }
1003
1004        // Restore users.
1005        let mut users = self.users.write().unwrap_or_else(|e| e.into_inner());
1006        let mut idx = self
1007            .api_key_index
1008            .write()
1009            .unwrap_or_else(|e| e.into_inner());
1010
1011        for user in state.users {
1012            let id = UserId::from_parts(user.tenant_id.as_deref(), &user.username);
1013            // Register API keys in the index.
1014            for key in &user.api_keys {
1015                idx.insert(key.key.clone(), (id.clone(), key.role));
1016            }
1017            users.insert(id, user);
1018        }
1019        drop(idx);
1020        drop(users);
1021
1022        self.rehydrate_acl();
1023        self.rehydrate_iam();
1024    }
1025
1026    // -----------------------------------------------------------------
1027    // User management
1028    // -----------------------------------------------------------------
1029
1030    /// Create a new platform-scoped user (`tenant_id = None`).
1031    ///
1032    /// For tenant-scoped creation, use [`Self::create_user_in_tenant`].
1033    pub fn create_user(
1034        &self,
1035        username: &str,
1036        password: &str,
1037        role: Role,
1038    ) -> Result<User, AuthError> {
1039        self.create_user_in_tenant(None, username, password, role)
1040    }
1041
1042    pub fn create_user_with_control_events(
1043        &self,
1044        username: &str,
1045        password: &str,
1046        role: Role,
1047        ctx: &ControlEventCtx<'_>,
1048        ledger: &dyn ControlEventLedger,
1049        config: ControlEventConfig,
1050    ) -> Result<User, AuthError> {
1051        self.create_user_in_tenant_with_control_events(
1052            None, username, password, role, ctx, ledger, config,
1053        )
1054    }
1055
1056    /// Create a user under the given tenant scope. `tenant_id == None`
1057    /// produces a platform-wide user. `(tenant, username)` is the
1058    /// uniqueness key — the same `username` may exist independently
1059    /// under multiple tenants.
1060    pub fn create_user_in_tenant(
1061        &self,
1062        tenant_id: Option<&str>,
1063        username: &str,
1064        password: &str,
1065        role: Role,
1066    ) -> Result<User, AuthError> {
1067        if let Some(configured) = self.configured_control_events() {
1068            let ctx = default_user_lifecycle_ctx();
1069            let control = UserLifecycleControl {
1070                ctx: &ctx,
1071                ledger: configured.ledger.as_ref(),
1072                config: configured.config,
1073            };
1074            self.create_user_in_tenant_controlled(tenant_id, username, password, role, &control)
1075        } else {
1076            self.create_user_in_tenant_unaudited(tenant_id, username, password, role)
1077        }
1078    }
1079
1080    #[allow(clippy::too_many_arguments)]
1081    pub fn create_user_in_tenant_with_control_events(
1082        &self,
1083        tenant_id: Option<&str>,
1084        username: &str,
1085        password: &str,
1086        role: Role,
1087        ctx: &ControlEventCtx<'_>,
1088        ledger: &dyn ControlEventLedger,
1089        config: ControlEventConfig,
1090    ) -> Result<User, AuthError> {
1091        let control = UserLifecycleControl {
1092            ctx,
1093            ledger,
1094            config,
1095        };
1096        self.create_user_in_tenant_controlled(tenant_id, username, password, role, &control)
1097    }
1098
1099    pub fn create_admin_user(
1100        &self,
1101        username: &str,
1102        password: &str,
1103        role: Role,
1104        tenant_id: Option<&str>,
1105    ) -> Result<User, AuthError> {
1106        if let Some(configured) = self.configured_control_events() {
1107            let ctx = default_user_lifecycle_ctx();
1108            let control = UserLifecycleControl {
1109                ctx: &ctx,
1110                ledger: configured.ledger.as_ref(),
1111                config: configured.config,
1112            };
1113            self.create_user_in_tenant_controlled(tenant_id, username, password, role, &control)
1114        } else {
1115            self.create_user_in_tenant_unaudited(tenant_id, username, password, role)
1116        }
1117    }
1118
1119    fn create_user_in_tenant_unaudited(
1120        &self,
1121        tenant_id: Option<&str>,
1122        username: &str,
1123        password: &str,
1124        role: Role,
1125    ) -> Result<User, AuthError> {
1126        let id = UserId::from_parts(tenant_id, username);
1127        let mut users = self.users.write().map_err(lock_err)?;
1128        if users.contains_key(&id) {
1129            return Err(AuthError::UserExists(id.to_string()));
1130        }
1131
1132        let now = now_ms();
1133        let user = User {
1134            username: username.to_string(),
1135            tenant_id: tenant_id.map(|s| s.to_string()),
1136            password_hash: hash_password(password),
1137            scram_verifier: Some(make_scram_verifier(password)),
1138            role,
1139            api_keys: Vec::new(),
1140            created_at: now,
1141            updated_at: now,
1142            enabled: true,
1143        };
1144        users.insert(id, user.clone());
1145        drop(users); // release lock before vault I/O
1146        self.persist_to_vault();
1147        Ok(user)
1148    }
1149
1150    fn create_user_in_tenant_controlled(
1151        &self,
1152        tenant_id: Option<&str>,
1153        username: &str,
1154        password: &str,
1155        role: Role,
1156        control: &UserLifecycleControl<'_>,
1157    ) -> Result<User, AuthError> {
1158        let id = UserId::from_parts(tenant_id, username);
1159        match self.create_user_in_tenant_unaudited(tenant_id, username, password, role) {
1160            Ok(user) => {
1161                if let Err(err) = self.emit_user_lifecycle_allowed(
1162                    control,
1163                    EventKind::UserCreate,
1164                    "user.create",
1165                    &id,
1166                    user_control_fields(&id, Some(role), Some(true), true),
1167                ) {
1168                    self.rollback_create_user(&id);
1169                    return Err(err);
1170                }
1171                Ok(user)
1172            }
1173            Err(err) => {
1174                self.emit_user_lifecycle_outcome(
1175                    control,
1176                    if user_error_is_denied(&err) {
1177                        Outcome::Denied
1178                    } else {
1179                        Outcome::Error
1180                    },
1181                    EventKind::UserCreate,
1182                    "user.create",
1183                    &id,
1184                    Some(err.to_string()),
1185                    user_control_fields(&id, Some(role), Some(true), true),
1186                );
1187                Err(err)
1188            }
1189        }
1190    }
1191
1192    /// Look up a user's SCRAM verifier by full `UserId`.
1193    ///
1194    /// The wire handshake passes the tenant resolved from the session
1195    /// (or `None` for the bootstrap admin) so cross-tenant collisions
1196    /// never authenticate the wrong identity.
1197    pub fn lookup_scram_verifier(&self, id: &UserId) -> Option<crate::auth::scram::ScramVerifier> {
1198        // Synthetic platform-owner principal must never authenticate.
1199        if crate::auth::self_lock_guard::is_synthetic_principal(&id.username) {
1200            return None;
1201        }
1202        let users = self.users.read().ok()?;
1203        users.get(id).and_then(|u| u.scram_verifier.clone())
1204    }
1205
1206    /// Backwards-compatible shim for the v2 wire bootstrap path: looks
1207    /// up a user by username assuming the platform (`tenant=None`)
1208    /// scope. Use this only where the handshake hasn't yet learned the
1209    /// caller's tenant.
1210    pub fn lookup_scram_verifier_global(
1211        &self,
1212        username: &str,
1213    ) -> Option<crate::auth::scram::ScramVerifier> {
1214        self.lookup_scram_verifier(&UserId::platform(username))
1215    }
1216
1217    /// Return all users (password hashes redacted).
1218    ///
1219    /// The synthetic [`crate::auth::self_lock_guard::PLATFORM_OWNER_USERNAME`]
1220    /// principal is filtered out — it is a policy-graph anchor, not an
1221    /// operator-visible account.
1222    pub fn list_users(&self) -> Vec<User> {
1223        let users = match self.users.read() {
1224            Ok(g) => g,
1225            Err(_) => return Vec::new(),
1226        };
1227        users
1228            .values()
1229            .filter(|u| !crate::auth::self_lock_guard::is_synthetic_principal(&u.username))
1230            .map(|u| User {
1231                password_hash: String::new(), // redacted
1232                ..u.clone()
1233            })
1234            .collect()
1235    }
1236
1237    /// Return users restricted to a tenant scope.
1238    ///
1239    /// `tenant_filter`:
1240    ///   - `None` listing in `Some(None)` — only platform users
1241    ///   - `Some(Some("acme"))` — only users in tenant `acme`
1242    ///   - `None` argument — all users (admin-only callers)
1243    pub fn list_users_scoped(&self, tenant_filter: Option<Option<&str>>) -> Vec<User> {
1244        let users = match self.users.read() {
1245            Ok(g) => g,
1246            Err(_) => return Vec::new(),
1247        };
1248        users
1249            .values()
1250            .filter(|u| match tenant_filter {
1251                None => true,
1252                Some(t) => u.tenant_id.as_deref() == t,
1253            })
1254            .filter(|u| !crate::auth::self_lock_guard::is_synthetic_principal(&u.username))
1255            .map(|u| User {
1256                password_hash: String::new(), // redacted
1257                ..u.clone()
1258            })
1259            .collect()
1260    }
1261
1262    /// Resource shape used by IAM policies that govern user lifecycle
1263    /// mutations. Platform users are addressed as `user:<username>`;
1264    /// tenant users are addressed as `user:tenant/<tenant>/<username>`
1265    /// by the existing policy resource qualifier.
1266    pub fn user_lifecycle_resource(uid: &UserId) -> ResourceRef {
1267        let resource = ResourceRef::new("user", uid.username.clone());
1268        match &uid.tenant {
1269            Some(tenant) => resource.with_tenant(tenant.clone()),
1270            None => resource,
1271        }
1272    }
1273
1274    pub fn eval_context_for_principal(
1275        &self,
1276        principal: &UserId,
1277        role: Role,
1278        current_tenant: Option<String>,
1279    ) -> EvalContext {
1280        EvalContext {
1281            principal_tenant: principal.tenant.clone(),
1282            current_tenant,
1283            peer_ip: None,
1284            mfa_present: false,
1285            now_ms: now_ms(),
1286            principal_is_admin_role: role == Role::Admin,
1287            principal_is_platform_scoped: principal.tenant.is_none(),
1288        }
1289    }
1290
1291    /// Policy gate for user lifecycle mutations.
1292    ///
1293    /// This is intentionally separate from the store mutation methods:
1294    /// bootstrap and manifest loading can still seed users directly,
1295    /// while public surfaces must authorize `user:*` before calling the
1296    /// mutator. Explicit Deny wins even in `LegacyRbac` mode.
1297    pub fn check_user_lifecycle_authz(
1298        &self,
1299        actor: &UserId,
1300        actor_role: Role,
1301        action: &str,
1302        target: &UserId,
1303    ) -> bool {
1304        let resource = Self::user_lifecycle_resource(target);
1305        let ctx = self.eval_context_for_principal(actor, actor_role, target.tenant.clone());
1306        self.check_policy_authz_with_role(actor, action, &resource, &ctx, actor_role)
1307    }
1308
1309    pub fn get_user(&self, tenant_id: Option<&str>, username: &str) -> Option<User> {
1310        let id = UserId::from_parts(tenant_id, username);
1311        self.get_user_cloned(&id).map(|u| User {
1312            password_hash: String::new(),
1313            ..u
1314        })
1315    }
1316
1317    /// Delete a platform-scoped user (`tenant_id = None`) and revoke
1318    /// all of their API keys + sessions.
1319    ///
1320    /// For tenant-scoped deletes, use [`Self::delete_user_in_tenant`].
1321    pub fn delete_user(&self, username: &str) -> Result<(), AuthError> {
1322        self.delete_user_in_tenant(None, username)
1323    }
1324
1325    pub fn delete_user_with_control_events(
1326        &self,
1327        username: &str,
1328        ctx: &ControlEventCtx<'_>,
1329        ledger: &dyn ControlEventLedger,
1330        config: ControlEventConfig,
1331    ) -> Result<(), AuthError> {
1332        self.delete_user_in_tenant_with_control_events(None, username, ctx, ledger, config)
1333    }
1334
1335    /// Delete a user identified by `(tenant_id, username)` and revoke
1336    /// all of their API keys + sessions.
1337    pub fn delete_user_in_tenant(
1338        &self,
1339        tenant_id: Option<&str>,
1340        username: &str,
1341    ) -> Result<(), AuthError> {
1342        if let Some(configured) = self.configured_control_events() {
1343            let ctx = default_user_lifecycle_ctx();
1344            let control = UserLifecycleControl {
1345                ctx: &ctx,
1346                ledger: configured.ledger.as_ref(),
1347                config: configured.config,
1348            };
1349            self.delete_user_in_tenant_controlled(tenant_id, username, &control)
1350        } else {
1351            self.delete_user_in_tenant_unaudited(tenant_id, username)
1352        }
1353    }
1354
1355    pub fn delete_user_in_tenant_with_control_events(
1356        &self,
1357        tenant_id: Option<&str>,
1358        username: &str,
1359        ctx: &ControlEventCtx<'_>,
1360        ledger: &dyn ControlEventLedger,
1361        config: ControlEventConfig,
1362    ) -> Result<(), AuthError> {
1363        let control = UserLifecycleControl {
1364            ctx,
1365            ledger,
1366            config,
1367        };
1368        self.delete_user_in_tenant_controlled(tenant_id, username, &control)
1369    }
1370
1371    fn delete_user_in_tenant_unaudited(
1372        &self,
1373        tenant_id: Option<&str>,
1374        username: &str,
1375    ) -> Result<(), AuthError> {
1376        let id = UserId::from_parts(tenant_id, username);
1377        let mut users = self.users.write().map_err(lock_err)?;
1378        let user = users
1379            .remove(&id)
1380            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1381
1382        // Remove API key index entries.
1383        if let Ok(mut idx) = self.api_key_index.write() {
1384            for api_key in &user.api_keys {
1385                idx.remove(&api_key.key);
1386            }
1387        }
1388
1389        // Remove sessions belonging to this user (match on tenant+username
1390        // so we don't tear down a same-named user in another tenant).
1391        if let Ok(mut sessions) = self.sessions.write() {
1392            sessions
1393                .retain(|_, s| !(s.username == username && s.tenant_id.as_deref() == tenant_id));
1394        }
1395
1396        self.persist_to_vault();
1397        Ok(())
1398    }
1399
1400    fn delete_user_in_tenant_controlled(
1401        &self,
1402        tenant_id: Option<&str>,
1403        username: &str,
1404        control: &UserLifecycleControl<'_>,
1405    ) -> Result<(), AuthError> {
1406        let id = UserId::from_parts(tenant_id, username);
1407        let rollback = self.user_delete_rollback_snapshot(&id, tenant_id, username);
1408        match self.delete_user_in_tenant_unaudited(tenant_id, username) {
1409            Ok(()) => {
1410                if let Err(err) = self.emit_user_lifecycle_allowed(
1411                    control,
1412                    EventKind::UserDelete,
1413                    "user.delete",
1414                    &id,
1415                    user_control_fields(&id, rollback.as_ref().map(|r| r.0.role), None, false),
1416                ) {
1417                    if let Some((user, sessions)) = rollback {
1418                        self.restore_deleted_user(&id, user, sessions);
1419                    }
1420                    return Err(err);
1421                }
1422                Ok(())
1423            }
1424            Err(err) => {
1425                self.emit_user_lifecycle_outcome(
1426                    control,
1427                    if user_error_is_denied(&err) {
1428                        Outcome::Denied
1429                    } else {
1430                        Outcome::Error
1431                    },
1432                    EventKind::UserDelete,
1433                    "user.delete",
1434                    &id,
1435                    Some(err.to_string()),
1436                    user_control_fields(&id, rollback.as_ref().map(|r| r.0.role), None, false),
1437                );
1438                Err(err)
1439            }
1440        }
1441    }
1442
1443    /// Change password (requires the old password). Defaults to
1444    /// platform tenant; use [`Self::change_password_in_tenant`] for
1445    /// scoped users.
1446    pub fn change_password(
1447        &self,
1448        username: &str,
1449        old_password: &str,
1450        new_password: &str,
1451    ) -> Result<(), AuthError> {
1452        self.change_password_in_tenant(None, username, old_password, new_password)
1453    }
1454
1455    pub fn change_password_with_control_events(
1456        &self,
1457        username: &str,
1458        old_password: &str,
1459        new_password: &str,
1460        ctx: &ControlEventCtx<'_>,
1461        ledger: &dyn ControlEventLedger,
1462        config: ControlEventConfig,
1463    ) -> Result<(), AuthError> {
1464        self.change_password_in_tenant_with_control_events(
1465            None,
1466            username,
1467            old_password,
1468            new_password,
1469            ctx,
1470            ledger,
1471            config,
1472        )
1473    }
1474
1475    #[allow(clippy::too_many_arguments)]
1476    pub fn change_password_in_tenant(
1477        &self,
1478        tenant_id: Option<&str>,
1479        username: &str,
1480        old_password: &str,
1481        new_password: &str,
1482    ) -> Result<(), AuthError> {
1483        if let Some(configured) = self.configured_control_events() {
1484            let ctx = default_user_lifecycle_ctx();
1485            let control = UserLifecycleControl {
1486                ctx: &ctx,
1487                ledger: configured.ledger.as_ref(),
1488                config: configured.config,
1489            };
1490            self.change_password_in_tenant_controlled(
1491                tenant_id,
1492                username,
1493                old_password,
1494                new_password,
1495                &control,
1496            )
1497        } else {
1498            self.change_password_in_tenant_unaudited(
1499                tenant_id,
1500                username,
1501                old_password,
1502                new_password,
1503            )
1504        }
1505    }
1506
1507    #[allow(clippy::too_many_arguments)]
1508    pub fn change_password_in_tenant_with_control_events(
1509        &self,
1510        tenant_id: Option<&str>,
1511        username: &str,
1512        old_password: &str,
1513        new_password: &str,
1514        ctx: &ControlEventCtx<'_>,
1515        ledger: &dyn ControlEventLedger,
1516        config: ControlEventConfig,
1517    ) -> Result<(), AuthError> {
1518        let control = UserLifecycleControl {
1519            ctx,
1520            ledger,
1521            config,
1522        };
1523        self.change_password_in_tenant_controlled(
1524            tenant_id,
1525            username,
1526            old_password,
1527            new_password,
1528            &control,
1529        )
1530    }
1531
1532    fn change_password_in_tenant_unaudited(
1533        &self,
1534        tenant_id: Option<&str>,
1535        username: &str,
1536        old_password: &str,
1537        new_password: &str,
1538    ) -> Result<(), AuthError> {
1539        let id = UserId::from_parts(tenant_id, username);
1540        let mut users = self.users.write().map_err(lock_err)?;
1541        let user = users
1542            .get_mut(&id)
1543            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1544
1545        if !verify_password(old_password, &user.password_hash) {
1546            return Err(AuthError::InvalidCredentials);
1547        }
1548
1549        user.password_hash = hash_password(new_password);
1550        user.scram_verifier = Some(make_scram_verifier(new_password));
1551        user.updated_at = now_ms();
1552        drop(users); // release lock before vault I/O
1553        self.persist_to_vault();
1554        Ok(())
1555    }
1556
1557    fn change_password_in_tenant_controlled(
1558        &self,
1559        tenant_id: Option<&str>,
1560        username: &str,
1561        old_password: &str,
1562        new_password: &str,
1563        control: &UserLifecycleControl<'_>,
1564    ) -> Result<(), AuthError> {
1565        let id = UserId::from_parts(tenant_id, username);
1566        let previous = self.get_user_cloned(&id);
1567        match self.change_password_in_tenant_unaudited(
1568            tenant_id,
1569            username,
1570            old_password,
1571            new_password,
1572        ) {
1573            Ok(()) => {
1574                if let Err(err) = self.emit_user_lifecycle_allowed(
1575                    control,
1576                    EventKind::UserUpdate,
1577                    "user.update",
1578                    &id,
1579                    user_control_fields(
1580                        &id,
1581                        previous.as_ref().map(|u| u.role),
1582                        previous.as_ref().map(|u| u.enabled),
1583                        true,
1584                    ),
1585                ) {
1586                    self.restore_user_snapshot(&id, previous);
1587                    return Err(err);
1588                }
1589                Ok(())
1590            }
1591            Err(err) => {
1592                self.emit_user_lifecycle_outcome(
1593                    control,
1594                    if user_error_is_denied(&err) {
1595                        Outcome::Denied
1596                    } else {
1597                        Outcome::Error
1598                    },
1599                    EventKind::UserUpdate,
1600                    "user.update",
1601                    &id,
1602                    Some(err.to_string()),
1603                    user_control_fields(
1604                        &id,
1605                        previous.as_ref().map(|u| u.role),
1606                        previous.as_ref().map(|u| u.enabled),
1607                        true,
1608                    ),
1609                );
1610                Err(err)
1611            }
1612        }
1613    }
1614
1615    /// Change a user's role (admin-only operation). Defaults to platform
1616    /// tenant; use [`Self::change_role_in_tenant`] for scoped users.
1617    pub fn change_role(&self, username: &str, new_role: Role) -> Result<(), AuthError> {
1618        self.change_role_in_tenant(None, username, new_role)
1619    }
1620
1621    pub fn change_role_in_tenant(
1622        &self,
1623        tenant_id: Option<&str>,
1624        username: &str,
1625        new_role: Role,
1626    ) -> Result<(), AuthError> {
1627        if let Some(configured) = self.configured_control_events() {
1628            let ctx = default_user_lifecycle_ctx();
1629            let control = UserLifecycleControl {
1630                ctx: &ctx,
1631                ledger: configured.ledger.as_ref(),
1632                config: configured.config,
1633            };
1634            self.change_role_in_tenant_controlled(tenant_id, username, new_role, &control)
1635        } else {
1636            self.change_role_in_tenant_unaudited(tenant_id, username, new_role)
1637        }
1638    }
1639
1640    pub fn change_role_in_tenant_with_control_events(
1641        &self,
1642        tenant_id: Option<&str>,
1643        username: &str,
1644        new_role: Role,
1645        ctx: &ControlEventCtx<'_>,
1646        ledger: &dyn ControlEventLedger,
1647        config: ControlEventConfig,
1648    ) -> Result<(), AuthError> {
1649        let control = UserLifecycleControl {
1650            ctx,
1651            ledger,
1652            config,
1653        };
1654        self.change_role_in_tenant_controlled(tenant_id, username, new_role, &control)
1655    }
1656
1657    fn change_role_in_tenant_unaudited(
1658        &self,
1659        tenant_id: Option<&str>,
1660        username: &str,
1661        new_role: Role,
1662    ) -> Result<(), AuthError> {
1663        let id = UserId::from_parts(tenant_id, username);
1664        let mut users = self.users.write().map_err(lock_err)?;
1665        let user = users
1666            .get_mut(&id)
1667            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1668
1669        let prior_role = user.role;
1670        user.role = new_role;
1671        user.updated_at = now_ms();
1672
1673        // Issue #205 — promotion to Admin is an operator-grade event:
1674        // the new role grants destructive capabilities (DROP, ALTER,
1675        // GRANT) that an operator must observe out-of-band even when
1676        // the auth path itself is healthy.
1677        if new_role == Role::Admin && prior_role != Role::Admin {
1678            crate::telemetry::operator_event::OperatorEvent::AdminCapabilityGranted {
1679                granted_to: id.to_string(),
1680                capability: "Role::Admin".to_string(),
1681                granted_by: "auth_store::change_role".to_string(),
1682            }
1683            .emit_global();
1684        }
1685
1686        // Downgrade any API keys that now exceed the user's role.
1687        for key in &mut user.api_keys {
1688            if key.role > new_role {
1689                key.role = new_role;
1690            }
1691        }
1692
1693        // Update the api_key_index as well.
1694        if let Ok(mut idx) = self.api_key_index.write() {
1695            for key in &user.api_keys {
1696                if let Some(entry) = idx.get_mut(&key.key) {
1697                    entry.1 = key.role;
1698                }
1699            }
1700        }
1701
1702        self.persist_to_vault();
1703        Ok(())
1704    }
1705
1706    fn change_role_in_tenant_controlled(
1707        &self,
1708        tenant_id: Option<&str>,
1709        username: &str,
1710        new_role: Role,
1711        control: &UserLifecycleControl<'_>,
1712    ) -> Result<(), AuthError> {
1713        let id = UserId::from_parts(tenant_id, username);
1714        let previous = self.get_user_cloned(&id);
1715        match self.change_role_in_tenant_unaudited(tenant_id, username, new_role) {
1716            Ok(()) => {
1717                if let Err(err) = self.emit_user_lifecycle_allowed(
1718                    control,
1719                    EventKind::UserUpdate,
1720                    "user.update",
1721                    &id,
1722                    user_control_fields(
1723                        &id,
1724                        Some(new_role),
1725                        previous.as_ref().map(|u| u.enabled),
1726                        false,
1727                    ),
1728                ) {
1729                    self.restore_user_snapshot(&id, previous);
1730                    return Err(err);
1731                }
1732                Ok(())
1733            }
1734            Err(err) => {
1735                self.emit_user_lifecycle_outcome(
1736                    control,
1737                    if user_error_is_denied(&err) {
1738                        Outcome::Denied
1739                    } else {
1740                        Outcome::Error
1741                    },
1742                    EventKind::UserUpdate,
1743                    "user.update",
1744                    &id,
1745                    Some(err.to_string()),
1746                    user_control_fields(
1747                        &id,
1748                        Some(new_role),
1749                        previous.as_ref().map(|u| u.enabled),
1750                        false,
1751                    ),
1752                );
1753                Err(err)
1754            }
1755        }
1756    }
1757
1758    // -----------------------------------------------------------------
1759    // Authentication (login)
1760    // -----------------------------------------------------------------
1761
1762    /// Verify credentials for a platform-tenant user (`tenant_id = None`)
1763    /// and create a session. For tenant-scoped login use
1764    /// [`Self::authenticate_in_tenant`].
1765    ///
1766    /// When a keypair is available (certificate-based seal), session tokens
1767    /// are signed with the master secret so the server can verify they were
1768    /// genuinely issued by this vault instance.
1769    pub fn authenticate(&self, username: &str, password: &str) -> Result<Session, AuthError> {
1770        self.authenticate_in_tenant(None, username, password)
1771    }
1772
1773    /// Verify credentials for `(tenant_id, username, password)` and
1774    /// create a session. Tenant-aware: `alice@acme` and `alice@globex`
1775    /// authenticate independently.
1776    pub fn authenticate_in_tenant(
1777        &self,
1778        tenant_id: Option<&str>,
1779        username: &str,
1780        password: &str,
1781    ) -> Result<Session, AuthError> {
1782        // The synthetic platform-owner principal exists only as a
1783        // policy-graph anchor (see `crate::auth::self_lock_guard`); it
1784        // must never authenticate via any transport.
1785        if crate::auth::self_lock_guard::is_synthetic_principal(username) {
1786            return Err(AuthError::InvalidCredentials);
1787        }
1788        let id = UserId::from_parts(tenant_id, username);
1789        let users = self.users.read().map_err(lock_err)?;
1790        let user = users.get(&id).ok_or(AuthError::InvalidCredentials)?;
1791
1792        if !user.enabled {
1793            return Err(AuthError::InvalidCredentials);
1794        }
1795
1796        if !verify_password(password, &user.password_hash) {
1797            return Err(AuthError::InvalidCredentials);
1798        }
1799
1800        // Generate token: signed if keypair is available, random otherwise.
1801        let token = match self.keypair.read().ok().and_then(|g| {
1802            g.as_ref().map(|kp| {
1803                let token_id = random_hex(16);
1804                let sig = kp.sign(format!("session:{}", token_id).as_bytes());
1805                // Take first 16 bytes of signature for compact token.
1806                format!("rs_{}{}", token_id, hex::encode(&sig[..16]))
1807            })
1808        }) {
1809            Some(signed_token) => signed_token,
1810            None => generate_session_token(),
1811        };
1812
1813        let now = now_ms();
1814        let session = Session {
1815            token,
1816            username: username.to_string(),
1817            tenant_id: user.tenant_id.clone(),
1818            role: user.role,
1819            created_at: now,
1820            expires_at: now + (self.config.session_ttl_secs as u128 * 1000),
1821        };
1822
1823        drop(users); // release read lock before acquiring write
1824
1825        let mut sessions = self.sessions.write().map_err(lock_err)?;
1826        sessions.insert(session.token.clone(), session.clone());
1827        Ok(session)
1828    }
1829
1830    // -----------------------------------------------------------------
1831    // Token validation
1832    // -----------------------------------------------------------------
1833
1834    /// Validate a token (session or API key).
1835    ///
1836    /// Returns `(username, role)` if valid, `None` otherwise. Tenant
1837    /// scope is dropped here for compatibility with the bulk of the
1838    /// existing caller surface (routing, gRPC control, redwire). Use
1839    /// [`Self::validate_token_full`] when the caller needs the
1840    /// resolved `UserId` (e.g. to pin `CURRENT_TENANT()`).
1841    pub fn validate_token(&self, token: &str) -> Option<(String, Role)> {
1842        self.validate_token_full(token)
1843            .map(|(id, role)| (id.username, role))
1844    }
1845
1846    /// Tenant-aware token validation. Returns the resolved `UserId`
1847    /// (which carries the tenant) and the granted `Role`.
1848    pub fn validate_token_full(&self, token: &str) -> Option<(UserId, Role)> {
1849        // Try session tokens first.
1850        if token.starts_with("rs_") {
1851            if let Ok(sessions) = self.sessions.read() {
1852                if let Some(session) = sessions.get(token) {
1853                    let now = now_ms();
1854                    if now < session.expires_at {
1855                        return Some((
1856                            UserId::from_parts(session.tenant_id.as_deref(), &session.username),
1857                            session.role,
1858                        ));
1859                    }
1860                }
1861            }
1862            return None;
1863        }
1864
1865        // Try API keys.
1866        if token.starts_with("rk_") {
1867            if let Ok(idx) = self.api_key_index.read() {
1868                return idx.get(token).cloned();
1869            }
1870            return None;
1871        }
1872
1873        None
1874    }
1875
1876    // -----------------------------------------------------------------
1877    // API Key management
1878    // -----------------------------------------------------------------
1879
1880    /// Create a persistent API key for a platform-tenant user.
1881    ///
1882    /// For tenant-scoped users use [`Self::create_api_key_in_tenant`].
1883    pub fn create_api_key(
1884        &self,
1885        username: &str,
1886        name: &str,
1887        role: Role,
1888    ) -> Result<ApiKey, AuthError> {
1889        self.create_api_key_in_tenant(None, username, name, role)
1890    }
1891
1892    pub fn create_api_key_with_control_events(
1893        &self,
1894        username: &str,
1895        name: &str,
1896        role: Role,
1897        ctx: &ControlEventCtx<'_>,
1898        ledger: &dyn ControlEventLedger,
1899        config: ControlEventConfig,
1900    ) -> Result<ApiKey, AuthError> {
1901        self.create_api_key_in_tenant_with_control_events(
1902            None, username, name, role, ctx, ledger, config,
1903        )
1904    }
1905
1906    pub fn create_api_key_in_tenant(
1907        &self,
1908        tenant_id: Option<&str>,
1909        username: &str,
1910        name: &str,
1911        role: Role,
1912    ) -> Result<ApiKey, AuthError> {
1913        if let Some(configured) = self.configured_control_events() {
1914            let ctx = default_user_lifecycle_ctx();
1915            let control = UserLifecycleControl {
1916                ctx: &ctx,
1917                ledger: configured.ledger.as_ref(),
1918                config: configured.config,
1919            };
1920            self.create_api_key_in_tenant_controlled(tenant_id, username, name, role, &control)
1921        } else {
1922            self.create_api_key_in_tenant_unaudited(tenant_id, username, name, role)
1923        }
1924    }
1925
1926    #[allow(clippy::too_many_arguments)]
1927    pub fn create_api_key_in_tenant_with_control_events(
1928        &self,
1929        tenant_id: Option<&str>,
1930        username: &str,
1931        name: &str,
1932        role: Role,
1933        ctx: &ControlEventCtx<'_>,
1934        ledger: &dyn ControlEventLedger,
1935        config: ControlEventConfig,
1936    ) -> Result<ApiKey, AuthError> {
1937        let control = UserLifecycleControl {
1938            ctx,
1939            ledger,
1940            config,
1941        };
1942        self.create_api_key_in_tenant_controlled(tenant_id, username, name, role, &control)
1943    }
1944
1945    fn create_api_key_in_tenant_unaudited(
1946        &self,
1947        tenant_id: Option<&str>,
1948        username: &str,
1949        name: &str,
1950        role: Role,
1951    ) -> Result<ApiKey, AuthError> {
1952        let id = UserId::from_parts(tenant_id, username);
1953        let mut users = self.users.write().map_err(lock_err)?;
1954        let user = users
1955            .get_mut(&id)
1956            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1957
1958        // The key's role cannot exceed the user's role.
1959        if role > user.role {
1960            return Err(AuthError::RoleExceeded {
1961                requested: role,
1962                ceiling: user.role,
1963            });
1964        }
1965
1966        let api_key = ApiKey {
1967            key: generate_api_key(),
1968            name: name.to_string(),
1969            role,
1970            created_at: now_ms(),
1971        };
1972
1973        user.api_keys.push(api_key.clone());
1974        user.updated_at = now_ms();
1975
1976        // Update the index.
1977        if let Ok(mut idx) = self.api_key_index.write() {
1978            idx.insert(api_key.key.clone(), (id.clone(), api_key.role));
1979        }
1980
1981        drop(users); // release lock before vault I/O
1982        self.persist_to_vault();
1983        Ok(api_key)
1984    }
1985
1986    fn create_api_key_in_tenant_controlled(
1987        &self,
1988        tenant_id: Option<&str>,
1989        username: &str,
1990        name: &str,
1991        role: Role,
1992        control: &UserLifecycleControl<'_>,
1993    ) -> Result<ApiKey, AuthError> {
1994        let id = UserId::from_parts(tenant_id, username);
1995        match self.create_api_key_in_tenant_unaudited(tenant_id, username, name, role) {
1996            Ok(api_key) => {
1997                let key_id = api_key_id(&api_key.key);
1998                if let Err(err) = self.emit_api_key_allowed(
1999                    control,
2000                    EventKind::ApiKeyCreate,
2001                    "apikey.create",
2002                    &key_id,
2003                    api_key_control_fields(&id, role, &key_id),
2004                ) {
2005                    self.rollback_create_api_key(&id, &api_key.key);
2006                    return Err(err);
2007                }
2008                Ok(api_key)
2009            }
2010            Err(err) => {
2011                self.emit_user_lifecycle_outcome(
2012                    control,
2013                    if user_error_is_denied(&err) {
2014                        Outcome::Denied
2015                    } else {
2016                        Outcome::Error
2017                    },
2018                    EventKind::ApiKeyCreate,
2019                    "apikey.create",
2020                    &id,
2021                    Some(err.to_string()),
2022                    user_control_fields(&id, Some(role), None, false),
2023                );
2024                Err(err)
2025            }
2026        }
2027    }
2028
2029    /// Revoke (delete) an API key.
2030    pub fn revoke_api_key(&self, key: &str) -> Result<(), AuthError> {
2031        if let Some(configured) = self.configured_control_events() {
2032            let ctx = default_user_lifecycle_ctx();
2033            let control = UserLifecycleControl {
2034                ctx: &ctx,
2035                ledger: configured.ledger.as_ref(),
2036                config: configured.config,
2037            };
2038            self.revoke_api_key_controlled(key, &control)
2039        } else {
2040            self.revoke_api_key_unaudited(key)
2041        }
2042    }
2043
2044    pub fn revoke_api_key_with_control_events(
2045        &self,
2046        key: &str,
2047        ctx: &ControlEventCtx<'_>,
2048        ledger: &dyn ControlEventLedger,
2049        config: ControlEventConfig,
2050    ) -> Result<(), AuthError> {
2051        let control = UserLifecycleControl {
2052            ctx,
2053            ledger,
2054            config,
2055        };
2056        self.revoke_api_key_controlled(key, &control)
2057    }
2058
2059    fn revoke_api_key_unaudited(&self, key: &str) -> Result<(), AuthError> {
2060        let mut users = self.users.write().map_err(lock_err)?;
2061
2062        // Find which user owns this key (look up by the api_key_index
2063        // first; fall back to a scan for legacy state restored before
2064        // the index was reseeded).
2065        let owner_id: UserId = {
2066            if let Ok(idx) = self.api_key_index.read() {
2067                if let Some((id, _)) = idx.get(key) {
2068                    id.clone()
2069                } else {
2070                    return Err(AuthError::KeyNotFound(key.to_string()));
2071                }
2072            } else {
2073                let owner = users
2074                    .iter()
2075                    .find(|(_, u)| u.api_keys.iter().any(|k| k.key == key));
2076                match owner {
2077                    Some((id, _)) => id.clone(),
2078                    None => return Err(AuthError::KeyNotFound(key.to_string())),
2079                }
2080            }
2081        };
2082
2083        let user = users
2084            .get_mut(&owner_id)
2085            .ok_or_else(|| AuthError::KeyNotFound(key.to_string()))?;
2086        user.api_keys.retain(|k| k.key != key);
2087        user.updated_at = now_ms();
2088
2089        // Remove from index.
2090        if let Ok(mut idx) = self.api_key_index.write() {
2091            idx.remove(key);
2092        }
2093
2094        self.persist_to_vault();
2095        Ok(())
2096    }
2097
2098    fn revoke_api_key_controlled(
2099        &self,
2100        key: &str,
2101        control: &UserLifecycleControl<'_>,
2102    ) -> Result<(), AuthError> {
2103        let key_id = api_key_id(key);
2104        let rollback = self.api_key_rollback_snapshot(key);
2105        let id = rollback
2106            .as_ref()
2107            .map(|r| r.0.clone())
2108            .unwrap_or_else(|| UserId::from_parts(None, ""));
2109        let role = rollback.as_ref().map(|r| r.1.role).unwrap_or(Role::Read);
2110        match self.revoke_api_key_unaudited(key) {
2111            Ok(()) => {
2112                if let Err(err) = self.emit_api_key_allowed(
2113                    control,
2114                    EventKind::ApiKeyRevoke,
2115                    "apikey.revoke",
2116                    &key_id,
2117                    api_key_control_fields(&id, role, &key_id),
2118                ) {
2119                    if let Some((owner, api_key)) = rollback {
2120                        self.restore_api_key(&owner, api_key);
2121                    }
2122                    return Err(err);
2123                }
2124                Ok(())
2125            }
2126            Err(err) => {
2127                self.emit_user_lifecycle_outcome(
2128                    control,
2129                    if user_error_is_denied(&err) {
2130                        Outcome::Denied
2131                    } else {
2132                        Outcome::Error
2133                    },
2134                    EventKind::ApiKeyRevoke,
2135                    "apikey.revoke",
2136                    &id,
2137                    Some(err.to_string()),
2138                    api_key_control_fields(&id, role, &key_id),
2139                );
2140                Err(err)
2141            }
2142        }
2143    }
2144
2145    // -----------------------------------------------------------------
2146    // Session management
2147    // -----------------------------------------------------------------
2148
2149    /// Revoke a session token.
2150    pub fn revoke_session(&self, token: &str) {
2151        if let Ok(mut sessions) = self.sessions.write() {
2152            sessions.remove(token);
2153        }
2154    }
2155
2156    /// Purge expired sessions (housekeeping).
2157    pub fn purge_expired_sessions(&self) -> usize {
2158        let now = now_ms();
2159        if let Ok(mut sessions) = self.sessions.write() {
2160            let before = sessions.len();
2161            sessions.retain(|_, s| s.expires_at > now);
2162            return before - sessions.len();
2163        }
2164        0
2165    }
2166
2167    // -----------------------------------------------------------------
2168    // Granular RBAC — GRANT / REVOKE
2169    //
2170    // The privilege engine lives in `super::privileges`. These helpers
2171    // are the AuthStore facade: they keep an in-memory map of grants per
2172    // user (plus a `public_grants` list), persist additions/removals to
2173    // the existing `vault_kv` store, and rebuild the per-user
2174    // `PermissionCache` so the hot path stays O(1).
2175    //
2176    // Persistence design: rather than extend the snapshot/restore
2177    // pipeline (Agent #2's territory) we serialise grants and account
2178    // attributes to the vault KV store. That gives us atomic write +
2179    // encrypted-at-rest semantics for free without touching the
2180    // existing USER/KEY/KV serializer paths. On restart `rehydrate_acl`
2181    // reads these KV entries back into the in-memory maps.
2182    // -----------------------------------------------------------------
2183
2184    /// Persist a grant. Returns `Forbidden` when the granting user is
2185    /// not Admin or attempts a cross-tenant grant.
2186    pub fn grant(
2187        &self,
2188        granter: &UserId,
2189        granter_role: Role,
2190        principal: GrantPrincipal,
2191        resource: Resource,
2192        actions: Vec<Action>,
2193        with_grant_option: bool,
2194        tenant: Option<String>,
2195    ) -> Result<(), AuthError> {
2196        if granter_role != Role::Admin {
2197            return Err(AuthError::Forbidden(format!(
2198                "GRANT requires Admin role; granter `{}` has `{:?}`",
2199                granter, granter_role
2200            )));
2201        }
2202
2203        // Cross-tenant guard: a tenant-scoped admin cannot mint grants
2204        // outside their tenant. Platform admin (tenant=None) may grant
2205        // anywhere.
2206        if granter.tenant.is_some() && granter.tenant != tenant {
2207            return Err(AuthError::Forbidden(format!(
2208                "cross-tenant GRANT denied: granter tenant `{:?}` != grant tenant `{:?}`",
2209                granter.tenant, tenant
2210            )));
2211        }
2212
2213        let mut actions_set = std::collections::BTreeSet::new();
2214        for a in actions {
2215            actions_set.insert(a);
2216        }
2217        let g = Grant {
2218            principal: principal.clone(),
2219            resource,
2220            actions: actions_set,
2221            with_grant_option,
2222            granted_by: granter.to_string(),
2223            granted_at: now_ms(),
2224            tenant,
2225            columns: None,
2226        };
2227
2228        match &principal {
2229            GrantPrincipal::User(uid) => {
2230                self.grants
2231                    .write()
2232                    .unwrap_or_else(|e| e.into_inner())
2233                    .entry(uid.clone())
2234                    .or_default()
2235                    .push(g.clone());
2236                self.invalidate_permission_cache(Some(uid));
2237            }
2238            GrantPrincipal::Public => {
2239                self.public_grants
2240                    .write()
2241                    .unwrap_or_else(|e| e.into_inner())
2242                    .push(g.clone());
2243                self.invalidate_permission_cache(None);
2244            }
2245            GrantPrincipal::Group(_) => {
2246                return Err(AuthError::Forbidden(
2247                    "GROUP principals are not yet supported; use a USER or PUBLIC".to_string(),
2248                ));
2249            }
2250        }
2251
2252        // Issue #119: a fresh grant changes the visible-collections set
2253        // for `(tenant, role)` callers under the same tenant. Drop those
2254        // cache entries so the next AI command sees the new SELECT
2255        // privilege immediately.
2256        self.invalidate_visible_collections_for_tenant(g.tenant.as_deref());
2257
2258        self.persist_acl_to_kv();
2259        Ok(())
2260    }
2261
2262    /// Drop matching grants from a principal. Returns the number of
2263    /// grants removed.
2264    pub fn revoke(
2265        &self,
2266        granter_role: Role,
2267        principal: &GrantPrincipal,
2268        resource: &Resource,
2269        actions: &[Action],
2270    ) -> Result<usize, AuthError> {
2271        if granter_role != Role::Admin {
2272            return Err(AuthError::Forbidden(format!(
2273                "REVOKE requires Admin role; granter has `{:?}`",
2274                granter_role
2275            )));
2276        }
2277
2278        let removed = match principal {
2279            GrantPrincipal::User(uid) => {
2280                let mut g = self.grants.write().unwrap_or_else(|e| e.into_inner());
2281                let before = g.get(uid).map(|v| v.len()).unwrap_or(0);
2282                if let Some(list) = g.get_mut(uid) {
2283                    list.retain(|gr| {
2284                        !(gr.resource == *resource
2285                            && (actions.iter().any(|a| gr.actions.contains(a))
2286                                || (gr.actions.contains(&Action::All) && !actions.is_empty())))
2287                    });
2288                }
2289                let after = g.get(uid).map(|v| v.len()).unwrap_or(0);
2290                drop(g);
2291                self.invalidate_permission_cache(Some(uid));
2292                before - after
2293            }
2294            GrantPrincipal::Public => {
2295                let mut p = self
2296                    .public_grants
2297                    .write()
2298                    .unwrap_or_else(|e| e.into_inner());
2299                let before = p.len();
2300                p.retain(|gr| {
2301                    !(gr.resource == *resource
2302                        && (actions.iter().any(|a| gr.actions.contains(a))
2303                            || (gr.actions.contains(&Action::All) && !actions.is_empty())))
2304                });
2305                let after = p.len();
2306                drop(p);
2307                self.invalidate_permission_cache(None);
2308                before - after
2309            }
2310            GrantPrincipal::Group(_) => 0,
2311        };
2312
2313        if removed > 0 {
2314            // Issue #119: REVOKE may shrink the visible-collections set
2315            // for any `(tenant, role)` slot. We don't know the exact
2316            // tenant when the principal is `Public`, so a `Public`
2317            // revoke clears the whole cache; user revokes scope to the
2318            // user's tenant.
2319            match principal {
2320                GrantPrincipal::User(uid) => {
2321                    self.invalidate_visible_collections_for_tenant(uid.tenant.as_deref());
2322                }
2323                GrantPrincipal::Public | GrantPrincipal::Group(_) => {
2324                    self.invalidate_visible_collections_cache();
2325                }
2326            }
2327            self.persist_acl_to_kv();
2328        }
2329        Ok(removed)
2330    }
2331
2332    /// Compute the set of collection ids a given `(tenant, role)`
2333    /// scope can read, consulting the explicit grant table. The result
2334    /// is cached for `super::scope_cache::DEFAULT_TTL` (60s) and
2335    /// invalidated on every GRANT/REVOKE/policy/collection mutation
2336    /// that could change the answer.
2337    ///
2338    /// `all_collections` is the full list of collection ids known to
2339    /// the storage layer. The runtime hands it in so this module stays
2340    /// decoupled from the storage crate. Each collection passes through
2341    /// `check_grant(SELECT)` under a synthetic `(principal, role,
2342    /// tenant)` view. The cache key includes principal because direct
2343    /// grants can differ between users that share the same tenant and
2344    /// role.
2345    pub fn visible_collections_for_scope(
2346        &self,
2347        tenant: Option<&str>,
2348        role: Role,
2349        principal: &str,
2350        all_collections: &[String],
2351    ) -> std::collections::HashSet<String> {
2352        let key = super::scope_cache::ScopeKey::new(tenant, principal, role);
2353        if let Some(hit) = self.visible_collections_cache.get(&key) {
2354            return hit;
2355        }
2356        // Slow path: walk every collection through `check_grant`. We
2357        // build the AuthzContext once, then reuse it per resource.
2358        let ctx = AuthzContext {
2359            principal,
2360            effective_role: role,
2361            tenant,
2362        };
2363        let mut visible = std::collections::HashSet::new();
2364        for collection in all_collections {
2365            let resource = Resource::table_from_name(collection);
2366            if self.check_grant(&ctx, Action::Select, &resource).is_ok() {
2367                visible.insert(collection.clone());
2368            }
2369        }
2370        self.visible_collections_cache.insert(key, visible.clone());
2371        visible
2372    }
2373
2374    /// Stats probe required by issue #119 — exposes hit/miss counts and
2375    /// invalidations for the visible-collections cache so metrics
2376    /// pipelines can compute a hit rate.
2377    pub fn auth_cache_stats(&self) -> super::scope_cache::AuthCacheStats {
2378        self.visible_collections_cache.stats()
2379    }
2380
2381    /// Drop every cached `(tenant, role)` entry. Called from CREATE
2382    /// POLICY / DROP POLICY / DROP COLLECTION paths where the affected
2383    /// tenant set is unknown.
2384    pub fn invalidate_visible_collections_cache(&self) {
2385        self.visible_collections_cache.invalidate_all();
2386    }
2387
2388    /// Drop cached entries for one tenant. Called from GRANT / REVOKE
2389    /// where the principal's tenant is known.
2390    pub fn invalidate_visible_collections_for_tenant(&self, tenant: Option<&str>) {
2391        self.visible_collections_cache.invalidate_tenant(tenant);
2392    }
2393
2394    /// Snapshot of every grant the principal effectively has, including
2395    /// `Public` grants. Audit / introspection helper.
2396    pub fn effective_grants(&self, uid: &UserId) -> Vec<Grant> {
2397        let mut out = Vec::new();
2398        if let Ok(g) = self.grants.read() {
2399            if let Some(list) = g.get(uid) {
2400                out.extend(list.iter().cloned());
2401            }
2402        }
2403        if let Ok(p) = self.public_grants.read() {
2404            out.extend(p.iter().cloned());
2405        }
2406        out
2407    }
2408
2409    /// Run a privilege check using the in-memory grant tables. Returns
2410    /// `Ok(())` on allow, `Err(AuthzError)` on deny.
2411    pub fn check_grant(
2412        &self,
2413        ctx: &AuthzContext<'_>,
2414        action: Action,
2415        resource: &Resource,
2416    ) -> Result<(), AuthzError> {
2417        if ctx.effective_role == Role::Admin {
2418            return Ok(());
2419        }
2420
2421        let uid = UserId::from_parts(ctx.tenant, ctx.principal);
2422
2423        // Fast path: per-user pre-resolved cache.
2424        if let Ok(cache) = self.permission_cache.read() {
2425            if let Some(pc) = cache.get(&uid) {
2426                if pc.allows(resource, action) {
2427                    return Ok(());
2428                }
2429            }
2430        }
2431
2432        // Slow path: linear scan + rebuild cache as a side-effect.
2433        let user_grants = self
2434            .grants
2435            .read()
2436            .ok()
2437            .and_then(|g| g.get(&uid).cloned())
2438            .unwrap_or_default();
2439        let any_user_grants = self
2440            .grants
2441            .read()
2442            .ok()
2443            .map(|g| g.values().any(|list| !list.is_empty()))
2444            .unwrap_or(false);
2445        let public_grants = self
2446            .public_grants
2447            .read()
2448            .ok()
2449            .map(|p| p.clone())
2450            .unwrap_or_default();
2451        if user_grants.is_empty() && public_grants.is_empty() && any_user_grants {
2452            return Err(AuthzError::PermissionDenied {
2453                action,
2454                resource: resource.clone(),
2455                principal: ctx.principal.to_string(),
2456            });
2457        }
2458        let view = GrantsView {
2459            user_grants: &user_grants,
2460            public_grants: &public_grants,
2461        };
2462        let result = check_grant(ctx, action, resource, &view);
2463
2464        if result.is_ok() {
2465            let pc = PermissionCache::build(&user_grants, &public_grants);
2466            if let Ok(mut cache) = self.permission_cache.write() {
2467                cache.insert(uid, pc);
2468            }
2469        }
2470        result
2471    }
2472
2473    // -----------------------------------------------------------------
2474    // ALTER USER attributes (VALID UNTIL, CONNECTION LIMIT, etc.)
2475    // -----------------------------------------------------------------
2476
2477    /// Replace the attribute record for `uid`.
2478    pub fn set_user_attributes(
2479        &self,
2480        uid: &UserId,
2481        attrs: UserAttributes,
2482    ) -> Result<(), AuthError> {
2483        let users = self.users.read().map_err(lock_err)?;
2484        if !users.contains_key(uid) {
2485            return Err(AuthError::UserNotFound(uid.to_string()));
2486        }
2487        drop(users);
2488
2489        self.user_attributes
2490            .write()
2491            .unwrap_or_else(|e| e.into_inner())
2492            .insert(uid.clone(), attrs);
2493        self.invalidate_iam_cache(Some(uid));
2494        self.persist_acl_to_kv();
2495        Ok(())
2496    }
2497
2498    /// Read the attributes for `uid`. Returns `Default::default()` for
2499    /// users that have never been altered.
2500    pub fn user_attributes(&self, uid: &UserId) -> UserAttributes {
2501        self.user_attributes
2502            .read()
2503            .ok()
2504            .and_then(|m| m.get(uid).cloned())
2505            .unwrap_or_default()
2506    }
2507
2508    pub fn add_user_to_group(&self, uid: &UserId, group: &str) -> Result<(), AuthError> {
2509        if group.trim().is_empty() {
2510            return Err(AuthError::Forbidden("group name cannot be empty".into()));
2511        }
2512        let mut attrs = self.user_attributes(uid);
2513        if !attrs.groups.iter().any(|g| g == group) {
2514            attrs.groups.push(group.to_string());
2515            attrs.groups.sort();
2516        }
2517        self.set_user_attributes(uid, attrs)
2518    }
2519
2520    pub fn remove_user_from_group(&self, uid: &UserId, group: &str) -> Result<(), AuthError> {
2521        let mut attrs = self.user_attributes(uid);
2522        attrs.groups.retain(|g| g != group);
2523        self.set_user_attributes(uid, attrs)
2524    }
2525
2526    /// Toggle `User.enabled` without rotating credentials.
2527    pub fn set_user_enabled(&self, uid: &UserId, enabled: bool) -> Result<(), AuthError> {
2528        if let Some(configured) = self.configured_control_events() {
2529            let ctx = default_user_lifecycle_ctx();
2530            let control = UserLifecycleControl {
2531                ctx: &ctx,
2532                ledger: configured.ledger.as_ref(),
2533                config: configured.config,
2534            };
2535            self.set_user_enabled_controlled(uid, enabled, &control)
2536        } else {
2537            self.set_user_enabled_unaudited(uid, enabled)
2538        }
2539    }
2540
2541    pub fn disable_user(
2542        &self,
2543        username: &str,
2544        ctx: &ControlEventCtx<'_>,
2545        ledger: &dyn ControlEventLedger,
2546        config: ControlEventConfig,
2547    ) -> Result<(), AuthError> {
2548        self.disable_user_in_tenant(None, username, ctx, ledger, config)
2549    }
2550
2551    pub fn disable_user_in_tenant(
2552        &self,
2553        tenant_id: Option<&str>,
2554        username: &str,
2555        ctx: &ControlEventCtx<'_>,
2556        ledger: &dyn ControlEventLedger,
2557        config: ControlEventConfig,
2558    ) -> Result<(), AuthError> {
2559        let uid = UserId::from_parts(tenant_id, username);
2560        let control = UserLifecycleControl {
2561            ctx,
2562            ledger,
2563            config,
2564        };
2565        self.set_user_enabled_controlled(&uid, false, &control)
2566    }
2567
2568    fn set_user_enabled_unaudited(&self, uid: &UserId, enabled: bool) -> Result<(), AuthError> {
2569        let mut users = self.users.write().map_err(lock_err)?;
2570        let user = users
2571            .get_mut(uid)
2572            .ok_or_else(|| AuthError::UserNotFound(uid.to_string()))?;
2573        user.enabled = enabled;
2574        user.updated_at = now_ms();
2575        drop(users);
2576        self.persist_to_vault();
2577        Ok(())
2578    }
2579
2580    fn set_user_enabled_controlled(
2581        &self,
2582        uid: &UserId,
2583        enabled: bool,
2584        control: &UserLifecycleControl<'_>,
2585    ) -> Result<(), AuthError> {
2586        let previous = self.get_user_cloned(uid);
2587        let kind = if enabled {
2588            EventKind::UserUpdate
2589        } else {
2590            EventKind::UserDisable
2591        };
2592        let action = if enabled {
2593            "user.update"
2594        } else {
2595            "user.disable"
2596        };
2597        match self.set_user_enabled_unaudited(uid, enabled) {
2598            Ok(()) => {
2599                if let Err(err) = self.emit_user_lifecycle_allowed(
2600                    control,
2601                    kind,
2602                    action,
2603                    uid,
2604                    user_control_fields(
2605                        uid,
2606                        previous.as_ref().map(|u| u.role),
2607                        Some(enabled),
2608                        false,
2609                    ),
2610                ) {
2611                    self.restore_user_snapshot(uid, previous);
2612                    return Err(err);
2613                }
2614                Ok(())
2615            }
2616            Err(err) => {
2617                self.emit_user_lifecycle_outcome(
2618                    control,
2619                    if user_error_is_denied(&err) {
2620                        Outcome::Denied
2621                    } else {
2622                        Outcome::Error
2623                    },
2624                    kind,
2625                    action,
2626                    uid,
2627                    Some(err.to_string()),
2628                    user_control_fields(
2629                        uid,
2630                        previous.as_ref().map(|u| u.role),
2631                        Some(enabled),
2632                        false,
2633                    ),
2634                );
2635                Err(err)
2636            }
2637        }
2638    }
2639
2640    fn emit_user_lifecycle_allowed(
2641        &self,
2642        control: &UserLifecycleControl<'_>,
2643        kind: EventKind,
2644        action: &'static str,
2645        id: &UserId,
2646        fields: HashMap<String, Sensitivity>,
2647    ) -> Result<(), AuthError> {
2648        self.emit_user_lifecycle_event(control, Outcome::Allowed, kind, action, id, None, fields)
2649    }
2650
2651    fn emit_api_key_allowed(
2652        &self,
2653        control: &UserLifecycleControl<'_>,
2654        kind: EventKind,
2655        action: &'static str,
2656        api_key_id: &str,
2657        fields: HashMap<String, Sensitivity>,
2658    ) -> Result<(), AuthError> {
2659        self.emit_control_event(
2660            control,
2661            Outcome::Allowed,
2662            kind,
2663            action,
2664            Some(api_key_resource(api_key_id)),
2665            None,
2666            fields,
2667        )
2668    }
2669
2670    #[allow(clippy::too_many_arguments)]
2671    fn emit_user_lifecycle_outcome(
2672        &self,
2673        control: &UserLifecycleControl<'_>,
2674        outcome: Outcome,
2675        kind: EventKind,
2676        action: &'static str,
2677        id: &UserId,
2678        reason: Option<String>,
2679        fields: HashMap<String, Sensitivity>,
2680    ) {
2681        let _ = self.emit_user_lifecycle_event(control, outcome, kind, action, id, reason, fields);
2682    }
2683
2684    #[allow(clippy::too_many_arguments)]
2685    fn emit_user_lifecycle_event(
2686        &self,
2687        control: &UserLifecycleControl<'_>,
2688        outcome: Outcome,
2689        kind: EventKind,
2690        action: &'static str,
2691        id: &UserId,
2692        reason: Option<String>,
2693        fields: HashMap<String, Sensitivity>,
2694    ) -> Result<(), AuthError> {
2695        self.emit_control_event(
2696            control,
2697            outcome,
2698            kind,
2699            action,
2700            Some(user_resource(id)),
2701            reason,
2702            fields,
2703        )
2704    }
2705
2706    #[allow(clippy::too_many_arguments)]
2707    fn emit_control_event(
2708        &self,
2709        control: &UserLifecycleControl<'_>,
2710        outcome: Outcome,
2711        kind: EventKind,
2712        action: &'static str,
2713        resource: Option<String>,
2714        reason: Option<String>,
2715        fields: HashMap<String, Sensitivity>,
2716    ) -> Result<(), AuthError> {
2717        let event = ControlEvent {
2718            kind,
2719            outcome,
2720            action: Cow::Borrowed(action),
2721            resource,
2722            reason,
2723            matched_policy_id: None,
2724            fields,
2725        };
2726        match control.ledger.emit(control.ctx, event) {
2727            Ok(_) => Ok(()),
2728            Err(err) if control.config.require_persistence() => {
2729                Err(AuthError::Internal(err.to_string()))
2730            }
2731            Err(_) => Ok(()),
2732        }
2733    }
2734
2735    fn rollback_create_user(&self, id: &UserId) {
2736        let removed = self
2737            .users
2738            .write()
2739            .unwrap_or_else(|e| e.into_inner())
2740            .remove(id);
2741        if let Some(user) = removed {
2742            self.remove_api_key_index_entries(&user);
2743        }
2744        self.persist_to_vault();
2745    }
2746
2747    fn rollback_bootstrap(&self, id: &UserId) {
2748        self.bootstrapped.store(false, Ordering::Release);
2749        self.rollback_create_user(id);
2750    }
2751
2752    fn restore_user_snapshot(&self, id: &UserId, previous: Option<User>) {
2753        match previous {
2754            Some(user) => {
2755                self.users
2756                    .write()
2757                    .unwrap_or_else(|e| e.into_inner())
2758                    .insert(id.clone(), user.clone());
2759                self.index_user_api_keys(id, &user);
2760            }
2761            None => {
2762                self.rollback_create_user(id);
2763                return;
2764            }
2765        }
2766        self.persist_to_vault();
2767    }
2768
2769    fn user_delete_rollback_snapshot(
2770        &self,
2771        id: &UserId,
2772        tenant_id: Option<&str>,
2773        username: &str,
2774    ) -> Option<(User, Vec<(String, Session)>)> {
2775        let user = self.get_user_cloned(id)?;
2776        let sessions = self
2777            .sessions
2778            .read()
2779            .map(|sessions| {
2780                sessions
2781                    .iter()
2782                    .filter(|(_, session)| {
2783                        session.username == username && session.tenant_id.as_deref() == tenant_id
2784                    })
2785                    .map(|(token, session)| (token.clone(), session.clone()))
2786                    .collect()
2787            })
2788            .unwrap_or_default();
2789        Some((user, sessions))
2790    }
2791
2792    fn restore_deleted_user(&self, id: &UserId, user: User, sessions: Vec<(String, Session)>) {
2793        self.users
2794            .write()
2795            .unwrap_or_else(|e| e.into_inner())
2796            .insert(id.clone(), user.clone());
2797        self.index_user_api_keys(id, &user);
2798        if !sessions.is_empty() {
2799            let mut guard = self.sessions.write().unwrap_or_else(|e| e.into_inner());
2800            for (token, session) in sessions {
2801                guard.insert(token, session);
2802            }
2803        }
2804        self.persist_to_vault();
2805    }
2806
2807    fn rollback_create_api_key(&self, id: &UserId, key: &str) {
2808        if let Ok(mut users) = self.users.write() {
2809            if let Some(user) = users.get_mut(id) {
2810                user.api_keys.retain(|api_key| api_key.key != key);
2811                user.updated_at = now_ms();
2812            }
2813        }
2814        if let Ok(mut idx) = self.api_key_index.write() {
2815            idx.remove(key);
2816        }
2817        self.persist_to_vault();
2818    }
2819
2820    fn api_key_rollback_snapshot(&self, key: &str) -> Option<(UserId, ApiKey)> {
2821        let users = self.users.read().ok()?;
2822        users.iter().find_map(|(id, user)| {
2823            user.api_keys
2824                .iter()
2825                .find(|api_key| api_key.key == key)
2826                .cloned()
2827                .map(|api_key| (id.clone(), api_key))
2828        })
2829    }
2830
2831    fn restore_api_key(&self, id: &UserId, api_key: ApiKey) {
2832        if let Ok(mut users) = self.users.write() {
2833            if let Some(user) = users.get_mut(id) {
2834                if !user
2835                    .api_keys
2836                    .iter()
2837                    .any(|candidate| candidate.key == api_key.key)
2838                {
2839                    user.api_keys.push(api_key.clone());
2840                    user.updated_at = now_ms();
2841                }
2842            }
2843        }
2844        if let Ok(mut idx) = self.api_key_index.write() {
2845            idx.insert(api_key.key.clone(), (id.clone(), api_key.role));
2846        }
2847        self.persist_to_vault();
2848    }
2849
2850    fn remove_api_key_index_entries(&self, user: &User) {
2851        if let Ok(mut idx) = self.api_key_index.write() {
2852            for api_key in &user.api_keys {
2853                idx.remove(&api_key.key);
2854            }
2855        }
2856    }
2857
2858    fn index_user_api_keys(&self, id: &UserId, user: &User) {
2859        if let Ok(mut idx) = self.api_key_index.write() {
2860            for api_key in &user.api_keys {
2861                idx.insert(api_key.key.clone(), (id.clone(), api_key.role));
2862            }
2863        }
2864    }
2865
2866    // -----------------------------------------------------------------
2867    // Login-side enforcement (HTTP path)
2868    // -----------------------------------------------------------------
2869
2870    /// Authenticate with VALID UNTIL / CONNECTION LIMIT enforcement.
2871    /// Wraps `authenticate_in_tenant` and additionally:
2872    ///   * rejects logins after `valid_until`,
2873    ///   * rejects logins when the live session count would exceed the
2874    ///     `connection_limit` attribute.
2875    pub fn authenticate_with_attrs(
2876        &self,
2877        tenant_id: Option<&str>,
2878        username: &str,
2879        password: &str,
2880    ) -> Result<Session, AuthError> {
2881        let uid = UserId::from_parts(tenant_id, username);
2882        let attrs = self.user_attributes(&uid);
2883
2884        if let Some(deadline) = attrs.valid_until {
2885            if now_ms() >= deadline {
2886                return Err(AuthError::Forbidden(format!(
2887                    "account `{}` expired (VALID UNTIL exceeded)",
2888                    uid
2889                )));
2890            }
2891        }
2892
2893        if let Some(limit) = attrs.connection_limit {
2894            let current = self
2895                .session_count_by_user
2896                .read()
2897                .ok()
2898                .and_then(|m| m.get(&uid).copied())
2899                .unwrap_or(0);
2900            if current >= limit {
2901                return Err(AuthError::Forbidden(format!(
2902                    "account `{}` exceeded CONNECTION LIMIT ({})",
2903                    uid, limit
2904                )));
2905            }
2906        }
2907
2908        let session = self.authenticate_in_tenant(tenant_id, username, password)?;
2909
2910        if let Ok(mut counts) = self.session_count_by_user.write() {
2911            *counts.entry(uid).or_insert(0) += 1;
2912        }
2913        Ok(session)
2914    }
2915
2916    /// Decrement the live-session count for `uid`. Call from session
2917    /// revoke / expiry paths so CONNECTION LIMIT stays accurate.
2918    pub fn decrement_session_count(&self, uid: &UserId) {
2919        if let Ok(mut counts) = self.session_count_by_user.write() {
2920            if let Some(c) = counts.get_mut(uid) {
2921                *c = c.saturating_sub(1);
2922            }
2923        }
2924    }
2925
2926    // -----------------------------------------------------------------
2927    // ACL persistence — vault_kv backed
2928    // -----------------------------------------------------------------
2929
2930    /// Re-read the ACL state from `vault_kv`. Call after vault load /
2931    /// restore so the in-memory maps reflect the persisted data.
2932    pub fn rehydrate_acl(&self) {
2933        let kv_snapshot: Vec<(String, String)> = self
2934            .vault_kv
2935            .read()
2936            .map(|kv| {
2937                kv.iter()
2938                    .filter(|(k, _)| {
2939                        k.starts_with("red.acl.grants.")
2940                            || k.starts_with("red.acl.attrs.")
2941                            || k == &"red.acl.public_grants"
2942                    })
2943                    .map(|(k, v)| (k.clone(), v.clone()))
2944                    .collect()
2945            })
2946            .unwrap_or_default();
2947
2948        for (k, v) in kv_snapshot {
2949            if k == "red.acl.public_grants" {
2950                if let Some(parsed) = decode_grants_blob(&v) {
2951                    *self
2952                        .public_grants
2953                        .write()
2954                        .unwrap_or_else(|e| e.into_inner()) = parsed;
2955                }
2956            } else if let Some(suffix) = k.strip_prefix("red.acl.grants.") {
2957                if let Some(uid) = decode_uid(suffix) {
2958                    if let Some(mut parsed) = decode_grants_blob(&v) {
2959                        // Restore the principal field — the on-disk
2960                        // line stores only resource+action shape.
2961                        for g in parsed.iter_mut() {
2962                            g.principal = GrantPrincipal::User(uid.clone());
2963                        }
2964                        self.grants
2965                            .write()
2966                            .unwrap_or_else(|e| e.into_inner())
2967                            .insert(uid, parsed);
2968                    }
2969                }
2970            } else if let Some(suffix) = k.strip_prefix("red.acl.attrs.") {
2971                if let Some(uid) = decode_uid(suffix) {
2972                    if let Some(parsed) = decode_attrs_blob(&v) {
2973                        self.user_attributes
2974                            .write()
2975                            .unwrap_or_else(|e| e.into_inner())
2976                            .insert(uid, parsed);
2977                    }
2978                }
2979            }
2980        }
2981
2982        self.permission_cache
2983            .write()
2984            .unwrap_or_else(|e| e.into_inner())
2985            .clear();
2986    }
2987
2988    /// Snapshot every ACL change back into the vault KV store.
2989    fn persist_acl_to_kv(&self) {
2990        let public = self
2991            .public_grants
2992            .read()
2993            .ok()
2994            .map(|p| encode_grants_blob(&p))
2995            .unwrap_or_default();
2996        self.vault_kv_set("red.acl.public_grants".to_string(), public);
2997
2998        let snapshot: Vec<(UserId, Vec<Grant>)> = self
2999            .grants
3000            .read()
3001            .ok()
3002            .map(|g| g.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3003            .unwrap_or_default();
3004        for (uid, list) in snapshot {
3005            let key = format!("red.acl.grants.{}", encode_uid(&uid));
3006            let val = encode_grants_blob(&list);
3007            self.vault_kv_set(key, val);
3008        }
3009
3010        let attrs_snapshot: Vec<(UserId, UserAttributes)> = self
3011            .user_attributes
3012            .read()
3013            .ok()
3014            .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3015            .unwrap_or_default();
3016        for (uid, attrs) in attrs_snapshot {
3017            let key = format!("red.acl.attrs.{}", encode_uid(&uid));
3018            let val = encode_attrs_blob(&attrs);
3019            self.vault_kv_set(key, val);
3020        }
3021    }
3022
3023    fn invalidate_permission_cache(&self, uid: Option<&UserId>) {
3024        if let Ok(mut cache) = self.permission_cache.write() {
3025            match uid {
3026                Some(u) => {
3027                    cache.remove(u);
3028                }
3029                None => cache.clear(),
3030            }
3031        }
3032    }
3033
3034    // -----------------------------------------------------------------
3035    // IAM policies — put / delete / attach / detach / simulate
3036    //
3037    // The kernel in `super::policies` owns the Policy type and the
3038    // evaluator. AuthStore handles persistence + per-user cache + the
3039    // GRANT translation layer (synthetic `_grant_*` policies).
3040    // -----------------------------------------------------------------
3041
3042    pub fn put_policy_with_control_events(
3043        &self,
3044        p: Policy,
3045        control: &PolicyMutationControl<'_>,
3046    ) -> Result<(), AuthError> {
3047        let policy_id = p.id.clone();
3048        let kind = if self.get_policy(&policy_id).is_some() {
3049            EventKind::PolicyUpdate
3050        } else {
3051            EventKind::PolicyCreate
3052        };
3053
3054        if p.id.starts_with("_grant_") || p.id.starts_with("_default_") {
3055            let err = AuthError::Forbidden(format!("policy id `{}` is reserved", p.id));
3056            self.emit_policy_error(
3057                control,
3058                kind,
3059                "policy:put",
3060                &policy_id,
3061                Some(&p),
3062                None,
3063                &err,
3064            );
3065            return Err(err);
3066        }
3067
3068        if let Some(ManagedPolicyDecision::Deny {
3069            entry_id,
3070            matched_action,
3071            matched_resource,
3072            reason,
3073            ..
3074        }) = self.managed_policy_decision(&policy_id, PolicyOp::Put, control)
3075        {
3076            self.emit_policy_denied(
3077                control,
3078                kind,
3079                "policy:put",
3080                &policy_id,
3081                Some(&p),
3082                None,
3083                &reason.to_string(),
3084                Some(entry_id),
3085                Some((matched_action, matched_resource)),
3086            );
3087            return Err(Self::managed_policy_error(&policy_id, &reason.to_string()));
3088        }
3089
3090        let previous = self.get_policy(&policy_id);
3091        let was_enabled = self.iam_authorization_enabled();
3092        match self.put_policy_internal(p.clone()) {
3093            Ok(()) => match self.emit_policy_allowed(
3094                control,
3095                kind,
3096                "policy:put",
3097                &policy_id,
3098                Some(&p),
3099                None,
3100                None,
3101            ) {
3102                Ok(()) => Ok(()),
3103                Err(err) => {
3104                    self.restore_policy_put(&policy_id, previous, was_enabled);
3105                    Err(err)
3106                }
3107            },
3108            Err(err) => {
3109                self.emit_policy_error(
3110                    control,
3111                    kind,
3112                    "policy:put",
3113                    &policy_id,
3114                    Some(&p),
3115                    None,
3116                    &err,
3117                );
3118                Err(err)
3119            }
3120        }
3121    }
3122
3123    pub fn delete_policy_with_control_events(
3124        &self,
3125        id: &str,
3126        control: &PolicyMutationControl<'_>,
3127    ) -> Result<(), AuthError> {
3128        let existing = self.get_policy(id);
3129        if let Some(ManagedPolicyDecision::Deny {
3130            entry_id,
3131            matched_action,
3132            matched_resource,
3133            reason,
3134            ..
3135        }) = self.managed_policy_decision(id, PolicyOp::Drop, control)
3136        {
3137            self.emit_policy_denied(
3138                control,
3139                EventKind::PolicyDelete,
3140                "policy:drop",
3141                id,
3142                existing.as_deref(),
3143                None,
3144                &reason.to_string(),
3145                Some(entry_id),
3146                Some((matched_action, matched_resource)),
3147            );
3148            return Err(Self::managed_policy_error(id, &reason.to_string()));
3149        }
3150
3151        let user_attachments = self
3152            .user_attachments
3153            .read()
3154            .map(|m| m.clone())
3155            .unwrap_or_default();
3156        let group_attachments = self
3157            .group_attachments
3158            .read()
3159            .map(|m| m.clone())
3160            .unwrap_or_default();
3161        let was_enabled = self.iam_authorization_enabled();
3162        match self.delete_policy(id) {
3163            Ok(()) => match self.emit_policy_allowed(
3164                control,
3165                EventKind::PolicyDelete,
3166                "policy:drop",
3167                id,
3168                existing.as_deref(),
3169                None,
3170                None,
3171            ) {
3172                Ok(()) => Ok(()),
3173                Err(err) => {
3174                    if let Some(policy) = existing {
3175                        self.restore_policy_delete(
3176                            id,
3177                            policy,
3178                            user_attachments,
3179                            group_attachments,
3180                            was_enabled,
3181                        );
3182                    }
3183                    Err(err)
3184                }
3185            },
3186            Err(err) => {
3187                self.emit_policy_error(
3188                    control,
3189                    EventKind::PolicyDelete,
3190                    "policy:drop",
3191                    id,
3192                    existing.as_deref(),
3193                    None,
3194                    &err,
3195                );
3196                Err(err)
3197            }
3198        }
3199    }
3200
3201    pub fn attach_policy_with_control_events(
3202        &self,
3203        principal: PrincipalRef,
3204        policy_id: &str,
3205        control: &PolicyMutationControl<'_>,
3206    ) -> Result<(), AuthError> {
3207        let existing = self.get_policy(policy_id);
3208        if let Some(ManagedPolicyDecision::Deny {
3209            entry_id,
3210            matched_action,
3211            matched_resource,
3212            reason,
3213            ..
3214        }) = self.managed_policy_decision(policy_id, PolicyOp::Attach, control)
3215        {
3216            self.emit_policy_denied(
3217                control,
3218                EventKind::PolicyAttach,
3219                "policy:attach",
3220                policy_id,
3221                existing.as_deref(),
3222                Some(&principal),
3223                &reason.to_string(),
3224                Some(entry_id),
3225                Some((matched_action, matched_resource)),
3226            );
3227            return Err(Self::managed_policy_error(policy_id, &reason.to_string()));
3228        }
3229
3230        if let Some(err_msg) = self.check_self_lock_invariant_for_attach(policy_id) {
3231            self.emit_policy_denied(
3232                control,
3233                EventKind::PolicyAttach,
3234                "policy:attach",
3235                policy_id,
3236                existing.as_deref(),
3237                Some(&principal),
3238                &err_msg,
3239                None,
3240                None,
3241            );
3242            return Err(AuthError::Forbidden(err_msg));
3243        }
3244
3245        let user_attachments = self
3246            .user_attachments
3247            .read()
3248            .map(|m| m.clone())
3249            .unwrap_or_default();
3250        let group_attachments = self
3251            .group_attachments
3252            .read()
3253            .map(|m| m.clone())
3254            .unwrap_or_default();
3255        match self.attach_policy(principal.clone(), policy_id) {
3256            Ok(()) => match self.emit_policy_allowed(
3257                control,
3258                EventKind::PolicyAttach,
3259                "policy:attach",
3260                policy_id,
3261                existing.as_deref(),
3262                Some(&principal),
3263                None,
3264            ) {
3265                Ok(()) => Ok(()),
3266                Err(err) => {
3267                    self.restore_policy_attachments(user_attachments, group_attachments);
3268                    Err(err)
3269                }
3270            },
3271            Err(err) => {
3272                self.emit_policy_error(
3273                    control,
3274                    EventKind::PolicyAttach,
3275                    "policy:attach",
3276                    policy_id,
3277                    existing.as_deref(),
3278                    Some(&principal),
3279                    &err,
3280                );
3281                Err(err)
3282            }
3283        }
3284    }
3285
3286    pub fn detach_policy_with_control_events(
3287        &self,
3288        principal: PrincipalRef,
3289        policy_id: &str,
3290        control: &PolicyMutationControl<'_>,
3291    ) -> Result<(), AuthError> {
3292        let existing = self.get_policy(policy_id);
3293        if let Some(ManagedPolicyDecision::Deny {
3294            entry_id,
3295            matched_action,
3296            matched_resource,
3297            reason,
3298            ..
3299        }) = self.managed_policy_decision(policy_id, PolicyOp::Detach, control)
3300        {
3301            self.emit_policy_denied(
3302                control,
3303                EventKind::PolicyDetach,
3304                "policy:detach",
3305                policy_id,
3306                existing.as_deref(),
3307                Some(&principal),
3308                &reason.to_string(),
3309                Some(entry_id),
3310                Some((matched_action, matched_resource)),
3311            );
3312            return Err(Self::managed_policy_error(policy_id, &reason.to_string()));
3313        }
3314        let Some(existing_policy) = existing else {
3315            let err = AuthError::Forbidden(format!("policy `{policy_id}` not found"));
3316            self.emit_policy_error(
3317                control,
3318                EventKind::PolicyDetach,
3319                "policy:detach",
3320                policy_id,
3321                None,
3322                Some(&principal),
3323                &err,
3324            );
3325            return Err(err);
3326        };
3327
3328        let user_attachments = self
3329            .user_attachments
3330            .read()
3331            .map(|m| m.clone())
3332            .unwrap_or_default();
3333        let group_attachments = self
3334            .group_attachments
3335            .read()
3336            .map(|m| m.clone())
3337            .unwrap_or_default();
3338        match self.detach_policy(principal.clone(), policy_id) {
3339            Ok(()) => match self.emit_policy_allowed(
3340                control,
3341                EventKind::PolicyDetach,
3342                "policy:detach",
3343                policy_id,
3344                Some(existing_policy.as_ref()),
3345                Some(&principal),
3346                None,
3347            ) {
3348                Ok(()) => Ok(()),
3349                Err(err) => {
3350                    self.restore_policy_attachments(user_attachments, group_attachments);
3351                    Err(err)
3352                }
3353            },
3354            Err(err) => {
3355                self.emit_policy_error(
3356                    control,
3357                    EventKind::PolicyDetach,
3358                    "policy:detach",
3359                    policy_id,
3360                    Some(existing_policy.as_ref()),
3361                    Some(&principal),
3362                    &err,
3363                );
3364                Err(err)
3365            }
3366        }
3367    }
3368
3369    fn managed_policy_decision(
3370        &self,
3371        policy_id: &str,
3372        op: PolicyOp,
3373        control: &PolicyMutationControl<'_>,
3374    ) -> Option<ManagedPolicyDecision> {
3375        control.registry.map(|registry| {
3376            ManagedPolicyGate::new(registry).check_mutation(
3377                self,
3378                control.actor,
3379                control.eval_ctx,
3380                policy_id,
3381                op,
3382            )
3383        })
3384    }
3385
3386    /// Run the [`crate::auth::self_lock_guard`] invariant against the
3387    /// current policy set. Returns `Some(error_message)` when an attach
3388    /// must be refused, `None` when the system would remain unlockable.
3389    ///
3390    /// `_policy_id` is informational — the invariant is on the full
3391    /// graph and doesn't currently special-case the new entry.
3392    fn check_self_lock_invariant_for_attach(&self, _policy_id: &str) -> Option<String> {
3393        use crate::auth::self_lock_guard::{
3394            check_self_lock_invariant, format_block_error, InvariantOutcome,
3395        };
3396        let policies: Vec<Arc<Policy>> = match self.policies.read() {
3397            Ok(map) => map.values().cloned().collect(),
3398            Err(_) => return None,
3399        };
3400        let outcome = check_self_lock_invariant(&policies);
3401        match &outcome {
3402            InvariantOutcome::Ok => None,
3403            InvariantOutcome::Blocked { .. } => format_block_error(&outcome),
3404        }
3405    }
3406
3407    fn managed_policy_error(policy_id: &str, reason: &str) -> AuthError {
3408        AuthError::Forbidden(format!(
3409            "managed policy mutation blocked for `{policy_id}`: {reason}"
3410        ))
3411    }
3412
3413    fn emit_policy_allowed(
3414        &self,
3415        control: &PolicyMutationControl<'_>,
3416        kind: EventKind,
3417        action: &'static str,
3418        policy_id: &str,
3419        policy: Option<&Policy>,
3420        principal: Option<&PrincipalRef>,
3421        matched_policy_id: Option<String>,
3422    ) -> Result<(), AuthError> {
3423        let event = ControlEvent {
3424            kind,
3425            outcome: Outcome::Allowed,
3426            action: std::borrow::Cow::Borrowed(action),
3427            resource: Some(format!("policy:{policy_id}")),
3428            reason: None,
3429            matched_policy_id,
3430            fields: policy_control_fields(policy_id, policy, principal),
3431        };
3432        match control.ledger.emit(control.ctx, event) {
3433            Ok(_) => Ok(()),
3434            Err(err) if control.config.require_persistence() => {
3435                Err(AuthError::Internal(err.to_string()))
3436            }
3437            Err(_) => Ok(()),
3438        }
3439    }
3440
3441    #[allow(clippy::too_many_arguments)]
3442    fn emit_policy_denied(
3443        &self,
3444        control: &PolicyMutationControl<'_>,
3445        kind: EventKind,
3446        action: &'static str,
3447        policy_id: &str,
3448        policy: Option<&Policy>,
3449        principal: Option<&PrincipalRef>,
3450        reason: &str,
3451        matched_policy_id: Option<String>,
3452        matched: Option<(String, String)>,
3453    ) {
3454        let mut fields = policy_control_fields(policy_id, policy, principal);
3455        if let Some((matched_action, matched_resource)) = matched {
3456            fields.insert(
3457                "matched_action".to_string(),
3458                Sensitivity::raw(matched_action),
3459            );
3460            fields.insert(
3461                "matched_resource".to_string(),
3462                Sensitivity::raw(matched_resource),
3463            );
3464        }
3465        let event = ControlEvent {
3466            kind,
3467            outcome: Outcome::Denied,
3468            action: std::borrow::Cow::Borrowed(action),
3469            resource: Some(format!("policy:{policy_id}")),
3470            reason: Some(reason.to_string()),
3471            matched_policy_id,
3472            fields,
3473        };
3474        let _ = control.ledger.emit(control.ctx, event);
3475    }
3476
3477    fn emit_policy_error(
3478        &self,
3479        control: &PolicyMutationControl<'_>,
3480        kind: EventKind,
3481        action: &'static str,
3482        policy_id: &str,
3483        policy: Option<&Policy>,
3484        principal: Option<&PrincipalRef>,
3485        err: &AuthError,
3486    ) {
3487        let event = ControlEvent {
3488            kind,
3489            outcome: Outcome::Error,
3490            action: std::borrow::Cow::Borrowed(action),
3491            resource: Some(format!("policy:{policy_id}")),
3492            reason: Some(err.to_string()),
3493            matched_policy_id: None,
3494            fields: policy_control_fields(policy_id, policy, principal),
3495        };
3496        let _ = control.ledger.emit(control.ctx, event);
3497    }
3498
3499    fn restore_policy_put(
3500        &self,
3501        policy_id: &str,
3502        previous: Option<Arc<Policy>>,
3503        was_enabled: bool,
3504    ) {
3505        let mut policies = self.policies.write().unwrap_or_else(|e| e.into_inner());
3506        match previous {
3507            Some(policy) => {
3508                policies.insert(policy_id.to_string(), policy);
3509            }
3510            None => {
3511                policies.remove(policy_id);
3512            }
3513        }
3514        drop(policies);
3515        self.iam_authorization_enabled
3516            .store(was_enabled, Ordering::Release);
3517        self.iam_effective_cache
3518            .write()
3519            .unwrap_or_else(|e| e.into_inner())
3520            .clear();
3521        self.invalidate_visible_collections_cache();
3522        self.persist_iam_to_kv();
3523    }
3524
3525    fn restore_policy_delete(
3526        &self,
3527        policy_id: &str,
3528        policy: Arc<Policy>,
3529        user_attachments: HashMap<UserId, Vec<String>>,
3530        group_attachments: HashMap<String, Vec<String>>,
3531        was_enabled: bool,
3532    ) {
3533        self.policies
3534            .write()
3535            .unwrap_or_else(|e| e.into_inner())
3536            .insert(policy_id.to_string(), policy);
3537        self.restore_policy_attachments(user_attachments, group_attachments);
3538        self.iam_authorization_enabled
3539            .store(was_enabled, Ordering::Release);
3540        self.persist_iam_to_kv();
3541    }
3542
3543    fn restore_policy_attachments(
3544        &self,
3545        user_attachments: HashMap<UserId, Vec<String>>,
3546        group_attachments: HashMap<String, Vec<String>>,
3547    ) {
3548        *self
3549            .user_attachments
3550            .write()
3551            .unwrap_or_else(|e| e.into_inner()) = user_attachments;
3552        *self
3553            .group_attachments
3554            .write()
3555            .unwrap_or_else(|e| e.into_inner()) = group_attachments;
3556        self.iam_effective_cache
3557            .write()
3558            .unwrap_or_else(|e| e.into_inner())
3559            .clear();
3560        self.invalidate_visible_collections_cache();
3561        self.persist_iam_to_kv();
3562    }
3563
3564    /// Insert or replace a policy by id. Rejects synthetic ids
3565    /// (`_grant_*` / `_default_*`) so callers can't hand-write them
3566    /// from the public API. Use `put_policy_internal` for synthetic
3567    /// inserts.
3568    pub fn put_policy(&self, p: Policy) -> Result<(), AuthError> {
3569        if p.id.starts_with("_grant_") || p.id.starts_with("_default_") {
3570            return Err(AuthError::Forbidden(format!(
3571                "policy id `{}` is reserved",
3572                p.id
3573            )));
3574        }
3575        self.put_policy_internal(p)
3576    }
3577
3578    /// Internal put bypassing the synthetic-namespace guard. Used by
3579    /// the GRANT translation layer; exposed publicly so integration
3580    /// tests can register synthetic `_grant_*` policies without going
3581    /// through the SQL frontend.
3582    pub fn put_policy_internal(&self, p: Policy) -> Result<(), AuthError> {
3583        p.validate()
3584            .map_err(|e| AuthError::Forbidden(format!("invalid policy `{}`: {e}", p.id)))?;
3585        let id = p.id.clone();
3586        self.policies
3587            .write()
3588            .unwrap_or_else(|e| e.into_inner())
3589            .insert(id, Arc::new(p));
3590        self.iam_authorization_enabled
3591            .store(true, Ordering::Release);
3592        self.iam_effective_cache
3593            .write()
3594            .unwrap_or_else(|e| e.into_inner())
3595            .clear();
3596        // Issue #119: a policy mutation can change the visible-
3597        // collections answer for any (tenant, role); we don't know
3598        // which up-front, so blow the whole cache.
3599        self.invalidate_visible_collections_cache();
3600        self.persist_iam_to_kv();
3601        Ok(())
3602    }
3603
3604    /// Whether the IAM evaluator should be authoritative for runtime
3605    /// authorization. This flips on the first policy write and remains
3606    /// on after deletes so dropping all policies leaves the instance in
3607    /// default-deny rather than silently returning to role fallback.
3608    pub fn iam_authorization_enabled(&self) -> bool {
3609        self.iam_authorization_enabled.load(Ordering::Acquire)
3610    }
3611
3612    /// Remove a policy and any attachments referencing it.
3613    pub fn delete_policy(&self, id: &str) -> Result<(), AuthError> {
3614        let removed = self
3615            .policies
3616            .write()
3617            .unwrap_or_else(|e| e.into_inner())
3618            .remove(id)
3619            .is_some();
3620        if !removed {
3621            return Err(AuthError::Forbidden(format!("policy `{id}` not found")));
3622        }
3623        // Detach from every user / group.
3624        if let Ok(mut ua) = self.user_attachments.write() {
3625            for ids in ua.values_mut() {
3626                ids.retain(|p| p != id);
3627            }
3628            ua.retain(|_, v| !v.is_empty());
3629        }
3630        if let Ok(mut ga) = self.group_attachments.write() {
3631            for ids in ga.values_mut() {
3632                ids.retain(|p| p != id);
3633            }
3634            ga.retain(|_, v| !v.is_empty());
3635        }
3636        self.iam_effective_cache
3637            .write()
3638            .unwrap_or_else(|e| e.into_inner())
3639            .clear();
3640        // Issue #119: dropping a policy can shrink any caller's visible
3641        // set; clear the (tenant, role) cache so AI commands re-resolve.
3642        self.invalidate_visible_collections_cache();
3643        self.persist_iam_to_kv();
3644        Ok(())
3645    }
3646
3647    /// List all policies (id-sorted for deterministic output).
3648    pub fn list_policies(&self) -> Vec<Arc<Policy>> {
3649        let map = match self.policies.read() {
3650            Ok(g) => g,
3651            Err(_) => return Vec::new(),
3652        };
3653        let mut out: Vec<Arc<Policy>> = map.values().cloned().collect();
3654        out.sort_by(|a, b| a.id.cmp(&b.id));
3655        out
3656    }
3657
3658    /// Fetch a single policy by id.
3659    pub fn get_policy(&self, id: &str) -> Option<Arc<Policy>> {
3660        self.policies.read().ok().and_then(|m| m.get(id).cloned())
3661    }
3662
3663    /// List policies directly attached to a group.
3664    pub fn group_policies(&self, group: &str) -> Vec<Arc<Policy>> {
3665        let policies = self.policies.read();
3666        let attachments = self.group_attachments.read();
3667        let mut out = Vec::new();
3668        if let (Ok(p_map), Ok(ga_map)) = (policies, attachments) {
3669            if let Some(ids) = ga_map.get(group) {
3670                for id in ids {
3671                    if let Some(p) = p_map.get(id) {
3672                        out.push(p.clone());
3673                    }
3674                }
3675            }
3676        }
3677        out.sort_by(|a, b| a.id.cmp(&b.id));
3678        out
3679    }
3680
3681    /// Delete synthetic policies produced by SQL GRANT translation.
3682    /// REVOKE uses this to keep the IAM lane and the legacy grant table
3683    /// in lock-step.
3684    pub fn delete_synthetic_grant_policies(
3685        &self,
3686        principal: &GrantPrincipal,
3687        resource: &Resource,
3688        actions: &[Action],
3689    ) -> usize {
3690        let attached = match principal {
3691            GrantPrincipal::User(uid) => self
3692                .user_attachments
3693                .read()
3694                .ok()
3695                .and_then(|m| m.get(uid).cloned())
3696                .unwrap_or_default(),
3697            GrantPrincipal::Group(group) => self
3698                .group_attachments
3699                .read()
3700                .ok()
3701                .and_then(|m| m.get(group).cloned())
3702                .unwrap_or_default(),
3703            GrantPrincipal::Public => self
3704                .group_attachments
3705                .read()
3706                .ok()
3707                .and_then(|m| m.get(PUBLIC_IAM_GROUP).cloned())
3708                .unwrap_or_default(),
3709        };
3710        if attached.is_empty() {
3711            return 0;
3712        }
3713
3714        let mut delete_ids = Vec::new();
3715        if let Ok(policies) = self.policies.read() {
3716            for id in attached {
3717                let Some(policy) = policies.get(&id) else {
3718                    continue;
3719                };
3720                if !policy.id.starts_with("_grant_") {
3721                    continue;
3722                }
3723                if synthetic_grant_matches(policy, resource, actions) {
3724                    delete_ids.push(policy.id.clone());
3725                }
3726            }
3727        }
3728
3729        let mut deleted = 0usize;
3730        for id in delete_ids {
3731            if self.delete_policy(&id).is_ok() {
3732                deleted += 1;
3733            }
3734        }
3735        deleted
3736    }
3737
3738    /// Apply the `REDDB_POLICY_BREAK_GLASS` recovery path: install or
3739    /// refresh the unlock policy, ensure the synthetic platform-owner
3740    /// user record exists, rebind the unlock policy to it, and emit a
3741    /// `policy.break_glass` audit event. Idempotent across reboots.
3742    ///
3743    /// `boot_ts_ms` is recorded in the audit event so operators can
3744    /// correlate the recovery with the boot transcript.
3745    pub fn apply_policy_break_glass(&self, boot_ts_ms: u128) -> Result<(), AuthError> {
3746        use crate::auth::self_lock_guard::{
3747            break_glass_audit_fields, unlock_policy, PLATFORM_OWNER_UNLOCK_POLICY_ID,
3748            PLATFORM_OWNER_USERNAME,
3749        };
3750
3751        // (1) Install or refresh the unlock policy. `put_policy_internal`
3752        // bypasses the synthetic-namespace guard, lets us replace any
3753        // tampered persisted copy, and persists to vault on success.
3754        let policy = unlock_policy();
3755        self.put_policy_internal(policy)?;
3756
3757        // (2) Ensure the synthetic platform-owner user record exists.
3758        // We bypass the public user-creation API to keep this user
3759        // immutable, disabled, and absent from operator listings even
3760        // if our filter ever regresses.
3761        let owner_id = UserId::platform(PLATFORM_OWNER_USERNAME);
3762        {
3763            let mut users = self.users.write().unwrap_or_else(|e| e.into_inner());
3764            users.entry(owner_id.clone()).or_insert_with(|| User {
3765                username: PLATFORM_OWNER_USERNAME.to_string(),
3766                tenant_id: None,
3767                // No usable password; even if `authenticate` were
3768                // wired to accept the principal it would fail
3769                // verification. Defence in depth: `enabled = false`
3770                // and the username filter both block it.
3771                password_hash: String::new(),
3772                scram_verifier: None,
3773                role: Role::Admin,
3774                api_keys: Vec::new(),
3775                created_at: boot_ts_ms,
3776                updated_at: boot_ts_ms,
3777                enabled: false,
3778            });
3779        }
3780
3781        // (3) Rebind the unlock policy to the synthetic principal.
3782        // `attach_policy` is idempotent (the helper checks for an
3783        // existing binding before appending).
3784        self.attach_policy(
3785            PrincipalRef::User(owner_id.clone()),
3786            PLATFORM_OWNER_UNLOCK_POLICY_ID,
3787        )?;
3788
3789        // (4) Emit a loud audit event. If the control-event ledger
3790        // isn't configured (e.g. unit tests without runtime wiring),
3791        // log a warn and proceed — recovery must not depend on audit
3792        // wiring being live.
3793        let fields_raw = break_glass_audit_fields(boot_ts_ms);
3794        let mut fields: HashMap<String, Sensitivity> = HashMap::new();
3795        for (k, v) in fields_raw {
3796            fields.insert(k, Sensitivity::raw(v));
3797        }
3798        if let Some(configured) = self.configured_control_events() {
3799            let ctx = ControlEventCtx {
3800                actor: crate::runtime::control_events::ActorRef::System("break_glass"),
3801                scope: None,
3802                request_id: None,
3803                trace_id: None,
3804            };
3805            let event = ControlEvent {
3806                kind: EventKind::PolicyBreakGlass,
3807                outcome: Outcome::Allowed,
3808                action: std::borrow::Cow::Borrowed("policy:break_glass"),
3809                resource: Some(format!("policy:{PLATFORM_OWNER_UNLOCK_POLICY_ID}")),
3810                reason: Some("REDDB_POLICY_BREAK_GLASS=1 at boot".into()),
3811                matched_policy_id: None,
3812                fields,
3813            };
3814            let _ = configured.ledger.emit(&ctx, event);
3815        }
3816        tracing::warn!(
3817            policy_id = PLATFORM_OWNER_UNLOCK_POLICY_ID,
3818            principal = PLATFORM_OWNER_USERNAME,
3819            boot_ts_ms = %boot_ts_ms,
3820            "REDDB_POLICY_BREAK_GLASS=1 — installed/refreshed platform-owner unlock policy and rebound to synthetic principal"
3821        );
3822        Ok(())
3823    }
3824
3825    /// Attach a policy to a user or group. Returns an error if the
3826    /// policy id doesn't exist.
3827    pub fn attach_policy(&self, principal: PrincipalRef, policy_id: &str) -> Result<(), AuthError> {
3828        if !self
3829            .policies
3830            .read()
3831            .map(|m| m.contains_key(policy_id))
3832            .unwrap_or(false)
3833        {
3834            return Err(AuthError::Forbidden(format!(
3835                "policy `{policy_id}` not found"
3836            )));
3837        }
3838        match &principal {
3839            PrincipalRef::User(uid) => {
3840                let mut ua = self
3841                    .user_attachments
3842                    .write()
3843                    .unwrap_or_else(|e| e.into_inner());
3844                let list = ua.entry(uid.clone()).or_default();
3845                if !list.iter().any(|p| p == policy_id) {
3846                    list.push(policy_id.to_string());
3847                }
3848                drop(ua);
3849                self.invalidate_iam_cache(Some(uid));
3850            }
3851            PrincipalRef::Group(g) => {
3852                let mut ga = self
3853                    .group_attachments
3854                    .write()
3855                    .unwrap_or_else(|e| e.into_inner());
3856                let list = ga.entry(g.clone()).or_default();
3857                if !list.iter().any(|p| p == policy_id) {
3858                    list.push(policy_id.to_string());
3859                }
3860                drop(ga);
3861                self.invalidate_iam_cache(None);
3862            }
3863        }
3864        self.persist_iam_to_kv();
3865        Ok(())
3866    }
3867
3868    /// Remove a policy attachment from a user or group.
3869    pub fn detach_policy(&self, principal: PrincipalRef, policy_id: &str) -> Result<(), AuthError> {
3870        match &principal {
3871            PrincipalRef::User(uid) => {
3872                if let Ok(mut ua) = self.user_attachments.write() {
3873                    if let Some(list) = ua.get_mut(uid) {
3874                        list.retain(|p| p != policy_id);
3875                        if list.is_empty() {
3876                            ua.remove(uid);
3877                        }
3878                    }
3879                }
3880                self.invalidate_iam_cache(Some(uid));
3881            }
3882            PrincipalRef::Group(g) => {
3883                if let Ok(mut ga) = self.group_attachments.write() {
3884                    if let Some(list) = ga.get_mut(g) {
3885                        list.retain(|p| p != policy_id);
3886                        if list.is_empty() {
3887                            ga.remove(g);
3888                        }
3889                    }
3890                }
3891                self.invalidate_iam_cache(None);
3892            }
3893        }
3894        self.persist_iam_to_kv();
3895        Ok(())
3896    }
3897
3898    /// Resolve the ordered list of effective policies for a user:
3899    /// group attachments first (least specific), then user
3900    /// attachments (most specific). Cached per user.
3901    pub fn effective_policies(&self, user: &UserId) -> Vec<Arc<Policy>> {
3902        if let Ok(cache) = self.iam_effective_cache.read() {
3903            if let Some(hit) = cache.get(user) {
3904                return hit.clone();
3905            }
3906        }
3907        let policies = self.policies.read();
3908        let user_attachments = self.user_attachments.read();
3909        let group_attachments = self.group_attachments.read();
3910        let mut groups = self
3911            .user_attributes
3912            .read()
3913            .ok()
3914            .and_then(|m| m.get(user).map(|attrs| attrs.groups.clone()))
3915            .unwrap_or_default();
3916        groups.insert(0, PUBLIC_IAM_GROUP.to_string());
3917        let mut out: Vec<Arc<Policy>> = Vec::new();
3918        if let (Ok(p_map), Ok(ua_map), Ok(ga_map)) = (policies, user_attachments, group_attachments)
3919        {
3920            for group in groups {
3921                if let Some(ids) = ga_map.get(&group) {
3922                    for id in ids {
3923                        if let Some(p) = p_map.get(id) {
3924                            out.push(p.clone());
3925                        }
3926                    }
3927                }
3928            }
3929            if let Some(ids) = ua_map.get(user) {
3930                for id in ids {
3931                    if let Some(p) = p_map.get(id) {
3932                        out.push(p.clone());
3933                    }
3934                }
3935            }
3936        }
3937        if let Ok(mut cache) = self.iam_effective_cache.write() {
3938            cache.insert(user.clone(), out.clone());
3939        }
3940        out
3941    }
3942
3943    /// Run the policy simulator for a principal. Synthesises an
3944    /// `EvalContext` from the user record + caller-supplied extras.
3945    pub fn simulate(
3946        &self,
3947        principal: &UserId,
3948        action: &str,
3949        resource: &ResourceRef,
3950        ctx_extras: SimCtx,
3951    ) -> SimulationOutcome {
3952        let user_role = self
3953            .users
3954            .read()
3955            .ok()
3956            .and_then(|u| u.get(principal).map(|u| u.role));
3957        let principal_is_admin_role = user_role == Some(Role::Admin);
3958        let now = ctx_extras.now_ms.unwrap_or_else(now_ms);
3959        let ctx = EvalContext {
3960            principal_tenant: principal.tenant.clone(),
3961            current_tenant: ctx_extras.current_tenant,
3962            peer_ip: ctx_extras.peer_ip,
3963            mfa_present: ctx_extras.mfa_present,
3964            now_ms: now,
3965            principal_is_admin_role,
3966            principal_is_platform_scoped: principal.tenant.is_none(),
3967        };
3968        let pols = self.effective_policies(principal);
3969        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
3970        iam_policies::simulate(&refs, action, resource, &ctx)
3971    }
3972
3973    /// Production hot-path policy evaluation. Returns `true` on Allow
3974    /// / AdminBypass, `false` on Deny / DefaultDeny.
3975    ///
3976    /// This entry point is **strict**: it never consults the
3977    /// [`PolicyEnforcementMode`] fallback. Governance APIs that gate
3978    /// admin-tier mutations (managed-config writes, registry
3979    /// supersedes, managed-policy lifecycle) call this so they cannot
3980    /// accidentally pick up the lenient `LegacyRbac` posture. Runtime
3981    /// hot-path callers that should respect the mode call
3982    /// [`AuthStore::check_policy_authz_with_role`] instead.
3983    pub fn check_policy_authz(
3984        &self,
3985        principal: &UserId,
3986        action: &str,
3987        resource: &ResourceRef,
3988        ctx: &EvalContext,
3989    ) -> bool {
3990        let pols = self.effective_policies(principal);
3991        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
3992        let decision = iam_policies::evaluate(&refs, action, resource, ctx);
3993        matches!(
3994            decision,
3995            iam_policies::Decision::Allow { .. } | iam_policies::Decision::AdminBypass
3996        )
3997    }
3998
3999    /// Mode-aware policy evaluation for runtime SQL/HTTP/wire surfaces.
4000    ///
4001    /// Returns `true` on `Allow`/`AdminBypass`, `false` on an explicit
4002    /// `Deny`. On `DefaultDeny` (no statement matched) the result
4003    /// depends on the active [`PolicyEnforcementMode`]:
4004    ///
4005    /// * `LegacyRbac` — defer to
4006    ///   [`legacy_rbac_decision`][super::enforcement_mode::legacy_rbac_decision]
4007    ///   using the caller's `role`. This preserves the pre-#712
4008    ///   behaviour where a principal with no attached policy is gated
4009    ///   only by their role.
4010    /// * `PolicyOnly` — return `false`. A principal needs an explicit
4011    ///   matching `Allow` to be authorized.
4012    ///
4013    /// An explicit `Deny` always wins, irrespective of mode and role.
4014    pub fn check_policy_authz_with_role(
4015        &self,
4016        principal: &UserId,
4017        action: &str,
4018        resource: &ResourceRef,
4019        ctx: &EvalContext,
4020        role: Role,
4021    ) -> bool {
4022        let pols = self.effective_policies(principal);
4023        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
4024        let decision = iam_policies::evaluate(&refs, action, resource, ctx);
4025        match decision {
4026            iam_policies::Decision::Allow { .. } | iam_policies::Decision::AdminBypass => true,
4027            iam_policies::Decision::Deny { .. } => false,
4028            iam_policies::Decision::DefaultDeny => match self.enforcement_mode() {
4029                PolicyEnforcementMode::LegacyRbac => legacy_rbac_decision(role, action),
4030                PolicyEnforcementMode::PolicyOnly => false,
4031            },
4032        }
4033    }
4034
4035    // -----------------------------------------------------------------
4036    // PolicyEnforcementMode (#712)
4037    // -----------------------------------------------------------------
4038
4039    /// Read the active enforcement mode. Cheap (a single `RwLock` read);
4040    /// safe to call on the hot path.
4041    pub fn enforcement_mode(&self) -> PolicyEnforcementMode {
4042        *self
4043            .enforcement_mode
4044            .read()
4045            .unwrap_or_else(|e| e.into_inner())
4046    }
4047
4048    /// Overwrite the active enforcement mode and persist it to the
4049    /// vault KV so the new value survives a restart. Returns the
4050    /// previous value so callers logging a transition (e.g. the
4051    /// boot-time loader, the S5B migration command) can record the
4052    /// before/after.
4053    pub fn set_enforcement_mode(&self, mode: PolicyEnforcementMode) -> PolicyEnforcementMode {
4054        let prev = {
4055            let mut guard = self
4056                .enforcement_mode
4057                .write()
4058                .unwrap_or_else(|e| e.into_inner());
4059            let prev = *guard;
4060            *guard = mode;
4061            prev
4062        };
4063        self.vault_kv_set(
4064            "red.config.policy.enforcement_mode".to_string(),
4065            mode.as_str().to_string(),
4066        );
4067        prev
4068    }
4069
4070    /// Claim the "emit the one-time legacy-RBAC boot warning" token.
4071    /// Returns `true` on the first call after construction (or after a
4072    /// process restart) **iff** the active mode is `LegacyRbac`;
4073    /// returns `false` on every subsequent call so the boot path can
4074    /// guarantee the warning is logged at most once per boot.
4075    pub fn take_legacy_rbac_warn_once(&self) -> bool {
4076        if self.enforcement_mode() != PolicyEnforcementMode::LegacyRbac {
4077            return false;
4078        }
4079        !self
4080            .legacy_rbac_boot_warn_emitted
4081            .swap(true, Ordering::AcqRel)
4082    }
4083
4084    /// Evaluate a resolved table projection through the column policy
4085    /// gate. Query paths should pass already-resolved column names; this
4086    /// helper intentionally does not parse SQL projection syntax.
4087    pub fn check_column_projection_authz(
4088        &self,
4089        principal: &UserId,
4090        request: &ColumnAccessRequest,
4091        ctx: &EvalContext,
4092    ) -> ColumnPolicyOutcome {
4093        let pols = self.effective_policies(principal);
4094        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
4095        ColumnPolicyGate::new(&refs).evaluate(request, ctx)
4096    }
4097
4098    fn invalidate_iam_cache(&self, uid: Option<&UserId>) {
4099        if let Ok(mut cache) = self.iam_effective_cache.write() {
4100            match uid {
4101                Some(u) => {
4102                    cache.remove(u);
4103                }
4104                None => cache.clear(),
4105            }
4106        }
4107    }
4108
4109    /// Drop every effective-policy cache entry. Called from execution
4110    /// paths that mutate policies/attachments without knowing which
4111    /// users will be affected.
4112    pub fn invalidate_all_iam_cache(&self) {
4113        self.invalidate_iam_cache(None);
4114    }
4115
4116    // -----------------------------------------------------------------
4117    // IAM persistence — vault_kv backed under `red.iam.*` keys
4118    // -----------------------------------------------------------------
4119
4120    /// Reload IAM state (policies + attachments) from the vault KV.
4121    /// Replaces the legacy `rehydrate_acl` reader — pre-1.0 we drop
4122    /// the old `red.acl.*` blob format entirely.
4123    pub fn rehydrate_iam(&self) {
4124        let mut enabled = self
4125            .vault_kv_get("red.iam.enabled")
4126            .map(|v| v == "true")
4127            .unwrap_or(false);
4128        // Policies — single JSON object keyed by id.
4129        if let Some(blob) = self.vault_kv_get("red.iam.policies") {
4130            if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
4131                if let Some(obj) = val.as_object() {
4132                    let mut map = HashMap::new();
4133                    for (id, body) in obj.iter() {
4134                        let s = body.to_string_compact();
4135                        if let Ok(p) = Policy::from_json_str(&s) {
4136                            map.insert(id.clone(), Arc::new(p));
4137                        }
4138                    }
4139                    if !map.is_empty() {
4140                        enabled = true;
4141                    }
4142                    *self.policies.write().unwrap_or_else(|e| e.into_inner()) = map;
4143                }
4144            }
4145        }
4146        // User attachments.
4147        if let Some(blob) = self.vault_kv_get("red.iam.attachments.users") {
4148            if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
4149                if let Some(obj) = val.as_object() {
4150                    let mut map: HashMap<UserId, Vec<String>> = HashMap::new();
4151                    for (encoded_uid, ids_v) in obj.iter() {
4152                        let Some(uid) = decode_uid(encoded_uid) else {
4153                            continue;
4154                        };
4155                        if let Some(arr) = ids_v.as_array() {
4156                            let ids: Vec<String> = arr
4157                                .iter()
4158                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
4159                                .collect();
4160                            map.insert(uid, ids);
4161                        }
4162                    }
4163                    *self
4164                        .user_attachments
4165                        .write()
4166                        .unwrap_or_else(|e| e.into_inner()) = map;
4167                }
4168            }
4169        }
4170        // Group attachments.
4171        if let Some(blob) = self.vault_kv_get("red.iam.attachments.groups") {
4172            if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
4173                if let Some(obj) = val.as_object() {
4174                    let mut map: HashMap<String, Vec<String>> = HashMap::new();
4175                    for (g, ids_v) in obj.iter() {
4176                        if let Some(arr) = ids_v.as_array() {
4177                            let ids: Vec<String> = arr
4178                                .iter()
4179                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
4180                                .collect();
4181                            map.insert(g.clone(), ids);
4182                        }
4183                    }
4184                    *self
4185                        .group_attachments
4186                        .write()
4187                        .unwrap_or_else(|e| e.into_inner()) = map;
4188                }
4189            }
4190        }
4191        self.iam_authorization_enabled
4192            .store(enabled, Ordering::Release);
4193        self.invalidate_iam_cache(None);
4194
4195        // #712 / S5A: load persisted enforcement mode, if any. Absence
4196        // of the key means an existing install upgrading past #712 —
4197        // stay on the lenient default written at construction time.
4198        // Fresh installs receive `PolicyOnly` via the bootstrap path
4199        // (`mark_bootstrap_complete`) before the first restart, so this
4200        // hydrate path picks it up on every subsequent boot.
4201        if let Some(stored) = self.vault_kv_get("red.config.policy.enforcement_mode") {
4202            if let Some(mode) = PolicyEnforcementMode::parse(&stored) {
4203                *self
4204                    .enforcement_mode
4205                    .write()
4206                    .unwrap_or_else(|e| e.into_inner()) = mode;
4207            }
4208        }
4209
4210        // #712 / S5A: legacy_rbac is a transitional posture; emit one
4211        // boot-time warning so operators see the migration nudge in
4212        // their logs. The flag inside take_legacy_rbac_warn_once
4213        // guarantees we don't spam reload cycles.
4214        if self.take_legacy_rbac_warn_once() {
4215            tracing::warn!(
4216                target: "reddb::auth::enforcement_mode",
4217                mode = "legacy_rbac",
4218                hard_version = crate::auth::enforcement_mode::POLICY_ONLY_HARD_VERSION,
4219                "policy enforcement_mode=legacy_rbac (transitional). Run \
4220                 `MIGRATE POLICY MODE TO 'policy_only'` (next slice S5B) \
4221                 before {} to flip this install over to the strict posture.",
4222                crate::auth::enforcement_mode::POLICY_ONLY_HARD_VERSION,
4223            );
4224        }
4225    }
4226
4227    /// Snapshot policies + attachments into the vault KV. Called
4228    /// after every mutation.
4229    fn persist_iam_to_kv(&self) {
4230        let enabled = if self.iam_authorization_enabled() {
4231            "true"
4232        } else {
4233            "false"
4234        };
4235        self.vault_kv_set("red.iam.enabled".to_string(), enabled.to_string());
4236
4237        // Policies: `{ "<id>": <policy_json>, ... }`
4238        let policies_obj = {
4239            let map = self.policies.read().unwrap_or_else(|e| e.into_inner());
4240            let mut obj = crate::serde_json::Map::new();
4241            for (id, p) in map.iter() {
4242                let s = p.to_json_string();
4243                if let Ok(v) = crate::serde_json::from_str::<crate::serde_json::Value>(&s) {
4244                    obj.insert(id.clone(), v);
4245                }
4246            }
4247            crate::serde_json::Value::Object(obj).to_string_compact()
4248        };
4249        self.vault_kv_set("red.iam.policies".to_string(), policies_obj);
4250
4251        // User attachments: `{ "<encoded_uid>": [ "<policy_id>", ... ], ... }`
4252        let users_obj = {
4253            let map = self
4254                .user_attachments
4255                .read()
4256                .unwrap_or_else(|e| e.into_inner());
4257            let mut obj = crate::serde_json::Map::new();
4258            for (uid, ids) in map.iter() {
4259                let arr = crate::serde_json::Value::Array(
4260                    ids.iter()
4261                        .map(|s| crate::serde_json::Value::String(s.clone()))
4262                        .collect(),
4263                );
4264                obj.insert(encode_uid(uid), arr);
4265            }
4266            crate::serde_json::Value::Object(obj).to_string_compact()
4267        };
4268        self.vault_kv_set("red.iam.attachments.users".to_string(), users_obj);
4269
4270        // Group attachments.
4271        let groups_obj = {
4272            let map = self
4273                .group_attachments
4274                .read()
4275                .unwrap_or_else(|e| e.into_inner());
4276            let mut obj = crate::serde_json::Map::new();
4277            for (g, ids) in map.iter() {
4278                let arr = crate::serde_json::Value::Array(
4279                    ids.iter()
4280                        .map(|s| crate::serde_json::Value::String(s.clone()))
4281                        .collect(),
4282                );
4283                obj.insert(g.clone(), arr);
4284            }
4285            crate::serde_json::Value::Object(obj).to_string_compact()
4286        };
4287        self.vault_kv_set("red.iam.attachments.groups".to_string(), groups_obj);
4288    }
4289}
4290
4291fn synthetic_grant_matches(policy: &Policy, resource: &Resource, actions: &[Action]) -> bool {
4292    policy.statements.iter().any(|st| {
4293        st.effect == crate::auth::policies::Effect::Allow
4294            && st.condition.is_none()
4295            && grant_actions_overlap(&st.actions, actions)
4296            && grant_resource_matches(&st.resources, resource)
4297    })
4298}
4299
4300fn grant_actions_overlap(
4301    patterns: &[crate::auth::policies::ActionPattern],
4302    actions: &[Action],
4303) -> bool {
4304    if actions.contains(&Action::All) {
4305        return true;
4306    }
4307    patterns.iter().any(|pat| match pat {
4308        crate::auth::policies::ActionPattern::Wildcard => true,
4309        crate::auth::policies::ActionPattern::Exact(s) => {
4310            actions.iter().any(|a| s.eq_ignore_ascii_case(a.as_str()))
4311        }
4312        crate::auth::policies::ActionPattern::Prefix(_) => false,
4313    })
4314}
4315
4316fn grant_resource_matches(
4317    patterns: &[crate::auth::policies::ResourcePattern],
4318    resource: &Resource,
4319) -> bool {
4320    let expected = grant_resource_pattern(resource);
4321    patterns.iter().any(|pat| pat == &expected)
4322}
4323
4324fn grant_resource_pattern(resource: &Resource) -> crate::auth::policies::ResourcePattern {
4325    use crate::auth::policies::ResourcePattern;
4326
4327    match resource {
4328        Resource::Database => ResourcePattern::Glob("table:*".to_string()),
4329        Resource::Schema(s) => ResourcePattern::Glob(format!("table:{s}.*")),
4330        Resource::Table { schema, table } => ResourcePattern::Exact {
4331            kind: "table".to_string(),
4332            name: match schema {
4333                Some(s) => format!("{s}.{table}"),
4334                None => table.clone(),
4335            },
4336        },
4337        Resource::Function { schema, name } => ResourcePattern::Exact {
4338            kind: "function".to_string(),
4339            name: match schema {
4340                Some(s) => format!("{s}.{name}"),
4341                None => name.clone(),
4342            },
4343        },
4344    }
4345}
4346
4347// ===========================================================================
4348// ACL serialization helpers — line-oriented, human-readable so an
4349// operator inspecting the vault dump can spot misconfigurations.
4350//
4351// Format (one record per line):
4352//   GRANT|<resource>|<actions_csv>|<with_grant_option>|<tenant_or_*>|<granted_by>|<granted_at>
4353//   ATTR|<valid_until>|<connection_limit>|<search_path>
4354//
4355// Resources are encoded as:
4356//   db                          → Database
4357//   schema:<name>               → Schema(name)
4358//   table:<schema_or_*>:<name>  → Table { schema, table }
4359//   func:<schema_or_*>:<name>   → Function { schema, name }
4360// ===========================================================================
4361
4362fn encode_uid(uid: &UserId) -> String {
4363    match &uid.tenant {
4364        Some(t) => format!("{}/{}", t, uid.username),
4365        None => format!("*/{}", uid.username),
4366    }
4367}
4368
4369fn decode_uid(s: &str) -> Option<UserId> {
4370    let (tenant, username) = s.split_once('/')?;
4371    Some(if tenant == "*" {
4372        UserId::platform(username)
4373    } else {
4374        UserId::scoped(tenant, username)
4375    })
4376}
4377
4378fn encode_resource(r: &Resource) -> String {
4379    match r {
4380        Resource::Database => "db".into(),
4381        Resource::Schema(s) => format!("schema:{}", s),
4382        Resource::Table { schema, table } => {
4383            format!("table:{}:{}", schema.as_deref().unwrap_or("*"), table)
4384        }
4385        Resource::Function { schema, name } => {
4386            format!("func:{}:{}", schema.as_deref().unwrap_or("*"), name)
4387        }
4388    }
4389}
4390
4391fn decode_resource(s: &str) -> Option<Resource> {
4392    if s == "db" {
4393        return Some(Resource::Database);
4394    }
4395    if let Some(rest) = s.strip_prefix("schema:") {
4396        return Some(Resource::Schema(rest.to_string()));
4397    }
4398    if let Some(rest) = s.strip_prefix("table:") {
4399        let (schema, table) = rest.split_once(':')?;
4400        return Some(Resource::Table {
4401            schema: if schema == "*" {
4402                None
4403            } else {
4404                Some(schema.to_string())
4405            },
4406            table: table.to_string(),
4407        });
4408    }
4409    if let Some(rest) = s.strip_prefix("func:") {
4410        let (schema, name) = rest.split_once(':')?;
4411        return Some(Resource::Function {
4412            schema: if schema == "*" {
4413                None
4414            } else {
4415                Some(schema.to_string())
4416            },
4417            name: name.to_string(),
4418        });
4419    }
4420    None
4421}
4422
4423fn encode_grants_blob(grants: &[Grant]) -> String {
4424    let mut out = String::new();
4425    for g in grants {
4426        let actions: Vec<&str> = g.actions.iter().map(|a| a.as_str()).collect();
4427        out.push_str(&format!(
4428            "GRANT|{}|{}|{}|{}|{}|{}\n",
4429            encode_resource(&g.resource),
4430            actions.join(","),
4431            g.with_grant_option,
4432            g.tenant.as_deref().unwrap_or("*"),
4433            g.granted_by,
4434            g.granted_at,
4435        ));
4436    }
4437    out
4438}
4439
4440fn decode_grants_blob(s: &str) -> Option<Vec<Grant>> {
4441    let mut out = Vec::new();
4442    for line in s.lines() {
4443        if line.is_empty() {
4444            continue;
4445        }
4446        let parts: Vec<&str> = line.split('|').collect();
4447        if parts.len() != 7 || parts[0] != "GRANT" {
4448            return None;
4449        }
4450        let resource = decode_resource(parts[1])?;
4451        let mut actions = std::collections::BTreeSet::new();
4452        for token in parts[2].split(',') {
4453            if let Some(a) = Action::from_keyword(token) {
4454                actions.insert(a);
4455            }
4456        }
4457        let with_grant_option = parts[3] == "true";
4458        let tenant = if parts[4] == "*" {
4459            None
4460        } else {
4461            Some(parts[4].to_string())
4462        };
4463        let granted_by = parts[5].to_string();
4464        let granted_at: u128 = parts[6].parse().unwrap_or(0);
4465        out.push(Grant {
4466            // Principal field is reconstructed by the loader from the
4467            // storage-key prefix; default to `Public` here.
4468            principal: GrantPrincipal::Public,
4469            resource,
4470            actions,
4471            with_grant_option,
4472            granted_by,
4473            granted_at,
4474            tenant,
4475            columns: None,
4476        });
4477    }
4478    Some(out)
4479}
4480
4481fn encode_attrs_blob(a: &UserAttributes) -> String {
4482    let valid = a
4483        .valid_until
4484        .map(|t| t.to_string())
4485        .unwrap_or_else(|| "*".into());
4486    let limit = a
4487        .connection_limit
4488        .map(|l| l.to_string())
4489        .unwrap_or_else(|| "*".into());
4490    let path = a.search_path.clone().unwrap_or_else(|| "*".into());
4491    let groups = if a.groups.is_empty() {
4492        "*".to_string()
4493    } else {
4494        a.groups.join(",")
4495    };
4496    format!("ATTR|{}|{}|{}|{}\n", valid, limit, path, groups)
4497}
4498
4499fn decode_attrs_blob(s: &str) -> Option<UserAttributes> {
4500    let line = s.lines().next()?;
4501    let parts: Vec<&str> = line.split('|').collect();
4502    if !(parts.len() == 4 || parts.len() == 5) || parts[0] != "ATTR" {
4503        return None;
4504    }
4505    let groups = if parts.get(4).copied().unwrap_or("*") == "*" {
4506        Vec::new()
4507    } else {
4508        parts[4]
4509            .split(',')
4510            .filter(|g| !g.is_empty())
4511            .map(|g| g.to_string())
4512            .collect()
4513    };
4514    Some(UserAttributes {
4515        valid_until: if parts[1] == "*" {
4516            None
4517        } else {
4518            parts[1].parse().ok()
4519        },
4520        connection_limit: if parts[2] == "*" {
4521            None
4522        } else {
4523            parts[2].parse().ok()
4524        },
4525        search_path: if parts[3] == "*" {
4526            None
4527        } else {
4528            Some(parts[3].to_string())
4529        },
4530        groups,
4531    })
4532}
4533
4534// ===========================================================================
4535// Password hashing
4536// ===========================================================================
4537
4538/// Derive a SCRAM-SHA-256 verifier for a fresh user / password
4539/// rotation. Salt is 16 random bytes; iter is the engine default
4540/// (`scram::DEFAULT_ITER`). Stored alongside the Argon2 password
4541/// hash so HTTP login + v2 SCRAM can both authenticate the same
4542/// user.
4543fn make_scram_verifier(password: &str) -> crate::auth::scram::ScramVerifier {
4544    let salt = random_bytes(16);
4545    crate::auth::scram::ScramVerifier::from_password(
4546        password,
4547        salt,
4548        crate::auth::scram::DEFAULT_ITER,
4549    )
4550}
4551
4552/// Hash a password using Argon2id.
4553///
4554/// Format: `argon2id$<salt_hex>$<hash_hex>`
4555pub(crate) fn hash_password(password: &str) -> String {
4556    let salt = random_bytes(16);
4557    let params = auth_argon2_params();
4558    let hash = derive_key(password.as_bytes(), &salt, &params);
4559    format!("argon2id${}${}", hex::encode(&salt), hex::encode(&hash))
4560}
4561
4562/// Verify a password against a stored `argon2id$<salt>$<hash>` string.
4563pub(crate) fn verify_password(password: &str, stored_hash: &str) -> bool {
4564    let parts: Vec<&str> = stored_hash.splitn(3, '$').collect();
4565    if parts.len() != 3 || parts[0] != "argon2id" {
4566        return false;
4567    }
4568
4569    let salt = match hex::decode(parts[1]) {
4570        Ok(s) => s,
4571        Err(_) => return false,
4572    };
4573
4574    let expected_hash = match hex::decode(parts[2]) {
4575        Ok(h) => h,
4576        Err(_) => return false,
4577    };
4578
4579    let params = auth_argon2_params();
4580    let computed = derive_key(password.as_bytes(), &salt, &params);
4581    constant_time_eq(&computed, &expected_hash)
4582}
4583
4584/// Constant-time byte comparison to avoid timing side-channels.
4585fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
4586    if a.len() != b.len() {
4587        return false;
4588    }
4589    let mut diff: u8 = 0;
4590    for (x, y) in a.iter().zip(b.iter()) {
4591        diff |= x ^ y;
4592    }
4593    diff == 0
4594}
4595
4596// ===========================================================================
4597// Token generation
4598// ===========================================================================
4599
4600fn generate_session_token() -> String {
4601    format!("rs_{}", hex::encode(random_bytes(32)))
4602}
4603
4604fn generate_api_key() -> String {
4605    format!("rk_{}", hex::encode(random_bytes(32)))
4606}
4607
4608/// Generate `n` random bytes and return as a hex string.
4609fn random_hex(n: usize) -> String {
4610    hex::encode(random_bytes(n))
4611}
4612
4613/// Generate `n` cryptographically random bytes using the OS CSPRNG,
4614/// then mix with SHA-256 for domain separation.
4615pub(crate) fn random_bytes(n: usize) -> Vec<u8> {
4616    let mut buf = vec![0u8; n.max(32)];
4617    if os_random::fill_bytes(&mut buf).is_err() {
4618        // Fallback: use system time and pointers as entropy (best-effort).
4619        let seed = now_ms().to_le_bytes();
4620        for (i, byte) in buf.iter_mut().enumerate() {
4621            *byte = seed[i % seed.len()] ^ (i as u8);
4622        }
4623    }
4624    // SHA-256 mix to ensure uniform distribution.
4625    let digest = sha256(&buf);
4626    if n <= 32 {
4627        digest[..n].to_vec()
4628    } else {
4629        // Chain SHA-256 for longer outputs (unusual but supported).
4630        let mut out = Vec::with_capacity(n);
4631        let mut prev = digest;
4632        while out.len() < n {
4633            out.extend_from_slice(&prev[..std::cmp::min(32, n - out.len())]);
4634            prev = sha256(&prev);
4635        }
4636        out
4637    }
4638}
4639
4640// ===========================================================================
4641// Helpers
4642// ===========================================================================
4643
4644fn lock_err<T>(_: T) -> AuthError {
4645    AuthError::Internal("lock poisoned".to_string())
4646}
4647
4648// ===========================================================================
4649// Tests
4650// ===========================================================================
4651
4652#[cfg(test)]
4653mod tests {
4654    use super::*;
4655
4656    fn test_config() -> AuthConfig {
4657        AuthConfig {
4658            enabled: true,
4659            session_ttl_secs: 60,
4660            require_auth: true,
4661            auto_encrypt_storage: false,
4662            vault_enabled: false,
4663            cert: Default::default(),
4664            oauth: Default::default(),
4665        }
4666    }
4667
4668    #[test]
4669    fn test_create_and_list_users() {
4670        let store = AuthStore::new(test_config());
4671        store.create_user("alice", "pass1", Role::Admin).unwrap();
4672        store.create_user("bob", "pass2", Role::Read).unwrap();
4673
4674        let users = store.list_users();
4675        assert_eq!(users.len(), 2);
4676        // Password hashes should be redacted.
4677        for u in &users {
4678            assert!(u.password_hash.is_empty());
4679        }
4680    }
4681
4682    #[test]
4683    fn test_create_duplicate_user() {
4684        let store = AuthStore::new(test_config());
4685        store.create_user("alice", "pass", Role::Admin).unwrap();
4686        let err = store.create_user("alice", "pass2", Role::Read).unwrap_err();
4687        assert!(matches!(err, AuthError::UserExists(_)));
4688    }
4689
4690    #[test]
4691    fn test_authenticate_and_validate() {
4692        let store = AuthStore::new(test_config());
4693        store.create_user("alice", "secret", Role::Write).unwrap();
4694
4695        let session = store.authenticate("alice", "secret").unwrap();
4696        assert!(session.token.starts_with("rs_"));
4697
4698        let (username, role) = store.validate_token(&session.token).unwrap();
4699        assert_eq!(username, "alice");
4700        assert_eq!(role, Role::Write);
4701    }
4702
4703    #[test]
4704    fn test_authenticate_wrong_password() {
4705        let store = AuthStore::new(test_config());
4706        store.create_user("alice", "secret", Role::Read).unwrap();
4707
4708        let err = store.authenticate("alice", "wrong").unwrap_err();
4709        assert!(matches!(err, AuthError::InvalidCredentials));
4710    }
4711
4712    #[test]
4713    fn test_api_key_lifecycle() {
4714        let store = AuthStore::new(test_config());
4715        store.create_user("alice", "pass", Role::Admin).unwrap();
4716
4717        let key = store
4718            .create_api_key("alice", "ci-token", Role::Write)
4719            .unwrap();
4720        assert!(key.key.starts_with("rk_"));
4721
4722        let (username, role) = store.validate_token(&key.key).unwrap();
4723        assert_eq!(username, "alice");
4724        assert_eq!(role, Role::Write);
4725
4726        store.revoke_api_key(&key.key).unwrap();
4727        assert!(store.validate_token(&key.key).is_none());
4728    }
4729
4730    #[test]
4731    fn test_api_key_role_exceeded() {
4732        let store = AuthStore::new(test_config());
4733        store.create_user("bob", "pass", Role::Read).unwrap();
4734
4735        let err = store
4736            .create_api_key("bob", "escalate", Role::Admin)
4737            .unwrap_err();
4738        assert!(matches!(err, AuthError::RoleExceeded { .. }));
4739    }
4740
4741    #[test]
4742    fn test_change_password() {
4743        let store = AuthStore::new(test_config());
4744        store.create_user("alice", "old", Role::Write).unwrap();
4745
4746        store.change_password("alice", "old", "new").unwrap();
4747
4748        // Old password should fail.
4749        assert!(store.authenticate("alice", "old").is_err());
4750        // New password should succeed.
4751        assert!(store.authenticate("alice", "new").is_ok());
4752    }
4753
4754    #[test]
4755    fn test_change_role() {
4756        let store = AuthStore::new(test_config());
4757        store.create_user("alice", "pass", Role::Admin).unwrap();
4758        store.create_api_key("alice", "key1", Role::Admin).unwrap();
4759
4760        store.change_role("alice", Role::Read).unwrap();
4761
4762        // User's role should be Read now.
4763        let users = store.list_users();
4764        let alice = users.iter().find(|u| u.username == "alice").unwrap();
4765        assert_eq!(alice.role, Role::Read);
4766
4767        // API keys should have been downgraded.
4768        assert_eq!(alice.api_keys[0].role, Role::Read);
4769    }
4770
4771    #[test]
4772    fn test_admin_user_mutations_are_policy_controlled_not_flag_guarded() {
4773        let store = AuthStore::new(test_config());
4774        store
4775            .create_admin_user("system", "pass", Role::Admin, None)
4776            .unwrap();
4777
4778        let uid = UserId::platform("system");
4779        store.change_password("system", "pass", "new").unwrap();
4780        store.change_role("system", Role::Read).unwrap();
4781        store.set_user_enabled(&uid, false).unwrap();
4782
4783        let key = store
4784            .create_api_key("system", "rotation", Role::Read)
4785            .unwrap();
4786        assert!(store.validate_token(&key.key).is_some());
4787        store.revoke_api_key(&key.key).unwrap();
4788        assert!(store.validate_token(&key.key).is_none());
4789        store.delete_user("system").unwrap();
4790    }
4791
4792    #[test]
4793    fn test_user_lifecycle_deny_policy_blocks_cloud_admin_deletion() {
4794        use crate::auth::policies::Policy;
4795
4796        let store = AuthStore::new(test_config());
4797        store
4798            .create_user("cloud-admin", "pass", Role::Admin)
4799            .unwrap();
4800        store
4801            .create_user("customer-admin", "pass", Role::Admin)
4802            .unwrap();
4803
4804        store
4805            .put_policy(
4806                Policy::from_json_str(
4807                    r#"{
4808                        "id": "customer-admin-allow-all",
4809                        "version": 1,
4810                        "statements": [{
4811                            "effect": "allow",
4812                            "actions": ["*"],
4813                            "resources": ["*"]
4814                        }]
4815                    }"#,
4816                )
4817                .unwrap(),
4818            )
4819            .unwrap();
4820        store
4821            .put_policy(
4822                Policy::from_json_str(
4823                    r#"{
4824                        "id": "cloud-admin-protection",
4825                        "version": 1,
4826                        "statements": [{
4827                            "effect": "deny",
4828                            "actions": [
4829                                "user:delete",
4830                                "user:disable",
4831                                "user:password:change",
4832                                "user:role:update"
4833                            ],
4834                            "resources": ["user:cloud-admin"]
4835                        }]
4836                    }"#,
4837                )
4838                .unwrap(),
4839            )
4840            .unwrap();
4841        let customer = UserId::platform("customer-admin");
4842        store
4843            .attach_policy(
4844                PrincipalRef::User(customer.clone()),
4845                "customer-admin-allow-all",
4846            )
4847            .unwrap();
4848        store
4849            .attach_policy(
4850                PrincipalRef::User(customer.clone()),
4851                "cloud-admin-protection",
4852            )
4853            .unwrap();
4854
4855        let cloud_admin = UserId::platform("cloud-admin");
4856        assert!(!store.check_user_lifecycle_authz(
4857            &customer,
4858            Role::Admin,
4859            "user:delete",
4860            &cloud_admin,
4861        ));
4862        assert!(!store.check_user_lifecycle_authz(
4863            &customer,
4864            Role::Admin,
4865            "user:disable",
4866            &cloud_admin,
4867        ));
4868        assert!(!store.check_user_lifecycle_authz(
4869            &customer,
4870            Role::Admin,
4871            "user:password:change",
4872            &cloud_admin,
4873        ));
4874        assert!(!store.check_user_lifecycle_authz(
4875            &customer,
4876            Role::Admin,
4877            "user:role:update",
4878            &cloud_admin,
4879        ));
4880
4881        let another_user = UserId::platform("someone-else");
4882        assert!(store.check_user_lifecycle_authz(
4883            &customer,
4884            Role::Admin,
4885            "user:delete",
4886            &another_user,
4887        ));
4888    }
4889
4890    #[test]
4891    fn test_regular_user_mutations_still_work() {
4892        let store = AuthStore::new(test_config());
4893        store.create_user("alice", "old", Role::Admin).unwrap();
4894
4895        let uid = UserId::platform("alice");
4896        store.set_user_enabled(&uid, false).unwrap();
4897        assert!(matches!(
4898            store.authenticate("alice", "old"),
4899            Err(AuthError::InvalidCredentials)
4900        ));
4901
4902        store.set_user_enabled(&uid, true).unwrap();
4903        store.change_password("alice", "old", "new").unwrap();
4904        store.change_role("alice", Role::Read).unwrap();
4905        store.delete_user("alice").unwrap();
4906        assert!(matches!(
4907            store.authenticate("alice", "new"),
4908            Err(AuthError::InvalidCredentials)
4909        ));
4910    }
4911
4912    #[test]
4913    fn test_delete_user() {
4914        let store = AuthStore::new(test_config());
4915        store.create_user("alice", "pass", Role::Admin).unwrap();
4916        let key = store.create_api_key("alice", "key1", Role::Read).unwrap();
4917        let session = store.authenticate("alice", "pass").unwrap();
4918
4919        store.delete_user("alice").unwrap();
4920
4921        assert!(store.validate_token(&key.key).is_none());
4922        assert!(store.validate_token(&session.token).is_none());
4923        assert!(store.list_users().is_empty());
4924    }
4925
4926    #[test]
4927    fn test_revoke_session() {
4928        let store = AuthStore::new(test_config());
4929        store.create_user("alice", "pass", Role::Read).unwrap();
4930        let session = store.authenticate("alice", "pass").unwrap();
4931
4932        store.revoke_session(&session.token);
4933        assert!(store.validate_token(&session.token).is_none());
4934    }
4935
4936    #[test]
4937    fn test_password_hash_format() {
4938        let hash = hash_password("test");
4939        assert!(hash.starts_with("argon2id$"));
4940        let parts: Vec<&str> = hash.splitn(3, '$').collect();
4941        assert_eq!(parts.len(), 3);
4942        // Salt is 16 bytes = 32 hex chars.
4943        assert_eq!(parts[1].len(), 32);
4944        // Hash is 32 bytes = 64 hex chars.
4945        assert_eq!(parts[2].len(), 64);
4946    }
4947
4948    #[test]
4949    fn test_constant_time_eq() {
4950        assert!(constant_time_eq(b"hello", b"hello"));
4951        assert!(!constant_time_eq(b"hello", b"world"));
4952        assert!(!constant_time_eq(b"short", b"longer"));
4953    }
4954
4955    #[test]
4956    fn test_bootstrap_seals_permanently() {
4957        let store = AuthStore::new(test_config());
4958
4959        assert!(store.needs_bootstrap());
4960        assert!(!store.is_bootstrapped());
4961
4962        // First bootstrap succeeds
4963        let result = store.bootstrap("admin", "secret");
4964        assert!(result.is_ok());
4965        let br = result.unwrap();
4966        assert_eq!(br.user.username, "admin");
4967        assert_eq!(br.user.role, Role::Admin);
4968        assert!(br.api_key.key.starts_with("rk_"));
4969        // No vault configured, so no certificate.
4970        assert!(br.certificate.is_none());
4971
4972        // Sealed now
4973        assert!(!store.needs_bootstrap());
4974        assert!(store.is_bootstrapped());
4975
4976        // Second bootstrap fails -- sealed permanently
4977        let result = store.bootstrap("admin2", "secret2");
4978        assert!(result.is_err());
4979        let err = result.unwrap_err();
4980        assert!(err.to_string().contains("sealed permanently"));
4981
4982        // Only 1 user exists (the first one)
4983        assert_eq!(store.list_users().len(), 1);
4984        assert_eq!(store.list_users()[0].username, "admin");
4985    }
4986
4987    #[test]
4988    fn test_bootstrap_after_manual_user_creation() {
4989        let store = AuthStore::new(test_config());
4990
4991        // Create a user manually first
4992        store.create_user("existing", "pass", Role::Read).unwrap();
4993
4994        // Bootstrap sees the seal hasn't been set but users exist
4995        // The atomic seal fires first, then the users check catches it
4996        assert!(!store.needs_bootstrap()); // users exist → false
4997    }
4998
4999    // ---------------------------------------------------------------
5000    // Tenant scoping
5001    // ---------------------------------------------------------------
5002
5003    #[test]
5004    fn test_same_username_two_tenants_distinct() {
5005        let store = AuthStore::new(test_config());
5006        store
5007            .create_user_in_tenant(Some("acme"), "alice", "pw-acme", Role::Write)
5008            .unwrap();
5009        store
5010            .create_user_in_tenant(Some("globex"), "alice", "pw-globex", Role::Read)
5011            .unwrap();
5012
5013        // Two distinct users.
5014        let users = store.list_users();
5015        assert_eq!(users.len(), 2);
5016
5017        // Each verifies its own password under its own tenant.
5018        assert!(store
5019            .authenticate_in_tenant(Some("acme"), "alice", "pw-acme")
5020            .is_ok());
5021        assert!(store
5022            .authenticate_in_tenant(Some("globex"), "alice", "pw-globex")
5023            .is_ok());
5024
5025        // Cross-tenant credentials are rejected.
5026        assert!(store
5027            .authenticate_in_tenant(Some("acme"), "alice", "pw-globex")
5028            .is_err());
5029        assert!(store
5030            .authenticate_in_tenant(Some("globex"), "alice", "pw-acme")
5031            .is_err());
5032    }
5033
5034    #[test]
5035    fn test_session_carries_tenant() {
5036        let store = AuthStore::new(test_config());
5037        store
5038            .create_user_in_tenant(Some("acme"), "alice", "pw", Role::Admin)
5039            .unwrap();
5040        let session = store
5041            .authenticate_in_tenant(Some("acme"), "alice", "pw")
5042            .unwrap();
5043        assert_eq!(session.tenant_id.as_deref(), Some("acme"));
5044
5045        let (id, role) = store.validate_token_full(&session.token).unwrap();
5046        assert_eq!(id.tenant.as_deref(), Some("acme"));
5047        assert_eq!(id.username, "alice");
5048        assert_eq!(role, Role::Admin);
5049    }
5050
5051    #[test]
5052    fn test_platform_user_has_no_tenant() {
5053        let store = AuthStore::new(test_config());
5054        store.create_user("admin", "pw", Role::Admin).unwrap();
5055        let session = store.authenticate("admin", "pw").unwrap();
5056        assert!(session.tenant_id.is_none());
5057
5058        let (id, _) = store.validate_token_full(&session.token).unwrap();
5059        assert!(id.tenant.is_none());
5060    }
5061
5062    #[test]
5063    fn test_lookup_scram_verifier_global_resolves_platform() {
5064        let store = AuthStore::new(test_config());
5065        store.create_user("admin", "pw", Role::Admin).unwrap();
5066        store
5067            .create_user_in_tenant(Some("acme"), "admin", "pw", Role::Admin)
5068            .unwrap();
5069
5070        // The global helper picks the platform-tenant user only.
5071        let v = store.lookup_scram_verifier_global("admin");
5072        assert!(v.is_some());
5073
5074        // The tenant-scoped user has its own verifier.
5075        let v_acme = store.lookup_scram_verifier(&UserId::scoped("acme", "admin"));
5076        assert!(v_acme.is_some());
5077
5078        // The two verifiers carry independent salts.
5079        assert_ne!(v.unwrap().salt, v_acme.unwrap().salt);
5080    }
5081
5082    #[test]
5083    fn test_delete_in_tenant_does_not_touch_other_tenant() {
5084        let store = AuthStore::new(test_config());
5085        store
5086            .create_user_in_tenant(Some("acme"), "alice", "pw", Role::Admin)
5087            .unwrap();
5088        store
5089            .create_user_in_tenant(Some("globex"), "alice", "pw", Role::Admin)
5090            .unwrap();
5091
5092        store.delete_user_in_tenant(Some("acme"), "alice").unwrap();
5093
5094        // Globex still alive.
5095        assert!(store
5096            .authenticate_in_tenant(Some("globex"), "alice", "pw")
5097            .is_ok());
5098        // Acme gone.
5099        assert!(store
5100            .authenticate_in_tenant(Some("acme"), "alice", "pw")
5101            .is_err());
5102    }
5103
5104    #[test]
5105    fn test_user_id_display() {
5106        assert_eq!(UserId::platform("admin").to_string(), "admin");
5107        assert_eq!(UserId::scoped("acme", "alice").to_string(), "acme/alice");
5108    }
5109
5110    // -----------------------------------------------------------------
5111    // PolicyEnforcementMode wiring (#712 / S5A)
5112    // -----------------------------------------------------------------
5113
5114    fn enforcement_eval_ctx(role: Role) -> EvalContext {
5115        EvalContext {
5116            principal_tenant: None,
5117            current_tenant: None,
5118            peer_ip: None,
5119            mfa_present: false,
5120            now_ms: 1_700_000_000_000,
5121            principal_is_admin_role: role == Role::Admin,
5122            principal_is_platform_scoped: true,
5123        }
5124    }
5125
5126    /// Helper: install a single unrelated allow policy so the IAM
5127    /// path is the authoritative one — without any installed policy
5128    /// the auth flow short-circuits before we ever consult the mode.
5129    fn install_unrelated_policy(store: &AuthStore) {
5130        store
5131            .put_policy(
5132                Policy::from_json_str(
5133                    r#"{"id":"p-unrelated","version":1,"statements":[{"effect":"allow","actions":["select"],"resources":["table:public.other"]}]}"#,
5134                )
5135                .unwrap(),
5136            )
5137            .unwrap();
5138    }
5139
5140    #[test]
5141    fn enforcement_mode_default_for_new_store_is_legacy_rbac() {
5142        // Pre-bootstrap construction = "existing install" path.
5143        // Defaulting to LegacyRbac preserves pre-#712 behaviour on
5144        // upgrade so an operator that has not yet attached IAM
5145        // policies does not lose access after the upgrade.
5146        let store = AuthStore::new(test_config());
5147        assert_eq!(store.enforcement_mode(), PolicyEnforcementMode::LegacyRbac);
5148    }
5149
5150    #[test]
5151    fn enforcement_mode_set_round_trips_and_returns_previous() {
5152        let store = AuthStore::new(test_config());
5153        let prev = store.set_enforcement_mode(PolicyEnforcementMode::PolicyOnly);
5154        assert_eq!(prev, PolicyEnforcementMode::LegacyRbac);
5155        assert_eq!(store.enforcement_mode(), PolicyEnforcementMode::PolicyOnly);
5156        let prev = store.set_enforcement_mode(PolicyEnforcementMode::LegacyRbac);
5157        assert_eq!(prev, PolicyEnforcementMode::PolicyOnly);
5158    }
5159
5160    #[test]
5161    fn policy_only_mode_treats_no_matching_policy_as_deny() {
5162        // Acceptance #2: in `policy_only` mode a principal with no
5163        // matching policy is denied even when the action's role
5164        // floor would otherwise have allowed it.
5165        let store = AuthStore::new(test_config());
5166        store.create_user("alice", "p", Role::Admin).unwrap();
5167        let uid = UserId::platform("alice");
5168        install_unrelated_policy(&store);
5169        store.set_enforcement_mode(PolicyEnforcementMode::PolicyOnly);
5170
5171        let resource = ResourceRef::new("table", "public.t");
5172        // Even Role::Admin is denied — DefaultDeny is the deciding
5173        // signal in `policy_only`, regardless of role.
5174        assert!(!store.check_policy_authz_with_role(
5175            &uid,
5176            "select",
5177            &resource,
5178            &enforcement_eval_ctx(Role::Admin),
5179            Role::Admin,
5180        ));
5181    }
5182
5183    #[test]
5184    fn legacy_rbac_mode_falls_back_to_role_decision() {
5185        // Acceptance #3: in `legacy_rbac` mode the same principal
5186        // falls through to the role-based decision. `select` requires
5187        // only `Read`, so all three roles satisfy it; a write verb
5188        // requires `Write`, so `Read` does not.
5189        let store = AuthStore::new(test_config());
5190        store.create_user("alice", "p", Role::Read).unwrap();
5191        let uid = UserId::platform("alice");
5192        install_unrelated_policy(&store);
5193        store.set_enforcement_mode(PolicyEnforcementMode::LegacyRbac);
5194
5195        let table = ResourceRef::new("table", "public.t");
5196        // Read role + select action: allowed via the legacy fallback.
5197        assert!(store.check_policy_authz_with_role(
5198            &uid,
5199            "select",
5200            &table,
5201            &enforcement_eval_ctx(Role::Read),
5202            Role::Read,
5203        ));
5204        // Read role + insert action: legacy decision says no.
5205        assert!(!store.check_policy_authz_with_role(
5206            &uid,
5207            "insert",
5208            &table,
5209            &enforcement_eval_ctx(Role::Read),
5210            Role::Read,
5211        ));
5212        // Admin role + insert action: legacy decision says yes.
5213        assert!(store.check_policy_authz_with_role(
5214            &uid,
5215            "insert",
5216            &table,
5217            &enforcement_eval_ctx(Role::Admin),
5218            Role::Admin,
5219        ));
5220    }
5221
5222    #[test]
5223    fn explicit_deny_wins_irrespective_of_mode_or_role() {
5224        // Even in `legacy_rbac` with an Admin role, an explicit
5225        // matching Deny statement always denies. The mode only
5226        // influences the `DefaultDeny` arm, never an explicit one.
5227        let store = AuthStore::new(test_config());
5228        store.create_user("alice", "p", Role::Admin).unwrap();
5229        let uid = UserId::platform("alice");
5230        store
5231            .put_policy(
5232                Policy::from_json_str(
5233                    r#"{"id":"p-deny-select","version":1,"statements":[{"effect":"deny","actions":["select"],"resources":["table:public.t"]}]}"#,
5234                )
5235                .unwrap(),
5236            )
5237            .unwrap();
5238        store
5239            .attach_policy(PrincipalRef::User(uid.clone()), "p-deny-select")
5240            .unwrap();
5241
5242        let resource = ResourceRef::new("table", "public.t");
5243        for mode in [
5244            PolicyEnforcementMode::LegacyRbac,
5245            PolicyEnforcementMode::PolicyOnly,
5246        ] {
5247            store.set_enforcement_mode(mode);
5248            assert!(
5249                !store.check_policy_authz_with_role(
5250                    &uid,
5251                    "select",
5252                    &resource,
5253                    &enforcement_eval_ctx(Role::Admin),
5254                    Role::Admin,
5255                ),
5256                "explicit deny must win under mode {mode:?}"
5257            );
5258        }
5259    }
5260
5261    #[test]
5262    fn explicit_allow_wins_irrespective_of_mode_or_role() {
5263        // Symmetric guarantee: an explicit Allow is honoured even in
5264        // `policy_only` mode for a role that the legacy fallback
5265        // would have rejected (Read role + admin-tier action).
5266        let store = AuthStore::new(test_config());
5267        store.create_user("alice", "p", Role::Read).unwrap();
5268        let uid = UserId::platform("alice");
5269        store
5270            .put_policy(
5271                Policy::from_json_str(
5272                    r#"{"id":"p-allow-create","version":1,"statements":[{"effect":"allow","actions":["create"],"resources":["table:public.t"]}]}"#,
5273                )
5274                .unwrap(),
5275            )
5276            .unwrap();
5277        store
5278            .attach_policy(PrincipalRef::User(uid.clone()), "p-allow-create")
5279            .unwrap();
5280        store.set_enforcement_mode(PolicyEnforcementMode::PolicyOnly);
5281
5282        let resource = ResourceRef::new("table", "public.t");
5283        assert!(store.check_policy_authz_with_role(
5284            &uid,
5285            "create",
5286            &resource,
5287            &enforcement_eval_ctx(Role::Read),
5288            Role::Read,
5289        ));
5290    }
5291
5292    #[test]
5293    fn take_legacy_rbac_warn_once_is_at_most_once_per_boot() {
5294        let store = AuthStore::new(test_config());
5295        // Default mode = LegacyRbac, so the first call wins the token.
5296        assert!(store.take_legacy_rbac_warn_once());
5297        // Every subsequent call is a no-op for the lifetime of this
5298        // process / AuthStore — that is the "once per boot" guarantee.
5299        for _ in 0..3 {
5300            assert!(!store.take_legacy_rbac_warn_once());
5301        }
5302    }
5303
5304    #[test]
5305    fn take_legacy_rbac_warn_once_is_silent_under_policy_only() {
5306        // Acceptance #5 phrases the warning as "Boot in legacy_rbac
5307        // mode emits exactly one warn entry per boot" — implicitly,
5308        // `policy_only` boots emit zero. The token must not be
5309        // claimable when the mode is strict so the boot path stays
5310        // quiet.
5311        let store = AuthStore::new(test_config());
5312        store.set_enforcement_mode(PolicyEnforcementMode::PolicyOnly);
5313        assert!(!store.take_legacy_rbac_warn_once());
5314        // And, importantly, switching back to LegacyRbac later does
5315        // not "owe" us a warning — the token was never claimed under
5316        // policy_only, so it is still available the first time the
5317        // mode is legacy_rbac.
5318        store.set_enforcement_mode(PolicyEnforcementMode::LegacyRbac);
5319        assert!(store.take_legacy_rbac_warn_once());
5320        assert!(!store.take_legacy_rbac_warn_once());
5321    }
5322
5323    #[test]
5324    fn strict_check_policy_authz_ignores_enforcement_mode() {
5325        // Governance APIs (managed_config, registry, managed_policy)
5326        // call the strict `check_policy_authz` — it must return
5327        // `false` on DefaultDeny regardless of mode so the lenient
5328        // posture cannot accidentally elevate an admin-tier mutation.
5329        let store = AuthStore::new(test_config());
5330        store.create_user("alice", "p", Role::Admin).unwrap();
5331        let uid = UserId::platform("alice");
5332        install_unrelated_policy(&store);
5333        store.set_enforcement_mode(PolicyEnforcementMode::LegacyRbac);
5334
5335        let resource = ResourceRef::new("table", "public.t");
5336        assert!(!store.check_policy_authz(
5337            &uid,
5338            "select",
5339            &resource,
5340            &enforcement_eval_ctx(Role::Admin),
5341        ));
5342    }
5343}