Skip to main content

reddb_server/auth/
store.rs

1//! AuthStore -- manages users, sessions, and API keys in memory.
2//!
3//! Password hashing delegates to the existing Argon2id implementation in
4//! `crate::storage::encryption::argon2id`.  Token generation uses the
5//! OS CSPRNG (`crate::crypto::os_random`) plus SHA-256.
6#![deny(clippy::disallowed_methods)]
7
8use std::borrow::Cow;
9use std::collections::HashMap;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, RwLock};
12
13use crate::crypto::os_random;
14use crate::crypto::sha256::sha256;
15use crate::storage::encryption::argon2id::{derive_key, Argon2Params};
16use crate::storage::engine::pager::Pager;
17
18use super::column_policy_gate::{ColumnAccessRequest, ColumnPolicyGate, ColumnPolicyOutcome};
19use super::enforcement_mode::{legacy_rbac_decision, PolicyEnforcementMode};
20use super::managed_policy::{ManagedPolicyDecision, ManagedPolicyGate, PolicyOp};
21use super::policies::{self as iam_policies, EvalContext, Policy, ResourceRef, SimulationOutcome};
22use super::privileges::{
23    check_grant, Action, AuthzContext, AuthzError, Grant, GrantPrincipal, GrantsView,
24    PermissionCache, Resource, UserAttributes,
25};
26use super::registry::ConfigRegistry;
27use super::vault::{KeyPair, Vault, VaultState};
28use super::{now_ms, ApiKey, AuthConfig, AuthError, Role, Session, User, UserId};
29use crate::runtime::control_events::{
30    ControlEvent, ControlEventConfig, ControlEventCtx, ControlEventLedger, EventKind, Outcome,
31    Sensitivity,
32};
33
34// ---------------------------------------------------------------------------
35// PrincipalRef + SimCtx — IAM policy attachments
36// ---------------------------------------------------------------------------
37
38/// Principal targeted by `attach_policy` / `detach_policy`.
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub enum PrincipalRef {
41    User(UserId),
42    Group(String),
43}
44
45/// Reserved IAM group that every principal belongs to. Used by the
46/// GRANT-to-PUBLIC compatibility layer.
47pub const PUBLIC_IAM_GROUP: &str = "__public__";
48
49/// Optional context overrides for `simulate` — anything not set falls back
50/// to a default value when the kernel evaluates the request.
51#[derive(Debug, Clone, Default)]
52pub struct SimCtx {
53    pub current_tenant: Option<String>,
54    pub peer_ip: Option<std::net::IpAddr>,
55    pub mfa_present: bool,
56    pub now_ms: Option<u128>,
57}
58
59/// Control-event dependencies for audited IAM policy mutations.
60///
61/// The existing `put_policy` / `delete_policy` / attach / detach
62/// methods remain unaudited for bootstrap and synthetic GRANT internals.
63/// Public mutation surfaces that need evidence pass this context to the
64/// `*_with_control_events` siblings.
65pub struct PolicyMutationControl<'a> {
66    pub ctx: &'a ControlEventCtx<'a>,
67    pub ledger: &'a dyn ControlEventLedger,
68    pub config: ControlEventConfig,
69    pub registry: Option<&'a ConfigRegistry>,
70    pub actor: &'a UserId,
71    pub eval_ctx: &'a EvalContext,
72}
73
74/// Control-event dependencies for audited user lifecycle mutations.
75pub struct UserLifecycleControl<'a> {
76    pub ctx: &'a ControlEventCtx<'a>,
77    pub ledger: &'a dyn ControlEventLedger,
78    pub config: ControlEventConfig,
79}
80
81#[derive(Clone)]
82struct AuthStoreControlEvents {
83    ledger: Arc<dyn ControlEventLedger>,
84    config: ControlEventConfig,
85}
86
87// ---------------------------------------------------------------------------
88// BootstrapResult
89// ---------------------------------------------------------------------------
90
91/// Result of a successful bootstrap operation.
92///
93/// The `certificate` is the hex-encoded string the admin must save --
94/// it is the ONLY way to unseal the vault after a restart.
95#[derive(Debug)]
96pub struct BootstrapResult {
97    pub user: User,
98    pub api_key: ApiKey,
99    /// Certificate hex string.  `None` when vault is not configured.
100    pub certificate: Option<String>,
101}
102
103// ---------------------------------------------------------------------------
104// AuthStore
105// ---------------------------------------------------------------------------
106
107/// Central in-process authority for auth state.
108///
109/// All mutations are guarded by `RwLock`s so the store is `Send + Sync`.
110pub struct AuthStore {
111    /// `(tenant_id, username) -> User`. Tenant scoping is built into the
112    /// key so `alice@acme` and `alice@globex` are distinct identities.
113    users: RwLock<HashMap<UserId, User>>,
114    sessions: RwLock<HashMap<String, Session>>,
115    /// key-string -> (owner UserId, role)
116    api_key_index: RwLock<HashMap<String, (UserId, Role)>>,
117    /// Once true, bootstrap() is permanently sealed.
118    bootstrapped: AtomicBool,
119    config: AuthConfig,
120    /// Optional encrypted vault for persisting auth state to pager pages.
121    vault: RwLock<Option<Vault>>,
122    /// Reference to the pager for vault page I/O.
123    pager: Option<Arc<Pager>>,
124    /// Certificate-based keypair for token signing and vault seal.
125    /// Populated after bootstrap or after restoring from a sealed vault.
126    keypair: RwLock<Option<KeyPair>>,
127    /// Encrypted key-value store for arbitrary secrets.
128    /// Persisted to vault alongside users/api_keys.
129    vault_kv: RwLock<HashMap<String, String>>,
130    /// Per-user GRANT rows. Persisted via `vault_kv` under the
131    /// `red.acl.grants.<tenant>/<user>` key prefix so existing snapshot
132    /// logic keeps working without modification. See `privileges` module
133    /// for the resolution algorithm.
134    grants: RwLock<HashMap<UserId, Vec<Grant>>>,
135    /// PUBLIC grants — apply to every authenticated principal.
136    public_grants: RwLock<Vec<Grant>>,
137    /// PG-style account attributes (`VALID UNTIL`, `CONNECTION LIMIT`,
138    /// `search_path`). Keyed by `UserId`. Persisted under
139    /// `red.acl.attrs.<tenant>/<user>`.
140    user_attributes: RwLock<HashMap<UserId, UserAttributes>>,
141    /// Live session count per user — used by `CONNECTION LIMIT`
142    /// enforcement on login. Bumped at authenticate, decremented at
143    /// session revoke / expiry.
144    session_count_by_user: RwLock<HashMap<UserId, u32>>,
145    /// Pre-resolved (resource, action) cache built per-user so the
146    /// hot path skips a linear scan of the user's grants on every
147    /// statement. Invalidated on GRANT / REVOKE / ALTER USER.
148    permission_cache: RwLock<HashMap<UserId, PermissionCache>>,
149    /// IAM-style policies, keyed by id. Persisted under
150    /// `red.iam.policies`. The kernel in `super::policies` owns the
151    /// Policy type — this map just deduplicates and shares.
152    policies: RwLock<HashMap<String, Arc<Policy>>>,
153    /// Per-user policy attachments — ordered list of policy ids.
154    /// Persisted under `red.iam.attachments.users`.
155    user_attachments: RwLock<HashMap<UserId, Vec<String>>>,
156    /// Per-group policy attachments. Users join groups through
157    /// `UserAttributes::groups`; effective policies resolve group
158    /// attachments before user-direct attachments.
159    group_attachments: RwLock<HashMap<String, Vec<String>>>,
160    /// Cached effective `Vec<Arc<Policy>>` per user. Invalidated on
161    /// any policy mutation that affects the user's attachments.
162    iam_effective_cache: RwLock<HashMap<UserId, Vec<Arc<Policy>>>>,
163    /// Once any IAM policy is installed, authorization switches to the
164    /// IAM evaluator and stays deny-by-default even if policies are
165    /// later deleted. Persisted under `red.iam.enabled`.
166    iam_authorization_enabled: AtomicBool,
167    /// Selects the fallback behaviour when the IAM evaluator returns
168    /// `DefaultDeny`. See [`PolicyEnforcementMode`] and #712 / S5A.
169    /// Default is [`PolicyEnforcementMode::default_existing_install`]
170    /// (`LegacyRbac`) so an existing install upgrading past #712
171    /// keeps its current authorization posture until the operator
172    /// flips it explicitly. The bootstrap path sets this to
173    /// `PolicyOnly` for fresh installs; the boot-time config loader
174    /// supersedes both when a `red.config.policy.enforcement_mode`
175    /// value is present in the configured value store.
176    enforcement_mode: RwLock<PolicyEnforcementMode>,
177    /// Once-per-boot flag used by [`AuthStore::take_legacy_rbac_warn_once`]
178    /// to deliver exactly one `warn`-level log line when the store is
179    /// running under [`PolicyEnforcementMode::LegacyRbac`]. The boot
180    /// path checks the flag and, if it can claim it, emits the line.
181    legacy_rbac_boot_warn_emitted: AtomicBool,
182    /// `(tenant, role) → HashSet<CollectionId>` cache used by the AI
183    /// pipeline (issue #119). Distinct from `permission_cache`, which
184    /// is keyed by `UserId` and answers `(resource, action)` lookups —
185    /// this cache answers the inverse "what collections is this scope
186    /// allowed to read?" query that `AuthorizedSearch` uses to
187    /// pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidates before any
188    /// similarity score is computed. Entries TTL out at 60s and are
189    /// invalidated explicitly on GRANT/REVOKE/CREATE POLICY/DROP
190    /// POLICY/DROP COLLECTION.
191    visible_collections_cache: super::scope_cache::AuthCache,
192    control_events: RwLock<Option<AuthStoreControlEvents>>,
193}
194
195// Use fast-but-safe Argon2id params for auth hashing (smaller than the
196// default 64 MB so that user-management RPCs respond quickly).
197fn auth_argon2_params() -> Argon2Params {
198    Argon2Params {
199        m_cost: 4 * 1024, // 4 MB
200        t_cost: 3,
201        p: 1,
202        tag_len: 32,
203    }
204}
205
206fn policy_control_fields(
207    policy_id: &str,
208    policy: Option<&Policy>,
209    principal: Option<&PrincipalRef>,
210) -> HashMap<String, Sensitivity> {
211    let mut fields = HashMap::new();
212    fields.insert("policy_id".to_string(), Sensitivity::raw(policy_id));
213    if let Some(policy) = policy {
214        let effects = policy
215            .statements
216            .iter()
217            .map(|s| match s.effect {
218                iam_policies::Effect::Allow => "allow",
219                iam_policies::Effect::Deny => "deny",
220            })
221            .collect::<Vec<_>>()
222            .join(",");
223        let actions = policy
224            .statements
225            .iter()
226            .flat_map(|s| s.actions.iter())
227            .map(policy_action_pattern)
228            .collect::<Vec<_>>()
229            .join(",");
230        let resources = policy
231            .statements
232            .iter()
233            .flat_map(|s| s.resources.iter())
234            .map(policy_resource_pattern)
235            .collect::<Vec<_>>()
236            .join(",");
237        fields.insert("effect".to_string(), Sensitivity::raw(effects));
238        fields.insert("action".to_string(), Sensitivity::raw(actions));
239        fields.insert("resource".to_string(), Sensitivity::raw(resources));
240        if let Some(attrs) = policy_principal_attrs(policy) {
241            fields.insert("principal_attrs".to_string(), Sensitivity::raw(attrs));
242        }
243    }
244    if let Some(principal) = principal {
245        fields.insert(
246            "principal".to_string(),
247            Sensitivity::raw(policy_principal_label(principal)),
248        );
249    }
250    fields
251}
252
253fn policy_action_pattern(pattern: &iam_policies::ActionPattern) -> String {
254    match pattern {
255        iam_policies::ActionPattern::Exact(s) => s.clone(),
256        iam_policies::ActionPattern::Wildcard => "*".to_string(),
257        iam_policies::ActionPattern::Prefix(s) => format!("{s}:*"),
258    }
259}
260
261fn policy_resource_pattern(pattern: &iam_policies::ResourcePattern) -> String {
262    match pattern {
263        iam_policies::ResourcePattern::Exact { kind, name } => format!("{kind}:{name}"),
264        iam_policies::ResourcePattern::Glob(s) => s.clone(),
265        iam_policies::ResourcePattern::Wildcard => "*".to_string(),
266    }
267}
268
269fn policy_principal_attrs(policy: &Policy) -> Option<String> {
270    let mut attrs = Vec::new();
271    for statement in &policy.statements {
272        let Some(condition) = &statement.condition else {
273            continue;
274        };
275        if let Some(value) = condition.platform_scoped {
276            attrs.push(format!("platform_scoped={value}"));
277        }
278        if let Some(value) = condition.mfa {
279            attrs.push(format!("mfa={value}"));
280        }
281        if let Some(value) = condition.tenant_match {
282            attrs.push(format!("tenant_match={value}"));
283        }
284    }
285    if attrs.is_empty() {
286        None
287    } else {
288        Some(attrs.join(","))
289    }
290}
291
292fn policy_principal_label(principal: &PrincipalRef) -> String {
293    match principal {
294        PrincipalRef::User(uid) => format!("user:{uid}"),
295        PrincipalRef::Group(group) => format!("group:{group}"),
296    }
297}
298
299fn default_user_lifecycle_ctx<'a>() -> ControlEventCtx<'a> {
300    ControlEventCtx {
301        actor: crate::runtime::control_events::ActorRef::Anonymous,
302        scope: None,
303        request_id: None,
304        trace_id: None,
305    }
306}
307
308fn bootstrap_user_lifecycle_ctx<'a>() -> ControlEventCtx<'a> {
309    ControlEventCtx {
310        actor: crate::runtime::control_events::ActorRef::System("bootstrap"),
311        scope: None,
312        request_id: None,
313        trace_id: None,
314    }
315}
316
317fn user_resource(id: &UserId) -> String {
318    format!("user:{id}")
319}
320
321fn api_key_resource(api_key_id: &str) -> String {
322    format!("apikey:{api_key_id}")
323}
324
325fn api_key_id(key: &str) -> String {
326    hex::encode(sha256(key.as_bytes()))
327}
328
329fn password_evidence() -> Sensitivity {
330    Sensitivity::redacted()
331}
332
333fn user_control_fields(
334    id: &UserId,
335    role: Option<Role>,
336    enabled: Option<bool>,
337    include_password: bool,
338) -> HashMap<String, Sensitivity> {
339    let mut fields = HashMap::new();
340    fields.insert(
341        "username".to_string(),
342        Sensitivity::raw(id.username.clone()),
343    );
344    fields.insert(
345        "tenant_id".to_string(),
346        Sensitivity::raw(id.tenant.clone().unwrap_or_default()),
347    );
348    if let Some(role) = role {
349        fields.insert("role".to_string(), Sensitivity::raw(role.as_str()));
350    }
351    if let Some(enabled) = enabled {
352        fields.insert("enabled".to_string(), Sensitivity::raw(enabled.to_string()));
353    }
354    if include_password {
355        fields.insert("password".to_string(), password_evidence());
356    }
357    fields
358}
359
360fn api_key_control_fields(
361    id: &UserId,
362    role: Role,
363    api_key_id: &str,
364) -> HashMap<String, Sensitivity> {
365    let mut fields = user_control_fields(id, Some(role), None, false);
366    fields.insert("api_key_id".to_string(), Sensitivity::raw(api_key_id));
367    fields.insert("api_key".to_string(), Sensitivity::redacted());
368    fields
369}
370
371fn user_error_is_denied(err: &AuthError) -> bool {
372    !matches!(err, AuthError::Internal(_))
373}
374
375impl AuthStore {
376    // -----------------------------------------------------------------
377    // Construction
378    // -----------------------------------------------------------------
379
380    pub fn new(config: AuthConfig) -> Self {
381        Self {
382            users: RwLock::new(HashMap::new()),
383            sessions: RwLock::new(HashMap::new()),
384            api_key_index: RwLock::new(HashMap::new()),
385            bootstrapped: AtomicBool::new(false),
386            config,
387            vault: RwLock::new(None),
388            pager: None,
389            keypair: RwLock::new(None),
390            vault_kv: RwLock::new(HashMap::new()),
391            grants: RwLock::new(HashMap::new()),
392            public_grants: RwLock::new(Vec::new()),
393            user_attributes: RwLock::new(HashMap::new()),
394            session_count_by_user: RwLock::new(HashMap::new()),
395            permission_cache: RwLock::new(HashMap::new()),
396            policies: RwLock::new(HashMap::new()),
397            user_attachments: RwLock::new(HashMap::new()),
398            group_attachments: RwLock::new(HashMap::new()),
399            iam_effective_cache: RwLock::new(HashMap::new()),
400            iam_authorization_enabled: AtomicBool::new(false),
401            enforcement_mode: RwLock::new(PolicyEnforcementMode::default_existing_install()),
402            legacy_rbac_boot_warn_emitted: AtomicBool::new(false),
403            visible_collections_cache: super::scope_cache::AuthCache::new(
404                super::scope_cache::DEFAULT_TTL,
405            ),
406            control_events: RwLock::new(None),
407        }
408    }
409
410    pub fn configure_control_events(
411        &self,
412        ledger: Arc<dyn ControlEventLedger>,
413        config: ControlEventConfig,
414    ) {
415        *self
416            .control_events
417            .write()
418            .unwrap_or_else(|e| e.into_inner()) = Some(AuthStoreControlEvents { ledger, config });
419    }
420
421    fn configured_control_events(&self) -> Option<AuthStoreControlEvents> {
422        self.control_events
423            .read()
424            .unwrap_or_else(|e| e.into_inner())
425            .clone()
426    }
427
428    /// Create an `AuthStore` backed by encrypted vault pages inside the
429    /// main `.rdb` database file.
430    ///
431    /// If vault pages already exist, their contents are loaded and
432    /// restored into the in-memory store.  All subsequent mutations are
433    /// automatically persisted to the vault pages via the pager.
434    pub fn with_vault(config: AuthConfig, pager: Arc<Pager>) -> Result<Self, AuthError> {
435        let certificate = crate::utils::env_with_file_fallback("REDDB_CERTIFICATE");
436        Self::with_vault_optional_certificate(config, pager, certificate.as_deref())
437    }
438
439    pub fn with_vault_certificate(
440        config: AuthConfig,
441        pager: Arc<Pager>,
442        certificate_hex: &str,
443    ) -> Result<Self, AuthError> {
444        Self::with_vault_optional_certificate(config, pager, Some(certificate_hex))
445    }
446
447    fn with_vault_optional_certificate(
448        config: AuthConfig,
449        pager: Arc<Pager>,
450        certificate_hex: Option<&str>,
451    ) -> Result<Self, AuthError> {
452        let mut store = Self::new(config);
453
454        // Restore persisted state if vault pages exist. A fresh database does
455        // not need a temporary seal: bootstrap generates the certificate and
456        // installs the first real vault atomically.
457        if Vault::has_saved_state(&pager) {
458            let vault = match certificate_hex {
459                Some(certificate) => Vault::with_certificate(&pager, certificate)?,
460                None => Vault::open(&pager)?,
461            };
462            if let Some(state) = vault.load(&pager)? {
463                store.restore_from_vault(state);
464            }
465            *store.vault.write().unwrap_or_else(|e| e.into_inner()) = Some(vault);
466        } else if let Some(certificate) = certificate_hex {
467            let vault = Vault::with_certificate(&pager, certificate)?;
468            *store.vault.write().unwrap_or_else(|e| e.into_inner()) = Some(vault);
469        }
470
471        store.pager = Some(pager);
472        Ok(store)
473    }
474
475    pub fn config(&self) -> &AuthConfig {
476        &self.config
477    }
478
479    pub fn is_enabled(&self) -> bool {
480        self.config.enabled
481    }
482
483    /// Returns true when no users exist yet and bootstrap hasn't been sealed.
484    pub fn needs_bootstrap(&self) -> bool {
485        !self.bootstrapped.load(Ordering::Acquire)
486            && self.users.read().map(|u| u.is_empty()).unwrap_or(true)
487    }
488
489    /// Internal: read-locked lookup by `UserId`.
490    fn get_user_cloned(&self, id: &UserId) -> Option<User> {
491        self.users.read().ok().and_then(|m| m.get(id).cloned())
492    }
493
494    /// Whether bootstrap has already been performed (sealed).
495    pub fn is_bootstrapped(&self) -> bool {
496        self.bootstrapped.load(Ordering::Acquire)
497    }
498
499    /// Bootstrap the first admin user. One-shot, irreversible.
500    ///
501    /// Uses an atomic compare-exchange to guarantee that even under
502    /// concurrent calls, only the first one succeeds. Once sealed,
503    /// all subsequent calls fail immediately -- there is no undo.
504    ///
505    /// When a vault/pager is configured, a certificate-based keypair is
506    /// generated and the vault is re-encrypted with the certificate-derived
507    /// key.  The certificate hex string is returned in `BootstrapResult`
508    /// so the admin can save it.
509    pub fn bootstrap(&self, username: &str, password: &str) -> Result<BootstrapResult, AuthError> {
510        if let Some(configured) = self.configured_control_events() {
511            let ctx = bootstrap_user_lifecycle_ctx();
512            let control = UserLifecycleControl {
513                ctx: &ctx,
514                ledger: configured.ledger.as_ref(),
515                config: configured.config,
516            };
517            self.bootstrap_with_control_events_inner(username, password, &control)
518        } else {
519            self.bootstrap_unaudited(username, password)
520        }
521    }
522
523    pub fn bootstrap_with_control_events(
524        &self,
525        username: &str,
526        password: &str,
527        ctx: &ControlEventCtx<'_>,
528        ledger: &dyn ControlEventLedger,
529        config: ControlEventConfig,
530    ) -> Result<BootstrapResult, AuthError> {
531        let system_ctx = ControlEventCtx {
532            actor: crate::runtime::control_events::ActorRef::System("bootstrap"),
533            scope: ctx.scope.clone(),
534            request_id: ctx.request_id.clone(),
535            trace_id: ctx.trace_id.clone(),
536        };
537        let control = UserLifecycleControl {
538            ctx: &system_ctx,
539            ledger,
540            config,
541        };
542        self.bootstrap_with_control_events_inner(username, password, &control)
543    }
544
545    /// Backwards-compatible bootstrap entry point for older callers
546    /// that used to ask for an operator-owned first admin. Authorization
547    /// is now policy-first, so this creates the same ordinary platform
548    /// admin as [`Self::bootstrap`].
549    pub fn bootstrap_system_admin(
550        &self,
551        username: &str,
552        password: &str,
553    ) -> Result<BootstrapResult, AuthError> {
554        self.bootstrap(username, password)
555    }
556
557    fn bootstrap_unaudited(
558        &self,
559        username: &str,
560        password: &str,
561    ) -> Result<BootstrapResult, AuthError> {
562        // Atomic seal: only the first caller wins.
563        if self
564            .bootstrapped
565            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
566            .is_err()
567        {
568            return Err(AuthError::Forbidden(
569                "bootstrap already completed — sealed permanently".to_string(),
570            ));
571        }
572
573        // Double-check users are actually empty (belt and suspenders).
574        {
575            let users = self.users.read().map_err(lock_err)?;
576            if !users.is_empty() {
577                return Err(AuthError::Forbidden(
578                    "bootstrap already completed — users exist".to_string(),
579                ));
580            }
581        }
582
583        let user = self.create_user_in_tenant_unaudited(None, username, password, Role::Admin)?;
584        let key =
585            self.create_api_key_in_tenant_unaudited(None, username, "bootstrap", Role::Admin)?;
586
587        // Generate a certificate-based keypair and re-seal the vault.
588        let certificate = if let Some(ref pager) = self.pager {
589            let kp = KeyPair::generate();
590            let cert_hex = kp.certificate_hex();
591
592            // Re-create the vault using the certificate-derived key.
593            let new_vault = Vault::with_certificate_bytes(pager, &kp.certificate)
594                .map_err(|e| AuthError::Internal(format!("vault re-seal failed: {e}")))?;
595
596            // Store the keypair so token signing works immediately.
597            if let Ok(mut kp_guard) = self.keypair.write() {
598                *kp_guard = Some(kp);
599            }
600
601            // Replace the vault and persist with the master secret included.
602            if let Ok(mut vault_guard) = self.vault.write() {
603                *vault_guard = Some(new_vault);
604            }
605            // Generate the AES-256 secret key for Value::Secret encryption.
606            self.ensure_vault_secret_key();
607            self.persist_to_vault();
608
609            Some(cert_hex)
610        } else {
611            None
612        };
613
614        // #712 / S5A: fresh bootstraps land in the strict posture.
615        // Persist explicitly to vault_kv so subsequent boots
616        // (rehydrate_iam) pick up `policy_only` instead of the
617        // existing-install default. Existing installs upgrading past
618        // this commit never traverse this branch — `bootstrap()` is
619        // sealed once and never re-runs.
620        self.set_enforcement_mode(PolicyEnforcementMode::default_fresh_bootstrap());
621
622        Ok(BootstrapResult {
623            user,
624            api_key: key,
625            certificate,
626        })
627    }
628
629    fn bootstrap_with_control_events_inner(
630        &self,
631        username: &str,
632        password: &str,
633        control: &UserLifecycleControl<'_>,
634    ) -> Result<BootstrapResult, AuthError> {
635        let id = UserId::from_parts(None, username);
636        match self.bootstrap_unaudited(username, password) {
637            Ok(result) => {
638                let event_result = self.emit_user_lifecycle_allowed(
639                    control,
640                    EventKind::UserCreate,
641                    "user.create",
642                    &id,
643                    user_control_fields(&id, Some(Role::Admin), Some(true), true),
644                );
645                if let Err(err) = event_result {
646                    self.rollback_bootstrap(&id);
647                    return Err(err);
648                }
649                Ok(result)
650            }
651            Err(err) => {
652                self.emit_user_lifecycle_outcome(
653                    control,
654                    if user_error_is_denied(&err) {
655                        Outcome::Denied
656                    } else {
657                        Outcome::Error
658                    },
659                    EventKind::UserCreate,
660                    "user.create",
661                    &id,
662                    Some(err.to_string()),
663                    user_control_fields(&id, Some(Role::Admin), Some(true), true),
664                );
665                Err(err)
666            }
667        }
668    }
669
670    /// Auto-bootstrap from environment variables if no users exist.
671    ///
672    /// Checks `REDDB_USERNAME` and `REDDB_PASSWORD`. If both are set and
673    /// the user store is empty, creates the first admin user automatically.
674    /// This mirrors the Docker pattern (`MYSQL_ROOT_PASSWORD`, etc.).
675    ///
676    /// Returns `Some(BootstrapResult)` if bootstrapped, `None` if skipped.
677    pub fn bootstrap_from_env(&self) -> Option<BootstrapResult> {
678        if !self.needs_bootstrap() {
679            return None;
680        }
681
682        let username = crate::utils::env_with_file_fallback("REDDB_USERNAME")?;
683        let password = crate::utils::env_with_file_fallback("REDDB_PASSWORD")?;
684
685        if username.is_empty() || password.is_empty() {
686            return None;
687        }
688
689        match self.bootstrap(&username, &password) {
690            Ok(result) => {
691                // F-04: `username` is REDDB_USERNAME — operator-supplied
692                // (env), but still routed through the LogField escaper
693                // because env strings cross trust boundaries in some
694                // deployment models (k8s downward API, Vault sidecar,
695                // external secret operator). See ADR 0010.
696                tracing::info!(
697                    username = %reddb_wire::audit_safe_log_field(&username),
698                    "bootstrapped admin user from REDDB_USERNAME/REDDB_PASSWORD"
699                );
700                if let Some(ref cert) = result.certificate {
701                    // Certificate must be readable by the operator — keep it
702                    // in the log stream but print raw to stderr too so it
703                    // survives even if the log file gets rotated.
704                    eprintln!("[reddb] CERTIFICATE: {}", cert);
705                    tracing::warn!(
706                        "vault certificate issued — save it: ONLY way to unseal after restart"
707                    );
708                }
709                Some(result)
710            }
711            Err(e) => {
712                tracing::error!(err = %e, "env bootstrap failed");
713                None
714            }
715        }
716    }
717
718    // -----------------------------------------------------------------
719    // Vault persistence
720    // -----------------------------------------------------------------
721
722    /// Persist the current auth state to the vault pages (if configured).
723    fn persist_to_vault_result(&self) -> Result<(), AuthError> {
724        let vault_guard = self.vault.read().unwrap_or_else(|e| e.into_inner());
725        if let (Some(ref vault), Some(ref pager)) = (&*vault_guard, &self.pager) {
726            let state = self.snapshot();
727            vault.save(pager, &state)?;
728        }
729        Ok(())
730    }
731
732    /// Persist the current auth state to the vault pages (if configured).
733    ///
734    /// Legacy auth mutations still treat in-memory state as authoritative.
735    /// New secret-management paths use the `try_` methods below so callers
736    /// get a hard error if the vault write fails.
737    fn persist_to_vault(&self) {
738        if let Err(e) = self.persist_to_vault_result() {
739            tracing::error!(err = %e, "vault persist failed");
740            // Issue #205 — vault persist is the secret-rotation
741            // serialization point: when this fails, freshly rotated
742            // credentials live only in memory and a process restart
743            // loses them. Operator-grade event so the operator can
744            // intervene before the next restart.
745            crate::telemetry::operator_event::OperatorEvent::SecretRotationFailed {
746                secret_ref: "auth_vault".to_string(),
747                error: e.to_string(),
748            }
749            .emit_global();
750        }
751    }
752
753    /// True when this store has an encrypted vault and pager wired in.
754    pub fn is_vault_backed(&self) -> bool {
755        self.pager.is_some()
756            && self
757                .vault
758                .read()
759                .map(|guard| guard.is_some())
760                .unwrap_or(false)
761    }
762
763    // -----------------------------------------------------------------
764    // Vault KV — encrypted key-value store for arbitrary secrets
765    // -----------------------------------------------------------------
766
767    /// Read a value from the vault KV store. Returns `None` if not set.
768    pub fn vault_kv_get(&self, key: &str) -> Option<String> {
769        self.vault_kv
770            .read()
771            .ok()
772            .and_then(|kv| kv.get(key).cloned())
773    }
774
775    /// Snapshot vault KV values for statement-local secret resolution.
776    pub fn vault_kv_snapshot(&self) -> HashMap<String, String> {
777        self.vault_kv
778            .read()
779            .map(|kv| kv.clone())
780            .unwrap_or_default()
781    }
782
783    /// Export vault KV as an encrypted logical blob for JSONL dump/restore.
784    /// Returns `None` when the vault has no KV entries.
785    pub fn vault_kv_export_encrypted(&self) -> Result<Option<String>, AuthError> {
786        if !self.is_vault_backed() {
787            return Err(AuthError::Forbidden(
788                "vault KV export requires an enabled, unsealed vault".to_string(),
789            ));
790        }
791        let kv = self.vault_kv_snapshot();
792        if kv.is_empty() {
793            return Ok(None);
794        }
795
796        let vault_guard = self.vault.read().map_err(lock_err)?;
797        let vault = vault_guard.as_ref().ok_or_else(|| {
798            AuthError::Forbidden("vault KV export requires an enabled, unsealed vault".to_string())
799        })?;
800        let state = VaultState {
801            users: Vec::new(),
802            api_keys: Vec::new(),
803            bootstrapped: false,
804            master_secret: None,
805            kv,
806        };
807        Ok(Some(vault.seal_logical_export(&state)?))
808    }
809
810    /// Merge imported vault KV entries and fail if the encrypted vault
811    /// write cannot be made durable.
812    pub fn vault_kv_try_import(
813        &self,
814        entries: HashMap<String, String>,
815    ) -> Result<usize, AuthError> {
816        if !self.is_vault_backed() {
817            return Err(AuthError::Forbidden(
818                "vault KV import requires an enabled, unsealed vault".to_string(),
819            ));
820        }
821        if entries.is_empty() {
822            return Ok(0);
823        }
824
825        let mut previous = HashMap::new();
826        {
827            let mut kv = self.vault_kv.write().map_err(lock_err)?;
828            for (key, value) in &entries {
829                previous.insert(key.clone(), kv.insert(key.clone(), value.clone()));
830            }
831        }
832
833        if let Err(err) = self.persist_to_vault_result() {
834            if let Ok(mut kv) = self.vault_kv.write() {
835                for (key, old) in previous {
836                    match old {
837                        Some(value) => {
838                            kv.insert(key, value);
839                        }
840                        None => {
841                            kv.remove(&key);
842                        }
843                    }
844                }
845            }
846            return Err(err);
847        }
848
849        Ok(entries.len())
850    }
851
852    /// Import false placeholders for secrets whose encrypted payload
853    /// could not be decrypted during logical restore.
854    pub fn vault_kv_try_import_placeholders(&self, keys: &[String]) -> Result<usize, AuthError> {
855        let entries = keys
856            .iter()
857            .map(|key| (key.clone(), "false".to_string()))
858            .collect();
859        self.vault_kv_try_import(entries)
860    }
861
862    /// Write a value to the vault KV store, persisting to disk.
863    pub fn vault_kv_set(&self, key: String, value: String) {
864        if let Ok(mut kv) = self.vault_kv.write() {
865            kv.insert(key, value);
866        }
867        self.persist_to_vault();
868    }
869
870    /// Write a value to the vault KV store and fail if the vault write
871    /// cannot be made durable.
872    pub fn vault_kv_try_set(&self, key: String, value: String) -> Result<(), AuthError> {
873        if !self.is_vault_backed() {
874            return Err(AuthError::Forbidden(
875                "SET SECRET requires an enabled, unsealed vault".to_string(),
876            ));
877        }
878
879        let previous = {
880            let mut kv = self.vault_kv.write().map_err(lock_err)?;
881            kv.insert(key.clone(), value)
882        };
883
884        if let Err(err) = self.persist_to_vault_result() {
885            if let Ok(mut kv) = self.vault_kv.write() {
886                match previous {
887                    Some(value) => {
888                        kv.insert(key, value);
889                    }
890                    None => {
891                        kv.remove(&key);
892                    }
893                }
894            }
895            return Err(err);
896        }
897
898        Ok(())
899    }
900
901    /// Delete a value from the vault KV store. Returns true if it existed.
902    pub fn vault_kv_delete(&self, key: &str) -> bool {
903        let existed = self
904            .vault_kv
905            .write()
906            .map(|mut kv| kv.remove(key).is_some())
907            .unwrap_or(false);
908        if existed {
909            self.persist_to_vault();
910        }
911        existed
912    }
913
914    /// Delete a value from the vault KV store and fail if the vault write
915    /// cannot be made durable.
916    pub fn vault_kv_try_delete(&self, key: &str) -> Result<bool, AuthError> {
917        if !self.is_vault_backed() {
918            return Err(AuthError::Forbidden(
919                "DELETE SECRET requires an enabled, unsealed vault".to_string(),
920            ));
921        }
922
923        let removed = {
924            let mut kv = self.vault_kv.write().map_err(lock_err)?;
925            kv.remove(key)
926        };
927
928        if removed.is_none() {
929            return Ok(false);
930        }
931
932        if let Err(err) = self.persist_to_vault_result() {
933            if let Ok(mut kv) = self.vault_kv.write() {
934                if let Some(value) = removed {
935                    kv.insert(key.to_string(), value);
936                }
937            }
938            return Err(err);
939        }
940
941        Ok(true)
942    }
943
944    /// List all keys in the vault KV store.
945    pub fn vault_kv_keys(&self) -> Vec<String> {
946        self.vault_kv
947            .read()
948            .map(|kv| kv.keys().cloned().collect())
949            .unwrap_or_default()
950    }
951
952    /// Convenience: get the 32-byte secret key for Value::Secret encryption.
953    /// Generated on first boot and stored at `red.secret.aes_key`.
954    pub fn vault_secret_key(&self) -> Option<[u8; 32]> {
955        let hex_str = self.vault_kv_get("red.secret.aes_key")?;
956        let bytes = hex::decode(hex_str).ok()?;
957        if bytes.len() == 32 {
958            let mut key = [0u8; 32];
959            key.copy_from_slice(&bytes);
960            Some(key)
961        } else {
962            None
963        }
964    }
965
966    /// Generate and store the AES-256 secret key on first boot if not present.
967    pub fn ensure_vault_secret_key(&self) {
968        if self.vault_kv_get("red.secret.aes_key").is_none() {
969            let key = random_bytes(32);
970            self.vault_kv_set("red.secret.aes_key".to_string(), hex::encode(key));
971        }
972    }
973
974    /// Take a snapshot of the current auth state for vault serialization.
975    fn snapshot(&self) -> VaultState {
976        let users_guard = self.users.read().unwrap_or_else(|e| e.into_inner());
977        let users: Vec<User> = users_guard.values().cloned().collect();
978
979        // Collect (owner UserId, api_key) pairs from all users so a
980        // tenant-scoped owner can be reattached on restore.
981        let mut api_keys = Vec::new();
982        for user in &users {
983            let owner = UserId::from_parts(user.tenant_id.as_deref(), &user.username);
984            for key in &user.api_keys {
985                api_keys.push((owner.clone(), key.clone()));
986            }
987        }
988
989        // Include the master secret if a keypair is loaded.
990        let master_secret = self
991            .keypair
992            .read()
993            .ok()
994            .and_then(|guard| guard.as_ref().map(|kp| kp.master_secret.clone()));
995
996        let kv = self.vault_kv.read().map(|m| m.clone()).unwrap_or_default();
997
998        VaultState {
999            users,
1000            api_keys,
1001            bootstrapped: self.bootstrapped.load(Ordering::Acquire),
1002            master_secret,
1003            kv,
1004        }
1005    }
1006
1007    /// Restore the in-memory auth state from a vault snapshot.
1008    fn restore_from_vault(&mut self, state: VaultState) {
1009        // Restore bootstrap seal.
1010        if state.bootstrapped {
1011            self.bootstrapped.store(true, Ordering::Release);
1012        }
1013
1014        // Restore keypair from master secret (if present).
1015        if let Some(secret) = state.master_secret {
1016            let kp = KeyPair::from_master_secret(secret);
1017            if let Ok(mut guard) = self.keypair.write() {
1018                *guard = Some(kp);
1019            }
1020        }
1021
1022        // Restore KV store.
1023        if let Ok(mut kv) = self.vault_kv.write() {
1024            *kv = state.kv;
1025        }
1026
1027        // Restore users.
1028        let mut users = self.users.write().unwrap_or_else(|e| e.into_inner());
1029        let mut idx = self
1030            .api_key_index
1031            .write()
1032            .unwrap_or_else(|e| e.into_inner());
1033
1034        for user in state.users {
1035            let id = UserId::from_parts(user.tenant_id.as_deref(), &user.username);
1036            // Register API keys in the index.
1037            for key in &user.api_keys {
1038                idx.insert(key.key.clone(), (id.clone(), key.role));
1039            }
1040            users.insert(id, user);
1041        }
1042        drop(idx);
1043        drop(users);
1044
1045        self.rehydrate_acl();
1046        self.rehydrate_iam();
1047    }
1048
1049    // -----------------------------------------------------------------
1050    // User management
1051    // -----------------------------------------------------------------
1052
1053    /// Create a new platform-scoped user (`tenant_id = None`).
1054    ///
1055    /// For tenant-scoped creation, use [`Self::create_user_in_tenant`].
1056    pub fn create_user(
1057        &self,
1058        username: &str,
1059        password: &str,
1060        role: Role,
1061    ) -> Result<User, AuthError> {
1062        self.create_user_in_tenant(None, username, password, role)
1063    }
1064
1065    pub fn create_user_with_control_events(
1066        &self,
1067        username: &str,
1068        password: &str,
1069        role: Role,
1070        ctx: &ControlEventCtx<'_>,
1071        ledger: &dyn ControlEventLedger,
1072        config: ControlEventConfig,
1073    ) -> Result<User, AuthError> {
1074        self.create_user_in_tenant_with_control_events(
1075            None, username, password, role, ctx, ledger, config,
1076        )
1077    }
1078
1079    /// Create a user under the given tenant scope. `tenant_id == None`
1080    /// produces a platform-wide user. `(tenant, username)` is the
1081    /// uniqueness key — the same `username` may exist independently
1082    /// under multiple tenants.
1083    pub fn create_user_in_tenant(
1084        &self,
1085        tenant_id: Option<&str>,
1086        username: &str,
1087        password: &str,
1088        role: Role,
1089    ) -> Result<User, AuthError> {
1090        if let Some(configured) = self.configured_control_events() {
1091            let ctx = default_user_lifecycle_ctx();
1092            let control = UserLifecycleControl {
1093                ctx: &ctx,
1094                ledger: configured.ledger.as_ref(),
1095                config: configured.config,
1096            };
1097            self.create_user_in_tenant_controlled(tenant_id, username, password, role, &control)
1098        } else {
1099            self.create_user_in_tenant_unaudited(tenant_id, username, password, role)
1100        }
1101    }
1102
1103    #[allow(clippy::too_many_arguments)]
1104    pub fn create_user_in_tenant_with_control_events(
1105        &self,
1106        tenant_id: Option<&str>,
1107        username: &str,
1108        password: &str,
1109        role: Role,
1110        ctx: &ControlEventCtx<'_>,
1111        ledger: &dyn ControlEventLedger,
1112        config: ControlEventConfig,
1113    ) -> Result<User, AuthError> {
1114        let control = UserLifecycleControl {
1115            ctx,
1116            ledger,
1117            config,
1118        };
1119        self.create_user_in_tenant_controlled(tenant_id, username, password, role, &control)
1120    }
1121
1122    pub fn create_admin_user(
1123        &self,
1124        username: &str,
1125        password: &str,
1126        role: Role,
1127        tenant_id: Option<&str>,
1128    ) -> Result<User, AuthError> {
1129        if let Some(configured) = self.configured_control_events() {
1130            let ctx = default_user_lifecycle_ctx();
1131            let control = UserLifecycleControl {
1132                ctx: &ctx,
1133                ledger: configured.ledger.as_ref(),
1134                config: configured.config,
1135            };
1136            self.create_user_in_tenant_controlled(tenant_id, username, password, role, &control)
1137        } else {
1138            self.create_user_in_tenant_unaudited(tenant_id, username, password, role)
1139        }
1140    }
1141
1142    fn create_user_in_tenant_unaudited(
1143        &self,
1144        tenant_id: Option<&str>,
1145        username: &str,
1146        password: &str,
1147        role: Role,
1148    ) -> Result<User, AuthError> {
1149        let id = UserId::from_parts(tenant_id, username);
1150        let mut users = self.users.write().map_err(lock_err)?;
1151        if users.contains_key(&id) {
1152            return Err(AuthError::UserExists(id.to_string()));
1153        }
1154
1155        let now = now_ms();
1156        let user = User {
1157            username: username.to_string(),
1158            tenant_id: tenant_id.map(|s| s.to_string()),
1159            password_hash: hash_password(password),
1160            scram_verifier: Some(make_scram_verifier(password)),
1161            role,
1162            api_keys: Vec::new(),
1163            created_at: now,
1164            updated_at: now,
1165            enabled: true,
1166        };
1167        users.insert(id, user.clone());
1168        drop(users); // release lock before vault I/O
1169        self.persist_to_vault();
1170        Ok(user)
1171    }
1172
1173    fn create_user_in_tenant_controlled(
1174        &self,
1175        tenant_id: Option<&str>,
1176        username: &str,
1177        password: &str,
1178        role: Role,
1179        control: &UserLifecycleControl<'_>,
1180    ) -> Result<User, AuthError> {
1181        let id = UserId::from_parts(tenant_id, username);
1182        match self.create_user_in_tenant_unaudited(tenant_id, username, password, role) {
1183            Ok(user) => {
1184                if let Err(err) = self.emit_user_lifecycle_allowed(
1185                    control,
1186                    EventKind::UserCreate,
1187                    "user.create",
1188                    &id,
1189                    user_control_fields(&id, Some(role), Some(true), true),
1190                ) {
1191                    self.rollback_create_user(&id);
1192                    return Err(err);
1193                }
1194                Ok(user)
1195            }
1196            Err(err) => {
1197                self.emit_user_lifecycle_outcome(
1198                    control,
1199                    if user_error_is_denied(&err) {
1200                        Outcome::Denied
1201                    } else {
1202                        Outcome::Error
1203                    },
1204                    EventKind::UserCreate,
1205                    "user.create",
1206                    &id,
1207                    Some(err.to_string()),
1208                    user_control_fields(&id, Some(role), Some(true), true),
1209                );
1210                Err(err)
1211            }
1212        }
1213    }
1214
1215    /// Look up a user's SCRAM verifier by full `UserId`.
1216    ///
1217    /// The wire handshake passes the tenant resolved from the session
1218    /// (or `None` for the bootstrap admin) so cross-tenant collisions
1219    /// never authenticate the wrong identity.
1220    pub fn lookup_scram_verifier(&self, id: &UserId) -> Option<crate::auth::scram::ScramVerifier> {
1221        // Synthetic platform-owner principal must never authenticate.
1222        if crate::auth::self_lock_guard::is_synthetic_principal(&id.username) {
1223            return None;
1224        }
1225        let users = self.users.read().ok()?;
1226        users.get(id).and_then(|u| u.scram_verifier.clone())
1227    }
1228
1229    /// Backwards-compatible shim for the v2 wire bootstrap path: looks
1230    /// up a user by username assuming the platform (`tenant=None`)
1231    /// scope. Use this only where the handshake hasn't yet learned the
1232    /// caller's tenant.
1233    pub fn lookup_scram_verifier_global(
1234        &self,
1235        username: &str,
1236    ) -> Option<crate::auth::scram::ScramVerifier> {
1237        self.lookup_scram_verifier(&UserId::platform(username))
1238    }
1239
1240    /// Return all users (password hashes redacted).
1241    ///
1242    /// The synthetic [`crate::auth::self_lock_guard::PLATFORM_OWNER_USERNAME`]
1243    /// principal is filtered out — it is a policy-graph anchor, not an
1244    /// operator-visible account.
1245    pub fn list_users(&self) -> Vec<User> {
1246        let users = match self.users.read() {
1247            Ok(g) => g,
1248            Err(_) => return Vec::new(),
1249        };
1250        users
1251            .values()
1252            .filter(|u| !crate::auth::self_lock_guard::is_synthetic_principal(&u.username))
1253            .map(|u| User {
1254                password_hash: String::new(), // redacted
1255                ..u.clone()
1256            })
1257            .collect()
1258    }
1259
1260    /// Return users restricted to a tenant scope.
1261    ///
1262    /// `tenant_filter`:
1263    ///   - `None` listing in `Some(None)` — only platform users
1264    ///   - `Some(Some("acme"))` — only users in tenant `acme`
1265    ///   - `None` argument — all users (admin-only callers)
1266    pub fn list_users_scoped(&self, tenant_filter: Option<Option<&str>>) -> Vec<User> {
1267        let users = match self.users.read() {
1268            Ok(g) => g,
1269            Err(_) => return Vec::new(),
1270        };
1271        users
1272            .values()
1273            .filter(|u| match tenant_filter {
1274                None => true,
1275                Some(t) => u.tenant_id.as_deref() == t,
1276            })
1277            .filter(|u| !crate::auth::self_lock_guard::is_synthetic_principal(&u.username))
1278            .map(|u| User {
1279                password_hash: String::new(), // redacted
1280                ..u.clone()
1281            })
1282            .collect()
1283    }
1284
1285    /// Resource shape used by IAM policies that govern user lifecycle
1286    /// mutations. Platform users are addressed as `user:<username>`;
1287    /// tenant users are addressed as `user:tenant/<tenant>/<username>`
1288    /// by the existing policy resource qualifier.
1289    pub fn user_lifecycle_resource(uid: &UserId) -> ResourceRef {
1290        let resource = ResourceRef::new("user", uid.username.clone());
1291        match &uid.tenant {
1292            Some(tenant) => resource.with_tenant(tenant.clone()),
1293            None => resource,
1294        }
1295    }
1296
1297    pub fn eval_context_for_principal(
1298        &self,
1299        principal: &UserId,
1300        role: Role,
1301        current_tenant: Option<String>,
1302    ) -> EvalContext {
1303        EvalContext {
1304            principal_tenant: principal.tenant.clone(),
1305            current_tenant,
1306            peer_ip: None,
1307            mfa_present: false,
1308            now_ms: now_ms(),
1309            principal_is_admin_role: role == Role::Admin,
1310            principal_is_platform_scoped: principal.tenant.is_none(),
1311        }
1312    }
1313
1314    /// Policy gate for user lifecycle mutations.
1315    ///
1316    /// This is intentionally separate from the store mutation methods:
1317    /// bootstrap and manifest loading can still seed users directly,
1318    /// while public surfaces must authorize `user:*` before calling the
1319    /// mutator. Explicit Deny wins even in `LegacyRbac` mode.
1320    pub fn check_user_lifecycle_authz(
1321        &self,
1322        actor: &UserId,
1323        actor_role: Role,
1324        action: &str,
1325        target: &UserId,
1326    ) -> bool {
1327        let resource = Self::user_lifecycle_resource(target);
1328        let ctx = self.eval_context_for_principal(actor, actor_role, target.tenant.clone());
1329        self.check_policy_authz_with_role(actor, action, &resource, &ctx, actor_role)
1330    }
1331
1332    pub fn get_user(&self, tenant_id: Option<&str>, username: &str) -> Option<User> {
1333        let id = UserId::from_parts(tenant_id, username);
1334        self.get_user_cloned(&id).map(|u| User {
1335            password_hash: String::new(),
1336            ..u
1337        })
1338    }
1339
1340    /// Delete a platform-scoped user (`tenant_id = None`) and revoke
1341    /// all of their API keys + sessions.
1342    ///
1343    /// For tenant-scoped deletes, use [`Self::delete_user_in_tenant`].
1344    pub fn delete_user(&self, username: &str) -> Result<(), AuthError> {
1345        self.delete_user_in_tenant(None, username)
1346    }
1347
1348    pub fn delete_user_with_control_events(
1349        &self,
1350        username: &str,
1351        ctx: &ControlEventCtx<'_>,
1352        ledger: &dyn ControlEventLedger,
1353        config: ControlEventConfig,
1354    ) -> Result<(), AuthError> {
1355        self.delete_user_in_tenant_with_control_events(None, username, ctx, ledger, config)
1356    }
1357
1358    /// Delete a user identified by `(tenant_id, username)` and revoke
1359    /// all of their API keys + sessions.
1360    pub fn delete_user_in_tenant(
1361        &self,
1362        tenant_id: Option<&str>,
1363        username: &str,
1364    ) -> Result<(), AuthError> {
1365        if let Some(configured) = self.configured_control_events() {
1366            let ctx = default_user_lifecycle_ctx();
1367            let control = UserLifecycleControl {
1368                ctx: &ctx,
1369                ledger: configured.ledger.as_ref(),
1370                config: configured.config,
1371            };
1372            self.delete_user_in_tenant_controlled(tenant_id, username, &control)
1373        } else {
1374            self.delete_user_in_tenant_unaudited(tenant_id, username)
1375        }
1376    }
1377
1378    pub fn delete_user_in_tenant_with_control_events(
1379        &self,
1380        tenant_id: Option<&str>,
1381        username: &str,
1382        ctx: &ControlEventCtx<'_>,
1383        ledger: &dyn ControlEventLedger,
1384        config: ControlEventConfig,
1385    ) -> Result<(), AuthError> {
1386        let control = UserLifecycleControl {
1387            ctx,
1388            ledger,
1389            config,
1390        };
1391        self.delete_user_in_tenant_controlled(tenant_id, username, &control)
1392    }
1393
1394    fn delete_user_in_tenant_unaudited(
1395        &self,
1396        tenant_id: Option<&str>,
1397        username: &str,
1398    ) -> Result<(), AuthError> {
1399        let id = UserId::from_parts(tenant_id, username);
1400        let mut users = self.users.write().map_err(lock_err)?;
1401        let user = users
1402            .remove(&id)
1403            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1404
1405        // Remove API key index entries.
1406        if let Ok(mut idx) = self.api_key_index.write() {
1407            for api_key in &user.api_keys {
1408                idx.remove(&api_key.key);
1409            }
1410        }
1411
1412        // Remove sessions belonging to this user (match on tenant+username
1413        // so we don't tear down a same-named user in another tenant).
1414        if let Ok(mut sessions) = self.sessions.write() {
1415            sessions
1416                .retain(|_, s| !(s.username == username && s.tenant_id.as_deref() == tenant_id));
1417        }
1418
1419        self.persist_to_vault();
1420        Ok(())
1421    }
1422
1423    fn delete_user_in_tenant_controlled(
1424        &self,
1425        tenant_id: Option<&str>,
1426        username: &str,
1427        control: &UserLifecycleControl<'_>,
1428    ) -> Result<(), AuthError> {
1429        let id = UserId::from_parts(tenant_id, username);
1430        let rollback = self.user_delete_rollback_snapshot(&id, tenant_id, username);
1431        match self.delete_user_in_tenant_unaudited(tenant_id, username) {
1432            Ok(()) => {
1433                if let Err(err) = self.emit_user_lifecycle_allowed(
1434                    control,
1435                    EventKind::UserDelete,
1436                    "user.delete",
1437                    &id,
1438                    user_control_fields(&id, rollback.as_ref().map(|r| r.0.role), None, false),
1439                ) {
1440                    if let Some((user, sessions)) = rollback {
1441                        self.restore_deleted_user(&id, user, sessions);
1442                    }
1443                    return Err(err);
1444                }
1445                Ok(())
1446            }
1447            Err(err) => {
1448                self.emit_user_lifecycle_outcome(
1449                    control,
1450                    if user_error_is_denied(&err) {
1451                        Outcome::Denied
1452                    } else {
1453                        Outcome::Error
1454                    },
1455                    EventKind::UserDelete,
1456                    "user.delete",
1457                    &id,
1458                    Some(err.to_string()),
1459                    user_control_fields(&id, rollback.as_ref().map(|r| r.0.role), None, false),
1460                );
1461                Err(err)
1462            }
1463        }
1464    }
1465
1466    /// Change password (requires the old password). Defaults to
1467    /// platform tenant; use [`Self::change_password_in_tenant`] for
1468    /// scoped users.
1469    pub fn change_password(
1470        &self,
1471        username: &str,
1472        old_password: &str,
1473        new_password: &str,
1474    ) -> Result<(), AuthError> {
1475        self.change_password_in_tenant(None, username, old_password, new_password)
1476    }
1477
1478    pub fn change_password_with_control_events(
1479        &self,
1480        username: &str,
1481        old_password: &str,
1482        new_password: &str,
1483        ctx: &ControlEventCtx<'_>,
1484        ledger: &dyn ControlEventLedger,
1485        config: ControlEventConfig,
1486    ) -> Result<(), AuthError> {
1487        self.change_password_in_tenant_with_control_events(
1488            None,
1489            username,
1490            old_password,
1491            new_password,
1492            ctx,
1493            ledger,
1494            config,
1495        )
1496    }
1497
1498    #[allow(clippy::too_many_arguments)]
1499    pub fn change_password_in_tenant(
1500        &self,
1501        tenant_id: Option<&str>,
1502        username: &str,
1503        old_password: &str,
1504        new_password: &str,
1505    ) -> Result<(), AuthError> {
1506        if let Some(configured) = self.configured_control_events() {
1507            let ctx = default_user_lifecycle_ctx();
1508            let control = UserLifecycleControl {
1509                ctx: &ctx,
1510                ledger: configured.ledger.as_ref(),
1511                config: configured.config,
1512            };
1513            self.change_password_in_tenant_controlled(
1514                tenant_id,
1515                username,
1516                old_password,
1517                new_password,
1518                &control,
1519            )
1520        } else {
1521            self.change_password_in_tenant_unaudited(
1522                tenant_id,
1523                username,
1524                old_password,
1525                new_password,
1526            )
1527        }
1528    }
1529
1530    #[allow(clippy::too_many_arguments)]
1531    pub fn change_password_in_tenant_with_control_events(
1532        &self,
1533        tenant_id: Option<&str>,
1534        username: &str,
1535        old_password: &str,
1536        new_password: &str,
1537        ctx: &ControlEventCtx<'_>,
1538        ledger: &dyn ControlEventLedger,
1539        config: ControlEventConfig,
1540    ) -> Result<(), AuthError> {
1541        let control = UserLifecycleControl {
1542            ctx,
1543            ledger,
1544            config,
1545        };
1546        self.change_password_in_tenant_controlled(
1547            tenant_id,
1548            username,
1549            old_password,
1550            new_password,
1551            &control,
1552        )
1553    }
1554
1555    fn change_password_in_tenant_unaudited(
1556        &self,
1557        tenant_id: Option<&str>,
1558        username: &str,
1559        old_password: &str,
1560        new_password: &str,
1561    ) -> Result<(), AuthError> {
1562        let id = UserId::from_parts(tenant_id, username);
1563        let mut users = self.users.write().map_err(lock_err)?;
1564        let user = users
1565            .get_mut(&id)
1566            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1567
1568        if !verify_password(old_password, &user.password_hash) {
1569            return Err(AuthError::InvalidCredentials);
1570        }
1571
1572        user.password_hash = hash_password(new_password);
1573        user.scram_verifier = Some(make_scram_verifier(new_password));
1574        user.updated_at = now_ms();
1575        drop(users); // release lock before vault I/O
1576        self.persist_to_vault();
1577        Ok(())
1578    }
1579
1580    fn change_password_in_tenant_controlled(
1581        &self,
1582        tenant_id: Option<&str>,
1583        username: &str,
1584        old_password: &str,
1585        new_password: &str,
1586        control: &UserLifecycleControl<'_>,
1587    ) -> Result<(), AuthError> {
1588        let id = UserId::from_parts(tenant_id, username);
1589        let previous = self.get_user_cloned(&id);
1590        match self.change_password_in_tenant_unaudited(
1591            tenant_id,
1592            username,
1593            old_password,
1594            new_password,
1595        ) {
1596            Ok(()) => {
1597                if let Err(err) = self.emit_user_lifecycle_allowed(
1598                    control,
1599                    EventKind::UserUpdate,
1600                    "user.update",
1601                    &id,
1602                    user_control_fields(
1603                        &id,
1604                        previous.as_ref().map(|u| u.role),
1605                        previous.as_ref().map(|u| u.enabled),
1606                        true,
1607                    ),
1608                ) {
1609                    self.restore_user_snapshot(&id, previous);
1610                    return Err(err);
1611                }
1612                Ok(())
1613            }
1614            Err(err) => {
1615                self.emit_user_lifecycle_outcome(
1616                    control,
1617                    if user_error_is_denied(&err) {
1618                        Outcome::Denied
1619                    } else {
1620                        Outcome::Error
1621                    },
1622                    EventKind::UserUpdate,
1623                    "user.update",
1624                    &id,
1625                    Some(err.to_string()),
1626                    user_control_fields(
1627                        &id,
1628                        previous.as_ref().map(|u| u.role),
1629                        previous.as_ref().map(|u| u.enabled),
1630                        true,
1631                    ),
1632                );
1633                Err(err)
1634            }
1635        }
1636    }
1637
1638    /// Change a user's role (admin-only operation). Defaults to platform
1639    /// tenant; use [`Self::change_role_in_tenant`] for scoped users.
1640    pub fn change_role(&self, username: &str, new_role: Role) -> Result<(), AuthError> {
1641        self.change_role_in_tenant(None, username, new_role)
1642    }
1643
1644    pub fn change_role_in_tenant(
1645        &self,
1646        tenant_id: Option<&str>,
1647        username: &str,
1648        new_role: Role,
1649    ) -> Result<(), AuthError> {
1650        if let Some(configured) = self.configured_control_events() {
1651            let ctx = default_user_lifecycle_ctx();
1652            let control = UserLifecycleControl {
1653                ctx: &ctx,
1654                ledger: configured.ledger.as_ref(),
1655                config: configured.config,
1656            };
1657            self.change_role_in_tenant_controlled(tenant_id, username, new_role, &control)
1658        } else {
1659            self.change_role_in_tenant_unaudited(tenant_id, username, new_role)
1660        }
1661    }
1662
1663    pub fn change_role_in_tenant_with_control_events(
1664        &self,
1665        tenant_id: Option<&str>,
1666        username: &str,
1667        new_role: Role,
1668        ctx: &ControlEventCtx<'_>,
1669        ledger: &dyn ControlEventLedger,
1670        config: ControlEventConfig,
1671    ) -> Result<(), AuthError> {
1672        let control = UserLifecycleControl {
1673            ctx,
1674            ledger,
1675            config,
1676        };
1677        self.change_role_in_tenant_controlled(tenant_id, username, new_role, &control)
1678    }
1679
1680    fn change_role_in_tenant_unaudited(
1681        &self,
1682        tenant_id: Option<&str>,
1683        username: &str,
1684        new_role: Role,
1685    ) -> Result<(), AuthError> {
1686        let id = UserId::from_parts(tenant_id, username);
1687        let mut users = self.users.write().map_err(lock_err)?;
1688        let user = users
1689            .get_mut(&id)
1690            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1691
1692        let prior_role = user.role;
1693        user.role = new_role;
1694        user.updated_at = now_ms();
1695
1696        // Issue #205 — promotion to Admin is an operator-grade event:
1697        // the new role grants destructive capabilities (DROP, ALTER,
1698        // GRANT) that an operator must observe out-of-band even when
1699        // the auth path itself is healthy.
1700        if new_role == Role::Admin && prior_role != Role::Admin {
1701            crate::telemetry::operator_event::OperatorEvent::AdminCapabilityGranted {
1702                granted_to: id.to_string(),
1703                capability: "Role::Admin".to_string(),
1704                granted_by: "auth_store::change_role".to_string(),
1705            }
1706            .emit_global();
1707        }
1708
1709        // Downgrade any API keys that now exceed the user's role.
1710        for key in &mut user.api_keys {
1711            if key.role > new_role {
1712                key.role = new_role;
1713            }
1714        }
1715
1716        // Update the api_key_index as well.
1717        if let Ok(mut idx) = self.api_key_index.write() {
1718            for key in &user.api_keys {
1719                if let Some(entry) = idx.get_mut(&key.key) {
1720                    entry.1 = key.role;
1721                }
1722            }
1723        }
1724
1725        self.persist_to_vault();
1726        Ok(())
1727    }
1728
1729    fn change_role_in_tenant_controlled(
1730        &self,
1731        tenant_id: Option<&str>,
1732        username: &str,
1733        new_role: Role,
1734        control: &UserLifecycleControl<'_>,
1735    ) -> Result<(), AuthError> {
1736        let id = UserId::from_parts(tenant_id, username);
1737        let previous = self.get_user_cloned(&id);
1738        match self.change_role_in_tenant_unaudited(tenant_id, username, new_role) {
1739            Ok(()) => {
1740                if let Err(err) = self.emit_user_lifecycle_allowed(
1741                    control,
1742                    EventKind::UserUpdate,
1743                    "user.update",
1744                    &id,
1745                    user_control_fields(
1746                        &id,
1747                        Some(new_role),
1748                        previous.as_ref().map(|u| u.enabled),
1749                        false,
1750                    ),
1751                ) {
1752                    self.restore_user_snapshot(&id, previous);
1753                    return Err(err);
1754                }
1755                Ok(())
1756            }
1757            Err(err) => {
1758                self.emit_user_lifecycle_outcome(
1759                    control,
1760                    if user_error_is_denied(&err) {
1761                        Outcome::Denied
1762                    } else {
1763                        Outcome::Error
1764                    },
1765                    EventKind::UserUpdate,
1766                    "user.update",
1767                    &id,
1768                    Some(err.to_string()),
1769                    user_control_fields(
1770                        &id,
1771                        Some(new_role),
1772                        previous.as_ref().map(|u| u.enabled),
1773                        false,
1774                    ),
1775                );
1776                Err(err)
1777            }
1778        }
1779    }
1780
1781    // -----------------------------------------------------------------
1782    // Authentication (login)
1783    // -----------------------------------------------------------------
1784
1785    /// Verify credentials for a platform-tenant user (`tenant_id = None`)
1786    /// and create a session. For tenant-scoped login use
1787    /// [`Self::authenticate_in_tenant`].
1788    ///
1789    /// When a keypair is available (certificate-based seal), session tokens
1790    /// are signed with the master secret so the server can verify they were
1791    /// genuinely issued by this vault instance.
1792    pub fn authenticate(&self, username: &str, password: &str) -> Result<Session, AuthError> {
1793        self.authenticate_in_tenant(None, username, password)
1794    }
1795
1796    /// Verify credentials for `(tenant_id, username, password)` and
1797    /// create a session. Tenant-aware: `alice@acme` and `alice@globex`
1798    /// authenticate independently.
1799    pub fn authenticate_in_tenant(
1800        &self,
1801        tenant_id: Option<&str>,
1802        username: &str,
1803        password: &str,
1804    ) -> Result<Session, AuthError> {
1805        // The synthetic platform-owner principal exists only as a
1806        // policy-graph anchor (see `crate::auth::self_lock_guard`); it
1807        // must never authenticate via any transport.
1808        if crate::auth::self_lock_guard::is_synthetic_principal(username) {
1809            return Err(AuthError::InvalidCredentials);
1810        }
1811        let id = UserId::from_parts(tenant_id, username);
1812        let users = self.users.read().map_err(lock_err)?;
1813        let user = users.get(&id).ok_or(AuthError::InvalidCredentials)?;
1814
1815        if !user.enabled {
1816            return Err(AuthError::InvalidCredentials);
1817        }
1818
1819        if !verify_password(password, &user.password_hash) {
1820            return Err(AuthError::InvalidCredentials);
1821        }
1822
1823        // Generate token: signed if keypair is available, random otherwise.
1824        let token = match self.keypair.read().ok().and_then(|g| {
1825            g.as_ref().map(|kp| {
1826                let token_id = random_hex(16);
1827                let sig = kp.sign(format!("session:{}", token_id).as_bytes());
1828                // Take first 16 bytes of signature for compact token.
1829                format!("rs_{}{}", token_id, hex::encode(&sig[..16]))
1830            })
1831        }) {
1832            Some(signed_token) => signed_token,
1833            None => generate_session_token(),
1834        };
1835
1836        let now = now_ms();
1837        let session = Session {
1838            token,
1839            username: username.to_string(),
1840            tenant_id: user.tenant_id.clone(),
1841            role: user.role,
1842            created_at: now,
1843            expires_at: now + (self.config.session_ttl_secs as u128 * 1000),
1844        };
1845
1846        drop(users); // release read lock before acquiring write
1847
1848        let mut sessions = self.sessions.write().map_err(lock_err)?;
1849        sessions.insert(session.token.clone(), session.clone());
1850        Ok(session)
1851    }
1852
1853    // -----------------------------------------------------------------
1854    // Token validation
1855    // -----------------------------------------------------------------
1856
1857    /// Validate a token (session or API key).
1858    ///
1859    /// Returns `(username, role)` if valid, `None` otherwise. Tenant
1860    /// scope is dropped here for compatibility with the bulk of the
1861    /// existing caller surface (routing, gRPC control, redwire). Use
1862    /// [`Self::validate_token_full`] when the caller needs the
1863    /// resolved `UserId` (e.g. to pin `CURRENT_TENANT()`).
1864    pub fn validate_token(&self, token: &str) -> Option<(String, Role)> {
1865        self.validate_token_full(token)
1866            .map(|(id, role)| (id.username, role))
1867    }
1868
1869    /// Tenant-aware token validation. Returns the resolved `UserId`
1870    /// (which carries the tenant) and the granted `Role`.
1871    pub fn validate_token_full(&self, token: &str) -> Option<(UserId, Role)> {
1872        // Try session tokens first.
1873        if token.starts_with("rs_") {
1874            if let Ok(sessions) = self.sessions.read() {
1875                if let Some(session) = sessions.get(token) {
1876                    let now = now_ms();
1877                    if now < session.expires_at {
1878                        return Some((
1879                            UserId::from_parts(session.tenant_id.as_deref(), &session.username),
1880                            session.role,
1881                        ));
1882                    }
1883                }
1884            }
1885            return None;
1886        }
1887
1888        // Try API keys.
1889        if token.starts_with("rk_") {
1890            if let Ok(idx) = self.api_key_index.read() {
1891                return idx.get(token).cloned();
1892            }
1893            return None;
1894        }
1895
1896        None
1897    }
1898
1899    // -----------------------------------------------------------------
1900    // API Key management
1901    // -----------------------------------------------------------------
1902
1903    /// Create a persistent API key for a platform-tenant user.
1904    ///
1905    /// For tenant-scoped users use [`Self::create_api_key_in_tenant`].
1906    pub fn create_api_key(
1907        &self,
1908        username: &str,
1909        name: &str,
1910        role: Role,
1911    ) -> Result<ApiKey, AuthError> {
1912        self.create_api_key_in_tenant(None, username, name, role)
1913    }
1914
1915    pub fn create_api_key_with_control_events(
1916        &self,
1917        username: &str,
1918        name: &str,
1919        role: Role,
1920        ctx: &ControlEventCtx<'_>,
1921        ledger: &dyn ControlEventLedger,
1922        config: ControlEventConfig,
1923    ) -> Result<ApiKey, AuthError> {
1924        self.create_api_key_in_tenant_with_control_events(
1925            None, username, name, role, ctx, ledger, config,
1926        )
1927    }
1928
1929    pub fn create_api_key_in_tenant(
1930        &self,
1931        tenant_id: Option<&str>,
1932        username: &str,
1933        name: &str,
1934        role: Role,
1935    ) -> Result<ApiKey, AuthError> {
1936        if let Some(configured) = self.configured_control_events() {
1937            let ctx = default_user_lifecycle_ctx();
1938            let control = UserLifecycleControl {
1939                ctx: &ctx,
1940                ledger: configured.ledger.as_ref(),
1941                config: configured.config,
1942            };
1943            self.create_api_key_in_tenant_controlled(tenant_id, username, name, role, &control)
1944        } else {
1945            self.create_api_key_in_tenant_unaudited(tenant_id, username, name, role)
1946        }
1947    }
1948
1949    #[allow(clippy::too_many_arguments)]
1950    pub fn create_api_key_in_tenant_with_control_events(
1951        &self,
1952        tenant_id: Option<&str>,
1953        username: &str,
1954        name: &str,
1955        role: Role,
1956        ctx: &ControlEventCtx<'_>,
1957        ledger: &dyn ControlEventLedger,
1958        config: ControlEventConfig,
1959    ) -> Result<ApiKey, AuthError> {
1960        let control = UserLifecycleControl {
1961            ctx,
1962            ledger,
1963            config,
1964        };
1965        self.create_api_key_in_tenant_controlled(tenant_id, username, name, role, &control)
1966    }
1967
1968    fn create_api_key_in_tenant_unaudited(
1969        &self,
1970        tenant_id: Option<&str>,
1971        username: &str,
1972        name: &str,
1973        role: Role,
1974    ) -> Result<ApiKey, AuthError> {
1975        let id = UserId::from_parts(tenant_id, username);
1976        let mut users = self.users.write().map_err(lock_err)?;
1977        let user = users
1978            .get_mut(&id)
1979            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1980
1981        // The key's role cannot exceed the user's role.
1982        if role > user.role {
1983            return Err(AuthError::RoleExceeded {
1984                requested: role,
1985                ceiling: user.role,
1986            });
1987        }
1988
1989        let api_key = ApiKey {
1990            key: generate_api_key(),
1991            name: name.to_string(),
1992            role,
1993            created_at: now_ms(),
1994        };
1995
1996        user.api_keys.push(api_key.clone());
1997        user.updated_at = now_ms();
1998
1999        // Update the index.
2000        if let Ok(mut idx) = self.api_key_index.write() {
2001            idx.insert(api_key.key.clone(), (id.clone(), api_key.role));
2002        }
2003
2004        drop(users); // release lock before vault I/O
2005        self.persist_to_vault();
2006        Ok(api_key)
2007    }
2008
2009    fn create_api_key_in_tenant_controlled(
2010        &self,
2011        tenant_id: Option<&str>,
2012        username: &str,
2013        name: &str,
2014        role: Role,
2015        control: &UserLifecycleControl<'_>,
2016    ) -> Result<ApiKey, AuthError> {
2017        let id = UserId::from_parts(tenant_id, username);
2018        match self.create_api_key_in_tenant_unaudited(tenant_id, username, name, role) {
2019            Ok(api_key) => {
2020                let key_id = api_key_id(&api_key.key);
2021                if let Err(err) = self.emit_api_key_allowed(
2022                    control,
2023                    EventKind::ApiKeyCreate,
2024                    "apikey.create",
2025                    &key_id,
2026                    api_key_control_fields(&id, role, &key_id),
2027                ) {
2028                    self.rollback_create_api_key(&id, &api_key.key);
2029                    return Err(err);
2030                }
2031                Ok(api_key)
2032            }
2033            Err(err) => {
2034                self.emit_user_lifecycle_outcome(
2035                    control,
2036                    if user_error_is_denied(&err) {
2037                        Outcome::Denied
2038                    } else {
2039                        Outcome::Error
2040                    },
2041                    EventKind::ApiKeyCreate,
2042                    "apikey.create",
2043                    &id,
2044                    Some(err.to_string()),
2045                    user_control_fields(&id, Some(role), None, false),
2046                );
2047                Err(err)
2048            }
2049        }
2050    }
2051
2052    /// Revoke (delete) an API key.
2053    pub fn revoke_api_key(&self, key: &str) -> Result<(), AuthError> {
2054        if let Some(configured) = self.configured_control_events() {
2055            let ctx = default_user_lifecycle_ctx();
2056            let control = UserLifecycleControl {
2057                ctx: &ctx,
2058                ledger: configured.ledger.as_ref(),
2059                config: configured.config,
2060            };
2061            self.revoke_api_key_controlled(key, &control)
2062        } else {
2063            self.revoke_api_key_unaudited(key)
2064        }
2065    }
2066
2067    pub fn revoke_api_key_with_control_events(
2068        &self,
2069        key: &str,
2070        ctx: &ControlEventCtx<'_>,
2071        ledger: &dyn ControlEventLedger,
2072        config: ControlEventConfig,
2073    ) -> Result<(), AuthError> {
2074        let control = UserLifecycleControl {
2075            ctx,
2076            ledger,
2077            config,
2078        };
2079        self.revoke_api_key_controlled(key, &control)
2080    }
2081
2082    fn revoke_api_key_unaudited(&self, key: &str) -> Result<(), AuthError> {
2083        let mut users = self.users.write().map_err(lock_err)?;
2084
2085        // Find which user owns this key (look up by the api_key_index
2086        // first; fall back to a scan for legacy state restored before
2087        // the index was reseeded).
2088        let owner_id: UserId = {
2089            if let Ok(idx) = self.api_key_index.read() {
2090                if let Some((id, _)) = idx.get(key) {
2091                    id.clone()
2092                } else {
2093                    return Err(AuthError::KeyNotFound(key.to_string()));
2094                }
2095            } else {
2096                let owner = users
2097                    .iter()
2098                    .find(|(_, u)| u.api_keys.iter().any(|k| k.key == key));
2099                match owner {
2100                    Some((id, _)) => id.clone(),
2101                    None => return Err(AuthError::KeyNotFound(key.to_string())),
2102                }
2103            }
2104        };
2105
2106        let user = users
2107            .get_mut(&owner_id)
2108            .ok_or_else(|| AuthError::KeyNotFound(key.to_string()))?;
2109        user.api_keys.retain(|k| k.key != key);
2110        user.updated_at = now_ms();
2111
2112        // Remove from index.
2113        if let Ok(mut idx) = self.api_key_index.write() {
2114            idx.remove(key);
2115        }
2116
2117        self.persist_to_vault();
2118        Ok(())
2119    }
2120
2121    fn revoke_api_key_controlled(
2122        &self,
2123        key: &str,
2124        control: &UserLifecycleControl<'_>,
2125    ) -> Result<(), AuthError> {
2126        let key_id = api_key_id(key);
2127        let rollback = self.api_key_rollback_snapshot(key);
2128        let id = rollback
2129            .as_ref()
2130            .map(|r| r.0.clone())
2131            .unwrap_or_else(|| UserId::from_parts(None, ""));
2132        let role = rollback.as_ref().map(|r| r.1.role).unwrap_or(Role::Read);
2133        match self.revoke_api_key_unaudited(key) {
2134            Ok(()) => {
2135                if let Err(err) = self.emit_api_key_allowed(
2136                    control,
2137                    EventKind::ApiKeyRevoke,
2138                    "apikey.revoke",
2139                    &key_id,
2140                    api_key_control_fields(&id, role, &key_id),
2141                ) {
2142                    if let Some((owner, api_key)) = rollback {
2143                        self.restore_api_key(&owner, api_key);
2144                    }
2145                    return Err(err);
2146                }
2147                Ok(())
2148            }
2149            Err(err) => {
2150                self.emit_user_lifecycle_outcome(
2151                    control,
2152                    if user_error_is_denied(&err) {
2153                        Outcome::Denied
2154                    } else {
2155                        Outcome::Error
2156                    },
2157                    EventKind::ApiKeyRevoke,
2158                    "apikey.revoke",
2159                    &id,
2160                    Some(err.to_string()),
2161                    api_key_control_fields(&id, role, &key_id),
2162                );
2163                Err(err)
2164            }
2165        }
2166    }
2167
2168    // -----------------------------------------------------------------
2169    // Session management
2170    // -----------------------------------------------------------------
2171
2172    /// Revoke a session token.
2173    pub fn revoke_session(&self, token: &str) {
2174        if let Ok(mut sessions) = self.sessions.write() {
2175            sessions.remove(token);
2176        }
2177    }
2178
2179    /// Purge expired sessions (housekeeping).
2180    pub fn purge_expired_sessions(&self) -> usize {
2181        let now = now_ms();
2182        if let Ok(mut sessions) = self.sessions.write() {
2183            let before = sessions.len();
2184            sessions.retain(|_, s| s.expires_at > now);
2185            return before - sessions.len();
2186        }
2187        0
2188    }
2189
2190    // -----------------------------------------------------------------
2191    // Granular RBAC — GRANT / REVOKE
2192    //
2193    // The privilege engine lives in `super::privileges`. These helpers
2194    // are the AuthStore facade: they keep an in-memory map of grants per
2195    // user (plus a `public_grants` list), persist additions/removals to
2196    // the existing `vault_kv` store, and rebuild the per-user
2197    // `PermissionCache` so the hot path stays O(1).
2198    //
2199    // Persistence design: rather than extend the snapshot/restore
2200    // pipeline (Agent #2's territory) we serialise grants and account
2201    // attributes to the vault KV store. That gives us atomic write +
2202    // encrypted-at-rest semantics for free without touching the
2203    // existing USER/KEY/KV serializer paths. On restart `rehydrate_acl`
2204    // reads these KV entries back into the in-memory maps.
2205    // -----------------------------------------------------------------
2206
2207    /// Persist a grant. Returns `Forbidden` when the granting user is
2208    /// not Admin or attempts a cross-tenant grant.
2209    pub fn grant(
2210        &self,
2211        granter: &UserId,
2212        granter_role: Role,
2213        principal: GrantPrincipal,
2214        resource: Resource,
2215        actions: Vec<Action>,
2216        with_grant_option: bool,
2217        tenant: Option<String>,
2218    ) -> Result<(), AuthError> {
2219        if granter_role != Role::Admin {
2220            return Err(AuthError::Forbidden(format!(
2221                "GRANT requires Admin role; granter `{}` has `{:?}`",
2222                granter, granter_role
2223            )));
2224        }
2225
2226        // Cross-tenant guard: a tenant-scoped admin cannot mint grants
2227        // outside their tenant. Platform admin (tenant=None) may grant
2228        // anywhere.
2229        if granter.tenant.is_some() && granter.tenant != tenant {
2230            return Err(AuthError::Forbidden(format!(
2231                "cross-tenant GRANT denied: granter tenant `{:?}` != grant tenant `{:?}`",
2232                granter.tenant, tenant
2233            )));
2234        }
2235
2236        let mut actions_set = std::collections::BTreeSet::new();
2237        for a in actions {
2238            actions_set.insert(a);
2239        }
2240        let g = Grant {
2241            principal: principal.clone(),
2242            resource,
2243            actions: actions_set,
2244            with_grant_option,
2245            granted_by: granter.to_string(),
2246            granted_at: now_ms(),
2247            tenant,
2248            columns: None,
2249        };
2250
2251        match &principal {
2252            GrantPrincipal::User(uid) => {
2253                self.grants
2254                    .write()
2255                    .unwrap_or_else(|e| e.into_inner())
2256                    .entry(uid.clone())
2257                    .or_default()
2258                    .push(g.clone());
2259                self.invalidate_permission_cache(Some(uid));
2260            }
2261            GrantPrincipal::Public => {
2262                self.public_grants
2263                    .write()
2264                    .unwrap_or_else(|e| e.into_inner())
2265                    .push(g.clone());
2266                self.invalidate_permission_cache(None);
2267            }
2268            GrantPrincipal::Group(_) => {
2269                return Err(AuthError::Forbidden(
2270                    "GROUP principals are not yet supported; use a USER or PUBLIC".to_string(),
2271                ));
2272            }
2273        }
2274
2275        // Issue #119: a fresh grant changes the visible-collections set
2276        // for `(tenant, role)` callers under the same tenant. Drop those
2277        // cache entries so the next AI command sees the new SELECT
2278        // privilege immediately.
2279        self.invalidate_visible_collections_for_tenant(g.tenant.as_deref());
2280
2281        self.persist_acl_to_kv();
2282        Ok(())
2283    }
2284
2285    /// Drop matching grants from a principal. Returns the number of
2286    /// grants removed.
2287    pub fn revoke(
2288        &self,
2289        granter_role: Role,
2290        principal: &GrantPrincipal,
2291        resource: &Resource,
2292        actions: &[Action],
2293    ) -> Result<usize, AuthError> {
2294        if granter_role != Role::Admin {
2295            return Err(AuthError::Forbidden(format!(
2296                "REVOKE requires Admin role; granter has `{:?}`",
2297                granter_role
2298            )));
2299        }
2300
2301        let removed = match principal {
2302            GrantPrincipal::User(uid) => {
2303                let mut g = self.grants.write().unwrap_or_else(|e| e.into_inner());
2304                let before = g.get(uid).map(|v| v.len()).unwrap_or(0);
2305                if let Some(list) = g.get_mut(uid) {
2306                    list.retain(|gr| {
2307                        !(gr.resource == *resource
2308                            && (actions.iter().any(|a| gr.actions.contains(a))
2309                                || (gr.actions.contains(&Action::All) && !actions.is_empty())))
2310                    });
2311                }
2312                let after = g.get(uid).map(|v| v.len()).unwrap_or(0);
2313                drop(g);
2314                self.invalidate_permission_cache(Some(uid));
2315                before - after
2316            }
2317            GrantPrincipal::Public => {
2318                let mut p = self
2319                    .public_grants
2320                    .write()
2321                    .unwrap_or_else(|e| e.into_inner());
2322                let before = p.len();
2323                p.retain(|gr| {
2324                    !(gr.resource == *resource
2325                        && (actions.iter().any(|a| gr.actions.contains(a))
2326                            || (gr.actions.contains(&Action::All) && !actions.is_empty())))
2327                });
2328                let after = p.len();
2329                drop(p);
2330                self.invalidate_permission_cache(None);
2331                before - after
2332            }
2333            GrantPrincipal::Group(_) => 0,
2334        };
2335
2336        if removed > 0 {
2337            // Issue #119: REVOKE may shrink the visible-collections set
2338            // for any `(tenant, role)` slot. We don't know the exact
2339            // tenant when the principal is `Public`, so a `Public`
2340            // revoke clears the whole cache; user revokes scope to the
2341            // user's tenant.
2342            match principal {
2343                GrantPrincipal::User(uid) => {
2344                    self.invalidate_visible_collections_for_tenant(uid.tenant.as_deref());
2345                }
2346                GrantPrincipal::Public | GrantPrincipal::Group(_) => {
2347                    self.invalidate_visible_collections_cache();
2348                }
2349            }
2350            self.persist_acl_to_kv();
2351        }
2352        Ok(removed)
2353    }
2354
2355    /// Compute the set of collection ids a given `(tenant, role)`
2356    /// scope can read, consulting the explicit grant table. The result
2357    /// is cached for `super::scope_cache::DEFAULT_TTL` (60s) and
2358    /// invalidated on every GRANT/REVOKE/policy/collection mutation
2359    /// that could change the answer.
2360    ///
2361    /// `all_collections` is the full list of collection ids known to
2362    /// the storage layer. The runtime hands it in so this module stays
2363    /// decoupled from the storage crate. Each collection passes through
2364    /// `check_grant(SELECT)` under a synthetic `(principal, role,
2365    /// tenant)` view. The cache key includes principal because direct
2366    /// grants can differ between users that share the same tenant and
2367    /// role.
2368    pub fn visible_collections_for_scope(
2369        &self,
2370        tenant: Option<&str>,
2371        role: Role,
2372        principal: &str,
2373        all_collections: &[String],
2374    ) -> std::collections::HashSet<String> {
2375        let key = super::scope_cache::ScopeKey::new(tenant, principal, role);
2376        if let Some(hit) = self.visible_collections_cache.get(&key) {
2377            return hit;
2378        }
2379        // Slow path: walk every collection through `check_grant`. We
2380        // build the AuthzContext once, then reuse it per resource.
2381        let ctx = AuthzContext {
2382            principal,
2383            effective_role: role,
2384            tenant,
2385        };
2386        let mut visible = std::collections::HashSet::new();
2387        for collection in all_collections {
2388            let resource = Resource::table_from_name(collection);
2389            if self.check_grant(&ctx, Action::Select, &resource).is_ok() {
2390                visible.insert(collection.clone());
2391            }
2392        }
2393        self.visible_collections_cache.insert(key, visible.clone());
2394        visible
2395    }
2396
2397    /// Stats probe required by issue #119 — exposes hit/miss counts and
2398    /// invalidations for the visible-collections cache so metrics
2399    /// pipelines can compute a hit rate.
2400    pub fn auth_cache_stats(&self) -> super::scope_cache::AuthCacheStats {
2401        self.visible_collections_cache.stats()
2402    }
2403
2404    /// Drop every cached `(tenant, role)` entry. Called from CREATE
2405    /// POLICY / DROP POLICY / DROP COLLECTION paths where the affected
2406    /// tenant set is unknown.
2407    pub fn invalidate_visible_collections_cache(&self) {
2408        self.visible_collections_cache.invalidate_all();
2409    }
2410
2411    /// Drop cached entries for one tenant. Called from GRANT / REVOKE
2412    /// where the principal's tenant is known.
2413    pub fn invalidate_visible_collections_for_tenant(&self, tenant: Option<&str>) {
2414        self.visible_collections_cache.invalidate_tenant(tenant);
2415    }
2416
2417    /// Snapshot of every grant the principal effectively has, including
2418    /// `Public` grants. Audit / introspection helper.
2419    pub fn effective_grants(&self, uid: &UserId) -> Vec<Grant> {
2420        let mut out = Vec::new();
2421        if let Ok(g) = self.grants.read() {
2422            if let Some(list) = g.get(uid) {
2423                out.extend(list.iter().cloned());
2424            }
2425        }
2426        if let Ok(p) = self.public_grants.read() {
2427            out.extend(p.iter().cloned());
2428        }
2429        out
2430    }
2431
2432    /// Run a privilege check using the in-memory grant tables. Returns
2433    /// `Ok(())` on allow, `Err(AuthzError)` on deny.
2434    pub fn check_grant(
2435        &self,
2436        ctx: &AuthzContext<'_>,
2437        action: Action,
2438        resource: &Resource,
2439    ) -> Result<(), AuthzError> {
2440        if ctx.effective_role == Role::Admin {
2441            return Ok(());
2442        }
2443
2444        let uid = UserId::from_parts(ctx.tenant, ctx.principal);
2445
2446        // Fast path: per-user pre-resolved cache.
2447        if let Ok(cache) = self.permission_cache.read() {
2448            if let Some(pc) = cache.get(&uid) {
2449                if pc.allows(resource, action) {
2450                    return Ok(());
2451                }
2452            }
2453        }
2454
2455        // Slow path: linear scan + rebuild cache as a side-effect.
2456        let user_grants = self
2457            .grants
2458            .read()
2459            .ok()
2460            .and_then(|g| g.get(&uid).cloned())
2461            .unwrap_or_default();
2462        let any_user_grants = self
2463            .grants
2464            .read()
2465            .ok()
2466            .map(|g| g.values().any(|list| !list.is_empty()))
2467            .unwrap_or(false);
2468        let public_grants = self
2469            .public_grants
2470            .read()
2471            .ok()
2472            .map(|p| p.clone())
2473            .unwrap_or_default();
2474        if user_grants.is_empty() && public_grants.is_empty() && any_user_grants {
2475            return Err(AuthzError::PermissionDenied {
2476                action,
2477                resource: resource.clone(),
2478                principal: ctx.principal.to_string(),
2479            });
2480        }
2481        let view = GrantsView {
2482            user_grants: &user_grants,
2483            public_grants: &public_grants,
2484        };
2485        let result = check_grant(ctx, action, resource, &view);
2486
2487        if result.is_ok() {
2488            let pc = PermissionCache::build(&user_grants, &public_grants);
2489            if let Ok(mut cache) = self.permission_cache.write() {
2490                cache.insert(uid, pc);
2491            }
2492        }
2493        result
2494    }
2495
2496    // -----------------------------------------------------------------
2497    // ALTER USER attributes (VALID UNTIL, CONNECTION LIMIT, etc.)
2498    // -----------------------------------------------------------------
2499
2500    /// Replace the attribute record for `uid`.
2501    pub fn set_user_attributes(
2502        &self,
2503        uid: &UserId,
2504        attrs: UserAttributes,
2505    ) -> Result<(), AuthError> {
2506        let users = self.users.read().map_err(lock_err)?;
2507        if !users.contains_key(uid) {
2508            return Err(AuthError::UserNotFound(uid.to_string()));
2509        }
2510        drop(users);
2511
2512        self.user_attributes
2513            .write()
2514            .unwrap_or_else(|e| e.into_inner())
2515            .insert(uid.clone(), attrs);
2516        self.invalidate_iam_cache(Some(uid));
2517        self.persist_acl_to_kv();
2518        Ok(())
2519    }
2520
2521    /// Read the attributes for `uid`. Returns `Default::default()` for
2522    /// users that have never been altered.
2523    pub fn user_attributes(&self, uid: &UserId) -> UserAttributes {
2524        self.user_attributes
2525            .read()
2526            .ok()
2527            .and_then(|m| m.get(uid).cloned())
2528            .unwrap_or_default()
2529    }
2530
2531    pub fn add_user_to_group(&self, uid: &UserId, group: &str) -> Result<(), AuthError> {
2532        if group.trim().is_empty() {
2533            return Err(AuthError::Forbidden("group name cannot be empty".into()));
2534        }
2535        let mut attrs = self.user_attributes(uid);
2536        if !attrs.groups.iter().any(|g| g == group) {
2537            attrs.groups.push(group.to_string());
2538            attrs.groups.sort();
2539        }
2540        self.set_user_attributes(uid, attrs)
2541    }
2542
2543    pub fn remove_user_from_group(&self, uid: &UserId, group: &str) -> Result<(), AuthError> {
2544        let mut attrs = self.user_attributes(uid);
2545        attrs.groups.retain(|g| g != group);
2546        self.set_user_attributes(uid, attrs)
2547    }
2548
2549    /// Toggle `User.enabled` without rotating credentials.
2550    pub fn set_user_enabled(&self, uid: &UserId, enabled: bool) -> Result<(), AuthError> {
2551        if let Some(configured) = self.configured_control_events() {
2552            let ctx = default_user_lifecycle_ctx();
2553            let control = UserLifecycleControl {
2554                ctx: &ctx,
2555                ledger: configured.ledger.as_ref(),
2556                config: configured.config,
2557            };
2558            self.set_user_enabled_controlled(uid, enabled, &control)
2559        } else {
2560            self.set_user_enabled_unaudited(uid, enabled)
2561        }
2562    }
2563
2564    pub fn disable_user(
2565        &self,
2566        username: &str,
2567        ctx: &ControlEventCtx<'_>,
2568        ledger: &dyn ControlEventLedger,
2569        config: ControlEventConfig,
2570    ) -> Result<(), AuthError> {
2571        self.disable_user_in_tenant(None, username, ctx, ledger, config)
2572    }
2573
2574    pub fn disable_user_in_tenant(
2575        &self,
2576        tenant_id: Option<&str>,
2577        username: &str,
2578        ctx: &ControlEventCtx<'_>,
2579        ledger: &dyn ControlEventLedger,
2580        config: ControlEventConfig,
2581    ) -> Result<(), AuthError> {
2582        let uid = UserId::from_parts(tenant_id, username);
2583        let control = UserLifecycleControl {
2584            ctx,
2585            ledger,
2586            config,
2587        };
2588        self.set_user_enabled_controlled(&uid, false, &control)
2589    }
2590
2591    fn set_user_enabled_unaudited(&self, uid: &UserId, enabled: bool) -> Result<(), AuthError> {
2592        let mut users = self.users.write().map_err(lock_err)?;
2593        let user = users
2594            .get_mut(uid)
2595            .ok_or_else(|| AuthError::UserNotFound(uid.to_string()))?;
2596        user.enabled = enabled;
2597        user.updated_at = now_ms();
2598        drop(users);
2599        self.persist_to_vault();
2600        Ok(())
2601    }
2602
2603    fn set_user_enabled_controlled(
2604        &self,
2605        uid: &UserId,
2606        enabled: bool,
2607        control: &UserLifecycleControl<'_>,
2608    ) -> Result<(), AuthError> {
2609        let previous = self.get_user_cloned(uid);
2610        let kind = if enabled {
2611            EventKind::UserUpdate
2612        } else {
2613            EventKind::UserDisable
2614        };
2615        let action = if enabled {
2616            "user.update"
2617        } else {
2618            "user.disable"
2619        };
2620        match self.set_user_enabled_unaudited(uid, enabled) {
2621            Ok(()) => {
2622                if let Err(err) = self.emit_user_lifecycle_allowed(
2623                    control,
2624                    kind,
2625                    action,
2626                    uid,
2627                    user_control_fields(
2628                        uid,
2629                        previous.as_ref().map(|u| u.role),
2630                        Some(enabled),
2631                        false,
2632                    ),
2633                ) {
2634                    self.restore_user_snapshot(uid, previous);
2635                    return Err(err);
2636                }
2637                Ok(())
2638            }
2639            Err(err) => {
2640                self.emit_user_lifecycle_outcome(
2641                    control,
2642                    if user_error_is_denied(&err) {
2643                        Outcome::Denied
2644                    } else {
2645                        Outcome::Error
2646                    },
2647                    kind,
2648                    action,
2649                    uid,
2650                    Some(err.to_string()),
2651                    user_control_fields(
2652                        uid,
2653                        previous.as_ref().map(|u| u.role),
2654                        Some(enabled),
2655                        false,
2656                    ),
2657                );
2658                Err(err)
2659            }
2660        }
2661    }
2662
2663    fn emit_user_lifecycle_allowed(
2664        &self,
2665        control: &UserLifecycleControl<'_>,
2666        kind: EventKind,
2667        action: &'static str,
2668        id: &UserId,
2669        fields: HashMap<String, Sensitivity>,
2670    ) -> Result<(), AuthError> {
2671        self.emit_user_lifecycle_event(control, Outcome::Allowed, kind, action, id, None, fields)
2672    }
2673
2674    fn emit_api_key_allowed(
2675        &self,
2676        control: &UserLifecycleControl<'_>,
2677        kind: EventKind,
2678        action: &'static str,
2679        api_key_id: &str,
2680        fields: HashMap<String, Sensitivity>,
2681    ) -> Result<(), AuthError> {
2682        self.emit_control_event(
2683            control,
2684            Outcome::Allowed,
2685            kind,
2686            action,
2687            Some(api_key_resource(api_key_id)),
2688            None,
2689            fields,
2690        )
2691    }
2692
2693    #[allow(clippy::too_many_arguments)]
2694    fn emit_user_lifecycle_outcome(
2695        &self,
2696        control: &UserLifecycleControl<'_>,
2697        outcome: Outcome,
2698        kind: EventKind,
2699        action: &'static str,
2700        id: &UserId,
2701        reason: Option<String>,
2702        fields: HashMap<String, Sensitivity>,
2703    ) {
2704        let _ = self.emit_user_lifecycle_event(control, outcome, kind, action, id, reason, fields);
2705    }
2706
2707    #[allow(clippy::too_many_arguments)]
2708    fn emit_user_lifecycle_event(
2709        &self,
2710        control: &UserLifecycleControl<'_>,
2711        outcome: Outcome,
2712        kind: EventKind,
2713        action: &'static str,
2714        id: &UserId,
2715        reason: Option<String>,
2716        fields: HashMap<String, Sensitivity>,
2717    ) -> Result<(), AuthError> {
2718        self.emit_control_event(
2719            control,
2720            outcome,
2721            kind,
2722            action,
2723            Some(user_resource(id)),
2724            reason,
2725            fields,
2726        )
2727    }
2728
2729    #[allow(clippy::too_many_arguments)]
2730    fn emit_control_event(
2731        &self,
2732        control: &UserLifecycleControl<'_>,
2733        outcome: Outcome,
2734        kind: EventKind,
2735        action: &'static str,
2736        resource: Option<String>,
2737        reason: Option<String>,
2738        fields: HashMap<String, Sensitivity>,
2739    ) -> Result<(), AuthError> {
2740        let event = ControlEvent {
2741            kind,
2742            outcome,
2743            action: Cow::Borrowed(action),
2744            resource,
2745            reason,
2746            matched_policy_id: None,
2747            fields,
2748        };
2749        match control.ledger.emit(control.ctx, event) {
2750            Ok(_) => Ok(()),
2751            Err(err) if control.config.require_persistence() => {
2752                Err(AuthError::Internal(err.to_string()))
2753            }
2754            Err(_) => Ok(()),
2755        }
2756    }
2757
2758    fn rollback_create_user(&self, id: &UserId) {
2759        let removed = self
2760            .users
2761            .write()
2762            .unwrap_or_else(|e| e.into_inner())
2763            .remove(id);
2764        if let Some(user) = removed {
2765            self.remove_api_key_index_entries(&user);
2766        }
2767        self.persist_to_vault();
2768    }
2769
2770    fn rollback_bootstrap(&self, id: &UserId) {
2771        self.bootstrapped.store(false, Ordering::Release);
2772        self.rollback_create_user(id);
2773    }
2774
2775    fn restore_user_snapshot(&self, id: &UserId, previous: Option<User>) {
2776        match previous {
2777            Some(user) => {
2778                self.users
2779                    .write()
2780                    .unwrap_or_else(|e| e.into_inner())
2781                    .insert(id.clone(), user.clone());
2782                self.index_user_api_keys(id, &user);
2783            }
2784            None => {
2785                self.rollback_create_user(id);
2786                return;
2787            }
2788        }
2789        self.persist_to_vault();
2790    }
2791
2792    fn user_delete_rollback_snapshot(
2793        &self,
2794        id: &UserId,
2795        tenant_id: Option<&str>,
2796        username: &str,
2797    ) -> Option<(User, Vec<(String, Session)>)> {
2798        let user = self.get_user_cloned(id)?;
2799        let sessions = self
2800            .sessions
2801            .read()
2802            .map(|sessions| {
2803                sessions
2804                    .iter()
2805                    .filter(|(_, session)| {
2806                        session.username == username && session.tenant_id.as_deref() == tenant_id
2807                    })
2808                    .map(|(token, session)| (token.clone(), session.clone()))
2809                    .collect()
2810            })
2811            .unwrap_or_default();
2812        Some((user, sessions))
2813    }
2814
2815    fn restore_deleted_user(&self, id: &UserId, user: User, sessions: Vec<(String, Session)>) {
2816        self.users
2817            .write()
2818            .unwrap_or_else(|e| e.into_inner())
2819            .insert(id.clone(), user.clone());
2820        self.index_user_api_keys(id, &user);
2821        if !sessions.is_empty() {
2822            let mut guard = self.sessions.write().unwrap_or_else(|e| e.into_inner());
2823            for (token, session) in sessions {
2824                guard.insert(token, session);
2825            }
2826        }
2827        self.persist_to_vault();
2828    }
2829
2830    fn rollback_create_api_key(&self, id: &UserId, key: &str) {
2831        if let Ok(mut users) = self.users.write() {
2832            if let Some(user) = users.get_mut(id) {
2833                user.api_keys.retain(|api_key| api_key.key != key);
2834                user.updated_at = now_ms();
2835            }
2836        }
2837        if let Ok(mut idx) = self.api_key_index.write() {
2838            idx.remove(key);
2839        }
2840        self.persist_to_vault();
2841    }
2842
2843    fn api_key_rollback_snapshot(&self, key: &str) -> Option<(UserId, ApiKey)> {
2844        let users = self.users.read().ok()?;
2845        users.iter().find_map(|(id, user)| {
2846            user.api_keys
2847                .iter()
2848                .find(|api_key| api_key.key == key)
2849                .cloned()
2850                .map(|api_key| (id.clone(), api_key))
2851        })
2852    }
2853
2854    fn restore_api_key(&self, id: &UserId, api_key: ApiKey) {
2855        if let Ok(mut users) = self.users.write() {
2856            if let Some(user) = users.get_mut(id) {
2857                if !user
2858                    .api_keys
2859                    .iter()
2860                    .any(|candidate| candidate.key == api_key.key)
2861                {
2862                    user.api_keys.push(api_key.clone());
2863                    user.updated_at = now_ms();
2864                }
2865            }
2866        }
2867        if let Ok(mut idx) = self.api_key_index.write() {
2868            idx.insert(api_key.key.clone(), (id.clone(), api_key.role));
2869        }
2870        self.persist_to_vault();
2871    }
2872
2873    fn remove_api_key_index_entries(&self, user: &User) {
2874        if let Ok(mut idx) = self.api_key_index.write() {
2875            for api_key in &user.api_keys {
2876                idx.remove(&api_key.key);
2877            }
2878        }
2879    }
2880
2881    fn index_user_api_keys(&self, id: &UserId, user: &User) {
2882        if let Ok(mut idx) = self.api_key_index.write() {
2883            for api_key in &user.api_keys {
2884                idx.insert(api_key.key.clone(), (id.clone(), api_key.role));
2885            }
2886        }
2887    }
2888
2889    // -----------------------------------------------------------------
2890    // Login-side enforcement (HTTP path)
2891    // -----------------------------------------------------------------
2892
2893    /// Authenticate with VALID UNTIL / CONNECTION LIMIT enforcement.
2894    /// Wraps `authenticate_in_tenant` and additionally:
2895    ///   * rejects logins after `valid_until`,
2896    ///   * rejects logins when the live session count would exceed the
2897    ///     `connection_limit` attribute.
2898    pub fn authenticate_with_attrs(
2899        &self,
2900        tenant_id: Option<&str>,
2901        username: &str,
2902        password: &str,
2903    ) -> Result<Session, AuthError> {
2904        let uid = UserId::from_parts(tenant_id, username);
2905        let attrs = self.user_attributes(&uid);
2906
2907        if let Some(deadline) = attrs.valid_until {
2908            if now_ms() >= deadline {
2909                return Err(AuthError::Forbidden(format!(
2910                    "account `{}` expired (VALID UNTIL exceeded)",
2911                    uid
2912                )));
2913            }
2914        }
2915
2916        if let Some(limit) = attrs.connection_limit {
2917            let current = self
2918                .session_count_by_user
2919                .read()
2920                .ok()
2921                .and_then(|m| m.get(&uid).copied())
2922                .unwrap_or(0);
2923            if current >= limit {
2924                return Err(AuthError::Forbidden(format!(
2925                    "account `{}` exceeded CONNECTION LIMIT ({})",
2926                    uid, limit
2927                )));
2928            }
2929        }
2930
2931        let session = self.authenticate_in_tenant(tenant_id, username, password)?;
2932
2933        if let Ok(mut counts) = self.session_count_by_user.write() {
2934            *counts.entry(uid).or_insert(0) += 1;
2935        }
2936        Ok(session)
2937    }
2938
2939    /// Decrement the live-session count for `uid`. Call from session
2940    /// revoke / expiry paths so CONNECTION LIMIT stays accurate.
2941    pub fn decrement_session_count(&self, uid: &UserId) {
2942        if let Ok(mut counts) = self.session_count_by_user.write() {
2943            if let Some(c) = counts.get_mut(uid) {
2944                *c = c.saturating_sub(1);
2945            }
2946        }
2947    }
2948
2949    // -----------------------------------------------------------------
2950    // ACL persistence — vault_kv backed
2951    // -----------------------------------------------------------------
2952
2953    /// Re-read the ACL state from `vault_kv`. Call after vault load /
2954    /// restore so the in-memory maps reflect the persisted data.
2955    pub fn rehydrate_acl(&self) {
2956        let kv_snapshot: Vec<(String, String)> = self
2957            .vault_kv
2958            .read()
2959            .map(|kv| {
2960                kv.iter()
2961                    .filter(|(k, _)| {
2962                        k.starts_with("red.acl.grants.")
2963                            || k.starts_with("red.acl.attrs.")
2964                            || k == &"red.acl.public_grants"
2965                    })
2966                    .map(|(k, v)| (k.clone(), v.clone()))
2967                    .collect()
2968            })
2969            .unwrap_or_default();
2970
2971        for (k, v) in kv_snapshot {
2972            if k == "red.acl.public_grants" {
2973                if let Some(parsed) = decode_grants_blob(&v) {
2974                    *self
2975                        .public_grants
2976                        .write()
2977                        .unwrap_or_else(|e| e.into_inner()) = parsed;
2978                }
2979            } else if let Some(suffix) = k.strip_prefix("red.acl.grants.") {
2980                if let Some(uid) = decode_uid(suffix) {
2981                    if let Some(mut parsed) = decode_grants_blob(&v) {
2982                        // Restore the principal field — the on-disk
2983                        // line stores only resource+action shape.
2984                        for g in parsed.iter_mut() {
2985                            g.principal = GrantPrincipal::User(uid.clone());
2986                        }
2987                        self.grants
2988                            .write()
2989                            .unwrap_or_else(|e| e.into_inner())
2990                            .insert(uid, parsed);
2991                    }
2992                }
2993            } else if let Some(suffix) = k.strip_prefix("red.acl.attrs.") {
2994                if let Some(uid) = decode_uid(suffix) {
2995                    if let Some(parsed) = decode_attrs_blob(&v) {
2996                        self.user_attributes
2997                            .write()
2998                            .unwrap_or_else(|e| e.into_inner())
2999                            .insert(uid, parsed);
3000                    }
3001                }
3002            }
3003        }
3004
3005        self.permission_cache
3006            .write()
3007            .unwrap_or_else(|e| e.into_inner())
3008            .clear();
3009    }
3010
3011    /// Snapshot every ACL change back into the vault KV store.
3012    fn persist_acl_to_kv(&self) {
3013        let public = self
3014            .public_grants
3015            .read()
3016            .ok()
3017            .map(|p| encode_grants_blob(&p))
3018            .unwrap_or_default();
3019        self.vault_kv_set("red.acl.public_grants".to_string(), public);
3020
3021        let snapshot: Vec<(UserId, Vec<Grant>)> = self
3022            .grants
3023            .read()
3024            .ok()
3025            .map(|g| g.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3026            .unwrap_or_default();
3027        for (uid, list) in snapshot {
3028            let key = format!("red.acl.grants.{}", encode_uid(&uid));
3029            let val = encode_grants_blob(&list);
3030            self.vault_kv_set(key, val);
3031        }
3032
3033        let attrs_snapshot: Vec<(UserId, UserAttributes)> = self
3034            .user_attributes
3035            .read()
3036            .ok()
3037            .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3038            .unwrap_or_default();
3039        for (uid, attrs) in attrs_snapshot {
3040            let key = format!("red.acl.attrs.{}", encode_uid(&uid));
3041            let val = encode_attrs_blob(&attrs);
3042            self.vault_kv_set(key, val);
3043        }
3044    }
3045
3046    fn invalidate_permission_cache(&self, uid: Option<&UserId>) {
3047        if let Ok(mut cache) = self.permission_cache.write() {
3048            match uid {
3049                Some(u) => {
3050                    cache.remove(u);
3051                }
3052                None => cache.clear(),
3053            }
3054        }
3055    }
3056
3057    // -----------------------------------------------------------------
3058    // IAM policies — put / delete / attach / detach / simulate
3059    //
3060    // The kernel in `super::policies` owns the Policy type and the
3061    // evaluator. AuthStore handles persistence + per-user cache + the
3062    // GRANT translation layer (synthetic `_grant_*` policies).
3063    // -----------------------------------------------------------------
3064
3065    pub fn put_policy_with_control_events(
3066        &self,
3067        p: Policy,
3068        control: &PolicyMutationControl<'_>,
3069    ) -> Result<(), AuthError> {
3070        let policy_id = p.id.clone();
3071        let kind = if self.get_policy(&policy_id).is_some() {
3072            EventKind::PolicyUpdate
3073        } else {
3074            EventKind::PolicyCreate
3075        };
3076
3077        if p.id.starts_with("_grant_") || p.id.starts_with("_default_") {
3078            let err = AuthError::Forbidden(format!("policy id `{}` is reserved", p.id));
3079            self.emit_policy_error(
3080                control,
3081                kind,
3082                "policy:put",
3083                &policy_id,
3084                Some(&p),
3085                None,
3086                &err,
3087            );
3088            return Err(err);
3089        }
3090
3091        if let Some(ManagedPolicyDecision::Deny {
3092            entry_id,
3093            matched_action,
3094            matched_resource,
3095            reason,
3096            ..
3097        }) = self.managed_policy_decision(&policy_id, PolicyOp::Put, control)
3098        {
3099            self.emit_policy_denied(
3100                control,
3101                kind,
3102                "policy:put",
3103                &policy_id,
3104                Some(&p),
3105                None,
3106                &reason.to_string(),
3107                Some(entry_id),
3108                Some((matched_action, matched_resource)),
3109            );
3110            return Err(Self::managed_policy_error(&policy_id, &reason.to_string()));
3111        }
3112
3113        let previous = self.get_policy(&policy_id);
3114        let was_enabled = self.iam_authorization_enabled();
3115        match self.put_policy_internal(p.clone()) {
3116            Ok(()) => match self.emit_policy_allowed(
3117                control,
3118                kind,
3119                "policy:put",
3120                &policy_id,
3121                Some(&p),
3122                None,
3123                None,
3124            ) {
3125                Ok(()) => Ok(()),
3126                Err(err) => {
3127                    self.restore_policy_put(&policy_id, previous, was_enabled);
3128                    Err(err)
3129                }
3130            },
3131            Err(err) => {
3132                self.emit_policy_error(
3133                    control,
3134                    kind,
3135                    "policy:put",
3136                    &policy_id,
3137                    Some(&p),
3138                    None,
3139                    &err,
3140                );
3141                Err(err)
3142            }
3143        }
3144    }
3145
3146    pub fn delete_policy_with_control_events(
3147        &self,
3148        id: &str,
3149        control: &PolicyMutationControl<'_>,
3150    ) -> Result<(), AuthError> {
3151        let existing = self.get_policy(id);
3152        if let Some(ManagedPolicyDecision::Deny {
3153            entry_id,
3154            matched_action,
3155            matched_resource,
3156            reason,
3157            ..
3158        }) = self.managed_policy_decision(id, PolicyOp::Drop, control)
3159        {
3160            self.emit_policy_denied(
3161                control,
3162                EventKind::PolicyDelete,
3163                "policy:drop",
3164                id,
3165                existing.as_deref(),
3166                None,
3167                &reason.to_string(),
3168                Some(entry_id),
3169                Some((matched_action, matched_resource)),
3170            );
3171            return Err(Self::managed_policy_error(id, &reason.to_string()));
3172        }
3173
3174        let user_attachments = self
3175            .user_attachments
3176            .read()
3177            .map(|m| m.clone())
3178            .unwrap_or_default();
3179        let group_attachments = self
3180            .group_attachments
3181            .read()
3182            .map(|m| m.clone())
3183            .unwrap_or_default();
3184        let was_enabled = self.iam_authorization_enabled();
3185        match self.delete_policy(id) {
3186            Ok(()) => match self.emit_policy_allowed(
3187                control,
3188                EventKind::PolicyDelete,
3189                "policy:drop",
3190                id,
3191                existing.as_deref(),
3192                None,
3193                None,
3194            ) {
3195                Ok(()) => Ok(()),
3196                Err(err) => {
3197                    if let Some(policy) = existing {
3198                        self.restore_policy_delete(
3199                            id,
3200                            policy,
3201                            user_attachments,
3202                            group_attachments,
3203                            was_enabled,
3204                        );
3205                    }
3206                    Err(err)
3207                }
3208            },
3209            Err(err) => {
3210                self.emit_policy_error(
3211                    control,
3212                    EventKind::PolicyDelete,
3213                    "policy:drop",
3214                    id,
3215                    existing.as_deref(),
3216                    None,
3217                    &err,
3218                );
3219                Err(err)
3220            }
3221        }
3222    }
3223
3224    pub fn attach_policy_with_control_events(
3225        &self,
3226        principal: PrincipalRef,
3227        policy_id: &str,
3228        control: &PolicyMutationControl<'_>,
3229    ) -> Result<(), AuthError> {
3230        let existing = self.get_policy(policy_id);
3231        if let Some(ManagedPolicyDecision::Deny {
3232            entry_id,
3233            matched_action,
3234            matched_resource,
3235            reason,
3236            ..
3237        }) = self.managed_policy_decision(policy_id, PolicyOp::Attach, control)
3238        {
3239            self.emit_policy_denied(
3240                control,
3241                EventKind::PolicyAttach,
3242                "policy:attach",
3243                policy_id,
3244                existing.as_deref(),
3245                Some(&principal),
3246                &reason.to_string(),
3247                Some(entry_id),
3248                Some((matched_action, matched_resource)),
3249            );
3250            return Err(Self::managed_policy_error(policy_id, &reason.to_string()));
3251        }
3252
3253        if let Some(err_msg) = self.check_self_lock_invariant_for_attach(policy_id) {
3254            self.emit_policy_denied(
3255                control,
3256                EventKind::PolicyAttach,
3257                "policy:attach",
3258                policy_id,
3259                existing.as_deref(),
3260                Some(&principal),
3261                &err_msg,
3262                None,
3263                None,
3264            );
3265            return Err(AuthError::Forbidden(err_msg));
3266        }
3267
3268        let user_attachments = self
3269            .user_attachments
3270            .read()
3271            .map(|m| m.clone())
3272            .unwrap_or_default();
3273        let group_attachments = self
3274            .group_attachments
3275            .read()
3276            .map(|m| m.clone())
3277            .unwrap_or_default();
3278        match self.attach_policy(principal.clone(), policy_id) {
3279            Ok(()) => match self.emit_policy_allowed(
3280                control,
3281                EventKind::PolicyAttach,
3282                "policy:attach",
3283                policy_id,
3284                existing.as_deref(),
3285                Some(&principal),
3286                None,
3287            ) {
3288                Ok(()) => Ok(()),
3289                Err(err) => {
3290                    self.restore_policy_attachments(user_attachments, group_attachments);
3291                    Err(err)
3292                }
3293            },
3294            Err(err) => {
3295                self.emit_policy_error(
3296                    control,
3297                    EventKind::PolicyAttach,
3298                    "policy:attach",
3299                    policy_id,
3300                    existing.as_deref(),
3301                    Some(&principal),
3302                    &err,
3303                );
3304                Err(err)
3305            }
3306        }
3307    }
3308
3309    pub fn detach_policy_with_control_events(
3310        &self,
3311        principal: PrincipalRef,
3312        policy_id: &str,
3313        control: &PolicyMutationControl<'_>,
3314    ) -> Result<(), AuthError> {
3315        let existing = self.get_policy(policy_id);
3316        if let Some(ManagedPolicyDecision::Deny {
3317            entry_id,
3318            matched_action,
3319            matched_resource,
3320            reason,
3321            ..
3322        }) = self.managed_policy_decision(policy_id, PolicyOp::Detach, control)
3323        {
3324            self.emit_policy_denied(
3325                control,
3326                EventKind::PolicyDetach,
3327                "policy:detach",
3328                policy_id,
3329                existing.as_deref(),
3330                Some(&principal),
3331                &reason.to_string(),
3332                Some(entry_id),
3333                Some((matched_action, matched_resource)),
3334            );
3335            return Err(Self::managed_policy_error(policy_id, &reason.to_string()));
3336        }
3337        let Some(existing_policy) = existing else {
3338            let err = AuthError::Forbidden(format!("policy `{policy_id}` not found"));
3339            self.emit_policy_error(
3340                control,
3341                EventKind::PolicyDetach,
3342                "policy:detach",
3343                policy_id,
3344                None,
3345                Some(&principal),
3346                &err,
3347            );
3348            return Err(err);
3349        };
3350
3351        let user_attachments = self
3352            .user_attachments
3353            .read()
3354            .map(|m| m.clone())
3355            .unwrap_or_default();
3356        let group_attachments = self
3357            .group_attachments
3358            .read()
3359            .map(|m| m.clone())
3360            .unwrap_or_default();
3361        match self.detach_policy(principal.clone(), policy_id) {
3362            Ok(()) => match self.emit_policy_allowed(
3363                control,
3364                EventKind::PolicyDetach,
3365                "policy:detach",
3366                policy_id,
3367                Some(existing_policy.as_ref()),
3368                Some(&principal),
3369                None,
3370            ) {
3371                Ok(()) => Ok(()),
3372                Err(err) => {
3373                    self.restore_policy_attachments(user_attachments, group_attachments);
3374                    Err(err)
3375                }
3376            },
3377            Err(err) => {
3378                self.emit_policy_error(
3379                    control,
3380                    EventKind::PolicyDetach,
3381                    "policy:detach",
3382                    policy_id,
3383                    Some(existing_policy.as_ref()),
3384                    Some(&principal),
3385                    &err,
3386                );
3387                Err(err)
3388            }
3389        }
3390    }
3391
3392    fn managed_policy_decision(
3393        &self,
3394        policy_id: &str,
3395        op: PolicyOp,
3396        control: &PolicyMutationControl<'_>,
3397    ) -> Option<ManagedPolicyDecision> {
3398        control.registry.map(|registry| {
3399            ManagedPolicyGate::new(registry).check_mutation(
3400                self,
3401                control.actor,
3402                control.eval_ctx,
3403                policy_id,
3404                op,
3405            )
3406        })
3407    }
3408
3409    /// Run the [`crate::auth::self_lock_guard`] invariant against the
3410    /// current policy set. Returns `Some(error_message)` when an attach
3411    /// must be refused, `None` when the system would remain unlockable.
3412    ///
3413    /// `_policy_id` is informational — the invariant is on the full
3414    /// graph and doesn't currently special-case the new entry.
3415    fn check_self_lock_invariant_for_attach(&self, _policy_id: &str) -> Option<String> {
3416        use crate::auth::self_lock_guard::{
3417            check_self_lock_invariant, format_block_error, InvariantOutcome,
3418        };
3419        let policies: Vec<Arc<Policy>> = match self.policies.read() {
3420            Ok(map) => map.values().cloned().collect(),
3421            Err(_) => return None,
3422        };
3423        let outcome = check_self_lock_invariant(&policies);
3424        match &outcome {
3425            InvariantOutcome::Ok => None,
3426            InvariantOutcome::Blocked { .. } => format_block_error(&outcome),
3427        }
3428    }
3429
3430    fn managed_policy_error(policy_id: &str, reason: &str) -> AuthError {
3431        AuthError::Forbidden(format!(
3432            "managed policy mutation blocked for `{policy_id}`: {reason}"
3433        ))
3434    }
3435
3436    fn emit_policy_allowed(
3437        &self,
3438        control: &PolicyMutationControl<'_>,
3439        kind: EventKind,
3440        action: &'static str,
3441        policy_id: &str,
3442        policy: Option<&Policy>,
3443        principal: Option<&PrincipalRef>,
3444        matched_policy_id: Option<String>,
3445    ) -> Result<(), AuthError> {
3446        let event = ControlEvent {
3447            kind,
3448            outcome: Outcome::Allowed,
3449            action: std::borrow::Cow::Borrowed(action),
3450            resource: Some(format!("policy:{policy_id}")),
3451            reason: None,
3452            matched_policy_id,
3453            fields: policy_control_fields(policy_id, policy, principal),
3454        };
3455        match control.ledger.emit(control.ctx, event) {
3456            Ok(_) => Ok(()),
3457            Err(err) if control.config.require_persistence() => {
3458                Err(AuthError::Internal(err.to_string()))
3459            }
3460            Err(_) => Ok(()),
3461        }
3462    }
3463
3464    #[allow(clippy::too_many_arguments)]
3465    fn emit_policy_denied(
3466        &self,
3467        control: &PolicyMutationControl<'_>,
3468        kind: EventKind,
3469        action: &'static str,
3470        policy_id: &str,
3471        policy: Option<&Policy>,
3472        principal: Option<&PrincipalRef>,
3473        reason: &str,
3474        matched_policy_id: Option<String>,
3475        matched: Option<(String, String)>,
3476    ) {
3477        let mut fields = policy_control_fields(policy_id, policy, principal);
3478        if let Some((matched_action, matched_resource)) = matched {
3479            fields.insert(
3480                "matched_action".to_string(),
3481                Sensitivity::raw(matched_action),
3482            );
3483            fields.insert(
3484                "matched_resource".to_string(),
3485                Sensitivity::raw(matched_resource),
3486            );
3487        }
3488        let event = ControlEvent {
3489            kind,
3490            outcome: Outcome::Denied,
3491            action: std::borrow::Cow::Borrowed(action),
3492            resource: Some(format!("policy:{policy_id}")),
3493            reason: Some(reason.to_string()),
3494            matched_policy_id,
3495            fields,
3496        };
3497        let _ = control.ledger.emit(control.ctx, event);
3498    }
3499
3500    fn emit_policy_error(
3501        &self,
3502        control: &PolicyMutationControl<'_>,
3503        kind: EventKind,
3504        action: &'static str,
3505        policy_id: &str,
3506        policy: Option<&Policy>,
3507        principal: Option<&PrincipalRef>,
3508        err: &AuthError,
3509    ) {
3510        let event = ControlEvent {
3511            kind,
3512            outcome: Outcome::Error,
3513            action: std::borrow::Cow::Borrowed(action),
3514            resource: Some(format!("policy:{policy_id}")),
3515            reason: Some(err.to_string()),
3516            matched_policy_id: None,
3517            fields: policy_control_fields(policy_id, policy, principal),
3518        };
3519        let _ = control.ledger.emit(control.ctx, event);
3520    }
3521
3522    fn restore_policy_put(
3523        &self,
3524        policy_id: &str,
3525        previous: Option<Arc<Policy>>,
3526        was_enabled: bool,
3527    ) {
3528        let mut policies = self.policies.write().unwrap_or_else(|e| e.into_inner());
3529        match previous {
3530            Some(policy) => {
3531                policies.insert(policy_id.to_string(), policy);
3532            }
3533            None => {
3534                policies.remove(policy_id);
3535            }
3536        }
3537        drop(policies);
3538        self.iam_authorization_enabled
3539            .store(was_enabled, Ordering::Release);
3540        self.iam_effective_cache
3541            .write()
3542            .unwrap_or_else(|e| e.into_inner())
3543            .clear();
3544        self.invalidate_visible_collections_cache();
3545        self.persist_iam_to_kv();
3546    }
3547
3548    fn restore_policy_delete(
3549        &self,
3550        policy_id: &str,
3551        policy: Arc<Policy>,
3552        user_attachments: HashMap<UserId, Vec<String>>,
3553        group_attachments: HashMap<String, Vec<String>>,
3554        was_enabled: bool,
3555    ) {
3556        self.policies
3557            .write()
3558            .unwrap_or_else(|e| e.into_inner())
3559            .insert(policy_id.to_string(), policy);
3560        self.restore_policy_attachments(user_attachments, group_attachments);
3561        self.iam_authorization_enabled
3562            .store(was_enabled, Ordering::Release);
3563        self.persist_iam_to_kv();
3564    }
3565
3566    fn restore_policy_attachments(
3567        &self,
3568        user_attachments: HashMap<UserId, Vec<String>>,
3569        group_attachments: HashMap<String, Vec<String>>,
3570    ) {
3571        *self
3572            .user_attachments
3573            .write()
3574            .unwrap_or_else(|e| e.into_inner()) = user_attachments;
3575        *self
3576            .group_attachments
3577            .write()
3578            .unwrap_or_else(|e| e.into_inner()) = group_attachments;
3579        self.iam_effective_cache
3580            .write()
3581            .unwrap_or_else(|e| e.into_inner())
3582            .clear();
3583        self.invalidate_visible_collections_cache();
3584        self.persist_iam_to_kv();
3585    }
3586
3587    /// Insert or replace a policy by id. Rejects synthetic ids
3588    /// (`_grant_*` / `_default_*`) so callers can't hand-write them
3589    /// from the public API. Use `put_policy_internal` for synthetic
3590    /// inserts.
3591    pub fn put_policy(&self, p: Policy) -> Result<(), AuthError> {
3592        if p.id.starts_with("_grant_") || p.id.starts_with("_default_") {
3593            return Err(AuthError::Forbidden(format!(
3594                "policy id `{}` is reserved",
3595                p.id
3596            )));
3597        }
3598        self.put_policy_internal(p)
3599    }
3600
3601    /// Internal put bypassing the synthetic-namespace guard. Used by
3602    /// the GRANT translation layer; exposed publicly so integration
3603    /// tests can register synthetic `_grant_*` policies without going
3604    /// through the SQL frontend.
3605    pub fn put_policy_internal(&self, p: Policy) -> Result<(), AuthError> {
3606        p.validate()
3607            .map_err(|e| AuthError::Forbidden(format!("invalid policy `{}`: {e}", p.id)))?;
3608        let id = p.id.clone();
3609        self.policies
3610            .write()
3611            .unwrap_or_else(|e| e.into_inner())
3612            .insert(id, Arc::new(p));
3613        self.iam_authorization_enabled
3614            .store(true, Ordering::Release);
3615        self.iam_effective_cache
3616            .write()
3617            .unwrap_or_else(|e| e.into_inner())
3618            .clear();
3619        // Issue #119: a policy mutation can change the visible-
3620        // collections answer for any (tenant, role); we don't know
3621        // which up-front, so blow the whole cache.
3622        self.invalidate_visible_collections_cache();
3623        self.persist_iam_to_kv();
3624        Ok(())
3625    }
3626
3627    /// Whether the IAM evaluator should be authoritative for runtime
3628    /// authorization. This flips on the first policy write and remains
3629    /// on after deletes so dropping all policies leaves the instance in
3630    /// default-deny rather than silently returning to role fallback.
3631    pub fn iam_authorization_enabled(&self) -> bool {
3632        self.iam_authorization_enabled.load(Ordering::Acquire)
3633    }
3634
3635    /// Remove a policy and any attachments referencing it.
3636    pub fn delete_policy(&self, id: &str) -> Result<(), AuthError> {
3637        let removed = self
3638            .policies
3639            .write()
3640            .unwrap_or_else(|e| e.into_inner())
3641            .remove(id)
3642            .is_some();
3643        if !removed {
3644            return Err(AuthError::Forbidden(format!("policy `{id}` not found")));
3645        }
3646        // Detach from every user / group.
3647        if let Ok(mut ua) = self.user_attachments.write() {
3648            for ids in ua.values_mut() {
3649                ids.retain(|p| p != id);
3650            }
3651            ua.retain(|_, v| !v.is_empty());
3652        }
3653        if let Ok(mut ga) = self.group_attachments.write() {
3654            for ids in ga.values_mut() {
3655                ids.retain(|p| p != id);
3656            }
3657            ga.retain(|_, v| !v.is_empty());
3658        }
3659        self.iam_effective_cache
3660            .write()
3661            .unwrap_or_else(|e| e.into_inner())
3662            .clear();
3663        // Issue #119: dropping a policy can shrink any caller's visible
3664        // set; clear the (tenant, role) cache so AI commands re-resolve.
3665        self.invalidate_visible_collections_cache();
3666        self.persist_iam_to_kv();
3667        Ok(())
3668    }
3669
3670    /// List all policies (id-sorted for deterministic output).
3671    pub fn list_policies(&self) -> Vec<Arc<Policy>> {
3672        let map = match self.policies.read() {
3673            Ok(g) => g,
3674            Err(_) => return Vec::new(),
3675        };
3676        let mut out: Vec<Arc<Policy>> = map.values().cloned().collect();
3677        out.sort_by(|a, b| a.id.cmp(&b.id));
3678        out
3679    }
3680
3681    /// Fetch a single policy by id.
3682    pub fn get_policy(&self, id: &str) -> Option<Arc<Policy>> {
3683        self.policies.read().ok().and_then(|m| m.get(id).cloned())
3684    }
3685
3686    /// List policies directly attached to a group.
3687    pub fn group_policies(&self, group: &str) -> Vec<Arc<Policy>> {
3688        let policies = self.policies.read();
3689        let attachments = self.group_attachments.read();
3690        let mut out = Vec::new();
3691        if let (Ok(p_map), Ok(ga_map)) = (policies, attachments) {
3692            if let Some(ids) = ga_map.get(group) {
3693                for id in ids {
3694                    if let Some(p) = p_map.get(id) {
3695                        out.push(p.clone());
3696                    }
3697                }
3698            }
3699        }
3700        out.sort_by(|a, b| a.id.cmp(&b.id));
3701        out
3702    }
3703
3704    /// Delete synthetic policies produced by SQL GRANT translation.
3705    /// REVOKE uses this to keep the IAM lane and the legacy grant table
3706    /// in lock-step.
3707    pub fn delete_synthetic_grant_policies(
3708        &self,
3709        principal: &GrantPrincipal,
3710        resource: &Resource,
3711        actions: &[Action],
3712    ) -> usize {
3713        let attached = match principal {
3714            GrantPrincipal::User(uid) => self
3715                .user_attachments
3716                .read()
3717                .ok()
3718                .and_then(|m| m.get(uid).cloned())
3719                .unwrap_or_default(),
3720            GrantPrincipal::Group(group) => self
3721                .group_attachments
3722                .read()
3723                .ok()
3724                .and_then(|m| m.get(group).cloned())
3725                .unwrap_or_default(),
3726            GrantPrincipal::Public => self
3727                .group_attachments
3728                .read()
3729                .ok()
3730                .and_then(|m| m.get(PUBLIC_IAM_GROUP).cloned())
3731                .unwrap_or_default(),
3732        };
3733        if attached.is_empty() {
3734            return 0;
3735        }
3736
3737        let mut delete_ids = Vec::new();
3738        if let Ok(policies) = self.policies.read() {
3739            for id in attached {
3740                let Some(policy) = policies.get(&id) else {
3741                    continue;
3742                };
3743                if !policy.id.starts_with("_grant_") {
3744                    continue;
3745                }
3746                if synthetic_grant_matches(policy, resource, actions) {
3747                    delete_ids.push(policy.id.clone());
3748                }
3749            }
3750        }
3751
3752        let mut deleted = 0usize;
3753        for id in delete_ids {
3754            if self.delete_policy(&id).is_ok() {
3755                deleted += 1;
3756            }
3757        }
3758        deleted
3759    }
3760
3761    /// Apply the `REDDB_POLICY_BREAK_GLASS` recovery path: install or
3762    /// refresh the unlock policy, ensure the synthetic platform-owner
3763    /// user record exists, rebind the unlock policy to it, and emit a
3764    /// `policy.break_glass` audit event. Idempotent across reboots.
3765    ///
3766    /// `boot_ts_ms` is recorded in the audit event so operators can
3767    /// correlate the recovery with the boot transcript.
3768    pub fn apply_policy_break_glass(&self, boot_ts_ms: u128) -> Result<(), AuthError> {
3769        use crate::auth::self_lock_guard::{
3770            break_glass_audit_fields, unlock_policy, PLATFORM_OWNER_UNLOCK_POLICY_ID,
3771            PLATFORM_OWNER_USERNAME,
3772        };
3773
3774        // (1) Install or refresh the unlock policy. `put_policy_internal`
3775        // bypasses the synthetic-namespace guard, lets us replace any
3776        // tampered persisted copy, and persists to vault on success.
3777        let policy = unlock_policy();
3778        self.put_policy_internal(policy)?;
3779
3780        // (2) Ensure the synthetic platform-owner user record exists.
3781        // We bypass the public user-creation API to keep this user
3782        // immutable, disabled, and absent from operator listings even
3783        // if our filter ever regresses.
3784        let owner_id = UserId::platform(PLATFORM_OWNER_USERNAME);
3785        {
3786            let mut users = self.users.write().unwrap_or_else(|e| e.into_inner());
3787            users.entry(owner_id.clone()).or_insert_with(|| User {
3788                username: PLATFORM_OWNER_USERNAME.to_string(),
3789                tenant_id: None,
3790                // No usable password; even if `authenticate` were
3791                // wired to accept the principal it would fail
3792                // verification. Defence in depth: `enabled = false`
3793                // and the username filter both block it.
3794                password_hash: String::new(),
3795                scram_verifier: None,
3796                role: Role::Admin,
3797                api_keys: Vec::new(),
3798                created_at: boot_ts_ms,
3799                updated_at: boot_ts_ms,
3800                enabled: false,
3801            });
3802        }
3803
3804        // (3) Rebind the unlock policy to the synthetic principal.
3805        // `attach_policy` is idempotent (the helper checks for an
3806        // existing binding before appending).
3807        self.attach_policy(
3808            PrincipalRef::User(owner_id.clone()),
3809            PLATFORM_OWNER_UNLOCK_POLICY_ID,
3810        )?;
3811
3812        // (4) Emit a loud audit event. If the control-event ledger
3813        // isn't configured (e.g. unit tests without runtime wiring),
3814        // log a warn and proceed — recovery must not depend on audit
3815        // wiring being live.
3816        let fields_raw = break_glass_audit_fields(boot_ts_ms);
3817        let mut fields: HashMap<String, Sensitivity> = HashMap::new();
3818        for (k, v) in fields_raw {
3819            fields.insert(k, Sensitivity::raw(v));
3820        }
3821        if let Some(configured) = self.configured_control_events() {
3822            let ctx = ControlEventCtx {
3823                actor: crate::runtime::control_events::ActorRef::System("break_glass"),
3824                scope: None,
3825                request_id: None,
3826                trace_id: None,
3827            };
3828            let event = ControlEvent {
3829                kind: EventKind::PolicyBreakGlass,
3830                outcome: Outcome::Allowed,
3831                action: std::borrow::Cow::Borrowed("policy:break_glass"),
3832                resource: Some(format!("policy:{PLATFORM_OWNER_UNLOCK_POLICY_ID}")),
3833                reason: Some("REDDB_POLICY_BREAK_GLASS=1 at boot".into()),
3834                matched_policy_id: None,
3835                fields,
3836            };
3837            let _ = configured.ledger.emit(&ctx, event);
3838        }
3839        tracing::warn!(
3840            policy_id = PLATFORM_OWNER_UNLOCK_POLICY_ID,
3841            principal = PLATFORM_OWNER_USERNAME,
3842            boot_ts_ms = %boot_ts_ms,
3843            "REDDB_POLICY_BREAK_GLASS=1 — installed/refreshed platform-owner unlock policy and rebound to synthetic principal"
3844        );
3845        Ok(())
3846    }
3847
3848    /// Attach a policy to a user or group. Returns an error if the
3849    /// policy id doesn't exist.
3850    pub fn attach_policy(&self, principal: PrincipalRef, policy_id: &str) -> Result<(), AuthError> {
3851        if !self
3852            .policies
3853            .read()
3854            .map(|m| m.contains_key(policy_id))
3855            .unwrap_or(false)
3856        {
3857            return Err(AuthError::Forbidden(format!(
3858                "policy `{policy_id}` not found"
3859            )));
3860        }
3861        match &principal {
3862            PrincipalRef::User(uid) => {
3863                let mut ua = self
3864                    .user_attachments
3865                    .write()
3866                    .unwrap_or_else(|e| e.into_inner());
3867                let list = ua.entry(uid.clone()).or_default();
3868                if !list.iter().any(|p| p == policy_id) {
3869                    list.push(policy_id.to_string());
3870                }
3871                drop(ua);
3872                self.invalidate_iam_cache(Some(uid));
3873            }
3874            PrincipalRef::Group(g) => {
3875                let mut ga = self
3876                    .group_attachments
3877                    .write()
3878                    .unwrap_or_else(|e| e.into_inner());
3879                let list = ga.entry(g.clone()).or_default();
3880                if !list.iter().any(|p| p == policy_id) {
3881                    list.push(policy_id.to_string());
3882                }
3883                drop(ga);
3884                self.invalidate_iam_cache(None);
3885            }
3886        }
3887        self.persist_iam_to_kv();
3888        Ok(())
3889    }
3890
3891    /// Remove a policy attachment from a user or group.
3892    pub fn detach_policy(&self, principal: PrincipalRef, policy_id: &str) -> Result<(), AuthError> {
3893        match &principal {
3894            PrincipalRef::User(uid) => {
3895                if let Ok(mut ua) = self.user_attachments.write() {
3896                    if let Some(list) = ua.get_mut(uid) {
3897                        list.retain(|p| p != policy_id);
3898                        if list.is_empty() {
3899                            ua.remove(uid);
3900                        }
3901                    }
3902                }
3903                self.invalidate_iam_cache(Some(uid));
3904            }
3905            PrincipalRef::Group(g) => {
3906                if let Ok(mut ga) = self.group_attachments.write() {
3907                    if let Some(list) = ga.get_mut(g) {
3908                        list.retain(|p| p != policy_id);
3909                        if list.is_empty() {
3910                            ga.remove(g);
3911                        }
3912                    }
3913                }
3914                self.invalidate_iam_cache(None);
3915            }
3916        }
3917        self.persist_iam_to_kv();
3918        Ok(())
3919    }
3920
3921    /// Resolve the ordered list of effective policies for a user:
3922    /// group attachments first (least specific), then user
3923    /// attachments (most specific). Cached per user.
3924    pub fn effective_policies(&self, user: &UserId) -> Vec<Arc<Policy>> {
3925        if let Ok(cache) = self.iam_effective_cache.read() {
3926            if let Some(hit) = cache.get(user) {
3927                return hit.clone();
3928            }
3929        }
3930        let policies = self.policies.read();
3931        let user_attachments = self.user_attachments.read();
3932        let group_attachments = self.group_attachments.read();
3933        let mut groups = self
3934            .user_attributes
3935            .read()
3936            .ok()
3937            .and_then(|m| m.get(user).map(|attrs| attrs.groups.clone()))
3938            .unwrap_or_default();
3939        groups.insert(0, PUBLIC_IAM_GROUP.to_string());
3940        let mut out: Vec<Arc<Policy>> = Vec::new();
3941        if let (Ok(p_map), Ok(ua_map), Ok(ga_map)) = (policies, user_attachments, group_attachments)
3942        {
3943            for group in groups {
3944                if let Some(ids) = ga_map.get(&group) {
3945                    for id in ids {
3946                        if let Some(p) = p_map.get(id) {
3947                            out.push(p.clone());
3948                        }
3949                    }
3950                }
3951            }
3952            if let Some(ids) = ua_map.get(user) {
3953                for id in ids {
3954                    if let Some(p) = p_map.get(id) {
3955                        out.push(p.clone());
3956                    }
3957                }
3958            }
3959        }
3960        if let Ok(mut cache) = self.iam_effective_cache.write() {
3961            cache.insert(user.clone(), out.clone());
3962        }
3963        out
3964    }
3965
3966    /// Run the policy simulator for a principal. Synthesises an
3967    /// `EvalContext` from the user record + caller-supplied extras.
3968    pub fn simulate(
3969        &self,
3970        principal: &UserId,
3971        action: &str,
3972        resource: &ResourceRef,
3973        ctx_extras: SimCtx,
3974    ) -> SimulationOutcome {
3975        let user_role = self
3976            .users
3977            .read()
3978            .ok()
3979            .and_then(|u| u.get(principal).map(|u| u.role));
3980        let principal_is_admin_role = user_role == Some(Role::Admin);
3981        let now = ctx_extras.now_ms.unwrap_or_else(now_ms);
3982        let ctx = EvalContext {
3983            principal_tenant: principal.tenant.clone(),
3984            current_tenant: ctx_extras.current_tenant,
3985            peer_ip: ctx_extras.peer_ip,
3986            mfa_present: ctx_extras.mfa_present,
3987            now_ms: now,
3988            principal_is_admin_role,
3989            principal_is_platform_scoped: principal.tenant.is_none(),
3990        };
3991        let pols = self.effective_policies(principal);
3992        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
3993        iam_policies::simulate(&refs, action, resource, &ctx)
3994    }
3995
3996    /// Production hot-path policy evaluation. Returns `true` on Allow
3997    /// / AdminBypass, `false` on Deny / DefaultDeny.
3998    ///
3999    /// This entry point is **strict**: it never consults the
4000    /// [`PolicyEnforcementMode`] fallback. Governance APIs that gate
4001    /// admin-tier mutations (managed-config writes, registry
4002    /// supersedes, managed-policy lifecycle) call this so they cannot
4003    /// accidentally pick up the lenient `LegacyRbac` posture. Runtime
4004    /// hot-path callers that should respect the mode call
4005    /// [`AuthStore::check_policy_authz_with_role`] instead.
4006    pub fn check_policy_authz(
4007        &self,
4008        principal: &UserId,
4009        action: &str,
4010        resource: &ResourceRef,
4011        ctx: &EvalContext,
4012    ) -> bool {
4013        let pols = self.effective_policies(principal);
4014        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
4015        let decision = iam_policies::evaluate(&refs, action, resource, ctx);
4016        matches!(
4017            decision,
4018            iam_policies::Decision::Allow { .. } | iam_policies::Decision::AdminBypass
4019        )
4020    }
4021
4022    /// Mode-aware policy evaluation for runtime SQL/HTTP/wire surfaces.
4023    ///
4024    /// Returns `true` on `Allow`/`AdminBypass`, `false` on an explicit
4025    /// `Deny`. On `DefaultDeny` (no statement matched) the result
4026    /// depends on the active [`PolicyEnforcementMode`]:
4027    ///
4028    /// * `LegacyRbac` — defer to
4029    ///   [`legacy_rbac_decision`][super::enforcement_mode::legacy_rbac_decision]
4030    ///   using the caller's `role`. This preserves the pre-#712
4031    ///   behaviour where a principal with no attached policy is gated
4032    ///   only by their role.
4033    /// * `PolicyOnly` — return `false`. A principal needs an explicit
4034    ///   matching `Allow` to be authorized.
4035    ///
4036    /// An explicit `Deny` always wins, irrespective of mode and role.
4037    pub fn check_policy_authz_with_role(
4038        &self,
4039        principal: &UserId,
4040        action: &str,
4041        resource: &ResourceRef,
4042        ctx: &EvalContext,
4043        role: Role,
4044    ) -> bool {
4045        let pols = self.effective_policies(principal);
4046        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
4047        let decision = iam_policies::evaluate(&refs, action, resource, ctx);
4048        match decision {
4049            iam_policies::Decision::Allow { .. } | iam_policies::Decision::AdminBypass => true,
4050            iam_policies::Decision::Deny { .. } => false,
4051            iam_policies::Decision::DefaultDeny => match self.enforcement_mode() {
4052                PolicyEnforcementMode::LegacyRbac => legacy_rbac_decision(role, action),
4053                PolicyEnforcementMode::PolicyOnly => false,
4054            },
4055        }
4056    }
4057
4058    // -----------------------------------------------------------------
4059    // PolicyEnforcementMode (#712)
4060    // -----------------------------------------------------------------
4061
4062    /// Read the active enforcement mode. Cheap (a single `RwLock` read);
4063    /// safe to call on the hot path.
4064    pub fn enforcement_mode(&self) -> PolicyEnforcementMode {
4065        *self
4066            .enforcement_mode
4067            .read()
4068            .unwrap_or_else(|e| e.into_inner())
4069    }
4070
4071    /// Overwrite the active enforcement mode and persist it to the
4072    /// vault KV so the new value survives a restart. Returns the
4073    /// previous value so callers logging a transition (e.g. the
4074    /// boot-time loader, the S5B migration command) can record the
4075    /// before/after.
4076    pub fn set_enforcement_mode(&self, mode: PolicyEnforcementMode) -> PolicyEnforcementMode {
4077        let prev = {
4078            let mut guard = self
4079                .enforcement_mode
4080                .write()
4081                .unwrap_or_else(|e| e.into_inner());
4082            let prev = *guard;
4083            *guard = mode;
4084            prev
4085        };
4086        self.vault_kv_set(
4087            "red.config.policy.enforcement_mode".to_string(),
4088            mode.as_str().to_string(),
4089        );
4090        prev
4091    }
4092
4093    /// Claim the "emit the one-time legacy-RBAC boot warning" token.
4094    /// Returns `true` on the first call after construction (or after a
4095    /// process restart) **iff** the active mode is `LegacyRbac`;
4096    /// returns `false` on every subsequent call so the boot path can
4097    /// guarantee the warning is logged at most once per boot.
4098    pub fn take_legacy_rbac_warn_once(&self) -> bool {
4099        if self.enforcement_mode() != PolicyEnforcementMode::LegacyRbac {
4100            return false;
4101        }
4102        !self
4103            .legacy_rbac_boot_warn_emitted
4104            .swap(true, Ordering::AcqRel)
4105    }
4106
4107    /// Evaluate a resolved table projection through the column policy
4108    /// gate. Query paths should pass already-resolved column names; this
4109    /// helper intentionally does not parse SQL projection syntax.
4110    pub fn check_column_projection_authz(
4111        &self,
4112        principal: &UserId,
4113        request: &ColumnAccessRequest,
4114        ctx: &EvalContext,
4115    ) -> ColumnPolicyOutcome {
4116        let pols = self.effective_policies(principal);
4117        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
4118        ColumnPolicyGate::new(&refs).evaluate(request, ctx)
4119    }
4120
4121    fn invalidate_iam_cache(&self, uid: Option<&UserId>) {
4122        if let Ok(mut cache) = self.iam_effective_cache.write() {
4123            match uid {
4124                Some(u) => {
4125                    cache.remove(u);
4126                }
4127                None => cache.clear(),
4128            }
4129        }
4130    }
4131
4132    /// Drop every effective-policy cache entry. Called from execution
4133    /// paths that mutate policies/attachments without knowing which
4134    /// users will be affected.
4135    pub fn invalidate_all_iam_cache(&self) {
4136        self.invalidate_iam_cache(None);
4137    }
4138
4139    // -----------------------------------------------------------------
4140    // IAM persistence — vault_kv backed under `red.iam.*` keys
4141    // -----------------------------------------------------------------
4142
4143    /// Reload IAM state (policies + attachments) from the vault KV.
4144    /// Replaces the legacy `rehydrate_acl` reader — pre-1.0 we drop
4145    /// the old `red.acl.*` blob format entirely.
4146    pub fn rehydrate_iam(&self) {
4147        let mut enabled = self
4148            .vault_kv_get("red.iam.enabled")
4149            .map(|v| v == "true")
4150            .unwrap_or(false);
4151        // Policies — single JSON object keyed by id.
4152        if let Some(blob) = self.vault_kv_get("red.iam.policies") {
4153            if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
4154                if let Some(obj) = val.as_object() {
4155                    let mut map = HashMap::new();
4156                    for (id, body) in obj.iter() {
4157                        let s = body.to_string_compact();
4158                        if let Ok(p) = Policy::from_json_str(&s) {
4159                            map.insert(id.clone(), Arc::new(p));
4160                        }
4161                    }
4162                    if !map.is_empty() {
4163                        enabled = true;
4164                    }
4165                    *self.policies.write().unwrap_or_else(|e| e.into_inner()) = map;
4166                }
4167            }
4168        }
4169        // User attachments.
4170        if let Some(blob) = self.vault_kv_get("red.iam.attachments.users") {
4171            if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
4172                if let Some(obj) = val.as_object() {
4173                    let mut map: HashMap<UserId, Vec<String>> = HashMap::new();
4174                    for (encoded_uid, ids_v) in obj.iter() {
4175                        let Some(uid) = decode_uid(encoded_uid) else {
4176                            continue;
4177                        };
4178                        if let Some(arr) = ids_v.as_array() {
4179                            let ids: Vec<String> = arr
4180                                .iter()
4181                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
4182                                .collect();
4183                            map.insert(uid, ids);
4184                        }
4185                    }
4186                    *self
4187                        .user_attachments
4188                        .write()
4189                        .unwrap_or_else(|e| e.into_inner()) = map;
4190                }
4191            }
4192        }
4193        // Group attachments.
4194        if let Some(blob) = self.vault_kv_get("red.iam.attachments.groups") {
4195            if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
4196                if let Some(obj) = val.as_object() {
4197                    let mut map: HashMap<String, Vec<String>> = HashMap::new();
4198                    for (g, ids_v) in obj.iter() {
4199                        if let Some(arr) = ids_v.as_array() {
4200                            let ids: Vec<String> = arr
4201                                .iter()
4202                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
4203                                .collect();
4204                            map.insert(g.clone(), ids);
4205                        }
4206                    }
4207                    *self
4208                        .group_attachments
4209                        .write()
4210                        .unwrap_or_else(|e| e.into_inner()) = map;
4211                }
4212            }
4213        }
4214        self.iam_authorization_enabled
4215            .store(enabled, Ordering::Release);
4216        self.invalidate_iam_cache(None);
4217
4218        // #712 / S5A: load persisted enforcement mode, if any. Absence
4219        // of the key means an existing install upgrading past #712 —
4220        // stay on the lenient default written at construction time.
4221        // Fresh installs receive `PolicyOnly` via the bootstrap path
4222        // (`mark_bootstrap_complete`) before the first restart, so this
4223        // hydrate path picks it up on every subsequent boot.
4224        if let Some(stored) = self.vault_kv_get("red.config.policy.enforcement_mode") {
4225            if let Some(mode) = PolicyEnforcementMode::parse(&stored) {
4226                *self
4227                    .enforcement_mode
4228                    .write()
4229                    .unwrap_or_else(|e| e.into_inner()) = mode;
4230            }
4231        }
4232
4233        // #712 / S5A: legacy_rbac is a transitional posture; emit one
4234        // boot-time warning so operators see the migration nudge in
4235        // their logs. The flag inside take_legacy_rbac_warn_once
4236        // guarantees we don't spam reload cycles.
4237        if self.take_legacy_rbac_warn_once() {
4238            tracing::warn!(
4239                target: "reddb::auth::enforcement_mode",
4240                mode = "legacy_rbac",
4241                hard_version = crate::auth::enforcement_mode::POLICY_ONLY_HARD_VERSION,
4242                "policy enforcement_mode=legacy_rbac (transitional). Run \
4243                 `MIGRATE POLICY MODE TO 'policy_only'` (next slice S5B) \
4244                 before {} to flip this install over to the strict posture.",
4245                crate::auth::enforcement_mode::POLICY_ONLY_HARD_VERSION,
4246            );
4247        }
4248    }
4249
4250    /// Snapshot policies + attachments into the vault KV. Called
4251    /// after every mutation.
4252    fn persist_iam_to_kv(&self) {
4253        let enabled = if self.iam_authorization_enabled() {
4254            "true"
4255        } else {
4256            "false"
4257        };
4258        self.vault_kv_set("red.iam.enabled".to_string(), enabled.to_string());
4259
4260        // Policies: `{ "<id>": <policy_json>, ... }`
4261        let policies_obj = {
4262            let map = self.policies.read().unwrap_or_else(|e| e.into_inner());
4263            let mut obj = crate::serde_json::Map::new();
4264            for (id, p) in map.iter() {
4265                let s = p.to_json_string();
4266                if let Ok(v) = crate::serde_json::from_str::<crate::serde_json::Value>(&s) {
4267                    obj.insert(id.clone(), v);
4268                }
4269            }
4270            crate::serde_json::Value::Object(obj).to_string_compact()
4271        };
4272        self.vault_kv_set("red.iam.policies".to_string(), policies_obj);
4273
4274        // User attachments: `{ "<encoded_uid>": [ "<policy_id>", ... ], ... }`
4275        let users_obj = {
4276            let map = self
4277                .user_attachments
4278                .read()
4279                .unwrap_or_else(|e| e.into_inner());
4280            let mut obj = crate::serde_json::Map::new();
4281            for (uid, ids) in map.iter() {
4282                let arr = crate::serde_json::Value::Array(
4283                    ids.iter()
4284                        .map(|s| crate::serde_json::Value::String(s.clone()))
4285                        .collect(),
4286                );
4287                obj.insert(encode_uid(uid), arr);
4288            }
4289            crate::serde_json::Value::Object(obj).to_string_compact()
4290        };
4291        self.vault_kv_set("red.iam.attachments.users".to_string(), users_obj);
4292
4293        // Group attachments.
4294        let groups_obj = {
4295            let map = self
4296                .group_attachments
4297                .read()
4298                .unwrap_or_else(|e| e.into_inner());
4299            let mut obj = crate::serde_json::Map::new();
4300            for (g, ids) in map.iter() {
4301                let arr = crate::serde_json::Value::Array(
4302                    ids.iter()
4303                        .map(|s| crate::serde_json::Value::String(s.clone()))
4304                        .collect(),
4305                );
4306                obj.insert(g.clone(), arr);
4307            }
4308            crate::serde_json::Value::Object(obj).to_string_compact()
4309        };
4310        self.vault_kv_set("red.iam.attachments.groups".to_string(), groups_obj);
4311    }
4312}
4313
4314fn synthetic_grant_matches(policy: &Policy, resource: &Resource, actions: &[Action]) -> bool {
4315    policy.statements.iter().any(|st| {
4316        st.effect == crate::auth::policies::Effect::Allow
4317            && st.condition.is_none()
4318            && grant_actions_overlap(&st.actions, actions)
4319            && grant_resource_matches(&st.resources, resource)
4320    })
4321}
4322
4323fn grant_actions_overlap(
4324    patterns: &[crate::auth::policies::ActionPattern],
4325    actions: &[Action],
4326) -> bool {
4327    if actions.contains(&Action::All) {
4328        return true;
4329    }
4330    patterns.iter().any(|pat| match pat {
4331        crate::auth::policies::ActionPattern::Wildcard => true,
4332        crate::auth::policies::ActionPattern::Exact(s) => {
4333            actions.iter().any(|a| s.eq_ignore_ascii_case(a.as_str()))
4334        }
4335        crate::auth::policies::ActionPattern::Prefix(_) => false,
4336    })
4337}
4338
4339fn grant_resource_matches(
4340    patterns: &[crate::auth::policies::ResourcePattern],
4341    resource: &Resource,
4342) -> bool {
4343    let expected = grant_resource_pattern(resource);
4344    patterns.iter().any(|pat| pat == &expected)
4345}
4346
4347fn grant_resource_pattern(resource: &Resource) -> crate::auth::policies::ResourcePattern {
4348    use crate::auth::policies::ResourcePattern;
4349
4350    match resource {
4351        Resource::Database => ResourcePattern::Glob("table:*".to_string()),
4352        Resource::Schema(s) => ResourcePattern::Glob(format!("table:{s}.*")),
4353        Resource::Table { schema, table } => ResourcePattern::Exact {
4354            kind: "table".to_string(),
4355            name: match schema {
4356                Some(s) => format!("{s}.{table}"),
4357                None => table.clone(),
4358            },
4359        },
4360        Resource::Function { schema, name } => ResourcePattern::Exact {
4361            kind: "function".to_string(),
4362            name: match schema {
4363                Some(s) => format!("{s}.{name}"),
4364                None => name.clone(),
4365            },
4366        },
4367    }
4368}
4369
4370// ===========================================================================
4371// ACL serialization helpers — line-oriented, human-readable so an
4372// operator inspecting the vault dump can spot misconfigurations.
4373//
4374// Format (one record per line):
4375//   GRANT|<resource>|<actions_csv>|<with_grant_option>|<tenant_or_*>|<granted_by>|<granted_at>
4376//   ATTR|<valid_until>|<connection_limit>|<search_path>
4377//
4378// Resources are encoded as:
4379//   db                          → Database
4380//   schema:<name>               → Schema(name)
4381//   table:<schema_or_*>:<name>  → Table { schema, table }
4382//   func:<schema_or_*>:<name>   → Function { schema, name }
4383// ===========================================================================
4384
4385fn encode_uid(uid: &UserId) -> String {
4386    match &uid.tenant {
4387        Some(t) => format!("{}/{}", t, uid.username),
4388        None => format!("*/{}", uid.username),
4389    }
4390}
4391
4392fn decode_uid(s: &str) -> Option<UserId> {
4393    let (tenant, username) = s.split_once('/')?;
4394    Some(if tenant == "*" {
4395        UserId::platform(username)
4396    } else {
4397        UserId::scoped(tenant, username)
4398    })
4399}
4400
4401fn encode_resource(r: &Resource) -> String {
4402    match r {
4403        Resource::Database => "db".into(),
4404        Resource::Schema(s) => format!("schema:{}", s),
4405        Resource::Table { schema, table } => {
4406            format!("table:{}:{}", schema.as_deref().unwrap_or("*"), table)
4407        }
4408        Resource::Function { schema, name } => {
4409            format!("func:{}:{}", schema.as_deref().unwrap_or("*"), name)
4410        }
4411    }
4412}
4413
4414fn decode_resource(s: &str) -> Option<Resource> {
4415    if s == "db" {
4416        return Some(Resource::Database);
4417    }
4418    if let Some(rest) = s.strip_prefix("schema:") {
4419        return Some(Resource::Schema(rest.to_string()));
4420    }
4421    if let Some(rest) = s.strip_prefix("table:") {
4422        let (schema, table) = rest.split_once(':')?;
4423        return Some(Resource::Table {
4424            schema: if schema == "*" {
4425                None
4426            } else {
4427                Some(schema.to_string())
4428            },
4429            table: table.to_string(),
4430        });
4431    }
4432    if let Some(rest) = s.strip_prefix("func:") {
4433        let (schema, name) = rest.split_once(':')?;
4434        return Some(Resource::Function {
4435            schema: if schema == "*" {
4436                None
4437            } else {
4438                Some(schema.to_string())
4439            },
4440            name: name.to_string(),
4441        });
4442    }
4443    None
4444}
4445
4446fn encode_grants_blob(grants: &[Grant]) -> String {
4447    let mut out = String::new();
4448    for g in grants {
4449        let actions: Vec<&str> = g.actions.iter().map(|a| a.as_str()).collect();
4450        out.push_str(&format!(
4451            "GRANT|{}|{}|{}|{}|{}|{}\n",
4452            encode_resource(&g.resource),
4453            actions.join(","),
4454            g.with_grant_option,
4455            g.tenant.as_deref().unwrap_or("*"),
4456            g.granted_by,
4457            g.granted_at,
4458        ));
4459    }
4460    out
4461}
4462
4463fn decode_grants_blob(s: &str) -> Option<Vec<Grant>> {
4464    let mut out = Vec::new();
4465    for line in s.lines() {
4466        if line.is_empty() {
4467            continue;
4468        }
4469        let parts: Vec<&str> = line.split('|').collect();
4470        if parts.len() != 7 || parts[0] != "GRANT" {
4471            return None;
4472        }
4473        let resource = decode_resource(parts[1])?;
4474        let mut actions = std::collections::BTreeSet::new();
4475        for token in parts[2].split(',') {
4476            if let Some(a) = Action::from_keyword(token) {
4477                actions.insert(a);
4478            }
4479        }
4480        let with_grant_option = parts[3] == "true";
4481        let tenant = if parts[4] == "*" {
4482            None
4483        } else {
4484            Some(parts[4].to_string())
4485        };
4486        let granted_by = parts[5].to_string();
4487        let granted_at: u128 = parts[6].parse().unwrap_or(0);
4488        out.push(Grant {
4489            // Principal field is reconstructed by the loader from the
4490            // storage-key prefix; default to `Public` here.
4491            principal: GrantPrincipal::Public,
4492            resource,
4493            actions,
4494            with_grant_option,
4495            granted_by,
4496            granted_at,
4497            tenant,
4498            columns: None,
4499        });
4500    }
4501    Some(out)
4502}
4503
4504fn encode_attrs_blob(a: &UserAttributes) -> String {
4505    let valid = a
4506        .valid_until
4507        .map(|t| t.to_string())
4508        .unwrap_or_else(|| "*".into());
4509    let limit = a
4510        .connection_limit
4511        .map(|l| l.to_string())
4512        .unwrap_or_else(|| "*".into());
4513    let path = a.search_path.clone().unwrap_or_else(|| "*".into());
4514    let groups = if a.groups.is_empty() {
4515        "*".to_string()
4516    } else {
4517        a.groups.join(",")
4518    };
4519    format!("ATTR|{}|{}|{}|{}\n", valid, limit, path, groups)
4520}
4521
4522fn decode_attrs_blob(s: &str) -> Option<UserAttributes> {
4523    let line = s.lines().next()?;
4524    let parts: Vec<&str> = line.split('|').collect();
4525    if !(parts.len() == 4 || parts.len() == 5) || parts[0] != "ATTR" {
4526        return None;
4527    }
4528    let groups = if parts.get(4).copied().unwrap_or("*") == "*" {
4529        Vec::new()
4530    } else {
4531        parts[4]
4532            .split(',')
4533            .filter(|g| !g.is_empty())
4534            .map(|g| g.to_string())
4535            .collect()
4536    };
4537    Some(UserAttributes {
4538        valid_until: if parts[1] == "*" {
4539            None
4540        } else {
4541            parts[1].parse().ok()
4542        },
4543        connection_limit: if parts[2] == "*" {
4544            None
4545        } else {
4546            parts[2].parse().ok()
4547        },
4548        search_path: if parts[3] == "*" {
4549            None
4550        } else {
4551            Some(parts[3].to_string())
4552        },
4553        groups,
4554    })
4555}
4556
4557// ===========================================================================
4558// Password hashing
4559// ===========================================================================
4560
4561/// Derive a SCRAM-SHA-256 verifier for a fresh user / password
4562/// rotation. Salt is 16 random bytes; iter is the engine default
4563/// (`scram::DEFAULT_ITER`). Stored alongside the Argon2 password
4564/// hash so HTTP login + v2 SCRAM can both authenticate the same
4565/// user.
4566fn make_scram_verifier(password: &str) -> crate::auth::scram::ScramVerifier {
4567    let salt = random_bytes(16);
4568    crate::auth::scram::ScramVerifier::from_password(
4569        password,
4570        salt,
4571        crate::auth::scram::DEFAULT_ITER,
4572    )
4573}
4574
4575/// Hash a password using Argon2id.
4576///
4577/// Format: `argon2id$<salt_hex>$<hash_hex>`
4578pub(crate) fn hash_password(password: &str) -> String {
4579    let salt = random_bytes(16);
4580    let params = auth_argon2_params();
4581    let hash = derive_key(password.as_bytes(), &salt, &params);
4582    format!("argon2id${}${}", hex::encode(&salt), hex::encode(&hash))
4583}
4584
4585/// Verify a password against a stored `argon2id$<salt>$<hash>` string.
4586pub(crate) fn verify_password(password: &str, stored_hash: &str) -> bool {
4587    let parts: Vec<&str> = stored_hash.splitn(3, '$').collect();
4588    if parts.len() != 3 || parts[0] != "argon2id" {
4589        return false;
4590    }
4591
4592    let salt = match hex::decode(parts[1]) {
4593        Ok(s) => s,
4594        Err(_) => return false,
4595    };
4596
4597    let expected_hash = match hex::decode(parts[2]) {
4598        Ok(h) => h,
4599        Err(_) => return false,
4600    };
4601
4602    let params = auth_argon2_params();
4603    let computed = derive_key(password.as_bytes(), &salt, &params);
4604    constant_time_eq(&computed, &expected_hash)
4605}
4606
4607/// Constant-time byte comparison to avoid timing side-channels.
4608fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
4609    if a.len() != b.len() {
4610        return false;
4611    }
4612    let mut diff: u8 = 0;
4613    for (x, y) in a.iter().zip(b.iter()) {
4614        diff |= x ^ y;
4615    }
4616    diff == 0
4617}
4618
4619// ===========================================================================
4620// Token generation
4621// ===========================================================================
4622
4623fn generate_session_token() -> String {
4624    format!("rs_{}", hex::encode(random_bytes(32)))
4625}
4626
4627fn generate_api_key() -> String {
4628    format!("rk_{}", hex::encode(random_bytes(32)))
4629}
4630
4631/// Generate `n` random bytes and return as a hex string.
4632fn random_hex(n: usize) -> String {
4633    hex::encode(random_bytes(n))
4634}
4635
4636/// Generate `n` cryptographically random bytes using the OS CSPRNG,
4637/// then mix with SHA-256 for domain separation.
4638pub(crate) fn random_bytes(n: usize) -> Vec<u8> {
4639    let mut buf = vec![0u8; n.max(32)];
4640    if os_random::fill_bytes(&mut buf).is_err() {
4641        // Fallback: use system time and pointers as entropy (best-effort).
4642        let seed = now_ms().to_le_bytes();
4643        for (i, byte) in buf.iter_mut().enumerate() {
4644            *byte = seed[i % seed.len()] ^ (i as u8);
4645        }
4646    }
4647    // SHA-256 mix to ensure uniform distribution.
4648    let digest = sha256(&buf);
4649    if n <= 32 {
4650        digest[..n].to_vec()
4651    } else {
4652        // Chain SHA-256 for longer outputs (unusual but supported).
4653        let mut out = Vec::with_capacity(n);
4654        let mut prev = digest;
4655        while out.len() < n {
4656            out.extend_from_slice(&prev[..std::cmp::min(32, n - out.len())]);
4657            prev = sha256(&prev);
4658        }
4659        out
4660    }
4661}
4662
4663// ===========================================================================
4664// Helpers
4665// ===========================================================================
4666
4667fn lock_err<T>(_: T) -> AuthError {
4668    AuthError::Internal("lock poisoned".to_string())
4669}
4670
4671// ===========================================================================
4672// Tests
4673// ===========================================================================
4674
4675#[cfg(test)]
4676mod tests {
4677    use super::*;
4678
4679    fn test_config() -> AuthConfig {
4680        AuthConfig {
4681            enabled: true,
4682            session_ttl_secs: 60,
4683            require_auth: true,
4684            auto_encrypt_storage: false,
4685            vault_enabled: false,
4686            cert: Default::default(),
4687            oauth: Default::default(),
4688        }
4689    }
4690
4691    #[test]
4692    fn test_create_and_list_users() {
4693        let store = AuthStore::new(test_config());
4694        store.create_user("alice", "pass1", Role::Admin).unwrap();
4695        store.create_user("bob", "pass2", Role::Read).unwrap();
4696
4697        let users = store.list_users();
4698        assert_eq!(users.len(), 2);
4699        // Password hashes should be redacted.
4700        for u in &users {
4701            assert!(u.password_hash.is_empty());
4702        }
4703    }
4704
4705    #[test]
4706    fn test_create_duplicate_user() {
4707        let store = AuthStore::new(test_config());
4708        store.create_user("alice", "pass", Role::Admin).unwrap();
4709        let err = store.create_user("alice", "pass2", Role::Read).unwrap_err();
4710        assert!(matches!(err, AuthError::UserExists(_)));
4711    }
4712
4713    #[test]
4714    fn test_authenticate_and_validate() {
4715        let store = AuthStore::new(test_config());
4716        store.create_user("alice", "secret", Role::Write).unwrap();
4717
4718        let session = store.authenticate("alice", "secret").unwrap();
4719        assert!(session.token.starts_with("rs_"));
4720
4721        let (username, role) = store.validate_token(&session.token).unwrap();
4722        assert_eq!(username, "alice");
4723        assert_eq!(role, Role::Write);
4724    }
4725
4726    #[test]
4727    fn test_authenticate_wrong_password() {
4728        let store = AuthStore::new(test_config());
4729        store.create_user("alice", "secret", Role::Read).unwrap();
4730
4731        let err = store.authenticate("alice", "wrong").unwrap_err();
4732        assert!(matches!(err, AuthError::InvalidCredentials));
4733    }
4734
4735    #[test]
4736    fn test_api_key_lifecycle() {
4737        let store = AuthStore::new(test_config());
4738        store.create_user("alice", "pass", Role::Admin).unwrap();
4739
4740        let key = store
4741            .create_api_key("alice", "ci-token", Role::Write)
4742            .unwrap();
4743        assert!(key.key.starts_with("rk_"));
4744
4745        let (username, role) = store.validate_token(&key.key).unwrap();
4746        assert_eq!(username, "alice");
4747        assert_eq!(role, Role::Write);
4748
4749        store.revoke_api_key(&key.key).unwrap();
4750        assert!(store.validate_token(&key.key).is_none());
4751    }
4752
4753    #[test]
4754    fn test_api_key_role_exceeded() {
4755        let store = AuthStore::new(test_config());
4756        store.create_user("bob", "pass", Role::Read).unwrap();
4757
4758        let err = store
4759            .create_api_key("bob", "escalate", Role::Admin)
4760            .unwrap_err();
4761        assert!(matches!(err, AuthError::RoleExceeded { .. }));
4762    }
4763
4764    #[test]
4765    fn test_change_password() {
4766        let store = AuthStore::new(test_config());
4767        store.create_user("alice", "old", Role::Write).unwrap();
4768
4769        store.change_password("alice", "old", "new").unwrap();
4770
4771        // Old password should fail.
4772        assert!(store.authenticate("alice", "old").is_err());
4773        // New password should succeed.
4774        assert!(store.authenticate("alice", "new").is_ok());
4775    }
4776
4777    #[test]
4778    fn test_change_role() {
4779        let store = AuthStore::new(test_config());
4780        store.create_user("alice", "pass", Role::Admin).unwrap();
4781        store.create_api_key("alice", "key1", Role::Admin).unwrap();
4782
4783        store.change_role("alice", Role::Read).unwrap();
4784
4785        // User's role should be Read now.
4786        let users = store.list_users();
4787        let alice = users.iter().find(|u| u.username == "alice").unwrap();
4788        assert_eq!(alice.role, Role::Read);
4789
4790        // API keys should have been downgraded.
4791        assert_eq!(alice.api_keys[0].role, Role::Read);
4792    }
4793
4794    #[test]
4795    fn test_admin_user_mutations_are_policy_controlled_not_flag_guarded() {
4796        let store = AuthStore::new(test_config());
4797        store
4798            .create_admin_user("system", "pass", Role::Admin, None)
4799            .unwrap();
4800
4801        let uid = UserId::platform("system");
4802        store.change_password("system", "pass", "new").unwrap();
4803        store.change_role("system", Role::Read).unwrap();
4804        store.set_user_enabled(&uid, false).unwrap();
4805
4806        let key = store
4807            .create_api_key("system", "rotation", Role::Read)
4808            .unwrap();
4809        assert!(store.validate_token(&key.key).is_some());
4810        store.revoke_api_key(&key.key).unwrap();
4811        assert!(store.validate_token(&key.key).is_none());
4812        store.delete_user("system").unwrap();
4813    }
4814
4815    #[test]
4816    fn test_user_lifecycle_deny_policy_blocks_cloud_admin_deletion() {
4817        use crate::auth::policies::Policy;
4818
4819        let store = AuthStore::new(test_config());
4820        store
4821            .create_user("cloud-admin", "pass", Role::Admin)
4822            .unwrap();
4823        store
4824            .create_user("customer-admin", "pass", Role::Admin)
4825            .unwrap();
4826
4827        store
4828            .put_policy(
4829                Policy::from_json_str(
4830                    r#"{
4831                        "id": "customer-admin-allow-all",
4832                        "version": 1,
4833                        "statements": [{
4834                            "effect": "allow",
4835                            "actions": ["*"],
4836                            "resources": ["*"]
4837                        }]
4838                    }"#,
4839                )
4840                .unwrap(),
4841            )
4842            .unwrap();
4843        store
4844            .put_policy(
4845                Policy::from_json_str(
4846                    r#"{
4847                        "id": "cloud-admin-protection",
4848                        "version": 1,
4849                        "statements": [{
4850                            "effect": "deny",
4851                            "actions": [
4852                                "user:delete",
4853                                "user:disable",
4854                                "user:password:change",
4855                                "user:role:update"
4856                            ],
4857                            "resources": ["user:cloud-admin"]
4858                        }]
4859                    }"#,
4860                )
4861                .unwrap(),
4862            )
4863            .unwrap();
4864        let customer = UserId::platform("customer-admin");
4865        store
4866            .attach_policy(
4867                PrincipalRef::User(customer.clone()),
4868                "customer-admin-allow-all",
4869            )
4870            .unwrap();
4871        store
4872            .attach_policy(
4873                PrincipalRef::User(customer.clone()),
4874                "cloud-admin-protection",
4875            )
4876            .unwrap();
4877
4878        let cloud_admin = UserId::platform("cloud-admin");
4879        assert!(!store.check_user_lifecycle_authz(
4880            &customer,
4881            Role::Admin,
4882            "user:delete",
4883            &cloud_admin,
4884        ));
4885        assert!(!store.check_user_lifecycle_authz(
4886            &customer,
4887            Role::Admin,
4888            "user:disable",
4889            &cloud_admin,
4890        ));
4891        assert!(!store.check_user_lifecycle_authz(
4892            &customer,
4893            Role::Admin,
4894            "user:password:change",
4895            &cloud_admin,
4896        ));
4897        assert!(!store.check_user_lifecycle_authz(
4898            &customer,
4899            Role::Admin,
4900            "user:role:update",
4901            &cloud_admin,
4902        ));
4903
4904        let another_user = UserId::platform("someone-else");
4905        assert!(store.check_user_lifecycle_authz(
4906            &customer,
4907            Role::Admin,
4908            "user:delete",
4909            &another_user,
4910        ));
4911    }
4912
4913    #[test]
4914    fn test_regular_user_mutations_still_work() {
4915        let store = AuthStore::new(test_config());
4916        store.create_user("alice", "old", Role::Admin).unwrap();
4917
4918        let uid = UserId::platform("alice");
4919        store.set_user_enabled(&uid, false).unwrap();
4920        assert!(matches!(
4921            store.authenticate("alice", "old"),
4922            Err(AuthError::InvalidCredentials)
4923        ));
4924
4925        store.set_user_enabled(&uid, true).unwrap();
4926        store.change_password("alice", "old", "new").unwrap();
4927        store.change_role("alice", Role::Read).unwrap();
4928        store.delete_user("alice").unwrap();
4929        assert!(matches!(
4930            store.authenticate("alice", "new"),
4931            Err(AuthError::InvalidCredentials)
4932        ));
4933    }
4934
4935    #[test]
4936    fn test_delete_user() {
4937        let store = AuthStore::new(test_config());
4938        store.create_user("alice", "pass", Role::Admin).unwrap();
4939        let key = store.create_api_key("alice", "key1", Role::Read).unwrap();
4940        let session = store.authenticate("alice", "pass").unwrap();
4941
4942        store.delete_user("alice").unwrap();
4943
4944        assert!(store.validate_token(&key.key).is_none());
4945        assert!(store.validate_token(&session.token).is_none());
4946        assert!(store.list_users().is_empty());
4947    }
4948
4949    #[test]
4950    fn test_revoke_session() {
4951        let store = AuthStore::new(test_config());
4952        store.create_user("alice", "pass", Role::Read).unwrap();
4953        let session = store.authenticate("alice", "pass").unwrap();
4954
4955        store.revoke_session(&session.token);
4956        assert!(store.validate_token(&session.token).is_none());
4957    }
4958
4959    #[test]
4960    fn test_password_hash_format() {
4961        let hash = hash_password("test");
4962        assert!(hash.starts_with("argon2id$"));
4963        let parts: Vec<&str> = hash.splitn(3, '$').collect();
4964        assert_eq!(parts.len(), 3);
4965        // Salt is 16 bytes = 32 hex chars.
4966        assert_eq!(parts[1].len(), 32);
4967        // Hash is 32 bytes = 64 hex chars.
4968        assert_eq!(parts[2].len(), 64);
4969    }
4970
4971    #[test]
4972    fn test_constant_time_eq() {
4973        assert!(constant_time_eq(b"hello", b"hello"));
4974        assert!(!constant_time_eq(b"hello", b"world"));
4975        assert!(!constant_time_eq(b"short", b"longer"));
4976    }
4977
4978    #[test]
4979    fn test_bootstrap_seals_permanently() {
4980        let store = AuthStore::new(test_config());
4981
4982        assert!(store.needs_bootstrap());
4983        assert!(!store.is_bootstrapped());
4984
4985        // First bootstrap succeeds
4986        let result = store.bootstrap("admin", "secret");
4987        assert!(result.is_ok());
4988        let br = result.unwrap();
4989        assert_eq!(br.user.username, "admin");
4990        assert_eq!(br.user.role, Role::Admin);
4991        assert!(br.api_key.key.starts_with("rk_"));
4992        // No vault configured, so no certificate.
4993        assert!(br.certificate.is_none());
4994
4995        // Sealed now
4996        assert!(!store.needs_bootstrap());
4997        assert!(store.is_bootstrapped());
4998
4999        // Second bootstrap fails -- sealed permanently
5000        let result = store.bootstrap("admin2", "secret2");
5001        assert!(result.is_err());
5002        let err = result.unwrap_err();
5003        assert!(err.to_string().contains("sealed permanently"));
5004
5005        // Only 1 user exists (the first one)
5006        assert_eq!(store.list_users().len(), 1);
5007        assert_eq!(store.list_users()[0].username, "admin");
5008    }
5009
5010    #[test]
5011    fn test_bootstrap_after_manual_user_creation() {
5012        let store = AuthStore::new(test_config());
5013
5014        // Create a user manually first
5015        store.create_user("existing", "pass", Role::Read).unwrap();
5016
5017        // Bootstrap sees the seal hasn't been set but users exist
5018        // The atomic seal fires first, then the users check catches it
5019        assert!(!store.needs_bootstrap()); // users exist → false
5020    }
5021
5022    // ---------------------------------------------------------------
5023    // Tenant scoping
5024    // ---------------------------------------------------------------
5025
5026    #[test]
5027    fn test_same_username_two_tenants_distinct() {
5028        let store = AuthStore::new(test_config());
5029        store
5030            .create_user_in_tenant(Some("acme"), "alice", "pw-acme", Role::Write)
5031            .unwrap();
5032        store
5033            .create_user_in_tenant(Some("globex"), "alice", "pw-globex", Role::Read)
5034            .unwrap();
5035
5036        // Two distinct users.
5037        let users = store.list_users();
5038        assert_eq!(users.len(), 2);
5039
5040        // Each verifies its own password under its own tenant.
5041        assert!(store
5042            .authenticate_in_tenant(Some("acme"), "alice", "pw-acme")
5043            .is_ok());
5044        assert!(store
5045            .authenticate_in_tenant(Some("globex"), "alice", "pw-globex")
5046            .is_ok());
5047
5048        // Cross-tenant credentials are rejected.
5049        assert!(store
5050            .authenticate_in_tenant(Some("acme"), "alice", "pw-globex")
5051            .is_err());
5052        assert!(store
5053            .authenticate_in_tenant(Some("globex"), "alice", "pw-acme")
5054            .is_err());
5055    }
5056
5057    #[test]
5058    fn test_session_carries_tenant() {
5059        let store = AuthStore::new(test_config());
5060        store
5061            .create_user_in_tenant(Some("acme"), "alice", "pw", Role::Admin)
5062            .unwrap();
5063        let session = store
5064            .authenticate_in_tenant(Some("acme"), "alice", "pw")
5065            .unwrap();
5066        assert_eq!(session.tenant_id.as_deref(), Some("acme"));
5067
5068        let (id, role) = store.validate_token_full(&session.token).unwrap();
5069        assert_eq!(id.tenant.as_deref(), Some("acme"));
5070        assert_eq!(id.username, "alice");
5071        assert_eq!(role, Role::Admin);
5072    }
5073
5074    #[test]
5075    fn test_platform_user_has_no_tenant() {
5076        let store = AuthStore::new(test_config());
5077        store.create_user("admin", "pw", Role::Admin).unwrap();
5078        let session = store.authenticate("admin", "pw").unwrap();
5079        assert!(session.tenant_id.is_none());
5080
5081        let (id, _) = store.validate_token_full(&session.token).unwrap();
5082        assert!(id.tenant.is_none());
5083    }
5084
5085    #[test]
5086    fn test_lookup_scram_verifier_global_resolves_platform() {
5087        let store = AuthStore::new(test_config());
5088        store.create_user("admin", "pw", Role::Admin).unwrap();
5089        store
5090            .create_user_in_tenant(Some("acme"), "admin", "pw", Role::Admin)
5091            .unwrap();
5092
5093        // The global helper picks the platform-tenant user only.
5094        let v = store.lookup_scram_verifier_global("admin");
5095        assert!(v.is_some());
5096
5097        // The tenant-scoped user has its own verifier.
5098        let v_acme = store.lookup_scram_verifier(&UserId::scoped("acme", "admin"));
5099        assert!(v_acme.is_some());
5100
5101        // The two verifiers carry independent salts.
5102        assert_ne!(v.unwrap().salt, v_acme.unwrap().salt);
5103    }
5104
5105    #[test]
5106    fn test_delete_in_tenant_does_not_touch_other_tenant() {
5107        let store = AuthStore::new(test_config());
5108        store
5109            .create_user_in_tenant(Some("acme"), "alice", "pw", Role::Admin)
5110            .unwrap();
5111        store
5112            .create_user_in_tenant(Some("globex"), "alice", "pw", Role::Admin)
5113            .unwrap();
5114
5115        store.delete_user_in_tenant(Some("acme"), "alice").unwrap();
5116
5117        // Globex still alive.
5118        assert!(store
5119            .authenticate_in_tenant(Some("globex"), "alice", "pw")
5120            .is_ok());
5121        // Acme gone.
5122        assert!(store
5123            .authenticate_in_tenant(Some("acme"), "alice", "pw")
5124            .is_err());
5125    }
5126
5127    #[test]
5128    fn test_user_id_display() {
5129        assert_eq!(UserId::platform("admin").to_string(), "admin");
5130        assert_eq!(UserId::scoped("acme", "alice").to_string(), "acme/alice");
5131    }
5132
5133    // -----------------------------------------------------------------
5134    // PolicyEnforcementMode wiring (#712 / S5A)
5135    // -----------------------------------------------------------------
5136
5137    fn enforcement_eval_ctx(role: Role) -> EvalContext {
5138        EvalContext {
5139            principal_tenant: None,
5140            current_tenant: None,
5141            peer_ip: None,
5142            mfa_present: false,
5143            now_ms: 1_700_000_000_000,
5144            principal_is_admin_role: role == Role::Admin,
5145            principal_is_platform_scoped: true,
5146        }
5147    }
5148
5149    /// Helper: install a single unrelated allow policy so the IAM
5150    /// path is the authoritative one — without any installed policy
5151    /// the auth flow short-circuits before we ever consult the mode.
5152    fn install_unrelated_policy(store: &AuthStore) {
5153        store
5154            .put_policy(
5155                Policy::from_json_str(
5156                    r#"{"id":"p-unrelated","version":1,"statements":[{"effect":"allow","actions":["select"],"resources":["table:public.other"]}]}"#,
5157                )
5158                .unwrap(),
5159            )
5160            .unwrap();
5161    }
5162
5163    #[test]
5164    fn enforcement_mode_default_for_new_store_is_legacy_rbac() {
5165        // Pre-bootstrap construction = "existing install" path.
5166        // Defaulting to LegacyRbac preserves pre-#712 behaviour on
5167        // upgrade so an operator that has not yet attached IAM
5168        // policies does not lose access after the upgrade.
5169        let store = AuthStore::new(test_config());
5170        assert_eq!(store.enforcement_mode(), PolicyEnforcementMode::LegacyRbac);
5171    }
5172
5173    #[test]
5174    fn enforcement_mode_set_round_trips_and_returns_previous() {
5175        let store = AuthStore::new(test_config());
5176        let prev = store.set_enforcement_mode(PolicyEnforcementMode::PolicyOnly);
5177        assert_eq!(prev, PolicyEnforcementMode::LegacyRbac);
5178        assert_eq!(store.enforcement_mode(), PolicyEnforcementMode::PolicyOnly);
5179        let prev = store.set_enforcement_mode(PolicyEnforcementMode::LegacyRbac);
5180        assert_eq!(prev, PolicyEnforcementMode::PolicyOnly);
5181    }
5182
5183    #[test]
5184    fn policy_only_mode_treats_no_matching_policy_as_deny() {
5185        // Acceptance #2: in `policy_only` mode a principal with no
5186        // matching policy is denied even when the action's role
5187        // floor would otherwise have allowed it.
5188        let store = AuthStore::new(test_config());
5189        store.create_user("alice", "p", Role::Admin).unwrap();
5190        let uid = UserId::platform("alice");
5191        install_unrelated_policy(&store);
5192        store.set_enforcement_mode(PolicyEnforcementMode::PolicyOnly);
5193
5194        let resource = ResourceRef::new("table", "public.t");
5195        // Even Role::Admin is denied — DefaultDeny is the deciding
5196        // signal in `policy_only`, regardless of role.
5197        assert!(!store.check_policy_authz_with_role(
5198            &uid,
5199            "select",
5200            &resource,
5201            &enforcement_eval_ctx(Role::Admin),
5202            Role::Admin,
5203        ));
5204    }
5205
5206    #[test]
5207    fn legacy_rbac_mode_falls_back_to_role_decision() {
5208        // Acceptance #3: in `legacy_rbac` mode the same principal
5209        // falls through to the role-based decision. `select` requires
5210        // only `Read`, so all three roles satisfy it; a write verb
5211        // requires `Write`, so `Read` does not.
5212        let store = AuthStore::new(test_config());
5213        store.create_user("alice", "p", Role::Read).unwrap();
5214        let uid = UserId::platform("alice");
5215        install_unrelated_policy(&store);
5216        store.set_enforcement_mode(PolicyEnforcementMode::LegacyRbac);
5217
5218        let table = ResourceRef::new("table", "public.t");
5219        // Read role + select action: allowed via the legacy fallback.
5220        assert!(store.check_policy_authz_with_role(
5221            &uid,
5222            "select",
5223            &table,
5224            &enforcement_eval_ctx(Role::Read),
5225            Role::Read,
5226        ));
5227        // Read role + insert action: legacy decision says no.
5228        assert!(!store.check_policy_authz_with_role(
5229            &uid,
5230            "insert",
5231            &table,
5232            &enforcement_eval_ctx(Role::Read),
5233            Role::Read,
5234        ));
5235        // Admin role + insert action: legacy decision says yes.
5236        assert!(store.check_policy_authz_with_role(
5237            &uid,
5238            "insert",
5239            &table,
5240            &enforcement_eval_ctx(Role::Admin),
5241            Role::Admin,
5242        ));
5243    }
5244
5245    #[test]
5246    fn explicit_deny_wins_irrespective_of_mode_or_role() {
5247        // Even in `legacy_rbac` with an Admin role, an explicit
5248        // matching Deny statement always denies. The mode only
5249        // influences the `DefaultDeny` arm, never an explicit one.
5250        let store = AuthStore::new(test_config());
5251        store.create_user("alice", "p", Role::Admin).unwrap();
5252        let uid = UserId::platform("alice");
5253        store
5254            .put_policy(
5255                Policy::from_json_str(
5256                    r#"{"id":"p-deny-select","version":1,"statements":[{"effect":"deny","actions":["select"],"resources":["table:public.t"]}]}"#,
5257                )
5258                .unwrap(),
5259            )
5260            .unwrap();
5261        store
5262            .attach_policy(PrincipalRef::User(uid.clone()), "p-deny-select")
5263            .unwrap();
5264
5265        let resource = ResourceRef::new("table", "public.t");
5266        for mode in [
5267            PolicyEnforcementMode::LegacyRbac,
5268            PolicyEnforcementMode::PolicyOnly,
5269        ] {
5270            store.set_enforcement_mode(mode);
5271            assert!(
5272                !store.check_policy_authz_with_role(
5273                    &uid,
5274                    "select",
5275                    &resource,
5276                    &enforcement_eval_ctx(Role::Admin),
5277                    Role::Admin,
5278                ),
5279                "explicit deny must win under mode {mode:?}"
5280            );
5281        }
5282    }
5283
5284    #[test]
5285    fn explicit_allow_wins_irrespective_of_mode_or_role() {
5286        // Symmetric guarantee: an explicit Allow is honoured even in
5287        // `policy_only` mode for a role that the legacy fallback
5288        // would have rejected (Read role + admin-tier action).
5289        let store = AuthStore::new(test_config());
5290        store.create_user("alice", "p", Role::Read).unwrap();
5291        let uid = UserId::platform("alice");
5292        store
5293            .put_policy(
5294                Policy::from_json_str(
5295                    r#"{"id":"p-allow-create","version":1,"statements":[{"effect":"allow","actions":["create"],"resources":["table:public.t"]}]}"#,
5296                )
5297                .unwrap(),
5298            )
5299            .unwrap();
5300        store
5301            .attach_policy(PrincipalRef::User(uid.clone()), "p-allow-create")
5302            .unwrap();
5303        store.set_enforcement_mode(PolicyEnforcementMode::PolicyOnly);
5304
5305        let resource = ResourceRef::new("table", "public.t");
5306        assert!(store.check_policy_authz_with_role(
5307            &uid,
5308            "create",
5309            &resource,
5310            &enforcement_eval_ctx(Role::Read),
5311            Role::Read,
5312        ));
5313    }
5314
5315    #[test]
5316    fn take_legacy_rbac_warn_once_is_at_most_once_per_boot() {
5317        let store = AuthStore::new(test_config());
5318        // Default mode = LegacyRbac, so the first call wins the token.
5319        assert!(store.take_legacy_rbac_warn_once());
5320        // Every subsequent call is a no-op for the lifetime of this
5321        // process / AuthStore — that is the "once per boot" guarantee.
5322        for _ in 0..3 {
5323            assert!(!store.take_legacy_rbac_warn_once());
5324        }
5325    }
5326
5327    #[test]
5328    fn take_legacy_rbac_warn_once_is_silent_under_policy_only() {
5329        // Acceptance #5 phrases the warning as "Boot in legacy_rbac
5330        // mode emits exactly one warn entry per boot" — implicitly,
5331        // `policy_only` boots emit zero. The token must not be
5332        // claimable when the mode is strict so the boot path stays
5333        // quiet.
5334        let store = AuthStore::new(test_config());
5335        store.set_enforcement_mode(PolicyEnforcementMode::PolicyOnly);
5336        assert!(!store.take_legacy_rbac_warn_once());
5337        // And, importantly, switching back to LegacyRbac later does
5338        // not "owe" us a warning — the token was never claimed under
5339        // policy_only, so it is still available the first time the
5340        // mode is legacy_rbac.
5341        store.set_enforcement_mode(PolicyEnforcementMode::LegacyRbac);
5342        assert!(store.take_legacy_rbac_warn_once());
5343        assert!(!store.take_legacy_rbac_warn_once());
5344    }
5345
5346    #[test]
5347    fn strict_check_policy_authz_ignores_enforcement_mode() {
5348        // Governance APIs (managed_config, registry, managed_policy)
5349        // call the strict `check_policy_authz` — it must return
5350        // `false` on DefaultDeny regardless of mode so the lenient
5351        // posture cannot accidentally elevate an admin-tier mutation.
5352        let store = AuthStore::new(test_config());
5353        store.create_user("alice", "p", Role::Admin).unwrap();
5354        let uid = UserId::platform("alice");
5355        install_unrelated_policy(&store);
5356        store.set_enforcement_mode(PolicyEnforcementMode::LegacyRbac);
5357
5358        let resource = ResourceRef::new("table", "public.t");
5359        assert!(!store.check_policy_authz(
5360            &uid,
5361            "select",
5362            &resource,
5363            &enforcement_eval_ctx(Role::Admin),
5364        ));
5365    }
5366}