Skip to main content

reddb_server/auth/
store.rs

1//! AuthStore -- manages users, sessions, and API keys in memory.
2//!
3//! Password hashing delegates to the existing Argon2id implementation in
4//! `crate::storage::encryption::argon2id`.  Token generation uses the
5//! OS CSPRNG (`crate::crypto::os_random`) plus SHA-256.
6#![deny(clippy::disallowed_methods)]
7
8use std::borrow::Cow;
9use std::collections::HashMap;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, RwLock};
12
13use crate::crypto::os_random;
14use crate::crypto::sha256::sha256;
15use crate::storage::encryption::argon2id::{derive_key, Argon2Params};
16use crate::storage::engine::pager::Pager;
17
18use super::column_policy_gate::{ColumnAccessRequest, ColumnPolicyGate, ColumnPolicyOutcome};
19use super::enforcement_mode::{legacy_rbac_decision, PolicyEnforcementMode};
20use super::managed_policy::{ManagedPolicyDecision, ManagedPolicyGate, PolicyOp};
21use super::policies::{self as iam_policies, EvalContext, Policy, ResourceRef, SimulationOutcome};
22use super::privileges::{
23    check_grant, Action, AuthzContext, AuthzError, Grant, GrantPrincipal, GrantsView,
24    PermissionCache, Resource, UserAttributes,
25};
26use super::registry::ConfigRegistry;
27use super::vault::{KeyPair, Vault, VaultState};
28use super::{now_ms, ApiKey, AuthConfig, AuthError, Role, Session, User, UserId};
29use crate::runtime::control_events::{
30    ControlEvent, ControlEventConfig, ControlEventCtx, ControlEventLedger, EventKind, Outcome,
31    Sensitivity,
32};
33
34// ---------------------------------------------------------------------------
35// PrincipalRef + SimCtx — IAM policy attachments
36// ---------------------------------------------------------------------------
37
38/// Principal targeted by `attach_policy` / `detach_policy`.
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub enum PrincipalRef {
41    User(UserId),
42    Group(String),
43}
44
45/// Reserved IAM group that every principal belongs to. Used by the
46/// GRANT-to-PUBLIC compatibility layer.
47pub const PUBLIC_IAM_GROUP: &str = "__public__";
48
49/// Optional context overrides for `simulate` — anything not set falls back
50/// to a default value when the kernel evaluates the request.
51#[derive(Debug, Clone, Default)]
52pub struct SimCtx {
53    pub current_tenant: Option<String>,
54    pub peer_ip: Option<std::net::IpAddr>,
55    pub mfa_present: bool,
56    pub now_ms: Option<u128>,
57}
58
59/// Control-event dependencies for audited IAM policy mutations.
60///
61/// The existing `put_policy` / `delete_policy` / attach / detach
62/// methods remain unaudited for bootstrap and synthetic GRANT internals.
63/// Public mutation surfaces that need evidence pass this context to the
64/// `*_with_control_events` siblings.
65pub struct PolicyMutationControl<'a> {
66    pub ctx: &'a ControlEventCtx<'a>,
67    pub ledger: &'a dyn ControlEventLedger,
68    pub config: ControlEventConfig,
69    pub registry: Option<&'a ConfigRegistry>,
70    pub actor: &'a UserId,
71    pub eval_ctx: &'a EvalContext,
72}
73
74/// Control-event dependencies for audited user lifecycle mutations.
75pub struct UserLifecycleControl<'a> {
76    pub ctx: &'a ControlEventCtx<'a>,
77    pub ledger: &'a dyn ControlEventLedger,
78    pub config: ControlEventConfig,
79}
80
81#[derive(Clone)]
82struct AuthStoreControlEvents {
83    ledger: Arc<dyn ControlEventLedger>,
84    config: ControlEventConfig,
85}
86
87// ---------------------------------------------------------------------------
88// BootstrapResult
89// ---------------------------------------------------------------------------
90
91/// Result of a successful bootstrap operation.
92///
93/// The `certificate` is the hex-encoded string the admin must save --
94/// it is the ONLY way to unseal the vault after a restart.
95#[derive(Debug)]
96pub struct BootstrapResult {
97    pub user: User,
98    pub api_key: ApiKey,
99    /// Certificate hex string.  `None` when vault is not configured.
100    pub certificate: Option<String>,
101}
102
103// ---------------------------------------------------------------------------
104// AuthStore
105// ---------------------------------------------------------------------------
106
107/// Central in-process authority for auth state.
108///
109/// All mutations are guarded by `RwLock`s so the store is `Send + Sync`.
110pub struct AuthStore {
111    /// `(tenant_id, username) -> User`. Tenant scoping is built into the
112    /// key so `alice@acme` and `alice@globex` are distinct identities.
113    users: RwLock<HashMap<UserId, User>>,
114    sessions: RwLock<HashMap<String, Session>>,
115    /// key-string -> (owner UserId, role)
116    api_key_index: RwLock<HashMap<String, (UserId, Role)>>,
117    /// Once true, bootstrap() is permanently sealed.
118    bootstrapped: AtomicBool,
119    config: AuthConfig,
120    /// Optional encrypted vault for persisting auth state to pager pages.
121    vault: RwLock<Option<Vault>>,
122    /// Reference to the pager for vault page I/O.
123    pager: Option<Arc<Pager>>,
124    /// Certificate-based keypair for token signing and vault seal.
125    /// Populated after bootstrap or after restoring from a sealed vault.
126    keypair: RwLock<Option<KeyPair>>,
127    /// Encrypted key-value store for arbitrary secrets.
128    /// Persisted to vault alongside users/api_keys.
129    vault_kv: RwLock<HashMap<String, String>>,
130    /// Per-user GRANT rows. Persisted via `vault_kv` under the
131    /// `red.acl.grants.<tenant>/<user>` key prefix so existing snapshot
132    /// logic keeps working without modification. See `privileges` module
133    /// for the resolution algorithm.
134    grants: RwLock<HashMap<UserId, Vec<Grant>>>,
135    /// PUBLIC grants — apply to every authenticated principal.
136    public_grants: RwLock<Vec<Grant>>,
137    /// PG-style account attributes (`VALID UNTIL`, `CONNECTION LIMIT`,
138    /// `search_path`). Keyed by `UserId`. Persisted under
139    /// `red.acl.attrs.<tenant>/<user>`.
140    user_attributes: RwLock<HashMap<UserId, UserAttributes>>,
141    /// Live session count per user — used by `CONNECTION LIMIT`
142    /// enforcement on login. Bumped at authenticate, decremented at
143    /// session revoke / expiry.
144    session_count_by_user: RwLock<HashMap<UserId, u32>>,
145    /// Pre-resolved (resource, action) cache built per-user so the
146    /// hot path skips a linear scan of the user's grants on every
147    /// statement. Invalidated on GRANT / REVOKE / ALTER USER.
148    permission_cache: RwLock<HashMap<UserId, PermissionCache>>,
149    /// IAM-style policies, keyed by id. Persisted under
150    /// `red.iam.policies`. The kernel in `super::policies` owns the
151    /// Policy type — this map just deduplicates and shares.
152    policies: RwLock<HashMap<String, Arc<Policy>>>,
153    /// Per-user policy attachments — ordered list of policy ids.
154    /// Persisted under `red.iam.attachments.users`.
155    user_attachments: RwLock<HashMap<UserId, Vec<String>>>,
156    /// Per-group policy attachments. Users join groups through
157    /// `UserAttributes::groups`; effective policies resolve group
158    /// attachments before user-direct attachments.
159    group_attachments: RwLock<HashMap<String, Vec<String>>>,
160    /// Cached effective `Vec<Arc<Policy>>` per user. Invalidated on
161    /// any policy mutation that affects the user's attachments.
162    iam_effective_cache: RwLock<HashMap<UserId, Vec<Arc<Policy>>>>,
163    /// Once any IAM policy is installed, authorization switches to the
164    /// IAM evaluator and stays deny-by-default even if policies are
165    /// later deleted. Persisted under `red.iam.enabled`.
166    iam_authorization_enabled: AtomicBool,
167    /// Selects the fallback behaviour when the IAM evaluator returns
168    /// `DefaultDeny`. See [`PolicyEnforcementMode`] and #712 / S5A.
169    /// Default is [`PolicyEnforcementMode::default_existing_install`]
170    /// (`LegacyRbac`) so an existing install upgrading past #712
171    /// keeps its current authorization posture until the operator
172    /// flips it explicitly. The bootstrap path sets this to
173    /// `PolicyOnly` for fresh installs; the boot-time config loader
174    /// supersedes both when a `red.config.policy.enforcement_mode`
175    /// value is present in the configured value store.
176    enforcement_mode: RwLock<PolicyEnforcementMode>,
177    /// Once-per-boot flag used by [`AuthStore::take_legacy_rbac_warn_once`]
178    /// to deliver exactly one `warn`-level log line when the store is
179    /// running under [`PolicyEnforcementMode::LegacyRbac`]. The boot
180    /// path checks the flag and, if it can claim it, emits the line.
181    legacy_rbac_boot_warn_emitted: AtomicBool,
182    /// `(tenant, role) → HashSet<CollectionId>` cache used by the AI
183    /// pipeline (issue #119). Distinct from `permission_cache`, which
184    /// is keyed by `UserId` and answers `(resource, action)` lookups —
185    /// this cache answers the inverse "what collections is this scope
186    /// allowed to read?" query that `AuthorizedSearch` uses to
187    /// pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidates before any
188    /// similarity score is computed. Entries TTL out at 60s and are
189    /// invalidated explicitly on GRANT/REVOKE/CREATE POLICY/DROP
190    /// POLICY/DROP COLLECTION.
191    visible_collections_cache: super::scope_cache::AuthCache,
192    control_events: RwLock<Option<AuthStoreControlEvents>>,
193}
194
195// Use fast-but-safe Argon2id params for auth hashing (smaller than the
196// default 64 MB so that user-management RPCs respond quickly).
197fn auth_argon2_params() -> Argon2Params {
198    Argon2Params {
199        m_cost: 4 * 1024, // 4 MB
200        t_cost: 3,
201        p: 1,
202        tag_len: 32,
203    }
204}
205
206fn policy_control_fields(
207    policy_id: &str,
208    policy: Option<&Policy>,
209    principal: Option<&PrincipalRef>,
210) -> HashMap<String, Sensitivity> {
211    let mut fields = HashMap::new();
212    fields.insert("policy_id".to_string(), Sensitivity::raw(policy_id));
213    if let Some(policy) = policy {
214        let effects = policy
215            .statements
216            .iter()
217            .map(|s| match s.effect {
218                iam_policies::Effect::Allow => "allow",
219                iam_policies::Effect::Deny => "deny",
220            })
221            .collect::<Vec<_>>()
222            .join(",");
223        let actions = policy
224            .statements
225            .iter()
226            .flat_map(|s| s.actions.iter())
227            .map(policy_action_pattern)
228            .collect::<Vec<_>>()
229            .join(",");
230        let resources = policy
231            .statements
232            .iter()
233            .flat_map(|s| s.resources.iter())
234            .map(policy_resource_pattern)
235            .collect::<Vec<_>>()
236            .join(",");
237        fields.insert("effect".to_string(), Sensitivity::raw(effects));
238        fields.insert("action".to_string(), Sensitivity::raw(actions));
239        fields.insert("resource".to_string(), Sensitivity::raw(resources));
240        if let Some(attrs) = policy_principal_attrs(policy) {
241            fields.insert("principal_attrs".to_string(), Sensitivity::raw(attrs));
242        }
243    }
244    if let Some(principal) = principal {
245        fields.insert(
246            "principal".to_string(),
247            Sensitivity::raw(policy_principal_label(principal)),
248        );
249    }
250    fields
251}
252
253fn policy_action_pattern(pattern: &iam_policies::ActionPattern) -> String {
254    match pattern {
255        iam_policies::ActionPattern::Exact(s) => s.clone(),
256        iam_policies::ActionPattern::Wildcard => "*".to_string(),
257        iam_policies::ActionPattern::Prefix(s) => format!("{s}:*"),
258    }
259}
260
261fn policy_resource_pattern(pattern: &iam_policies::ResourcePattern) -> String {
262    match pattern {
263        iam_policies::ResourcePattern::Exact { kind, name } => format!("{kind}:{name}"),
264        iam_policies::ResourcePattern::Glob(s) => s.clone(),
265        iam_policies::ResourcePattern::Wildcard => "*".to_string(),
266    }
267}
268
269fn policy_principal_attrs(policy: &Policy) -> Option<String> {
270    let mut attrs = Vec::new();
271    for statement in &policy.statements {
272        let Some(condition) = &statement.condition else {
273            continue;
274        };
275        if let Some(value) = condition.system_owned {
276            attrs.push(format!("system_owned={value}"));
277        }
278        if let Some(value) = condition.platform_scoped {
279            attrs.push(format!("platform_scoped={value}"));
280        }
281        if let Some(value) = condition.mfa {
282            attrs.push(format!("mfa={value}"));
283        }
284        if let Some(value) = condition.tenant_match {
285            attrs.push(format!("tenant_match={value}"));
286        }
287    }
288    if attrs.is_empty() {
289        None
290    } else {
291        Some(attrs.join(","))
292    }
293}
294
295fn policy_principal_label(principal: &PrincipalRef) -> String {
296    match principal {
297        PrincipalRef::User(uid) => format!("user:{uid}"),
298        PrincipalRef::Group(group) => format!("group:{group}"),
299    }
300}
301
302fn default_user_lifecycle_ctx<'a>() -> ControlEventCtx<'a> {
303    ControlEventCtx {
304        actor: crate::runtime::control_events::ActorRef::Anonymous,
305        scope: None,
306        request_id: None,
307        trace_id: None,
308    }
309}
310
311fn bootstrap_user_lifecycle_ctx<'a>() -> ControlEventCtx<'a> {
312    ControlEventCtx {
313        actor: crate::runtime::control_events::ActorRef::System("bootstrap"),
314        scope: None,
315        request_id: None,
316        trace_id: None,
317    }
318}
319
320fn user_resource(id: &UserId) -> String {
321    format!("user:{id}")
322}
323
324fn api_key_resource(api_key_id: &str) -> String {
325    format!("apikey:{api_key_id}")
326}
327
328fn api_key_id(key: &str) -> String {
329    hex::encode(sha256(key.as_bytes()))
330}
331
332fn password_evidence() -> Sensitivity {
333    Sensitivity::redacted()
334}
335
336fn user_control_fields(
337    id: &UserId,
338    role: Option<Role>,
339    enabled: Option<bool>,
340    include_password: bool,
341) -> HashMap<String, Sensitivity> {
342    let mut fields = HashMap::new();
343    fields.insert(
344        "username".to_string(),
345        Sensitivity::raw(id.username.clone()),
346    );
347    fields.insert(
348        "tenant_id".to_string(),
349        Sensitivity::raw(id.tenant.clone().unwrap_or_default()),
350    );
351    if let Some(role) = role {
352        fields.insert("role".to_string(), Sensitivity::raw(role.as_str()));
353    }
354    if let Some(enabled) = enabled {
355        fields.insert("enabled".to_string(), Sensitivity::raw(enabled.to_string()));
356    }
357    if include_password {
358        fields.insert("password".to_string(), password_evidence());
359    }
360    fields
361}
362
363fn api_key_control_fields(
364    id: &UserId,
365    role: Role,
366    api_key_id: &str,
367) -> HashMap<String, Sensitivity> {
368    let mut fields = user_control_fields(id, Some(role), None, false);
369    fields.insert("api_key_id".to_string(), Sensitivity::raw(api_key_id));
370    fields.insert("api_key".to_string(), Sensitivity::redacted());
371    fields
372}
373
374fn user_error_is_denied(err: &AuthError) -> bool {
375    !matches!(err, AuthError::Internal(_))
376}
377
378impl AuthStore {
379    // -----------------------------------------------------------------
380    // Construction
381    // -----------------------------------------------------------------
382
383    pub fn new(config: AuthConfig) -> Self {
384        Self {
385            users: RwLock::new(HashMap::new()),
386            sessions: RwLock::new(HashMap::new()),
387            api_key_index: RwLock::new(HashMap::new()),
388            bootstrapped: AtomicBool::new(false),
389            config,
390            vault: RwLock::new(None),
391            pager: None,
392            keypair: RwLock::new(None),
393            vault_kv: RwLock::new(HashMap::new()),
394            grants: RwLock::new(HashMap::new()),
395            public_grants: RwLock::new(Vec::new()),
396            user_attributes: RwLock::new(HashMap::new()),
397            session_count_by_user: RwLock::new(HashMap::new()),
398            permission_cache: RwLock::new(HashMap::new()),
399            policies: RwLock::new(HashMap::new()),
400            user_attachments: RwLock::new(HashMap::new()),
401            group_attachments: RwLock::new(HashMap::new()),
402            iam_effective_cache: RwLock::new(HashMap::new()),
403            iam_authorization_enabled: AtomicBool::new(false),
404            enforcement_mode: RwLock::new(PolicyEnforcementMode::default_existing_install()),
405            legacy_rbac_boot_warn_emitted: AtomicBool::new(false),
406            visible_collections_cache: super::scope_cache::AuthCache::new(
407                super::scope_cache::DEFAULT_TTL,
408            ),
409            control_events: RwLock::new(None),
410        }
411    }
412
413    pub fn configure_control_events(
414        &self,
415        ledger: Arc<dyn ControlEventLedger>,
416        config: ControlEventConfig,
417    ) {
418        *self
419            .control_events
420            .write()
421            .unwrap_or_else(|e| e.into_inner()) = Some(AuthStoreControlEvents { ledger, config });
422    }
423
424    fn configured_control_events(&self) -> Option<AuthStoreControlEvents> {
425        self.control_events
426            .read()
427            .unwrap_or_else(|e| e.into_inner())
428            .clone()
429    }
430
431    /// Create an `AuthStore` backed by encrypted vault pages inside the
432    /// main `.rdb` database file.
433    ///
434    /// If vault pages already exist, their contents are loaded and
435    /// restored into the in-memory store.  All subsequent mutations are
436    /// automatically persisted to the vault pages via the pager.
437    pub fn with_vault(
438        config: AuthConfig,
439        pager: Arc<Pager>,
440        passphrase: Option<&str>,
441    ) -> Result<Self, AuthError> {
442        let vault = Vault::open(&pager, passphrase)?;
443        let mut store = Self::new(config);
444
445        // Restore persisted state if vault pages exist.
446        if let Some(state) = vault.load(&pager)? {
447            store.restore_from_vault(state);
448        }
449
450        *store.vault.write().unwrap_or_else(|e| e.into_inner()) = Some(vault);
451        store.pager = Some(pager);
452        Ok(store)
453    }
454
455    pub fn config(&self) -> &AuthConfig {
456        &self.config
457    }
458
459    pub fn is_enabled(&self) -> bool {
460        self.config.enabled
461    }
462
463    /// Returns true when no users exist yet and bootstrap hasn't been sealed.
464    pub fn needs_bootstrap(&self) -> bool {
465        !self.bootstrapped.load(Ordering::Acquire)
466            && self.users.read().map(|u| u.is_empty()).unwrap_or(true)
467    }
468
469    /// Internal: read-locked lookup by `UserId`.
470    fn get_user_cloned(&self, id: &UserId) -> Option<User> {
471        self.users.read().ok().and_then(|m| m.get(id).cloned())
472    }
473
474    /// Whether bootstrap has already been performed (sealed).
475    pub fn is_bootstrapped(&self) -> bool {
476        self.bootstrapped.load(Ordering::Acquire)
477    }
478
479    /// Bootstrap the first admin user. One-shot, irreversible.
480    ///
481    /// Uses an atomic compare-exchange to guarantee that even under
482    /// concurrent calls, only the first one succeeds. Once sealed,
483    /// all subsequent calls fail immediately -- there is no undo.
484    ///
485    /// When a vault/pager is configured, a certificate-based keypair is
486    /// generated and the vault is re-encrypted with the certificate-derived
487    /// key.  The certificate hex string is returned in `BootstrapResult`
488    /// so the admin can save it.
489    pub fn bootstrap(&self, username: &str, password: &str) -> Result<BootstrapResult, AuthError> {
490        if let Some(configured) = self.configured_control_events() {
491            let ctx = bootstrap_user_lifecycle_ctx();
492            let control = UserLifecycleControl {
493                ctx: &ctx,
494                ledger: configured.ledger.as_ref(),
495                config: configured.config,
496            };
497            self.bootstrap_with_ownership_and_control_events(username, password, false, &control)
498        } else {
499            self.bootstrap_with_ownership(username, password, false)
500        }
501    }
502
503    pub fn bootstrap_with_control_events(
504        &self,
505        username: &str,
506        password: &str,
507        ctx: &ControlEventCtx<'_>,
508        ledger: &dyn ControlEventLedger,
509        config: ControlEventConfig,
510    ) -> Result<BootstrapResult, AuthError> {
511        let system_ctx = ControlEventCtx {
512            actor: crate::runtime::control_events::ActorRef::System("bootstrap"),
513            scope: ctx.scope.clone(),
514            request_id: ctx.request_id.clone(),
515            trace_id: ctx.trace_id.clone(),
516        };
517        let control = UserLifecycleControl {
518            ctx: &system_ctx,
519            ledger,
520            config,
521        };
522        self.bootstrap_with_ownership_and_control_events(username, password, false, &control)
523    }
524
525    /// Bootstrap the first admin with `system_owned = true` and platform
526    /// scope (`tenant = None`). The resulting principal carries both
527    /// `principal_is_system_owned` and `principal_is_platform_scoped`,
528    /// which is what #649's `ManagedConfigGate` requires before allowing
529    /// writes to managed config keys. Used by the `production` bootstrap
530    /// preset (issue #650).
531    pub fn bootstrap_system_admin(
532        &self,
533        username: &str,
534        password: &str,
535    ) -> Result<BootstrapResult, AuthError> {
536        if let Some(configured) = self.configured_control_events() {
537            let ctx = bootstrap_user_lifecycle_ctx();
538            let control = UserLifecycleControl {
539                ctx: &ctx,
540                ledger: configured.ledger.as_ref(),
541                config: configured.config,
542            };
543            self.bootstrap_with_ownership_and_control_events(username, password, true, &control)
544        } else {
545            self.bootstrap_with_ownership(username, password, true)
546        }
547    }
548
549    fn bootstrap_with_ownership(
550        &self,
551        username: &str,
552        password: &str,
553        system_owned: bool,
554    ) -> Result<BootstrapResult, AuthError> {
555        self.bootstrap_with_ownership_unaudited(username, password, system_owned)
556    }
557
558    fn bootstrap_with_ownership_unaudited(
559        &self,
560        username: &str,
561        password: &str,
562        system_owned: bool,
563    ) -> Result<BootstrapResult, AuthError> {
564        // Atomic seal: only the first caller wins.
565        if self
566            .bootstrapped
567            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
568            .is_err()
569        {
570            return Err(AuthError::Forbidden(
571                "bootstrap already completed — sealed permanently".to_string(),
572            ));
573        }
574
575        // Double-check users are actually empty (belt and suspenders).
576        {
577            let users = self.users.read().map_err(lock_err)?;
578            if !users.is_empty() {
579                return Err(AuthError::Forbidden(
580                    "bootstrap already completed — users exist".to_string(),
581                ));
582            }
583        }
584
585        let user = if system_owned {
586            self.create_user_in_tenant_with_ownership_unaudited(
587                None,
588                username,
589                password,
590                Role::Admin,
591                true,
592            )?
593        } else {
594            self.create_user_in_tenant_with_ownership_unaudited(
595                None,
596                username,
597                password,
598                Role::Admin,
599                false,
600            )?
601        };
602        let key =
603            self.create_api_key_in_tenant_unaudited(None, username, "bootstrap", Role::Admin)?;
604
605        // Generate a certificate-based keypair and re-seal the vault.
606        let certificate = if let Some(ref pager) = self.pager {
607            let kp = KeyPair::generate();
608            let cert_hex = kp.certificate_hex();
609
610            // Re-create the vault using the certificate-derived key.
611            let new_vault = Vault::with_certificate_bytes(pager, &kp.certificate)
612                .map_err(|e| AuthError::Internal(format!("vault re-seal failed: {e}")))?;
613
614            // Store the keypair so token signing works immediately.
615            if let Ok(mut kp_guard) = self.keypair.write() {
616                *kp_guard = Some(kp);
617            }
618
619            // Replace the vault and persist with the master secret included.
620            if let Ok(mut vault_guard) = self.vault.write() {
621                *vault_guard = Some(new_vault);
622            }
623            // Generate the AES-256 secret key for Value::Secret encryption.
624            self.ensure_vault_secret_key();
625            self.persist_to_vault();
626
627            Some(cert_hex)
628        } else {
629            None
630        };
631
632        // #712 / S5A: fresh bootstraps land in the strict posture.
633        // Persist explicitly to vault_kv so subsequent boots
634        // (rehydrate_iam) pick up `policy_only` instead of the
635        // existing-install default. Existing installs upgrading past
636        // this commit never traverse this branch — `bootstrap()` is
637        // sealed once and never re-runs.
638        self.set_enforcement_mode(PolicyEnforcementMode::default_fresh_bootstrap());
639
640        Ok(BootstrapResult {
641            user,
642            api_key: key,
643            certificate,
644        })
645    }
646
647    fn bootstrap_with_ownership_and_control_events(
648        &self,
649        username: &str,
650        password: &str,
651        system_owned: bool,
652        control: &UserLifecycleControl<'_>,
653    ) -> Result<BootstrapResult, AuthError> {
654        let id = UserId::from_parts(None, username);
655        match self.bootstrap_with_ownership_unaudited(username, password, system_owned) {
656            Ok(result) => {
657                let event_result = self.emit_user_lifecycle_allowed(
658                    control,
659                    EventKind::UserCreate,
660                    "user.create",
661                    &id,
662                    user_control_fields(&id, Some(Role::Admin), Some(true), true),
663                );
664                if let Err(err) = event_result {
665                    self.rollback_bootstrap(&id);
666                    return Err(err);
667                }
668                Ok(result)
669            }
670            Err(err) => {
671                self.emit_user_lifecycle_outcome(
672                    control,
673                    if user_error_is_denied(&err) {
674                        Outcome::Denied
675                    } else {
676                        Outcome::Error
677                    },
678                    EventKind::UserCreate,
679                    "user.create",
680                    &id,
681                    Some(err.to_string()),
682                    user_control_fields(&id, Some(Role::Admin), Some(true), true),
683                );
684                Err(err)
685            }
686        }
687    }
688
689    /// Auto-bootstrap from environment variables if no users exist.
690    ///
691    /// Checks `REDDB_USERNAME` and `REDDB_PASSWORD`. If both are set and
692    /// the user store is empty, creates the first admin user automatically.
693    /// This mirrors the Docker pattern (`MYSQL_ROOT_PASSWORD`, etc.).
694    ///
695    /// Returns `Some(BootstrapResult)` if bootstrapped, `None` if skipped.
696    pub fn bootstrap_from_env(&self) -> Option<BootstrapResult> {
697        if !self.needs_bootstrap() {
698            return None;
699        }
700
701        let username = std::env::var("REDDB_USERNAME").ok()?;
702        let password = std::env::var("REDDB_PASSWORD").ok()?;
703
704        if username.is_empty() || password.is_empty() {
705            return None;
706        }
707
708        match self.bootstrap(&username, &password) {
709            Ok(result) => {
710                // F-04: `username` is REDDB_USERNAME — operator-supplied
711                // (env), but still routed through the LogField escaper
712                // because env strings cross trust boundaries in some
713                // deployment models (k8s downward API, Vault sidecar,
714                // external secret operator). See ADR 0010.
715                tracing::info!(
716                    username = %reddb_wire::audit_safe_log_field(&username),
717                    "bootstrapped admin user from REDDB_USERNAME/REDDB_PASSWORD"
718                );
719                if let Some(ref cert) = result.certificate {
720                    // Certificate must be readable by the operator — keep it
721                    // in the log stream but print raw to stderr too so it
722                    // survives even if the log file gets rotated.
723                    eprintln!("[reddb] CERTIFICATE: {}", cert);
724                    tracing::warn!(
725                        "vault certificate issued — save it: ONLY way to unseal after restart"
726                    );
727                }
728                Some(result)
729            }
730            Err(e) => {
731                tracing::error!(err = %e, "env bootstrap failed");
732                None
733            }
734        }
735    }
736
737    // -----------------------------------------------------------------
738    // Vault persistence
739    // -----------------------------------------------------------------
740
741    /// Persist the current auth state to the vault pages (if configured).
742    fn persist_to_vault_result(&self) -> Result<(), AuthError> {
743        let vault_guard = self.vault.read().unwrap_or_else(|e| e.into_inner());
744        if let (Some(ref vault), Some(ref pager)) = (&*vault_guard, &self.pager) {
745            let state = self.snapshot();
746            vault.save(pager, &state)?;
747        }
748        Ok(())
749    }
750
751    /// Persist the current auth state to the vault pages (if configured).
752    ///
753    /// Legacy auth mutations still treat in-memory state as authoritative.
754    /// New secret-management paths use the `try_` methods below so callers
755    /// get a hard error if the vault write fails.
756    fn persist_to_vault(&self) {
757        if let Err(e) = self.persist_to_vault_result() {
758            tracing::error!(err = %e, "vault persist failed");
759            // Issue #205 — vault persist is the secret-rotation
760            // serialization point: when this fails, freshly rotated
761            // credentials live only in memory and a process restart
762            // loses them. Operator-grade event so the operator can
763            // intervene before the next restart.
764            crate::telemetry::operator_event::OperatorEvent::SecretRotationFailed {
765                secret_ref: "auth_vault".to_string(),
766                error: e.to_string(),
767            }
768            .emit_global();
769        }
770    }
771
772    /// True when this store has an encrypted vault and pager wired in.
773    pub fn is_vault_backed(&self) -> bool {
774        self.pager.is_some()
775            && self
776                .vault
777                .read()
778                .map(|guard| guard.is_some())
779                .unwrap_or(false)
780    }
781
782    // -----------------------------------------------------------------
783    // Vault KV — encrypted key-value store for arbitrary secrets
784    // -----------------------------------------------------------------
785
786    /// Read a value from the vault KV store. Returns `None` if not set.
787    pub fn vault_kv_get(&self, key: &str) -> Option<String> {
788        self.vault_kv
789            .read()
790            .ok()
791            .and_then(|kv| kv.get(key).cloned())
792    }
793
794    /// Snapshot vault KV values for statement-local secret resolution.
795    pub fn vault_kv_snapshot(&self) -> HashMap<String, String> {
796        self.vault_kv
797            .read()
798            .map(|kv| kv.clone())
799            .unwrap_or_default()
800    }
801
802    /// Export vault KV as an encrypted logical blob for JSONL dump/restore.
803    /// Returns `None` when the vault has no KV entries.
804    pub fn vault_kv_export_encrypted(&self) -> Result<Option<String>, AuthError> {
805        if !self.is_vault_backed() {
806            return Err(AuthError::Forbidden(
807                "vault KV export requires an enabled, unsealed vault".to_string(),
808            ));
809        }
810        let kv = self.vault_kv_snapshot();
811        if kv.is_empty() {
812            return Ok(None);
813        }
814
815        let vault_guard = self.vault.read().map_err(lock_err)?;
816        let vault = vault_guard.as_ref().ok_or_else(|| {
817            AuthError::Forbidden("vault KV export requires an enabled, unsealed vault".to_string())
818        })?;
819        let state = VaultState {
820            users: Vec::new(),
821            api_keys: Vec::new(),
822            bootstrapped: false,
823            master_secret: None,
824            kv,
825        };
826        Ok(Some(vault.seal_logical_export(&state)?))
827    }
828
829    /// Merge imported vault KV entries and fail if the encrypted vault
830    /// write cannot be made durable.
831    pub fn vault_kv_try_import(
832        &self,
833        entries: HashMap<String, String>,
834    ) -> Result<usize, AuthError> {
835        if !self.is_vault_backed() {
836            return Err(AuthError::Forbidden(
837                "vault KV import requires an enabled, unsealed vault".to_string(),
838            ));
839        }
840        if entries.is_empty() {
841            return Ok(0);
842        }
843
844        let mut previous = HashMap::new();
845        {
846            let mut kv = self.vault_kv.write().map_err(lock_err)?;
847            for (key, value) in &entries {
848                previous.insert(key.clone(), kv.insert(key.clone(), value.clone()));
849            }
850        }
851
852        if let Err(err) = self.persist_to_vault_result() {
853            if let Ok(mut kv) = self.vault_kv.write() {
854                for (key, old) in previous {
855                    match old {
856                        Some(value) => {
857                            kv.insert(key, value);
858                        }
859                        None => {
860                            kv.remove(&key);
861                        }
862                    }
863                }
864            }
865            return Err(err);
866        }
867
868        Ok(entries.len())
869    }
870
871    /// Import false placeholders for secrets whose encrypted payload
872    /// could not be decrypted during logical restore.
873    pub fn vault_kv_try_import_placeholders(&self, keys: &[String]) -> Result<usize, AuthError> {
874        let entries = keys
875            .iter()
876            .map(|key| (key.clone(), "false".to_string()))
877            .collect();
878        self.vault_kv_try_import(entries)
879    }
880
881    /// Write a value to the vault KV store, persisting to disk.
882    pub fn vault_kv_set(&self, key: String, value: String) {
883        if let Ok(mut kv) = self.vault_kv.write() {
884            kv.insert(key, value);
885        }
886        self.persist_to_vault();
887    }
888
889    /// Write a value to the vault KV store and fail if the vault write
890    /// cannot be made durable.
891    pub fn vault_kv_try_set(&self, key: String, value: String) -> Result<(), AuthError> {
892        if !self.is_vault_backed() {
893            return Err(AuthError::Forbidden(
894                "SET SECRET requires an enabled, unsealed vault".to_string(),
895            ));
896        }
897
898        let previous = {
899            let mut kv = self.vault_kv.write().map_err(lock_err)?;
900            kv.insert(key.clone(), value)
901        };
902
903        if let Err(err) = self.persist_to_vault_result() {
904            if let Ok(mut kv) = self.vault_kv.write() {
905                match previous {
906                    Some(value) => {
907                        kv.insert(key, value);
908                    }
909                    None => {
910                        kv.remove(&key);
911                    }
912                }
913            }
914            return Err(err);
915        }
916
917        Ok(())
918    }
919
920    /// Delete a value from the vault KV store. Returns true if it existed.
921    pub fn vault_kv_delete(&self, key: &str) -> bool {
922        let existed = self
923            .vault_kv
924            .write()
925            .map(|mut kv| kv.remove(key).is_some())
926            .unwrap_or(false);
927        if existed {
928            self.persist_to_vault();
929        }
930        existed
931    }
932
933    /// Delete a value from the vault KV store and fail if the vault write
934    /// cannot be made durable.
935    pub fn vault_kv_try_delete(&self, key: &str) -> Result<bool, AuthError> {
936        if !self.is_vault_backed() {
937            return Err(AuthError::Forbidden(
938                "DELETE SECRET requires an enabled, unsealed vault".to_string(),
939            ));
940        }
941
942        let removed = {
943            let mut kv = self.vault_kv.write().map_err(lock_err)?;
944            kv.remove(key)
945        };
946
947        if removed.is_none() {
948            return Ok(false);
949        }
950
951        if let Err(err) = self.persist_to_vault_result() {
952            if let Ok(mut kv) = self.vault_kv.write() {
953                if let Some(value) = removed {
954                    kv.insert(key.to_string(), value);
955                }
956            }
957            return Err(err);
958        }
959
960        Ok(true)
961    }
962
963    /// List all keys in the vault KV store.
964    pub fn vault_kv_keys(&self) -> Vec<String> {
965        self.vault_kv
966            .read()
967            .map(|kv| kv.keys().cloned().collect())
968            .unwrap_or_default()
969    }
970
971    /// Convenience: get the 32-byte secret key for Value::Secret encryption.
972    /// Generated on first boot and stored at `red.secret.aes_key`.
973    pub fn vault_secret_key(&self) -> Option<[u8; 32]> {
974        let hex_str = self.vault_kv_get("red.secret.aes_key")?;
975        let bytes = hex::decode(hex_str).ok()?;
976        if bytes.len() == 32 {
977            let mut key = [0u8; 32];
978            key.copy_from_slice(&bytes);
979            Some(key)
980        } else {
981            None
982        }
983    }
984
985    /// Generate and store the AES-256 secret key on first boot if not present.
986    pub fn ensure_vault_secret_key(&self) {
987        if self.vault_kv_get("red.secret.aes_key").is_none() {
988            let key = random_bytes(32);
989            self.vault_kv_set("red.secret.aes_key".to_string(), hex::encode(key));
990        }
991    }
992
993    /// Take a snapshot of the current auth state for vault serialization.
994    fn snapshot(&self) -> VaultState {
995        let users_guard = self.users.read().unwrap_or_else(|e| e.into_inner());
996        let users: Vec<User> = users_guard.values().cloned().collect();
997
998        // Collect (owner UserId, api_key) pairs from all users so a
999        // tenant-scoped owner can be reattached on restore.
1000        let mut api_keys = Vec::new();
1001        for user in &users {
1002            let owner = UserId::from_parts(user.tenant_id.as_deref(), &user.username);
1003            for key in &user.api_keys {
1004                api_keys.push((owner.clone(), key.clone()));
1005            }
1006        }
1007
1008        // Include the master secret if a keypair is loaded.
1009        let master_secret = self
1010            .keypair
1011            .read()
1012            .ok()
1013            .and_then(|guard| guard.as_ref().map(|kp| kp.master_secret.clone()));
1014
1015        let kv = self.vault_kv.read().map(|m| m.clone()).unwrap_or_default();
1016
1017        VaultState {
1018            users,
1019            api_keys,
1020            bootstrapped: self.bootstrapped.load(Ordering::Acquire),
1021            master_secret,
1022            kv,
1023        }
1024    }
1025
1026    /// Restore the in-memory auth state from a vault snapshot.
1027    fn restore_from_vault(&mut self, state: VaultState) {
1028        // Restore bootstrap seal.
1029        if state.bootstrapped {
1030            self.bootstrapped.store(true, Ordering::Release);
1031        }
1032
1033        // Restore keypair from master secret (if present).
1034        if let Some(secret) = state.master_secret {
1035            let kp = KeyPair::from_master_secret(secret);
1036            if let Ok(mut guard) = self.keypair.write() {
1037                *guard = Some(kp);
1038            }
1039        }
1040
1041        // Restore KV store.
1042        if let Ok(mut kv) = self.vault_kv.write() {
1043            *kv = state.kv;
1044        }
1045
1046        // Restore users.
1047        let mut users = self.users.write().unwrap_or_else(|e| e.into_inner());
1048        let mut idx = self
1049            .api_key_index
1050            .write()
1051            .unwrap_or_else(|e| e.into_inner());
1052
1053        for user in state.users {
1054            let id = UserId::from_parts(user.tenant_id.as_deref(), &user.username);
1055            // Register API keys in the index.
1056            for key in &user.api_keys {
1057                idx.insert(key.key.clone(), (id.clone(), key.role));
1058            }
1059            users.insert(id, user);
1060        }
1061        drop(idx);
1062        drop(users);
1063
1064        self.rehydrate_acl();
1065        self.rehydrate_iam();
1066    }
1067
1068    // -----------------------------------------------------------------
1069    // User management
1070    // -----------------------------------------------------------------
1071
1072    /// Create a new platform-scoped user (`tenant_id = None`).
1073    ///
1074    /// For tenant-scoped creation, use [`Self::create_user_in_tenant`].
1075    pub fn create_user(
1076        &self,
1077        username: &str,
1078        password: &str,
1079        role: Role,
1080    ) -> Result<User, AuthError> {
1081        self.create_user_in_tenant(None, username, password, role)
1082    }
1083
1084    pub fn create_user_with_control_events(
1085        &self,
1086        username: &str,
1087        password: &str,
1088        role: Role,
1089        ctx: &ControlEventCtx<'_>,
1090        ledger: &dyn ControlEventLedger,
1091        config: ControlEventConfig,
1092    ) -> Result<User, AuthError> {
1093        self.create_user_in_tenant_with_control_events(
1094            None, username, password, role, ctx, ledger, config,
1095        )
1096    }
1097
1098    /// Create a user under the given tenant scope. `tenant_id == None`
1099    /// produces a platform-wide user. `(tenant, username)` is the
1100    /// uniqueness key — the same `username` may exist independently
1101    /// under multiple tenants.
1102    pub fn create_user_in_tenant(
1103        &self,
1104        tenant_id: Option<&str>,
1105        username: &str,
1106        password: &str,
1107        role: Role,
1108    ) -> Result<User, AuthError> {
1109        if let Some(configured) = self.configured_control_events() {
1110            let ctx = default_user_lifecycle_ctx();
1111            let control = UserLifecycleControl {
1112                ctx: &ctx,
1113                ledger: configured.ledger.as_ref(),
1114                config: configured.config,
1115            };
1116            self.create_user_in_tenant_with_ownership_controlled(
1117                tenant_id, username, password, role, false, &control,
1118            )
1119        } else {
1120            self.create_user_in_tenant_with_ownership_unaudited(
1121                tenant_id, username, password, role, false,
1122            )
1123        }
1124    }
1125
1126    #[allow(clippy::too_many_arguments)]
1127    pub fn create_user_in_tenant_with_control_events(
1128        &self,
1129        tenant_id: Option<&str>,
1130        username: &str,
1131        password: &str,
1132        role: Role,
1133        ctx: &ControlEventCtx<'_>,
1134        ledger: &dyn ControlEventLedger,
1135        config: ControlEventConfig,
1136    ) -> Result<User, AuthError> {
1137        let control = UserLifecycleControl {
1138            ctx,
1139            ledger,
1140            config,
1141        };
1142        self.create_user_in_tenant_with_ownership_controlled(
1143            tenant_id, username, password, role, false, &control,
1144        )
1145    }
1146
1147    pub fn create_system_user(
1148        &self,
1149        username: &str,
1150        password: &str,
1151        role: Role,
1152        tenant_id: Option<&str>,
1153    ) -> Result<User, AuthError> {
1154        if let Some(configured) = self.configured_control_events() {
1155            let ctx = default_user_lifecycle_ctx();
1156            let control = UserLifecycleControl {
1157                ctx: &ctx,
1158                ledger: configured.ledger.as_ref(),
1159                config: configured.config,
1160            };
1161            self.create_user_in_tenant_with_ownership_controlled(
1162                tenant_id, username, password, role, true, &control,
1163            )
1164        } else {
1165            self.create_user_in_tenant_with_ownership_unaudited(
1166                tenant_id, username, password, role, true,
1167            )
1168        }
1169    }
1170
1171    fn create_user_in_tenant_with_ownership_unaudited(
1172        &self,
1173        tenant_id: Option<&str>,
1174        username: &str,
1175        password: &str,
1176        role: Role,
1177        system_owned: bool,
1178    ) -> Result<User, AuthError> {
1179        let id = UserId::from_parts(tenant_id, username);
1180        let mut users = self.users.write().map_err(lock_err)?;
1181        if users.contains_key(&id) {
1182            return Err(AuthError::UserExists(id.to_string()));
1183        }
1184
1185        let now = now_ms();
1186        let user = User {
1187            username: username.to_string(),
1188            tenant_id: tenant_id.map(|s| s.to_string()),
1189            password_hash: hash_password(password),
1190            scram_verifier: Some(make_scram_verifier(password)),
1191            role,
1192            api_keys: Vec::new(),
1193            created_at: now,
1194            updated_at: now,
1195            enabled: true,
1196            system_owned,
1197        };
1198        users.insert(id, user.clone());
1199        drop(users); // release lock before vault I/O
1200        self.persist_to_vault();
1201        Ok(user)
1202    }
1203
1204    fn create_user_in_tenant_with_ownership_controlled(
1205        &self,
1206        tenant_id: Option<&str>,
1207        username: &str,
1208        password: &str,
1209        role: Role,
1210        system_owned: bool,
1211        control: &UserLifecycleControl<'_>,
1212    ) -> Result<User, AuthError> {
1213        let id = UserId::from_parts(tenant_id, username);
1214        match self.create_user_in_tenant_with_ownership_unaudited(
1215            tenant_id,
1216            username,
1217            password,
1218            role,
1219            system_owned,
1220        ) {
1221            Ok(user) => {
1222                if let Err(err) = self.emit_user_lifecycle_allowed(
1223                    control,
1224                    EventKind::UserCreate,
1225                    "user.create",
1226                    &id,
1227                    user_control_fields(&id, Some(role), Some(true), true),
1228                ) {
1229                    self.rollback_create_user(&id);
1230                    return Err(err);
1231                }
1232                Ok(user)
1233            }
1234            Err(err) => {
1235                self.emit_user_lifecycle_outcome(
1236                    control,
1237                    if user_error_is_denied(&err) {
1238                        Outcome::Denied
1239                    } else {
1240                        Outcome::Error
1241                    },
1242                    EventKind::UserCreate,
1243                    "user.create",
1244                    &id,
1245                    Some(err.to_string()),
1246                    user_control_fields(&id, Some(role), Some(true), true),
1247                );
1248                Err(err)
1249            }
1250        }
1251    }
1252
1253    /// Look up a user's SCRAM verifier by full `UserId`.
1254    ///
1255    /// The wire handshake passes the tenant resolved from the session
1256    /// (or `None` for the bootstrap admin) so cross-tenant collisions
1257    /// never authenticate the wrong identity.
1258    pub fn lookup_scram_verifier(&self, id: &UserId) -> Option<crate::auth::scram::ScramVerifier> {
1259        // Synthetic platform-owner principal must never authenticate.
1260        if crate::auth::self_lock_guard::is_synthetic_principal(&id.username) {
1261            return None;
1262        }
1263        let users = self.users.read().ok()?;
1264        users.get(id).and_then(|u| u.scram_verifier.clone())
1265    }
1266
1267    /// Backwards-compatible shim for the v2 wire bootstrap path: looks
1268    /// up a user by username assuming the platform (`tenant=None`)
1269    /// scope. Use this only where the handshake hasn't yet learned the
1270    /// caller's tenant.
1271    pub fn lookup_scram_verifier_global(
1272        &self,
1273        username: &str,
1274    ) -> Option<crate::auth::scram::ScramVerifier> {
1275        self.lookup_scram_verifier(&UserId::platform(username))
1276    }
1277
1278    /// Return all users (password hashes redacted).
1279    ///
1280    /// The synthetic [`crate::auth::self_lock_guard::PLATFORM_OWNER_USERNAME`]
1281    /// principal is filtered out — it is a policy-graph anchor, not an
1282    /// operator-visible account.
1283    pub fn list_users(&self) -> Vec<User> {
1284        let users = match self.users.read() {
1285            Ok(g) => g,
1286            Err(_) => return Vec::new(),
1287        };
1288        users
1289            .values()
1290            .filter(|u| !crate::auth::self_lock_guard::is_synthetic_principal(&u.username))
1291            .map(|u| User {
1292                password_hash: String::new(), // redacted
1293                ..u.clone()
1294            })
1295            .collect()
1296    }
1297
1298    /// Return users restricted to a tenant scope.
1299    ///
1300    /// `tenant_filter`:
1301    ///   - `None` listing in `Some(None)` — only platform users
1302    ///   - `Some(Some("acme"))` — only users in tenant `acme`
1303    ///   - `None` argument — all users (admin-only callers)
1304    pub fn list_users_scoped(&self, tenant_filter: Option<Option<&str>>) -> Vec<User> {
1305        let users = match self.users.read() {
1306            Ok(g) => g,
1307            Err(_) => return Vec::new(),
1308        };
1309        users
1310            .values()
1311            .filter(|u| match tenant_filter {
1312                None => true,
1313                Some(t) => u.tenant_id.as_deref() == t,
1314            })
1315            .filter(|u| !crate::auth::self_lock_guard::is_synthetic_principal(&u.username))
1316            .map(|u| User {
1317                password_hash: String::new(), // redacted
1318                ..u.clone()
1319            })
1320            .collect()
1321    }
1322
1323    /// Look up a single user by `(tenant, username)`. Password hash
1324    /// is redacted.
1325    /// Whether the given principal's user record is system-owned. Returns
1326    /// `false` for unknown principals — an absent record is never treated as
1327    /// operator-owned. Used to populate `EvalContext::principal_is_system_owned`
1328    /// on the runtime authorization hot path.
1329    pub fn principal_is_system_owned(&self, principal: &UserId) -> bool {
1330        self.get_user_cloned(principal)
1331            .map(|u| u.system_owned)
1332            .unwrap_or(false)
1333    }
1334
1335    pub fn get_user(&self, tenant_id: Option<&str>, username: &str) -> Option<User> {
1336        let id = UserId::from_parts(tenant_id, username);
1337        self.get_user_cloned(&id).map(|u| User {
1338            password_hash: String::new(),
1339            ..u
1340        })
1341    }
1342
1343    /// Delete a platform-scoped user (`tenant_id = None`) and revoke
1344    /// all of their API keys + sessions.
1345    ///
1346    /// For tenant-scoped deletes, use [`Self::delete_user_in_tenant`].
1347    pub fn delete_user(&self, username: &str) -> Result<(), AuthError> {
1348        self.delete_user_in_tenant(None, username)
1349    }
1350
1351    pub fn delete_user_with_control_events(
1352        &self,
1353        username: &str,
1354        ctx: &ControlEventCtx<'_>,
1355        ledger: &dyn ControlEventLedger,
1356        config: ControlEventConfig,
1357    ) -> Result<(), AuthError> {
1358        self.delete_user_in_tenant_with_control_events(None, username, ctx, ledger, config)
1359    }
1360
1361    /// Delete a user identified by `(tenant_id, username)` and revoke
1362    /// all of their API keys + sessions.
1363    pub fn delete_user_in_tenant(
1364        &self,
1365        tenant_id: Option<&str>,
1366        username: &str,
1367    ) -> Result<(), AuthError> {
1368        if let Some(configured) = self.configured_control_events() {
1369            let ctx = default_user_lifecycle_ctx();
1370            let control = UserLifecycleControl {
1371                ctx: &ctx,
1372                ledger: configured.ledger.as_ref(),
1373                config: configured.config,
1374            };
1375            self.delete_user_in_tenant_controlled(tenant_id, username, &control)
1376        } else {
1377            self.delete_user_in_tenant_unaudited(tenant_id, username)
1378        }
1379    }
1380
1381    pub fn delete_user_in_tenant_with_control_events(
1382        &self,
1383        tenant_id: Option<&str>,
1384        username: &str,
1385        ctx: &ControlEventCtx<'_>,
1386        ledger: &dyn ControlEventLedger,
1387        config: ControlEventConfig,
1388    ) -> Result<(), AuthError> {
1389        let control = UserLifecycleControl {
1390            ctx,
1391            ledger,
1392            config,
1393        };
1394        self.delete_user_in_tenant_controlled(tenant_id, username, &control)
1395    }
1396
1397    fn delete_user_in_tenant_unaudited(
1398        &self,
1399        tenant_id: Option<&str>,
1400        username: &str,
1401    ) -> Result<(), AuthError> {
1402        let id = UserId::from_parts(tenant_id, username);
1403        let mut users = self.users.write().map_err(lock_err)?;
1404        let user = users
1405            .get(&id)
1406            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1407        reject_system_owned(&id, user)?;
1408        let user = users
1409            .remove(&id)
1410            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1411
1412        // Remove API key index entries.
1413        if let Ok(mut idx) = self.api_key_index.write() {
1414            for api_key in &user.api_keys {
1415                idx.remove(&api_key.key);
1416            }
1417        }
1418
1419        // Remove sessions belonging to this user (match on tenant+username
1420        // so we don't tear down a same-named user in another tenant).
1421        if let Ok(mut sessions) = self.sessions.write() {
1422            sessions
1423                .retain(|_, s| !(s.username == username && s.tenant_id.as_deref() == tenant_id));
1424        }
1425
1426        self.persist_to_vault();
1427        Ok(())
1428    }
1429
1430    fn delete_user_in_tenant_controlled(
1431        &self,
1432        tenant_id: Option<&str>,
1433        username: &str,
1434        control: &UserLifecycleControl<'_>,
1435    ) -> Result<(), AuthError> {
1436        let id = UserId::from_parts(tenant_id, username);
1437        let rollback = self.user_delete_rollback_snapshot(&id, tenant_id, username);
1438        match self.delete_user_in_tenant_unaudited(tenant_id, username) {
1439            Ok(()) => {
1440                if let Err(err) = self.emit_user_lifecycle_allowed(
1441                    control,
1442                    EventKind::UserDelete,
1443                    "user.delete",
1444                    &id,
1445                    user_control_fields(&id, rollback.as_ref().map(|r| r.0.role), None, false),
1446                ) {
1447                    if let Some((user, sessions)) = rollback {
1448                        self.restore_deleted_user(&id, user, sessions);
1449                    }
1450                    return Err(err);
1451                }
1452                Ok(())
1453            }
1454            Err(err) => {
1455                self.emit_user_lifecycle_outcome(
1456                    control,
1457                    if user_error_is_denied(&err) {
1458                        Outcome::Denied
1459                    } else {
1460                        Outcome::Error
1461                    },
1462                    EventKind::UserDelete,
1463                    "user.delete",
1464                    &id,
1465                    Some(err.to_string()),
1466                    user_control_fields(&id, rollback.as_ref().map(|r| r.0.role), None, false),
1467                );
1468                Err(err)
1469            }
1470        }
1471    }
1472
1473    /// Change password (requires the old password). Defaults to
1474    /// platform tenant; use [`Self::change_password_in_tenant`] for
1475    /// scoped users.
1476    pub fn change_password(
1477        &self,
1478        username: &str,
1479        old_password: &str,
1480        new_password: &str,
1481    ) -> Result<(), AuthError> {
1482        self.change_password_in_tenant(None, username, old_password, new_password)
1483    }
1484
1485    pub fn change_password_with_control_events(
1486        &self,
1487        username: &str,
1488        old_password: &str,
1489        new_password: &str,
1490        ctx: &ControlEventCtx<'_>,
1491        ledger: &dyn ControlEventLedger,
1492        config: ControlEventConfig,
1493    ) -> Result<(), AuthError> {
1494        self.change_password_in_tenant_with_control_events(
1495            None,
1496            username,
1497            old_password,
1498            new_password,
1499            ctx,
1500            ledger,
1501            config,
1502        )
1503    }
1504
1505    #[allow(clippy::too_many_arguments)]
1506    pub fn change_password_in_tenant(
1507        &self,
1508        tenant_id: Option<&str>,
1509        username: &str,
1510        old_password: &str,
1511        new_password: &str,
1512    ) -> Result<(), AuthError> {
1513        if let Some(configured) = self.configured_control_events() {
1514            let ctx = default_user_lifecycle_ctx();
1515            let control = UserLifecycleControl {
1516                ctx: &ctx,
1517                ledger: configured.ledger.as_ref(),
1518                config: configured.config,
1519            };
1520            self.change_password_in_tenant_controlled(
1521                tenant_id,
1522                username,
1523                old_password,
1524                new_password,
1525                &control,
1526            )
1527        } else {
1528            self.change_password_in_tenant_unaudited(
1529                tenant_id,
1530                username,
1531                old_password,
1532                new_password,
1533            )
1534        }
1535    }
1536
1537    #[allow(clippy::too_many_arguments)]
1538    pub fn change_password_in_tenant_with_control_events(
1539        &self,
1540        tenant_id: Option<&str>,
1541        username: &str,
1542        old_password: &str,
1543        new_password: &str,
1544        ctx: &ControlEventCtx<'_>,
1545        ledger: &dyn ControlEventLedger,
1546        config: ControlEventConfig,
1547    ) -> Result<(), AuthError> {
1548        let control = UserLifecycleControl {
1549            ctx,
1550            ledger,
1551            config,
1552        };
1553        self.change_password_in_tenant_controlled(
1554            tenant_id,
1555            username,
1556            old_password,
1557            new_password,
1558            &control,
1559        )
1560    }
1561
1562    fn change_password_in_tenant_unaudited(
1563        &self,
1564        tenant_id: Option<&str>,
1565        username: &str,
1566        old_password: &str,
1567        new_password: &str,
1568    ) -> Result<(), AuthError> {
1569        let id = UserId::from_parts(tenant_id, username);
1570        let mut users = self.users.write().map_err(lock_err)?;
1571        let user = users
1572            .get_mut(&id)
1573            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1574        reject_system_owned(&id, user)?;
1575
1576        if !verify_password(old_password, &user.password_hash) {
1577            return Err(AuthError::InvalidCredentials);
1578        }
1579
1580        user.password_hash = hash_password(new_password);
1581        user.scram_verifier = Some(make_scram_verifier(new_password));
1582        user.updated_at = now_ms();
1583        drop(users); // release lock before vault I/O
1584        self.persist_to_vault();
1585        Ok(())
1586    }
1587
1588    fn change_password_in_tenant_controlled(
1589        &self,
1590        tenant_id: Option<&str>,
1591        username: &str,
1592        old_password: &str,
1593        new_password: &str,
1594        control: &UserLifecycleControl<'_>,
1595    ) -> Result<(), AuthError> {
1596        let id = UserId::from_parts(tenant_id, username);
1597        let previous = self.get_user_cloned(&id);
1598        match self.change_password_in_tenant_unaudited(
1599            tenant_id,
1600            username,
1601            old_password,
1602            new_password,
1603        ) {
1604            Ok(()) => {
1605                if let Err(err) = self.emit_user_lifecycle_allowed(
1606                    control,
1607                    EventKind::UserUpdate,
1608                    "user.update",
1609                    &id,
1610                    user_control_fields(
1611                        &id,
1612                        previous.as_ref().map(|u| u.role),
1613                        previous.as_ref().map(|u| u.enabled),
1614                        true,
1615                    ),
1616                ) {
1617                    self.restore_user_snapshot(&id, previous);
1618                    return Err(err);
1619                }
1620                Ok(())
1621            }
1622            Err(err) => {
1623                self.emit_user_lifecycle_outcome(
1624                    control,
1625                    if user_error_is_denied(&err) {
1626                        Outcome::Denied
1627                    } else {
1628                        Outcome::Error
1629                    },
1630                    EventKind::UserUpdate,
1631                    "user.update",
1632                    &id,
1633                    Some(err.to_string()),
1634                    user_control_fields(
1635                        &id,
1636                        previous.as_ref().map(|u| u.role),
1637                        previous.as_ref().map(|u| u.enabled),
1638                        true,
1639                    ),
1640                );
1641                Err(err)
1642            }
1643        }
1644    }
1645
1646    /// Change a user's role (admin-only operation). Defaults to platform
1647    /// tenant; use [`Self::change_role_in_tenant`] for scoped users.
1648    pub fn change_role(&self, username: &str, new_role: Role) -> Result<(), AuthError> {
1649        self.change_role_in_tenant(None, username, new_role)
1650    }
1651
1652    pub fn change_role_in_tenant(
1653        &self,
1654        tenant_id: Option<&str>,
1655        username: &str,
1656        new_role: Role,
1657    ) -> Result<(), AuthError> {
1658        if let Some(configured) = self.configured_control_events() {
1659            let ctx = default_user_lifecycle_ctx();
1660            let control = UserLifecycleControl {
1661                ctx: &ctx,
1662                ledger: configured.ledger.as_ref(),
1663                config: configured.config,
1664            };
1665            self.change_role_in_tenant_controlled(tenant_id, username, new_role, &control)
1666        } else {
1667            self.change_role_in_tenant_unaudited(tenant_id, username, new_role)
1668        }
1669    }
1670
1671    pub fn change_role_in_tenant_with_control_events(
1672        &self,
1673        tenant_id: Option<&str>,
1674        username: &str,
1675        new_role: Role,
1676        ctx: &ControlEventCtx<'_>,
1677        ledger: &dyn ControlEventLedger,
1678        config: ControlEventConfig,
1679    ) -> Result<(), AuthError> {
1680        let control = UserLifecycleControl {
1681            ctx,
1682            ledger,
1683            config,
1684        };
1685        self.change_role_in_tenant_controlled(tenant_id, username, new_role, &control)
1686    }
1687
1688    fn change_role_in_tenant_unaudited(
1689        &self,
1690        tenant_id: Option<&str>,
1691        username: &str,
1692        new_role: Role,
1693    ) -> Result<(), AuthError> {
1694        let id = UserId::from_parts(tenant_id, username);
1695        let mut users = self.users.write().map_err(lock_err)?;
1696        let user = users
1697            .get_mut(&id)
1698            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1699        reject_system_owned(&id, user)?;
1700
1701        let prior_role = user.role;
1702        user.role = new_role;
1703        user.updated_at = now_ms();
1704
1705        // Issue #205 — promotion to Admin is an operator-grade event:
1706        // the new role grants destructive capabilities (DROP, ALTER,
1707        // GRANT) that an operator must observe out-of-band even when
1708        // the auth path itself is healthy.
1709        if new_role == Role::Admin && prior_role != Role::Admin {
1710            crate::telemetry::operator_event::OperatorEvent::AdminCapabilityGranted {
1711                granted_to: id.to_string(),
1712                capability: "Role::Admin".to_string(),
1713                granted_by: "auth_store::change_role".to_string(),
1714            }
1715            .emit_global();
1716        }
1717
1718        // Downgrade any API keys that now exceed the user's role.
1719        for key in &mut user.api_keys {
1720            if key.role > new_role {
1721                key.role = new_role;
1722            }
1723        }
1724
1725        // Update the api_key_index as well.
1726        if let Ok(mut idx) = self.api_key_index.write() {
1727            for key in &user.api_keys {
1728                if let Some(entry) = idx.get_mut(&key.key) {
1729                    entry.1 = key.role;
1730                }
1731            }
1732        }
1733
1734        self.persist_to_vault();
1735        Ok(())
1736    }
1737
1738    fn change_role_in_tenant_controlled(
1739        &self,
1740        tenant_id: Option<&str>,
1741        username: &str,
1742        new_role: Role,
1743        control: &UserLifecycleControl<'_>,
1744    ) -> Result<(), AuthError> {
1745        let id = UserId::from_parts(tenant_id, username);
1746        let previous = self.get_user_cloned(&id);
1747        match self.change_role_in_tenant_unaudited(tenant_id, username, new_role) {
1748            Ok(()) => {
1749                if let Err(err) = self.emit_user_lifecycle_allowed(
1750                    control,
1751                    EventKind::UserUpdate,
1752                    "user.update",
1753                    &id,
1754                    user_control_fields(
1755                        &id,
1756                        Some(new_role),
1757                        previous.as_ref().map(|u| u.enabled),
1758                        false,
1759                    ),
1760                ) {
1761                    self.restore_user_snapshot(&id, previous);
1762                    return Err(err);
1763                }
1764                Ok(())
1765            }
1766            Err(err) => {
1767                self.emit_user_lifecycle_outcome(
1768                    control,
1769                    if user_error_is_denied(&err) {
1770                        Outcome::Denied
1771                    } else {
1772                        Outcome::Error
1773                    },
1774                    EventKind::UserUpdate,
1775                    "user.update",
1776                    &id,
1777                    Some(err.to_string()),
1778                    user_control_fields(
1779                        &id,
1780                        Some(new_role),
1781                        previous.as_ref().map(|u| u.enabled),
1782                        false,
1783                    ),
1784                );
1785                Err(err)
1786            }
1787        }
1788    }
1789
1790    // -----------------------------------------------------------------
1791    // Authentication (login)
1792    // -----------------------------------------------------------------
1793
1794    /// Verify credentials for a platform-tenant user (`tenant_id = None`)
1795    /// and create a session. For tenant-scoped login use
1796    /// [`Self::authenticate_in_tenant`].
1797    ///
1798    /// When a keypair is available (certificate-based seal), session tokens
1799    /// are signed with the master secret so the server can verify they were
1800    /// genuinely issued by this vault instance.
1801    pub fn authenticate(&self, username: &str, password: &str) -> Result<Session, AuthError> {
1802        self.authenticate_in_tenant(None, username, password)
1803    }
1804
1805    /// Verify credentials for `(tenant_id, username, password)` and
1806    /// create a session. Tenant-aware: `alice@acme` and `alice@globex`
1807    /// authenticate independently.
1808    pub fn authenticate_in_tenant(
1809        &self,
1810        tenant_id: Option<&str>,
1811        username: &str,
1812        password: &str,
1813    ) -> Result<Session, AuthError> {
1814        // The synthetic platform-owner principal exists only as a
1815        // policy-graph anchor (see `crate::auth::self_lock_guard`); it
1816        // must never authenticate via any transport.
1817        if crate::auth::self_lock_guard::is_synthetic_principal(username) {
1818            return Err(AuthError::InvalidCredentials);
1819        }
1820        let id = UserId::from_parts(tenant_id, username);
1821        let users = self.users.read().map_err(lock_err)?;
1822        let user = users.get(&id).ok_or(AuthError::InvalidCredentials)?;
1823
1824        if !user.enabled {
1825            return Err(AuthError::InvalidCredentials);
1826        }
1827
1828        if !verify_password(password, &user.password_hash) {
1829            return Err(AuthError::InvalidCredentials);
1830        }
1831
1832        // Generate token: signed if keypair is available, random otherwise.
1833        let token = match self.keypair.read().ok().and_then(|g| {
1834            g.as_ref().map(|kp| {
1835                let token_id = random_hex(16);
1836                let sig = kp.sign(format!("session:{}", token_id).as_bytes());
1837                // Take first 16 bytes of signature for compact token.
1838                format!("rs_{}{}", token_id, hex::encode(&sig[..16]))
1839            })
1840        }) {
1841            Some(signed_token) => signed_token,
1842            None => generate_session_token(),
1843        };
1844
1845        let now = now_ms();
1846        let session = Session {
1847            token,
1848            username: username.to_string(),
1849            tenant_id: user.tenant_id.clone(),
1850            role: user.role,
1851            created_at: now,
1852            expires_at: now + (self.config.session_ttl_secs as u128 * 1000),
1853        };
1854
1855        drop(users); // release read lock before acquiring write
1856
1857        let mut sessions = self.sessions.write().map_err(lock_err)?;
1858        sessions.insert(session.token.clone(), session.clone());
1859        Ok(session)
1860    }
1861
1862    // -----------------------------------------------------------------
1863    // Token validation
1864    // -----------------------------------------------------------------
1865
1866    /// Validate a token (session or API key).
1867    ///
1868    /// Returns `(username, role)` if valid, `None` otherwise. Tenant
1869    /// scope is dropped here for compatibility with the bulk of the
1870    /// existing caller surface (routing, gRPC control, redwire). Use
1871    /// [`Self::validate_token_full`] when the caller needs the
1872    /// resolved `UserId` (e.g. to pin `CURRENT_TENANT()`).
1873    pub fn validate_token(&self, token: &str) -> Option<(String, Role)> {
1874        self.validate_token_full(token)
1875            .map(|(id, role)| (id.username, role))
1876    }
1877
1878    /// Tenant-aware token validation. Returns the resolved `UserId`
1879    /// (which carries the tenant) and the granted `Role`.
1880    pub fn validate_token_full(&self, token: &str) -> Option<(UserId, Role)> {
1881        // Try session tokens first.
1882        if token.starts_with("rs_") {
1883            if let Ok(sessions) = self.sessions.read() {
1884                if let Some(session) = sessions.get(token) {
1885                    let now = now_ms();
1886                    if now < session.expires_at {
1887                        return Some((
1888                            UserId::from_parts(session.tenant_id.as_deref(), &session.username),
1889                            session.role,
1890                        ));
1891                    }
1892                }
1893            }
1894            return None;
1895        }
1896
1897        // Try API keys.
1898        if token.starts_with("rk_") {
1899            if let Ok(idx) = self.api_key_index.read() {
1900                return idx.get(token).cloned();
1901            }
1902            return None;
1903        }
1904
1905        None
1906    }
1907
1908    // -----------------------------------------------------------------
1909    // API Key management
1910    // -----------------------------------------------------------------
1911
1912    /// Create a persistent API key for a platform-tenant user.
1913    ///
1914    /// For tenant-scoped users use [`Self::create_api_key_in_tenant`].
1915    pub fn create_api_key(
1916        &self,
1917        username: &str,
1918        name: &str,
1919        role: Role,
1920    ) -> Result<ApiKey, AuthError> {
1921        self.create_api_key_in_tenant(None, username, name, role)
1922    }
1923
1924    pub fn create_api_key_with_control_events(
1925        &self,
1926        username: &str,
1927        name: &str,
1928        role: Role,
1929        ctx: &ControlEventCtx<'_>,
1930        ledger: &dyn ControlEventLedger,
1931        config: ControlEventConfig,
1932    ) -> Result<ApiKey, AuthError> {
1933        self.create_api_key_in_tenant_with_control_events(
1934            None, username, name, role, ctx, ledger, config,
1935        )
1936    }
1937
1938    pub fn create_api_key_in_tenant(
1939        &self,
1940        tenant_id: Option<&str>,
1941        username: &str,
1942        name: &str,
1943        role: Role,
1944    ) -> Result<ApiKey, AuthError> {
1945        if let Some(configured) = self.configured_control_events() {
1946            let ctx = default_user_lifecycle_ctx();
1947            let control = UserLifecycleControl {
1948                ctx: &ctx,
1949                ledger: configured.ledger.as_ref(),
1950                config: configured.config,
1951            };
1952            self.create_api_key_in_tenant_controlled(tenant_id, username, name, role, &control)
1953        } else {
1954            self.create_api_key_in_tenant_unaudited(tenant_id, username, name, role)
1955        }
1956    }
1957
1958    #[allow(clippy::too_many_arguments)]
1959    pub fn create_api_key_in_tenant_with_control_events(
1960        &self,
1961        tenant_id: Option<&str>,
1962        username: &str,
1963        name: &str,
1964        role: Role,
1965        ctx: &ControlEventCtx<'_>,
1966        ledger: &dyn ControlEventLedger,
1967        config: ControlEventConfig,
1968    ) -> Result<ApiKey, AuthError> {
1969        let control = UserLifecycleControl {
1970            ctx,
1971            ledger,
1972            config,
1973        };
1974        self.create_api_key_in_tenant_controlled(tenant_id, username, name, role, &control)
1975    }
1976
1977    fn create_api_key_in_tenant_unaudited(
1978        &self,
1979        tenant_id: Option<&str>,
1980        username: &str,
1981        name: &str,
1982        role: Role,
1983    ) -> Result<ApiKey, AuthError> {
1984        let id = UserId::from_parts(tenant_id, username);
1985        let mut users = self.users.write().map_err(lock_err)?;
1986        let user = users
1987            .get_mut(&id)
1988            .ok_or_else(|| AuthError::UserNotFound(id.to_string()))?;
1989
1990        // The key's role cannot exceed the user's role.
1991        if role > user.role {
1992            return Err(AuthError::RoleExceeded {
1993                requested: role,
1994                ceiling: user.role,
1995            });
1996        }
1997
1998        let api_key = ApiKey {
1999            key: generate_api_key(),
2000            name: name.to_string(),
2001            role,
2002            created_at: now_ms(),
2003        };
2004
2005        user.api_keys.push(api_key.clone());
2006        user.updated_at = now_ms();
2007
2008        // Update the index.
2009        if let Ok(mut idx) = self.api_key_index.write() {
2010            idx.insert(api_key.key.clone(), (id.clone(), api_key.role));
2011        }
2012
2013        drop(users); // release lock before vault I/O
2014        self.persist_to_vault();
2015        Ok(api_key)
2016    }
2017
2018    fn create_api_key_in_tenant_controlled(
2019        &self,
2020        tenant_id: Option<&str>,
2021        username: &str,
2022        name: &str,
2023        role: Role,
2024        control: &UserLifecycleControl<'_>,
2025    ) -> Result<ApiKey, AuthError> {
2026        let id = UserId::from_parts(tenant_id, username);
2027        match self.create_api_key_in_tenant_unaudited(tenant_id, username, name, role) {
2028            Ok(api_key) => {
2029                let key_id = api_key_id(&api_key.key);
2030                if let Err(err) = self.emit_api_key_allowed(
2031                    control,
2032                    EventKind::ApiKeyCreate,
2033                    "apikey.create",
2034                    &key_id,
2035                    api_key_control_fields(&id, role, &key_id),
2036                ) {
2037                    self.rollback_create_api_key(&id, &api_key.key);
2038                    return Err(err);
2039                }
2040                Ok(api_key)
2041            }
2042            Err(err) => {
2043                self.emit_user_lifecycle_outcome(
2044                    control,
2045                    if user_error_is_denied(&err) {
2046                        Outcome::Denied
2047                    } else {
2048                        Outcome::Error
2049                    },
2050                    EventKind::ApiKeyCreate,
2051                    "apikey.create",
2052                    &id,
2053                    Some(err.to_string()),
2054                    user_control_fields(&id, Some(role), None, false),
2055                );
2056                Err(err)
2057            }
2058        }
2059    }
2060
2061    /// Revoke (delete) an API key.
2062    pub fn revoke_api_key(&self, key: &str) -> Result<(), AuthError> {
2063        if let Some(configured) = self.configured_control_events() {
2064            let ctx = default_user_lifecycle_ctx();
2065            let control = UserLifecycleControl {
2066                ctx: &ctx,
2067                ledger: configured.ledger.as_ref(),
2068                config: configured.config,
2069            };
2070            self.revoke_api_key_controlled(key, &control)
2071        } else {
2072            self.revoke_api_key_unaudited(key)
2073        }
2074    }
2075
2076    pub fn revoke_api_key_with_control_events(
2077        &self,
2078        key: &str,
2079        ctx: &ControlEventCtx<'_>,
2080        ledger: &dyn ControlEventLedger,
2081        config: ControlEventConfig,
2082    ) -> Result<(), AuthError> {
2083        let control = UserLifecycleControl {
2084            ctx,
2085            ledger,
2086            config,
2087        };
2088        self.revoke_api_key_controlled(key, &control)
2089    }
2090
2091    fn revoke_api_key_unaudited(&self, key: &str) -> Result<(), AuthError> {
2092        let mut users = self.users.write().map_err(lock_err)?;
2093
2094        // Find which user owns this key (look up by the api_key_index
2095        // first; fall back to a scan for legacy state restored before
2096        // the index was reseeded).
2097        let owner_id: UserId = {
2098            if let Ok(idx) = self.api_key_index.read() {
2099                if let Some((id, _)) = idx.get(key) {
2100                    id.clone()
2101                } else {
2102                    return Err(AuthError::KeyNotFound(key.to_string()));
2103                }
2104            } else {
2105                let owner = users
2106                    .iter()
2107                    .find(|(_, u)| u.api_keys.iter().any(|k| k.key == key));
2108                match owner {
2109                    Some((id, _)) => id.clone(),
2110                    None => return Err(AuthError::KeyNotFound(key.to_string())),
2111                }
2112            }
2113        };
2114
2115        let user = users
2116            .get_mut(&owner_id)
2117            .ok_or_else(|| AuthError::KeyNotFound(key.to_string()))?;
2118        user.api_keys.retain(|k| k.key != key);
2119        user.updated_at = now_ms();
2120
2121        // Remove from index.
2122        if let Ok(mut idx) = self.api_key_index.write() {
2123            idx.remove(key);
2124        }
2125
2126        self.persist_to_vault();
2127        Ok(())
2128    }
2129
2130    fn revoke_api_key_controlled(
2131        &self,
2132        key: &str,
2133        control: &UserLifecycleControl<'_>,
2134    ) -> Result<(), AuthError> {
2135        let key_id = api_key_id(key);
2136        let rollback = self.api_key_rollback_snapshot(key);
2137        let id = rollback
2138            .as_ref()
2139            .map(|r| r.0.clone())
2140            .unwrap_or_else(|| UserId::from_parts(None, ""));
2141        let role = rollback.as_ref().map(|r| r.1.role).unwrap_or(Role::Read);
2142        match self.revoke_api_key_unaudited(key) {
2143            Ok(()) => {
2144                if let Err(err) = self.emit_api_key_allowed(
2145                    control,
2146                    EventKind::ApiKeyRevoke,
2147                    "apikey.revoke",
2148                    &key_id,
2149                    api_key_control_fields(&id, role, &key_id),
2150                ) {
2151                    if let Some((owner, api_key)) = rollback {
2152                        self.restore_api_key(&owner, api_key);
2153                    }
2154                    return Err(err);
2155                }
2156                Ok(())
2157            }
2158            Err(err) => {
2159                self.emit_user_lifecycle_outcome(
2160                    control,
2161                    if user_error_is_denied(&err) {
2162                        Outcome::Denied
2163                    } else {
2164                        Outcome::Error
2165                    },
2166                    EventKind::ApiKeyRevoke,
2167                    "apikey.revoke",
2168                    &id,
2169                    Some(err.to_string()),
2170                    api_key_control_fields(&id, role, &key_id),
2171                );
2172                Err(err)
2173            }
2174        }
2175    }
2176
2177    // -----------------------------------------------------------------
2178    // Session management
2179    // -----------------------------------------------------------------
2180
2181    /// Revoke a session token.
2182    pub fn revoke_session(&self, token: &str) {
2183        if let Ok(mut sessions) = self.sessions.write() {
2184            sessions.remove(token);
2185        }
2186    }
2187
2188    /// Purge expired sessions (housekeeping).
2189    pub fn purge_expired_sessions(&self) -> usize {
2190        let now = now_ms();
2191        if let Ok(mut sessions) = self.sessions.write() {
2192            let before = sessions.len();
2193            sessions.retain(|_, s| s.expires_at > now);
2194            return before - sessions.len();
2195        }
2196        0
2197    }
2198
2199    // -----------------------------------------------------------------
2200    // Granular RBAC — GRANT / REVOKE
2201    //
2202    // The privilege engine lives in `super::privileges`. These helpers
2203    // are the AuthStore facade: they keep an in-memory map of grants per
2204    // user (plus a `public_grants` list), persist additions/removals to
2205    // the existing `vault_kv` store, and rebuild the per-user
2206    // `PermissionCache` so the hot path stays O(1).
2207    //
2208    // Persistence design: rather than extend the snapshot/restore
2209    // pipeline (Agent #2's territory) we serialise grants and account
2210    // attributes to the vault KV store. That gives us atomic write +
2211    // encrypted-at-rest semantics for free without touching the
2212    // existing USER/KEY/KV serializer paths. On restart `rehydrate_acl`
2213    // reads these KV entries back into the in-memory maps.
2214    // -----------------------------------------------------------------
2215
2216    /// Persist a grant. Returns `Forbidden` when the granting user is
2217    /// not Admin or attempts a cross-tenant grant.
2218    pub fn grant(
2219        &self,
2220        granter: &UserId,
2221        granter_role: Role,
2222        principal: GrantPrincipal,
2223        resource: Resource,
2224        actions: Vec<Action>,
2225        with_grant_option: bool,
2226        tenant: Option<String>,
2227    ) -> Result<(), AuthError> {
2228        if granter_role != Role::Admin {
2229            return Err(AuthError::Forbidden(format!(
2230                "GRANT requires Admin role; granter `{}` has `{:?}`",
2231                granter, granter_role
2232            )));
2233        }
2234
2235        // Cross-tenant guard: a tenant-scoped admin cannot mint grants
2236        // outside their tenant. Platform admin (tenant=None) may grant
2237        // anywhere.
2238        if granter.tenant.is_some() && granter.tenant != tenant {
2239            return Err(AuthError::Forbidden(format!(
2240                "cross-tenant GRANT denied: granter tenant `{:?}` != grant tenant `{:?}`",
2241                granter.tenant, tenant
2242            )));
2243        }
2244
2245        let mut actions_set = std::collections::BTreeSet::new();
2246        for a in actions {
2247            actions_set.insert(a);
2248        }
2249        let g = Grant {
2250            principal: principal.clone(),
2251            resource,
2252            actions: actions_set,
2253            with_grant_option,
2254            granted_by: granter.to_string(),
2255            granted_at: now_ms(),
2256            tenant,
2257            columns: None,
2258        };
2259
2260        match &principal {
2261            GrantPrincipal::User(uid) => {
2262                self.grants
2263                    .write()
2264                    .unwrap_or_else(|e| e.into_inner())
2265                    .entry(uid.clone())
2266                    .or_default()
2267                    .push(g.clone());
2268                self.invalidate_permission_cache(Some(uid));
2269            }
2270            GrantPrincipal::Public => {
2271                self.public_grants
2272                    .write()
2273                    .unwrap_or_else(|e| e.into_inner())
2274                    .push(g.clone());
2275                self.invalidate_permission_cache(None);
2276            }
2277            GrantPrincipal::Group(_) => {
2278                return Err(AuthError::Forbidden(
2279                    "GROUP principals are not yet supported; use a USER or PUBLIC".to_string(),
2280                ));
2281            }
2282        }
2283
2284        // Issue #119: a fresh grant changes the visible-collections set
2285        // for `(tenant, role)` callers under the same tenant. Drop those
2286        // cache entries so the next AI command sees the new SELECT
2287        // privilege immediately.
2288        self.invalidate_visible_collections_for_tenant(g.tenant.as_deref());
2289
2290        self.persist_acl_to_kv();
2291        Ok(())
2292    }
2293
2294    /// Drop matching grants from a principal. Returns the number of
2295    /// grants removed.
2296    pub fn revoke(
2297        &self,
2298        granter_role: Role,
2299        principal: &GrantPrincipal,
2300        resource: &Resource,
2301        actions: &[Action],
2302    ) -> Result<usize, AuthError> {
2303        if granter_role != Role::Admin {
2304            return Err(AuthError::Forbidden(format!(
2305                "REVOKE requires Admin role; granter has `{:?}`",
2306                granter_role
2307            )));
2308        }
2309
2310        let removed = match principal {
2311            GrantPrincipal::User(uid) => {
2312                let mut g = self.grants.write().unwrap_or_else(|e| e.into_inner());
2313                let before = g.get(uid).map(|v| v.len()).unwrap_or(0);
2314                if let Some(list) = g.get_mut(uid) {
2315                    list.retain(|gr| {
2316                        !(gr.resource == *resource
2317                            && (actions.iter().any(|a| gr.actions.contains(a))
2318                                || (gr.actions.contains(&Action::All) && !actions.is_empty())))
2319                    });
2320                }
2321                let after = g.get(uid).map(|v| v.len()).unwrap_or(0);
2322                drop(g);
2323                self.invalidate_permission_cache(Some(uid));
2324                before - after
2325            }
2326            GrantPrincipal::Public => {
2327                let mut p = self
2328                    .public_grants
2329                    .write()
2330                    .unwrap_or_else(|e| e.into_inner());
2331                let before = p.len();
2332                p.retain(|gr| {
2333                    !(gr.resource == *resource
2334                        && (actions.iter().any(|a| gr.actions.contains(a))
2335                            || (gr.actions.contains(&Action::All) && !actions.is_empty())))
2336                });
2337                let after = p.len();
2338                drop(p);
2339                self.invalidate_permission_cache(None);
2340                before - after
2341            }
2342            GrantPrincipal::Group(_) => 0,
2343        };
2344
2345        if removed > 0 {
2346            // Issue #119: REVOKE may shrink the visible-collections set
2347            // for any `(tenant, role)` slot. We don't know the exact
2348            // tenant when the principal is `Public`, so a `Public`
2349            // revoke clears the whole cache; user revokes scope to the
2350            // user's tenant.
2351            match principal {
2352                GrantPrincipal::User(uid) => {
2353                    self.invalidate_visible_collections_for_tenant(uid.tenant.as_deref());
2354                }
2355                GrantPrincipal::Public | GrantPrincipal::Group(_) => {
2356                    self.invalidate_visible_collections_cache();
2357                }
2358            }
2359            self.persist_acl_to_kv();
2360        }
2361        Ok(removed)
2362    }
2363
2364    /// Compute the set of collection ids a given `(tenant, role)`
2365    /// scope can read, consulting the explicit grant table. The result
2366    /// is cached for `super::scope_cache::DEFAULT_TTL` (60s) and
2367    /// invalidated on every GRANT/REVOKE/policy/collection mutation
2368    /// that could change the answer.
2369    ///
2370    /// `all_collections` is the full list of collection ids known to
2371    /// the storage layer. The runtime hands it in so this module stays
2372    /// decoupled from the storage crate. Each collection passes through
2373    /// `check_grant(SELECT)` under a synthetic `(principal, role,
2374    /// tenant)` view. The cache key includes principal because direct
2375    /// grants can differ between users that share the same tenant and
2376    /// role.
2377    pub fn visible_collections_for_scope(
2378        &self,
2379        tenant: Option<&str>,
2380        role: Role,
2381        principal: &str,
2382        all_collections: &[String],
2383    ) -> std::collections::HashSet<String> {
2384        let key = super::scope_cache::ScopeKey::new(tenant, principal, role);
2385        if let Some(hit) = self.visible_collections_cache.get(&key) {
2386            return hit;
2387        }
2388        // Slow path: walk every collection through `check_grant`. We
2389        // build the AuthzContext once, then reuse it per resource.
2390        let ctx = AuthzContext {
2391            principal,
2392            effective_role: role,
2393            tenant,
2394        };
2395        let mut visible = std::collections::HashSet::new();
2396        for collection in all_collections {
2397            let resource = Resource::table_from_name(collection);
2398            if self.check_grant(&ctx, Action::Select, &resource).is_ok() {
2399                visible.insert(collection.clone());
2400            }
2401        }
2402        self.visible_collections_cache.insert(key, visible.clone());
2403        visible
2404    }
2405
2406    /// Stats probe required by issue #119 — exposes hit/miss counts and
2407    /// invalidations for the visible-collections cache so metrics
2408    /// pipelines can compute a hit rate.
2409    pub fn auth_cache_stats(&self) -> super::scope_cache::AuthCacheStats {
2410        self.visible_collections_cache.stats()
2411    }
2412
2413    /// Drop every cached `(tenant, role)` entry. Called from CREATE
2414    /// POLICY / DROP POLICY / DROP COLLECTION paths where the affected
2415    /// tenant set is unknown.
2416    pub fn invalidate_visible_collections_cache(&self) {
2417        self.visible_collections_cache.invalidate_all();
2418    }
2419
2420    /// Drop cached entries for one tenant. Called from GRANT / REVOKE
2421    /// where the principal's tenant is known.
2422    pub fn invalidate_visible_collections_for_tenant(&self, tenant: Option<&str>) {
2423        self.visible_collections_cache.invalidate_tenant(tenant);
2424    }
2425
2426    /// Snapshot of every grant the principal effectively has, including
2427    /// `Public` grants. Audit / introspection helper.
2428    pub fn effective_grants(&self, uid: &UserId) -> Vec<Grant> {
2429        let mut out = Vec::new();
2430        if let Ok(g) = self.grants.read() {
2431            if let Some(list) = g.get(uid) {
2432                out.extend(list.iter().cloned());
2433            }
2434        }
2435        if let Ok(p) = self.public_grants.read() {
2436            out.extend(p.iter().cloned());
2437        }
2438        out
2439    }
2440
2441    /// Run a privilege check using the in-memory grant tables. Returns
2442    /// `Ok(())` on allow, `Err(AuthzError)` on deny.
2443    pub fn check_grant(
2444        &self,
2445        ctx: &AuthzContext<'_>,
2446        action: Action,
2447        resource: &Resource,
2448    ) -> Result<(), AuthzError> {
2449        if ctx.effective_role == Role::Admin {
2450            return Ok(());
2451        }
2452
2453        let uid = UserId::from_parts(ctx.tenant, ctx.principal);
2454
2455        // Fast path: per-user pre-resolved cache.
2456        if let Ok(cache) = self.permission_cache.read() {
2457            if let Some(pc) = cache.get(&uid) {
2458                if pc.allows(resource, action) {
2459                    return Ok(());
2460                }
2461            }
2462        }
2463
2464        // Slow path: linear scan + rebuild cache as a side-effect.
2465        let user_grants = self
2466            .grants
2467            .read()
2468            .ok()
2469            .and_then(|g| g.get(&uid).cloned())
2470            .unwrap_or_default();
2471        let any_user_grants = self
2472            .grants
2473            .read()
2474            .ok()
2475            .map(|g| g.values().any(|list| !list.is_empty()))
2476            .unwrap_or(false);
2477        let public_grants = self
2478            .public_grants
2479            .read()
2480            .ok()
2481            .map(|p| p.clone())
2482            .unwrap_or_default();
2483        if user_grants.is_empty() && public_grants.is_empty() && any_user_grants {
2484            return Err(AuthzError::PermissionDenied {
2485                action,
2486                resource: resource.clone(),
2487                principal: ctx.principal.to_string(),
2488            });
2489        }
2490        let view = GrantsView {
2491            user_grants: &user_grants,
2492            public_grants: &public_grants,
2493        };
2494        let result = check_grant(ctx, action, resource, &view);
2495
2496        if result.is_ok() {
2497            let pc = PermissionCache::build(&user_grants, &public_grants);
2498            if let Ok(mut cache) = self.permission_cache.write() {
2499                cache.insert(uid, pc);
2500            }
2501        }
2502        result
2503    }
2504
2505    // -----------------------------------------------------------------
2506    // ALTER USER attributes (VALID UNTIL, CONNECTION LIMIT, etc.)
2507    // -----------------------------------------------------------------
2508
2509    /// Replace the attribute record for `uid`.
2510    pub fn set_user_attributes(
2511        &self,
2512        uid: &UserId,
2513        attrs: UserAttributes,
2514    ) -> Result<(), AuthError> {
2515        let users = self.users.read().map_err(lock_err)?;
2516        if !users.contains_key(uid) {
2517            return Err(AuthError::UserNotFound(uid.to_string()));
2518        }
2519        drop(users);
2520
2521        self.user_attributes
2522            .write()
2523            .unwrap_or_else(|e| e.into_inner())
2524            .insert(uid.clone(), attrs);
2525        self.invalidate_iam_cache(Some(uid));
2526        self.persist_acl_to_kv();
2527        Ok(())
2528    }
2529
2530    /// Read the attributes for `uid`. Returns `Default::default()` for
2531    /// users that have never been altered.
2532    pub fn user_attributes(&self, uid: &UserId) -> UserAttributes {
2533        self.user_attributes
2534            .read()
2535            .ok()
2536            .and_then(|m| m.get(uid).cloned())
2537            .unwrap_or_default()
2538    }
2539
2540    pub fn add_user_to_group(&self, uid: &UserId, group: &str) -> Result<(), AuthError> {
2541        if group.trim().is_empty() {
2542            return Err(AuthError::Forbidden("group name cannot be empty".into()));
2543        }
2544        let mut attrs = self.user_attributes(uid);
2545        if !attrs.groups.iter().any(|g| g == group) {
2546            attrs.groups.push(group.to_string());
2547            attrs.groups.sort();
2548        }
2549        self.set_user_attributes(uid, attrs)
2550    }
2551
2552    pub fn remove_user_from_group(&self, uid: &UserId, group: &str) -> Result<(), AuthError> {
2553        let mut attrs = self.user_attributes(uid);
2554        attrs.groups.retain(|g| g != group);
2555        self.set_user_attributes(uid, attrs)
2556    }
2557
2558    /// Toggle `User.enabled` without rotating credentials.
2559    pub fn set_user_enabled(&self, uid: &UserId, enabled: bool) -> Result<(), AuthError> {
2560        if let Some(configured) = self.configured_control_events() {
2561            let ctx = default_user_lifecycle_ctx();
2562            let control = UserLifecycleControl {
2563                ctx: &ctx,
2564                ledger: configured.ledger.as_ref(),
2565                config: configured.config,
2566            };
2567            self.set_user_enabled_controlled(uid, enabled, &control)
2568        } else {
2569            self.set_user_enabled_unaudited(uid, enabled)
2570        }
2571    }
2572
2573    pub fn disable_user(
2574        &self,
2575        username: &str,
2576        ctx: &ControlEventCtx<'_>,
2577        ledger: &dyn ControlEventLedger,
2578        config: ControlEventConfig,
2579    ) -> Result<(), AuthError> {
2580        self.disable_user_in_tenant(None, username, ctx, ledger, config)
2581    }
2582
2583    pub fn disable_user_in_tenant(
2584        &self,
2585        tenant_id: Option<&str>,
2586        username: &str,
2587        ctx: &ControlEventCtx<'_>,
2588        ledger: &dyn ControlEventLedger,
2589        config: ControlEventConfig,
2590    ) -> Result<(), AuthError> {
2591        let uid = UserId::from_parts(tenant_id, username);
2592        let control = UserLifecycleControl {
2593            ctx,
2594            ledger,
2595            config,
2596        };
2597        self.set_user_enabled_controlled(&uid, false, &control)
2598    }
2599
2600    fn set_user_enabled_unaudited(&self, uid: &UserId, enabled: bool) -> Result<(), AuthError> {
2601        let mut users = self.users.write().map_err(lock_err)?;
2602        let user = users
2603            .get_mut(uid)
2604            .ok_or_else(|| AuthError::UserNotFound(uid.to_string()))?;
2605        reject_system_owned(uid, user)?;
2606        user.enabled = enabled;
2607        user.updated_at = now_ms();
2608        drop(users);
2609        self.persist_to_vault();
2610        Ok(())
2611    }
2612
2613    fn set_user_enabled_controlled(
2614        &self,
2615        uid: &UserId,
2616        enabled: bool,
2617        control: &UserLifecycleControl<'_>,
2618    ) -> Result<(), AuthError> {
2619        let previous = self.get_user_cloned(uid);
2620        let kind = if enabled {
2621            EventKind::UserUpdate
2622        } else {
2623            EventKind::UserDisable
2624        };
2625        let action = if enabled {
2626            "user.update"
2627        } else {
2628            "user.disable"
2629        };
2630        match self.set_user_enabled_unaudited(uid, enabled) {
2631            Ok(()) => {
2632                if let Err(err) = self.emit_user_lifecycle_allowed(
2633                    control,
2634                    kind,
2635                    action,
2636                    uid,
2637                    user_control_fields(
2638                        uid,
2639                        previous.as_ref().map(|u| u.role),
2640                        Some(enabled),
2641                        false,
2642                    ),
2643                ) {
2644                    self.restore_user_snapshot(uid, previous);
2645                    return Err(err);
2646                }
2647                Ok(())
2648            }
2649            Err(err) => {
2650                self.emit_user_lifecycle_outcome(
2651                    control,
2652                    if user_error_is_denied(&err) {
2653                        Outcome::Denied
2654                    } else {
2655                        Outcome::Error
2656                    },
2657                    kind,
2658                    action,
2659                    uid,
2660                    Some(err.to_string()),
2661                    user_control_fields(
2662                        uid,
2663                        previous.as_ref().map(|u| u.role),
2664                        Some(enabled),
2665                        false,
2666                    ),
2667                );
2668                Err(err)
2669            }
2670        }
2671    }
2672
2673    fn emit_user_lifecycle_allowed(
2674        &self,
2675        control: &UserLifecycleControl<'_>,
2676        kind: EventKind,
2677        action: &'static str,
2678        id: &UserId,
2679        fields: HashMap<String, Sensitivity>,
2680    ) -> Result<(), AuthError> {
2681        self.emit_user_lifecycle_event(control, Outcome::Allowed, kind, action, id, None, fields)
2682    }
2683
2684    fn emit_api_key_allowed(
2685        &self,
2686        control: &UserLifecycleControl<'_>,
2687        kind: EventKind,
2688        action: &'static str,
2689        api_key_id: &str,
2690        fields: HashMap<String, Sensitivity>,
2691    ) -> Result<(), AuthError> {
2692        self.emit_control_event(
2693            control,
2694            Outcome::Allowed,
2695            kind,
2696            action,
2697            Some(api_key_resource(api_key_id)),
2698            None,
2699            fields,
2700        )
2701    }
2702
2703    #[allow(clippy::too_many_arguments)]
2704    fn emit_user_lifecycle_outcome(
2705        &self,
2706        control: &UserLifecycleControl<'_>,
2707        outcome: Outcome,
2708        kind: EventKind,
2709        action: &'static str,
2710        id: &UserId,
2711        reason: Option<String>,
2712        fields: HashMap<String, Sensitivity>,
2713    ) {
2714        let _ = self.emit_user_lifecycle_event(control, outcome, kind, action, id, reason, fields);
2715    }
2716
2717    #[allow(clippy::too_many_arguments)]
2718    fn emit_user_lifecycle_event(
2719        &self,
2720        control: &UserLifecycleControl<'_>,
2721        outcome: Outcome,
2722        kind: EventKind,
2723        action: &'static str,
2724        id: &UserId,
2725        reason: Option<String>,
2726        fields: HashMap<String, Sensitivity>,
2727    ) -> Result<(), AuthError> {
2728        self.emit_control_event(
2729            control,
2730            outcome,
2731            kind,
2732            action,
2733            Some(user_resource(id)),
2734            reason,
2735            fields,
2736        )
2737    }
2738
2739    #[allow(clippy::too_many_arguments)]
2740    fn emit_control_event(
2741        &self,
2742        control: &UserLifecycleControl<'_>,
2743        outcome: Outcome,
2744        kind: EventKind,
2745        action: &'static str,
2746        resource: Option<String>,
2747        reason: Option<String>,
2748        fields: HashMap<String, Sensitivity>,
2749    ) -> Result<(), AuthError> {
2750        let event = ControlEvent {
2751            kind,
2752            outcome,
2753            action: Cow::Borrowed(action),
2754            resource,
2755            reason,
2756            matched_policy_id: None,
2757            fields,
2758        };
2759        match control.ledger.emit(control.ctx, event) {
2760            Ok(_) => Ok(()),
2761            Err(err) if control.config.require_persistence() => {
2762                Err(AuthError::Internal(err.to_string()))
2763            }
2764            Err(_) => Ok(()),
2765        }
2766    }
2767
2768    fn rollback_create_user(&self, id: &UserId) {
2769        let removed = self
2770            .users
2771            .write()
2772            .unwrap_or_else(|e| e.into_inner())
2773            .remove(id);
2774        if let Some(user) = removed {
2775            self.remove_api_key_index_entries(&user);
2776        }
2777        self.persist_to_vault();
2778    }
2779
2780    fn rollback_bootstrap(&self, id: &UserId) {
2781        self.bootstrapped.store(false, Ordering::Release);
2782        self.rollback_create_user(id);
2783    }
2784
2785    fn restore_user_snapshot(&self, id: &UserId, previous: Option<User>) {
2786        match previous {
2787            Some(user) => {
2788                self.users
2789                    .write()
2790                    .unwrap_or_else(|e| e.into_inner())
2791                    .insert(id.clone(), user.clone());
2792                self.index_user_api_keys(id, &user);
2793            }
2794            None => {
2795                self.rollback_create_user(id);
2796                return;
2797            }
2798        }
2799        self.persist_to_vault();
2800    }
2801
2802    fn user_delete_rollback_snapshot(
2803        &self,
2804        id: &UserId,
2805        tenant_id: Option<&str>,
2806        username: &str,
2807    ) -> Option<(User, Vec<(String, Session)>)> {
2808        let user = self.get_user_cloned(id)?;
2809        let sessions = self
2810            .sessions
2811            .read()
2812            .map(|sessions| {
2813                sessions
2814                    .iter()
2815                    .filter(|(_, session)| {
2816                        session.username == username && session.tenant_id.as_deref() == tenant_id
2817                    })
2818                    .map(|(token, session)| (token.clone(), session.clone()))
2819                    .collect()
2820            })
2821            .unwrap_or_default();
2822        Some((user, sessions))
2823    }
2824
2825    fn restore_deleted_user(&self, id: &UserId, user: User, sessions: Vec<(String, Session)>) {
2826        self.users
2827            .write()
2828            .unwrap_or_else(|e| e.into_inner())
2829            .insert(id.clone(), user.clone());
2830        self.index_user_api_keys(id, &user);
2831        if !sessions.is_empty() {
2832            let mut guard = self.sessions.write().unwrap_or_else(|e| e.into_inner());
2833            for (token, session) in sessions {
2834                guard.insert(token, session);
2835            }
2836        }
2837        self.persist_to_vault();
2838    }
2839
2840    fn rollback_create_api_key(&self, id: &UserId, key: &str) {
2841        if let Ok(mut users) = self.users.write() {
2842            if let Some(user) = users.get_mut(id) {
2843                user.api_keys.retain(|api_key| api_key.key != key);
2844                user.updated_at = now_ms();
2845            }
2846        }
2847        if let Ok(mut idx) = self.api_key_index.write() {
2848            idx.remove(key);
2849        }
2850        self.persist_to_vault();
2851    }
2852
2853    fn api_key_rollback_snapshot(&self, key: &str) -> Option<(UserId, ApiKey)> {
2854        let users = self.users.read().ok()?;
2855        users.iter().find_map(|(id, user)| {
2856            user.api_keys
2857                .iter()
2858                .find(|api_key| api_key.key == key)
2859                .cloned()
2860                .map(|api_key| (id.clone(), api_key))
2861        })
2862    }
2863
2864    fn restore_api_key(&self, id: &UserId, api_key: ApiKey) {
2865        if let Ok(mut users) = self.users.write() {
2866            if let Some(user) = users.get_mut(id) {
2867                if !user
2868                    .api_keys
2869                    .iter()
2870                    .any(|candidate| candidate.key == api_key.key)
2871                {
2872                    user.api_keys.push(api_key.clone());
2873                    user.updated_at = now_ms();
2874                }
2875            }
2876        }
2877        if let Ok(mut idx) = self.api_key_index.write() {
2878            idx.insert(api_key.key.clone(), (id.clone(), api_key.role));
2879        }
2880        self.persist_to_vault();
2881    }
2882
2883    fn remove_api_key_index_entries(&self, user: &User) {
2884        if let Ok(mut idx) = self.api_key_index.write() {
2885            for api_key in &user.api_keys {
2886                idx.remove(&api_key.key);
2887            }
2888        }
2889    }
2890
2891    fn index_user_api_keys(&self, id: &UserId, user: &User) {
2892        if let Ok(mut idx) = self.api_key_index.write() {
2893            for api_key in &user.api_keys {
2894                idx.insert(api_key.key.clone(), (id.clone(), api_key.role));
2895            }
2896        }
2897    }
2898
2899    // -----------------------------------------------------------------
2900    // Login-side enforcement (HTTP path)
2901    // -----------------------------------------------------------------
2902
2903    /// Authenticate with VALID UNTIL / CONNECTION LIMIT enforcement.
2904    /// Wraps `authenticate_in_tenant` and additionally:
2905    ///   * rejects logins after `valid_until`,
2906    ///   * rejects logins when the live session count would exceed the
2907    ///     `connection_limit` attribute.
2908    pub fn authenticate_with_attrs(
2909        &self,
2910        tenant_id: Option<&str>,
2911        username: &str,
2912        password: &str,
2913    ) -> Result<Session, AuthError> {
2914        let uid = UserId::from_parts(tenant_id, username);
2915        let attrs = self.user_attributes(&uid);
2916
2917        if let Some(deadline) = attrs.valid_until {
2918            if now_ms() >= deadline {
2919                return Err(AuthError::Forbidden(format!(
2920                    "account `{}` expired (VALID UNTIL exceeded)",
2921                    uid
2922                )));
2923            }
2924        }
2925
2926        if let Some(limit) = attrs.connection_limit {
2927            let current = self
2928                .session_count_by_user
2929                .read()
2930                .ok()
2931                .and_then(|m| m.get(&uid).copied())
2932                .unwrap_or(0);
2933            if current >= limit {
2934                return Err(AuthError::Forbidden(format!(
2935                    "account `{}` exceeded CONNECTION LIMIT ({})",
2936                    uid, limit
2937                )));
2938            }
2939        }
2940
2941        let session = self.authenticate_in_tenant(tenant_id, username, password)?;
2942
2943        if let Ok(mut counts) = self.session_count_by_user.write() {
2944            *counts.entry(uid).or_insert(0) += 1;
2945        }
2946        Ok(session)
2947    }
2948
2949    /// Decrement the live-session count for `uid`. Call from session
2950    /// revoke / expiry paths so CONNECTION LIMIT stays accurate.
2951    pub fn decrement_session_count(&self, uid: &UserId) {
2952        if let Ok(mut counts) = self.session_count_by_user.write() {
2953            if let Some(c) = counts.get_mut(uid) {
2954                *c = c.saturating_sub(1);
2955            }
2956        }
2957    }
2958
2959    // -----------------------------------------------------------------
2960    // ACL persistence — vault_kv backed
2961    // -----------------------------------------------------------------
2962
2963    /// Re-read the ACL state from `vault_kv`. Call after vault load /
2964    /// restore so the in-memory maps reflect the persisted data.
2965    pub fn rehydrate_acl(&self) {
2966        let kv_snapshot: Vec<(String, String)> = self
2967            .vault_kv
2968            .read()
2969            .map(|kv| {
2970                kv.iter()
2971                    .filter(|(k, _)| {
2972                        k.starts_with("red.acl.grants.")
2973                            || k.starts_with("red.acl.attrs.")
2974                            || k == &"red.acl.public_grants"
2975                    })
2976                    .map(|(k, v)| (k.clone(), v.clone()))
2977                    .collect()
2978            })
2979            .unwrap_or_default();
2980
2981        for (k, v) in kv_snapshot {
2982            if k == "red.acl.public_grants" {
2983                if let Some(parsed) = decode_grants_blob(&v) {
2984                    *self
2985                        .public_grants
2986                        .write()
2987                        .unwrap_or_else(|e| e.into_inner()) = parsed;
2988                }
2989            } else if let Some(suffix) = k.strip_prefix("red.acl.grants.") {
2990                if let Some(uid) = decode_uid(suffix) {
2991                    if let Some(mut parsed) = decode_grants_blob(&v) {
2992                        // Restore the principal field — the on-disk
2993                        // line stores only resource+action shape.
2994                        for g in parsed.iter_mut() {
2995                            g.principal = GrantPrincipal::User(uid.clone());
2996                        }
2997                        self.grants
2998                            .write()
2999                            .unwrap_or_else(|e| e.into_inner())
3000                            .insert(uid, parsed);
3001                    }
3002                }
3003            } else if let Some(suffix) = k.strip_prefix("red.acl.attrs.") {
3004                if let Some(uid) = decode_uid(suffix) {
3005                    if let Some(parsed) = decode_attrs_blob(&v) {
3006                        self.user_attributes
3007                            .write()
3008                            .unwrap_or_else(|e| e.into_inner())
3009                            .insert(uid, parsed);
3010                    }
3011                }
3012            }
3013        }
3014
3015        self.permission_cache
3016            .write()
3017            .unwrap_or_else(|e| e.into_inner())
3018            .clear();
3019    }
3020
3021    /// Snapshot every ACL change back into the vault KV store.
3022    fn persist_acl_to_kv(&self) {
3023        let public = self
3024            .public_grants
3025            .read()
3026            .ok()
3027            .map(|p| encode_grants_blob(&p))
3028            .unwrap_or_default();
3029        self.vault_kv_set("red.acl.public_grants".to_string(), public);
3030
3031        let snapshot: Vec<(UserId, Vec<Grant>)> = self
3032            .grants
3033            .read()
3034            .ok()
3035            .map(|g| g.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3036            .unwrap_or_default();
3037        for (uid, list) in snapshot {
3038            let key = format!("red.acl.grants.{}", encode_uid(&uid));
3039            let val = encode_grants_blob(&list);
3040            self.vault_kv_set(key, val);
3041        }
3042
3043        let attrs_snapshot: Vec<(UserId, UserAttributes)> = self
3044            .user_attributes
3045            .read()
3046            .ok()
3047            .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3048            .unwrap_or_default();
3049        for (uid, attrs) in attrs_snapshot {
3050            let key = format!("red.acl.attrs.{}", encode_uid(&uid));
3051            let val = encode_attrs_blob(&attrs);
3052            self.vault_kv_set(key, val);
3053        }
3054    }
3055
3056    fn invalidate_permission_cache(&self, uid: Option<&UserId>) {
3057        if let Ok(mut cache) = self.permission_cache.write() {
3058            match uid {
3059                Some(u) => {
3060                    cache.remove(u);
3061                }
3062                None => cache.clear(),
3063            }
3064        }
3065    }
3066
3067    // -----------------------------------------------------------------
3068    // IAM policies — put / delete / attach / detach / simulate
3069    //
3070    // The kernel in `super::policies` owns the Policy type and the
3071    // evaluator. AuthStore handles persistence + per-user cache + the
3072    // GRANT translation layer (synthetic `_grant_*` policies).
3073    // -----------------------------------------------------------------
3074
3075    pub fn put_policy_with_control_events(
3076        &self,
3077        p: Policy,
3078        control: &PolicyMutationControl<'_>,
3079    ) -> Result<(), AuthError> {
3080        let policy_id = p.id.clone();
3081        let kind = if self.get_policy(&policy_id).is_some() {
3082            EventKind::PolicyUpdate
3083        } else {
3084            EventKind::PolicyCreate
3085        };
3086
3087        if p.id.starts_with("_grant_") || p.id.starts_with("_default_") {
3088            let err = AuthError::Forbidden(format!("policy id `{}` is reserved", p.id));
3089            self.emit_policy_error(
3090                control,
3091                kind,
3092                "policy:put",
3093                &policy_id,
3094                Some(&p),
3095                None,
3096                &err,
3097            );
3098            return Err(err);
3099        }
3100
3101        if let Some(ManagedPolicyDecision::Deny {
3102            entry_id,
3103            matched_action,
3104            matched_resource,
3105            reason,
3106            ..
3107        }) = self.managed_policy_decision(&policy_id, PolicyOp::Put, control)
3108        {
3109            self.emit_policy_denied(
3110                control,
3111                kind,
3112                "policy:put",
3113                &policy_id,
3114                Some(&p),
3115                None,
3116                &reason.to_string(),
3117                Some(entry_id),
3118                Some((matched_action, matched_resource)),
3119            );
3120            return Err(Self::managed_policy_error(&policy_id, &reason.to_string()));
3121        }
3122
3123        let previous = self.get_policy(&policy_id);
3124        let was_enabled = self.iam_authorization_enabled();
3125        match self.put_policy_internal(p.clone()) {
3126            Ok(()) => match self.emit_policy_allowed(
3127                control,
3128                kind,
3129                "policy:put",
3130                &policy_id,
3131                Some(&p),
3132                None,
3133                None,
3134            ) {
3135                Ok(()) => Ok(()),
3136                Err(err) => {
3137                    self.restore_policy_put(&policy_id, previous, was_enabled);
3138                    Err(err)
3139                }
3140            },
3141            Err(err) => {
3142                self.emit_policy_error(
3143                    control,
3144                    kind,
3145                    "policy:put",
3146                    &policy_id,
3147                    Some(&p),
3148                    None,
3149                    &err,
3150                );
3151                Err(err)
3152            }
3153        }
3154    }
3155
3156    pub fn delete_policy_with_control_events(
3157        &self,
3158        id: &str,
3159        control: &PolicyMutationControl<'_>,
3160    ) -> Result<(), AuthError> {
3161        let existing = self.get_policy(id);
3162        if let Some(ManagedPolicyDecision::Deny {
3163            entry_id,
3164            matched_action,
3165            matched_resource,
3166            reason,
3167            ..
3168        }) = self.managed_policy_decision(id, PolicyOp::Drop, control)
3169        {
3170            self.emit_policy_denied(
3171                control,
3172                EventKind::PolicyDelete,
3173                "policy:drop",
3174                id,
3175                existing.as_deref(),
3176                None,
3177                &reason.to_string(),
3178                Some(entry_id),
3179                Some((matched_action, matched_resource)),
3180            );
3181            return Err(Self::managed_policy_error(id, &reason.to_string()));
3182        }
3183
3184        let user_attachments = self
3185            .user_attachments
3186            .read()
3187            .map(|m| m.clone())
3188            .unwrap_or_default();
3189        let group_attachments = self
3190            .group_attachments
3191            .read()
3192            .map(|m| m.clone())
3193            .unwrap_or_default();
3194        let was_enabled = self.iam_authorization_enabled();
3195        match self.delete_policy(id) {
3196            Ok(()) => match self.emit_policy_allowed(
3197                control,
3198                EventKind::PolicyDelete,
3199                "policy:drop",
3200                id,
3201                existing.as_deref(),
3202                None,
3203                None,
3204            ) {
3205                Ok(()) => Ok(()),
3206                Err(err) => {
3207                    if let Some(policy) = existing {
3208                        self.restore_policy_delete(
3209                            id,
3210                            policy,
3211                            user_attachments,
3212                            group_attachments,
3213                            was_enabled,
3214                        );
3215                    }
3216                    Err(err)
3217                }
3218            },
3219            Err(err) => {
3220                self.emit_policy_error(
3221                    control,
3222                    EventKind::PolicyDelete,
3223                    "policy:drop",
3224                    id,
3225                    existing.as_deref(),
3226                    None,
3227                    &err,
3228                );
3229                Err(err)
3230            }
3231        }
3232    }
3233
3234    pub fn attach_policy_with_control_events(
3235        &self,
3236        principal: PrincipalRef,
3237        policy_id: &str,
3238        control: &PolicyMutationControl<'_>,
3239    ) -> Result<(), AuthError> {
3240        let existing = self.get_policy(policy_id);
3241        if let Some(ManagedPolicyDecision::Deny {
3242            entry_id,
3243            matched_action,
3244            matched_resource,
3245            reason,
3246            ..
3247        }) = self.managed_policy_decision(policy_id, PolicyOp::Attach, control)
3248        {
3249            self.emit_policy_denied(
3250                control,
3251                EventKind::PolicyAttach,
3252                "policy:attach",
3253                policy_id,
3254                existing.as_deref(),
3255                Some(&principal),
3256                &reason.to_string(),
3257                Some(entry_id),
3258                Some((matched_action, matched_resource)),
3259            );
3260            return Err(Self::managed_policy_error(policy_id, &reason.to_string()));
3261        }
3262
3263        if let Some(err_msg) = self.check_self_lock_invariant_for_attach(policy_id) {
3264            self.emit_policy_denied(
3265                control,
3266                EventKind::PolicyAttach,
3267                "policy:attach",
3268                policy_id,
3269                existing.as_deref(),
3270                Some(&principal),
3271                &err_msg,
3272                None,
3273                None,
3274            );
3275            return Err(AuthError::Forbidden(err_msg));
3276        }
3277
3278        let user_attachments = self
3279            .user_attachments
3280            .read()
3281            .map(|m| m.clone())
3282            .unwrap_or_default();
3283        let group_attachments = self
3284            .group_attachments
3285            .read()
3286            .map(|m| m.clone())
3287            .unwrap_or_default();
3288        match self.attach_policy(principal.clone(), policy_id) {
3289            Ok(()) => match self.emit_policy_allowed(
3290                control,
3291                EventKind::PolicyAttach,
3292                "policy:attach",
3293                policy_id,
3294                existing.as_deref(),
3295                Some(&principal),
3296                None,
3297            ) {
3298                Ok(()) => Ok(()),
3299                Err(err) => {
3300                    self.restore_policy_attachments(user_attachments, group_attachments);
3301                    Err(err)
3302                }
3303            },
3304            Err(err) => {
3305                self.emit_policy_error(
3306                    control,
3307                    EventKind::PolicyAttach,
3308                    "policy:attach",
3309                    policy_id,
3310                    existing.as_deref(),
3311                    Some(&principal),
3312                    &err,
3313                );
3314                Err(err)
3315            }
3316        }
3317    }
3318
3319    pub fn detach_policy_with_control_events(
3320        &self,
3321        principal: PrincipalRef,
3322        policy_id: &str,
3323        control: &PolicyMutationControl<'_>,
3324    ) -> Result<(), AuthError> {
3325        let existing = self.get_policy(policy_id);
3326        if let Some(ManagedPolicyDecision::Deny {
3327            entry_id,
3328            matched_action,
3329            matched_resource,
3330            reason,
3331            ..
3332        }) = self.managed_policy_decision(policy_id, PolicyOp::Detach, control)
3333        {
3334            self.emit_policy_denied(
3335                control,
3336                EventKind::PolicyDetach,
3337                "policy:detach",
3338                policy_id,
3339                existing.as_deref(),
3340                Some(&principal),
3341                &reason.to_string(),
3342                Some(entry_id),
3343                Some((matched_action, matched_resource)),
3344            );
3345            return Err(Self::managed_policy_error(policy_id, &reason.to_string()));
3346        }
3347        let Some(existing_policy) = existing else {
3348            let err = AuthError::Forbidden(format!("policy `{policy_id}` not found"));
3349            self.emit_policy_error(
3350                control,
3351                EventKind::PolicyDetach,
3352                "policy:detach",
3353                policy_id,
3354                None,
3355                Some(&principal),
3356                &err,
3357            );
3358            return Err(err);
3359        };
3360
3361        let user_attachments = self
3362            .user_attachments
3363            .read()
3364            .map(|m| m.clone())
3365            .unwrap_or_default();
3366        let group_attachments = self
3367            .group_attachments
3368            .read()
3369            .map(|m| m.clone())
3370            .unwrap_or_default();
3371        match self.detach_policy(principal.clone(), policy_id) {
3372            Ok(()) => match self.emit_policy_allowed(
3373                control,
3374                EventKind::PolicyDetach,
3375                "policy:detach",
3376                policy_id,
3377                Some(existing_policy.as_ref()),
3378                Some(&principal),
3379                None,
3380            ) {
3381                Ok(()) => Ok(()),
3382                Err(err) => {
3383                    self.restore_policy_attachments(user_attachments, group_attachments);
3384                    Err(err)
3385                }
3386            },
3387            Err(err) => {
3388                self.emit_policy_error(
3389                    control,
3390                    EventKind::PolicyDetach,
3391                    "policy:detach",
3392                    policy_id,
3393                    Some(existing_policy.as_ref()),
3394                    Some(&principal),
3395                    &err,
3396                );
3397                Err(err)
3398            }
3399        }
3400    }
3401
3402    fn managed_policy_decision(
3403        &self,
3404        policy_id: &str,
3405        op: PolicyOp,
3406        control: &PolicyMutationControl<'_>,
3407    ) -> Option<ManagedPolicyDecision> {
3408        control.registry.map(|registry| {
3409            ManagedPolicyGate::new(registry).check_mutation(
3410                self,
3411                control.actor,
3412                control.eval_ctx,
3413                policy_id,
3414                op,
3415            )
3416        })
3417    }
3418
3419    /// Run the [`crate::auth::self_lock_guard`] invariant against the
3420    /// current policy set. Returns `Some(error_message)` when an attach
3421    /// must be refused, `None` when the system would remain unlockable.
3422    ///
3423    /// `_policy_id` is informational — the invariant is on the full
3424    /// graph and doesn't currently special-case the new entry.
3425    fn check_self_lock_invariant_for_attach(&self, _policy_id: &str) -> Option<String> {
3426        use crate::auth::self_lock_guard::{
3427            check_self_lock_invariant, format_block_error, InvariantOutcome,
3428        };
3429        let policies: Vec<Arc<Policy>> = match self.policies.read() {
3430            Ok(map) => map.values().cloned().collect(),
3431            Err(_) => return None,
3432        };
3433        let outcome = check_self_lock_invariant(&policies);
3434        match &outcome {
3435            InvariantOutcome::Ok => None,
3436            InvariantOutcome::Blocked { .. } => format_block_error(&outcome),
3437        }
3438    }
3439
3440    fn managed_policy_error(policy_id: &str, reason: &str) -> AuthError {
3441        AuthError::Forbidden(format!(
3442            "managed policy mutation blocked for `{policy_id}`: {reason}"
3443        ))
3444    }
3445
3446    fn emit_policy_allowed(
3447        &self,
3448        control: &PolicyMutationControl<'_>,
3449        kind: EventKind,
3450        action: &'static str,
3451        policy_id: &str,
3452        policy: Option<&Policy>,
3453        principal: Option<&PrincipalRef>,
3454        matched_policy_id: Option<String>,
3455    ) -> Result<(), AuthError> {
3456        let event = ControlEvent {
3457            kind,
3458            outcome: Outcome::Allowed,
3459            action: std::borrow::Cow::Borrowed(action),
3460            resource: Some(format!("policy:{policy_id}")),
3461            reason: None,
3462            matched_policy_id,
3463            fields: policy_control_fields(policy_id, policy, principal),
3464        };
3465        match control.ledger.emit(control.ctx, event) {
3466            Ok(_) => Ok(()),
3467            Err(err) if control.config.require_persistence() => {
3468                Err(AuthError::Internal(err.to_string()))
3469            }
3470            Err(_) => Ok(()),
3471        }
3472    }
3473
3474    #[allow(clippy::too_many_arguments)]
3475    fn emit_policy_denied(
3476        &self,
3477        control: &PolicyMutationControl<'_>,
3478        kind: EventKind,
3479        action: &'static str,
3480        policy_id: &str,
3481        policy: Option<&Policy>,
3482        principal: Option<&PrincipalRef>,
3483        reason: &str,
3484        matched_policy_id: Option<String>,
3485        matched: Option<(String, String)>,
3486    ) {
3487        let mut fields = policy_control_fields(policy_id, policy, principal);
3488        if let Some((matched_action, matched_resource)) = matched {
3489            fields.insert(
3490                "matched_action".to_string(),
3491                Sensitivity::raw(matched_action),
3492            );
3493            fields.insert(
3494                "matched_resource".to_string(),
3495                Sensitivity::raw(matched_resource),
3496            );
3497        }
3498        let event = ControlEvent {
3499            kind,
3500            outcome: Outcome::Denied,
3501            action: std::borrow::Cow::Borrowed(action),
3502            resource: Some(format!("policy:{policy_id}")),
3503            reason: Some(reason.to_string()),
3504            matched_policy_id,
3505            fields,
3506        };
3507        let _ = control.ledger.emit(control.ctx, event);
3508    }
3509
3510    fn emit_policy_error(
3511        &self,
3512        control: &PolicyMutationControl<'_>,
3513        kind: EventKind,
3514        action: &'static str,
3515        policy_id: &str,
3516        policy: Option<&Policy>,
3517        principal: Option<&PrincipalRef>,
3518        err: &AuthError,
3519    ) {
3520        let event = ControlEvent {
3521            kind,
3522            outcome: Outcome::Error,
3523            action: std::borrow::Cow::Borrowed(action),
3524            resource: Some(format!("policy:{policy_id}")),
3525            reason: Some(err.to_string()),
3526            matched_policy_id: None,
3527            fields: policy_control_fields(policy_id, policy, principal),
3528        };
3529        let _ = control.ledger.emit(control.ctx, event);
3530    }
3531
3532    fn restore_policy_put(
3533        &self,
3534        policy_id: &str,
3535        previous: Option<Arc<Policy>>,
3536        was_enabled: bool,
3537    ) {
3538        let mut policies = self.policies.write().unwrap_or_else(|e| e.into_inner());
3539        match previous {
3540            Some(policy) => {
3541                policies.insert(policy_id.to_string(), policy);
3542            }
3543            None => {
3544                policies.remove(policy_id);
3545            }
3546        }
3547        drop(policies);
3548        self.iam_authorization_enabled
3549            .store(was_enabled, Ordering::Release);
3550        self.iam_effective_cache
3551            .write()
3552            .unwrap_or_else(|e| e.into_inner())
3553            .clear();
3554        self.invalidate_visible_collections_cache();
3555        self.persist_iam_to_kv();
3556    }
3557
3558    fn restore_policy_delete(
3559        &self,
3560        policy_id: &str,
3561        policy: Arc<Policy>,
3562        user_attachments: HashMap<UserId, Vec<String>>,
3563        group_attachments: HashMap<String, Vec<String>>,
3564        was_enabled: bool,
3565    ) {
3566        self.policies
3567            .write()
3568            .unwrap_or_else(|e| e.into_inner())
3569            .insert(policy_id.to_string(), policy);
3570        self.restore_policy_attachments(user_attachments, group_attachments);
3571        self.iam_authorization_enabled
3572            .store(was_enabled, Ordering::Release);
3573        self.persist_iam_to_kv();
3574    }
3575
3576    fn restore_policy_attachments(
3577        &self,
3578        user_attachments: HashMap<UserId, Vec<String>>,
3579        group_attachments: HashMap<String, Vec<String>>,
3580    ) {
3581        *self
3582            .user_attachments
3583            .write()
3584            .unwrap_or_else(|e| e.into_inner()) = user_attachments;
3585        *self
3586            .group_attachments
3587            .write()
3588            .unwrap_or_else(|e| e.into_inner()) = group_attachments;
3589        self.iam_effective_cache
3590            .write()
3591            .unwrap_or_else(|e| e.into_inner())
3592            .clear();
3593        self.invalidate_visible_collections_cache();
3594        self.persist_iam_to_kv();
3595    }
3596
3597    /// Insert or replace a policy by id. Rejects synthetic ids
3598    /// (`_grant_*` / `_default_*`) so callers can't hand-write them
3599    /// from the public API. Use `put_policy_internal` for synthetic
3600    /// inserts.
3601    pub fn put_policy(&self, p: Policy) -> Result<(), AuthError> {
3602        if p.id.starts_with("_grant_") || p.id.starts_with("_default_") {
3603            return Err(AuthError::Forbidden(format!(
3604                "policy id `{}` is reserved",
3605                p.id
3606            )));
3607        }
3608        self.put_policy_internal(p)
3609    }
3610
3611    /// Internal put bypassing the synthetic-namespace guard. Used by
3612    /// the GRANT translation layer; exposed publicly so integration
3613    /// tests can register synthetic `_grant_*` policies without going
3614    /// through the SQL frontend.
3615    pub fn put_policy_internal(&self, p: Policy) -> Result<(), AuthError> {
3616        p.validate()
3617            .map_err(|e| AuthError::Forbidden(format!("invalid policy `{}`: {e}", p.id)))?;
3618        let id = p.id.clone();
3619        self.policies
3620            .write()
3621            .unwrap_or_else(|e| e.into_inner())
3622            .insert(id, Arc::new(p));
3623        self.iam_authorization_enabled
3624            .store(true, Ordering::Release);
3625        self.iam_effective_cache
3626            .write()
3627            .unwrap_or_else(|e| e.into_inner())
3628            .clear();
3629        // Issue #119: a policy mutation can change the visible-
3630        // collections answer for any (tenant, role); we don't know
3631        // which up-front, so blow the whole cache.
3632        self.invalidate_visible_collections_cache();
3633        self.persist_iam_to_kv();
3634        Ok(())
3635    }
3636
3637    /// Whether the IAM evaluator should be authoritative for runtime
3638    /// authorization. This flips on the first policy write and remains
3639    /// on after deletes so dropping all policies leaves the instance in
3640    /// default-deny rather than silently returning to role fallback.
3641    pub fn iam_authorization_enabled(&self) -> bool {
3642        self.iam_authorization_enabled.load(Ordering::Acquire)
3643    }
3644
3645    /// Remove a policy and any attachments referencing it.
3646    pub fn delete_policy(&self, id: &str) -> Result<(), AuthError> {
3647        let removed = self
3648            .policies
3649            .write()
3650            .unwrap_or_else(|e| e.into_inner())
3651            .remove(id)
3652            .is_some();
3653        if !removed {
3654            return Err(AuthError::Forbidden(format!("policy `{id}` not found")));
3655        }
3656        // Detach from every user / group.
3657        if let Ok(mut ua) = self.user_attachments.write() {
3658            for ids in ua.values_mut() {
3659                ids.retain(|p| p != id);
3660            }
3661            ua.retain(|_, v| !v.is_empty());
3662        }
3663        if let Ok(mut ga) = self.group_attachments.write() {
3664            for ids in ga.values_mut() {
3665                ids.retain(|p| p != id);
3666            }
3667            ga.retain(|_, v| !v.is_empty());
3668        }
3669        self.iam_effective_cache
3670            .write()
3671            .unwrap_or_else(|e| e.into_inner())
3672            .clear();
3673        // Issue #119: dropping a policy can shrink any caller's visible
3674        // set; clear the (tenant, role) cache so AI commands re-resolve.
3675        self.invalidate_visible_collections_cache();
3676        self.persist_iam_to_kv();
3677        Ok(())
3678    }
3679
3680    /// List all policies (id-sorted for deterministic output).
3681    pub fn list_policies(&self) -> Vec<Arc<Policy>> {
3682        let map = match self.policies.read() {
3683            Ok(g) => g,
3684            Err(_) => return Vec::new(),
3685        };
3686        let mut out: Vec<Arc<Policy>> = map.values().cloned().collect();
3687        out.sort_by(|a, b| a.id.cmp(&b.id));
3688        out
3689    }
3690
3691    /// Fetch a single policy by id.
3692    pub fn get_policy(&self, id: &str) -> Option<Arc<Policy>> {
3693        self.policies.read().ok().and_then(|m| m.get(id).cloned())
3694    }
3695
3696    /// List policies directly attached to a group.
3697    pub fn group_policies(&self, group: &str) -> Vec<Arc<Policy>> {
3698        let policies = self.policies.read();
3699        let attachments = self.group_attachments.read();
3700        let mut out = Vec::new();
3701        if let (Ok(p_map), Ok(ga_map)) = (policies, attachments) {
3702            if let Some(ids) = ga_map.get(group) {
3703                for id in ids {
3704                    if let Some(p) = p_map.get(id) {
3705                        out.push(p.clone());
3706                    }
3707                }
3708            }
3709        }
3710        out.sort_by(|a, b| a.id.cmp(&b.id));
3711        out
3712    }
3713
3714    /// Delete synthetic policies produced by SQL GRANT translation.
3715    /// REVOKE uses this to keep the IAM lane and the legacy grant table
3716    /// in lock-step.
3717    pub fn delete_synthetic_grant_policies(
3718        &self,
3719        principal: &GrantPrincipal,
3720        resource: &Resource,
3721        actions: &[Action],
3722    ) -> usize {
3723        let attached = match principal {
3724            GrantPrincipal::User(uid) => self
3725                .user_attachments
3726                .read()
3727                .ok()
3728                .and_then(|m| m.get(uid).cloned())
3729                .unwrap_or_default(),
3730            GrantPrincipal::Group(group) => self
3731                .group_attachments
3732                .read()
3733                .ok()
3734                .and_then(|m| m.get(group).cloned())
3735                .unwrap_or_default(),
3736            GrantPrincipal::Public => self
3737                .group_attachments
3738                .read()
3739                .ok()
3740                .and_then(|m| m.get(PUBLIC_IAM_GROUP).cloned())
3741                .unwrap_or_default(),
3742        };
3743        if attached.is_empty() {
3744            return 0;
3745        }
3746
3747        let mut delete_ids = Vec::new();
3748        if let Ok(policies) = self.policies.read() {
3749            for id in attached {
3750                let Some(policy) = policies.get(&id) else {
3751                    continue;
3752                };
3753                if !policy.id.starts_with("_grant_") {
3754                    continue;
3755                }
3756                if synthetic_grant_matches(policy, resource, actions) {
3757                    delete_ids.push(policy.id.clone());
3758                }
3759            }
3760        }
3761
3762        let mut deleted = 0usize;
3763        for id in delete_ids {
3764            if self.delete_policy(&id).is_ok() {
3765                deleted += 1;
3766            }
3767        }
3768        deleted
3769    }
3770
3771    /// Apply the `REDDB_POLICY_BREAK_GLASS` recovery path: install or
3772    /// refresh the unlock policy, ensure the synthetic platform-owner
3773    /// user record exists, rebind the unlock policy to it, and emit a
3774    /// `policy.break_glass` audit event. Idempotent across reboots.
3775    ///
3776    /// `boot_ts_ms` is recorded in the audit event so operators can
3777    /// correlate the recovery with the boot transcript.
3778    pub fn apply_policy_break_glass(&self, boot_ts_ms: u128) -> Result<(), AuthError> {
3779        use crate::auth::self_lock_guard::{
3780            break_glass_audit_fields, unlock_policy, PLATFORM_OWNER_UNLOCK_POLICY_ID,
3781            PLATFORM_OWNER_USERNAME,
3782        };
3783
3784        // (1) Install or refresh the unlock policy. `put_policy_internal`
3785        // bypasses the synthetic-namespace guard, lets us replace any
3786        // tampered persisted copy, and persists to vault on success.
3787        let policy = unlock_policy();
3788        self.put_policy_internal(policy)?;
3789
3790        // (2) Ensure the synthetic platform-owner user record exists.
3791        // We bypass the public user-creation API to keep this user
3792        // immutable, disabled, and absent from operator listings even
3793        // if our filter ever regresses.
3794        let owner_id = UserId::platform(PLATFORM_OWNER_USERNAME);
3795        {
3796            let mut users = self.users.write().unwrap_or_else(|e| e.into_inner());
3797            users.entry(owner_id.clone()).or_insert_with(|| User {
3798                username: PLATFORM_OWNER_USERNAME.to_string(),
3799                tenant_id: None,
3800                // No usable password; even if `authenticate` were
3801                // wired to accept the principal it would fail
3802                // verification. Defence in depth: `enabled = false`
3803                // and the username filter both block it.
3804                password_hash: String::new(),
3805                scram_verifier: None,
3806                role: Role::Admin,
3807                api_keys: Vec::new(),
3808                created_at: boot_ts_ms,
3809                updated_at: boot_ts_ms,
3810                enabled: false,
3811                system_owned: true,
3812            });
3813        }
3814
3815        // (3) Rebind the unlock policy to the synthetic principal.
3816        // `attach_policy` is idempotent (the helper checks for an
3817        // existing binding before appending).
3818        self.attach_policy(
3819            PrincipalRef::User(owner_id.clone()),
3820            PLATFORM_OWNER_UNLOCK_POLICY_ID,
3821        )?;
3822
3823        // (4) Emit a loud audit event. If the control-event ledger
3824        // isn't configured (e.g. unit tests without runtime wiring),
3825        // log a warn and proceed — recovery must not depend on audit
3826        // wiring being live.
3827        let fields_raw = break_glass_audit_fields(boot_ts_ms);
3828        let mut fields: HashMap<String, Sensitivity> = HashMap::new();
3829        for (k, v) in fields_raw {
3830            fields.insert(k, Sensitivity::raw(v));
3831        }
3832        if let Some(configured) = self.configured_control_events() {
3833            let ctx = ControlEventCtx {
3834                actor: crate::runtime::control_events::ActorRef::System("break_glass"),
3835                scope: None,
3836                request_id: None,
3837                trace_id: None,
3838            };
3839            let event = ControlEvent {
3840                kind: EventKind::PolicyBreakGlass,
3841                outcome: Outcome::Allowed,
3842                action: std::borrow::Cow::Borrowed("policy:break_glass"),
3843                resource: Some(format!("policy:{PLATFORM_OWNER_UNLOCK_POLICY_ID}")),
3844                reason: Some("REDDB_POLICY_BREAK_GLASS=1 at boot".into()),
3845                matched_policy_id: None,
3846                fields,
3847            };
3848            let _ = configured.ledger.emit(&ctx, event);
3849        }
3850        tracing::warn!(
3851            policy_id = PLATFORM_OWNER_UNLOCK_POLICY_ID,
3852            principal = PLATFORM_OWNER_USERNAME,
3853            boot_ts_ms = %boot_ts_ms,
3854            "REDDB_POLICY_BREAK_GLASS=1 — installed/refreshed platform-owner unlock policy and rebound to synthetic principal"
3855        );
3856        Ok(())
3857    }
3858
3859    /// Attach a policy to a user or group. Returns an error if the
3860    /// policy id doesn't exist.
3861    pub fn attach_policy(&self, principal: PrincipalRef, policy_id: &str) -> Result<(), AuthError> {
3862        if !self
3863            .policies
3864            .read()
3865            .map(|m| m.contains_key(policy_id))
3866            .unwrap_or(false)
3867        {
3868            return Err(AuthError::Forbidden(format!(
3869                "policy `{policy_id}` not found"
3870            )));
3871        }
3872        match &principal {
3873            PrincipalRef::User(uid) => {
3874                let mut ua = self
3875                    .user_attachments
3876                    .write()
3877                    .unwrap_or_else(|e| e.into_inner());
3878                let list = ua.entry(uid.clone()).or_default();
3879                if !list.iter().any(|p| p == policy_id) {
3880                    list.push(policy_id.to_string());
3881                }
3882                drop(ua);
3883                self.invalidate_iam_cache(Some(uid));
3884            }
3885            PrincipalRef::Group(g) => {
3886                let mut ga = self
3887                    .group_attachments
3888                    .write()
3889                    .unwrap_or_else(|e| e.into_inner());
3890                let list = ga.entry(g.clone()).or_default();
3891                if !list.iter().any(|p| p == policy_id) {
3892                    list.push(policy_id.to_string());
3893                }
3894                drop(ga);
3895                self.invalidate_iam_cache(None);
3896            }
3897        }
3898        self.persist_iam_to_kv();
3899        Ok(())
3900    }
3901
3902    /// Remove a policy attachment from a user or group.
3903    pub fn detach_policy(&self, principal: PrincipalRef, policy_id: &str) -> Result<(), AuthError> {
3904        match &principal {
3905            PrincipalRef::User(uid) => {
3906                if let Ok(mut ua) = self.user_attachments.write() {
3907                    if let Some(list) = ua.get_mut(uid) {
3908                        list.retain(|p| p != policy_id);
3909                        if list.is_empty() {
3910                            ua.remove(uid);
3911                        }
3912                    }
3913                }
3914                self.invalidate_iam_cache(Some(uid));
3915            }
3916            PrincipalRef::Group(g) => {
3917                if let Ok(mut ga) = self.group_attachments.write() {
3918                    if let Some(list) = ga.get_mut(g) {
3919                        list.retain(|p| p != policy_id);
3920                        if list.is_empty() {
3921                            ga.remove(g);
3922                        }
3923                    }
3924                }
3925                self.invalidate_iam_cache(None);
3926            }
3927        }
3928        self.persist_iam_to_kv();
3929        Ok(())
3930    }
3931
3932    /// Resolve the ordered list of effective policies for a user:
3933    /// group attachments first (least specific), then user
3934    /// attachments (most specific). Cached per user.
3935    pub fn effective_policies(&self, user: &UserId) -> Vec<Arc<Policy>> {
3936        if let Ok(cache) = self.iam_effective_cache.read() {
3937            if let Some(hit) = cache.get(user) {
3938                return hit.clone();
3939            }
3940        }
3941        let policies = self.policies.read();
3942        let user_attachments = self.user_attachments.read();
3943        let group_attachments = self.group_attachments.read();
3944        let mut groups = self
3945            .user_attributes
3946            .read()
3947            .ok()
3948            .and_then(|m| m.get(user).map(|attrs| attrs.groups.clone()))
3949            .unwrap_or_default();
3950        groups.insert(0, PUBLIC_IAM_GROUP.to_string());
3951        let mut out: Vec<Arc<Policy>> = Vec::new();
3952        if let (Ok(p_map), Ok(ua_map), Ok(ga_map)) = (policies, user_attachments, group_attachments)
3953        {
3954            for group in groups {
3955                if let Some(ids) = ga_map.get(&group) {
3956                    for id in ids {
3957                        if let Some(p) = p_map.get(id) {
3958                            out.push(p.clone());
3959                        }
3960                    }
3961                }
3962            }
3963            if let Some(ids) = ua_map.get(user) {
3964                for id in ids {
3965                    if let Some(p) = p_map.get(id) {
3966                        out.push(p.clone());
3967                    }
3968                }
3969            }
3970        }
3971        if let Ok(mut cache) = self.iam_effective_cache.write() {
3972            cache.insert(user.clone(), out.clone());
3973        }
3974        out
3975    }
3976
3977    /// Run the policy simulator for a principal. Synthesises an
3978    /// `EvalContext` from the user record + caller-supplied extras.
3979    pub fn simulate(
3980        &self,
3981        principal: &UserId,
3982        action: &str,
3983        resource: &ResourceRef,
3984        ctx_extras: SimCtx,
3985    ) -> SimulationOutcome {
3986        let (user_role, user_system_owned) = self
3987            .users
3988            .read()
3989            .ok()
3990            .and_then(|u| u.get(principal).map(|u| (Some(u.role), u.system_owned)))
3991            .unwrap_or((None, false));
3992        let principal_is_admin_role = user_role == Some(Role::Admin);
3993        let now = ctx_extras.now_ms.unwrap_or_else(now_ms);
3994        let ctx = EvalContext {
3995            principal_tenant: principal.tenant.clone(),
3996            current_tenant: ctx_extras.current_tenant,
3997            peer_ip: ctx_extras.peer_ip,
3998            mfa_present: ctx_extras.mfa_present,
3999            now_ms: now,
4000            principal_is_admin_role,
4001            principal_is_system_owned: user_system_owned,
4002            principal_is_platform_scoped: principal.tenant.is_none(),
4003        };
4004        let pols = self.effective_policies(principal);
4005        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
4006        iam_policies::simulate(&refs, action, resource, &ctx)
4007    }
4008
4009    /// Production hot-path policy evaluation. Returns `true` on Allow
4010    /// / AdminBypass, `false` on Deny / DefaultDeny.
4011    ///
4012    /// This entry point is **strict**: it never consults the
4013    /// [`PolicyEnforcementMode`] fallback. Governance APIs that gate
4014    /// admin-tier mutations (managed-config writes, registry
4015    /// supersedes, managed-policy lifecycle) call this so they cannot
4016    /// accidentally pick up the lenient `LegacyRbac` posture. Runtime
4017    /// hot-path callers that should respect the mode call
4018    /// [`AuthStore::check_policy_authz_with_role`] instead.
4019    pub fn check_policy_authz(
4020        &self,
4021        principal: &UserId,
4022        action: &str,
4023        resource: &ResourceRef,
4024        ctx: &EvalContext,
4025    ) -> bool {
4026        let pols = self.effective_policies(principal);
4027        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
4028        let decision = iam_policies::evaluate(&refs, action, resource, ctx);
4029        matches!(
4030            decision,
4031            iam_policies::Decision::Allow { .. } | iam_policies::Decision::AdminBypass
4032        )
4033    }
4034
4035    /// Mode-aware policy evaluation for runtime SQL/HTTP/wire surfaces.
4036    ///
4037    /// Returns `true` on `Allow`/`AdminBypass`, `false` on an explicit
4038    /// `Deny`. On `DefaultDeny` (no statement matched) the result
4039    /// depends on the active [`PolicyEnforcementMode`]:
4040    ///
4041    /// * `LegacyRbac` — defer to
4042    ///   [`legacy_rbac_decision`][super::enforcement_mode::legacy_rbac_decision]
4043    ///   using the caller's `role`. This preserves the pre-#712
4044    ///   behaviour where a principal with no attached policy is gated
4045    ///   only by their role.
4046    /// * `PolicyOnly` — return `false`. A principal needs an explicit
4047    ///   matching `Allow` to be authorized.
4048    ///
4049    /// An explicit `Deny` always wins, irrespective of mode and role.
4050    pub fn check_policy_authz_with_role(
4051        &self,
4052        principal: &UserId,
4053        action: &str,
4054        resource: &ResourceRef,
4055        ctx: &EvalContext,
4056        role: Role,
4057    ) -> bool {
4058        let pols = self.effective_policies(principal);
4059        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
4060        let decision = iam_policies::evaluate(&refs, action, resource, ctx);
4061        match decision {
4062            iam_policies::Decision::Allow { .. } | iam_policies::Decision::AdminBypass => true,
4063            iam_policies::Decision::Deny { .. } => false,
4064            iam_policies::Decision::DefaultDeny => match self.enforcement_mode() {
4065                PolicyEnforcementMode::LegacyRbac => legacy_rbac_decision(role, action),
4066                PolicyEnforcementMode::PolicyOnly => false,
4067            },
4068        }
4069    }
4070
4071    // -----------------------------------------------------------------
4072    // PolicyEnforcementMode (#712)
4073    // -----------------------------------------------------------------
4074
4075    /// Read the active enforcement mode. Cheap (a single `RwLock` read);
4076    /// safe to call on the hot path.
4077    pub fn enforcement_mode(&self) -> PolicyEnforcementMode {
4078        *self
4079            .enforcement_mode
4080            .read()
4081            .unwrap_or_else(|e| e.into_inner())
4082    }
4083
4084    /// Overwrite the active enforcement mode and persist it to the
4085    /// vault KV so the new value survives a restart. Returns the
4086    /// previous value so callers logging a transition (e.g. the
4087    /// boot-time loader, the S5B migration command) can record the
4088    /// before/after.
4089    pub fn set_enforcement_mode(&self, mode: PolicyEnforcementMode) -> PolicyEnforcementMode {
4090        let prev = {
4091            let mut guard = self
4092                .enforcement_mode
4093                .write()
4094                .unwrap_or_else(|e| e.into_inner());
4095            let prev = *guard;
4096            *guard = mode;
4097            prev
4098        };
4099        self.vault_kv_set(
4100            "red.config.policy.enforcement_mode".to_string(),
4101            mode.as_str().to_string(),
4102        );
4103        prev
4104    }
4105
4106    /// Claim the "emit the one-time legacy-RBAC boot warning" token.
4107    /// Returns `true` on the first call after construction (or after a
4108    /// process restart) **iff** the active mode is `LegacyRbac`;
4109    /// returns `false` on every subsequent call so the boot path can
4110    /// guarantee the warning is logged at most once per boot.
4111    pub fn take_legacy_rbac_warn_once(&self) -> bool {
4112        if self.enforcement_mode() != PolicyEnforcementMode::LegacyRbac {
4113            return false;
4114        }
4115        !self
4116            .legacy_rbac_boot_warn_emitted
4117            .swap(true, Ordering::AcqRel)
4118    }
4119
4120    /// Evaluate a resolved table projection through the column policy
4121    /// gate. Query paths should pass already-resolved column names; this
4122    /// helper intentionally does not parse SQL projection syntax.
4123    pub fn check_column_projection_authz(
4124        &self,
4125        principal: &UserId,
4126        request: &ColumnAccessRequest,
4127        ctx: &EvalContext,
4128    ) -> ColumnPolicyOutcome {
4129        let pols = self.effective_policies(principal);
4130        let refs: Vec<&Policy> = pols.iter().map(|p| p.as_ref()).collect();
4131        ColumnPolicyGate::new(&refs).evaluate(request, ctx)
4132    }
4133
4134    fn invalidate_iam_cache(&self, uid: Option<&UserId>) {
4135        if let Ok(mut cache) = self.iam_effective_cache.write() {
4136            match uid {
4137                Some(u) => {
4138                    cache.remove(u);
4139                }
4140                None => cache.clear(),
4141            }
4142        }
4143    }
4144
4145    /// Drop every effective-policy cache entry. Called from execution
4146    /// paths that mutate policies/attachments without knowing which
4147    /// users will be affected.
4148    pub fn invalidate_all_iam_cache(&self) {
4149        self.invalidate_iam_cache(None);
4150    }
4151
4152    // -----------------------------------------------------------------
4153    // IAM persistence — vault_kv backed under `red.iam.*` keys
4154    // -----------------------------------------------------------------
4155
4156    /// Reload IAM state (policies + attachments) from the vault KV.
4157    /// Replaces the legacy `rehydrate_acl` reader — pre-1.0 we drop
4158    /// the old `red.acl.*` blob format entirely.
4159    pub fn rehydrate_iam(&self) {
4160        let mut enabled = self
4161            .vault_kv_get("red.iam.enabled")
4162            .map(|v| v == "true")
4163            .unwrap_or(false);
4164        // Policies — single JSON object keyed by id.
4165        if let Some(blob) = self.vault_kv_get("red.iam.policies") {
4166            if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
4167                if let Some(obj) = val.as_object() {
4168                    let mut map = HashMap::new();
4169                    for (id, body) in obj.iter() {
4170                        let s = body.to_string_compact();
4171                        if let Ok(p) = Policy::from_json_str(&s) {
4172                            map.insert(id.clone(), Arc::new(p));
4173                        }
4174                    }
4175                    if !map.is_empty() {
4176                        enabled = true;
4177                    }
4178                    *self.policies.write().unwrap_or_else(|e| e.into_inner()) = map;
4179                }
4180            }
4181        }
4182        // User attachments.
4183        if let Some(blob) = self.vault_kv_get("red.iam.attachments.users") {
4184            if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
4185                if let Some(obj) = val.as_object() {
4186                    let mut map: HashMap<UserId, Vec<String>> = HashMap::new();
4187                    for (encoded_uid, ids_v) in obj.iter() {
4188                        let Some(uid) = decode_uid(encoded_uid) else {
4189                            continue;
4190                        };
4191                        if let Some(arr) = ids_v.as_array() {
4192                            let ids: Vec<String> = arr
4193                                .iter()
4194                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
4195                                .collect();
4196                            map.insert(uid, ids);
4197                        }
4198                    }
4199                    *self
4200                        .user_attachments
4201                        .write()
4202                        .unwrap_or_else(|e| e.into_inner()) = map;
4203                }
4204            }
4205        }
4206        // Group attachments.
4207        if let Some(blob) = self.vault_kv_get("red.iam.attachments.groups") {
4208            if let Ok(val) = crate::serde_json::from_str::<crate::serde_json::Value>(&blob) {
4209                if let Some(obj) = val.as_object() {
4210                    let mut map: HashMap<String, Vec<String>> = HashMap::new();
4211                    for (g, ids_v) in obj.iter() {
4212                        if let Some(arr) = ids_v.as_array() {
4213                            let ids: Vec<String> = arr
4214                                .iter()
4215                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
4216                                .collect();
4217                            map.insert(g.clone(), ids);
4218                        }
4219                    }
4220                    *self
4221                        .group_attachments
4222                        .write()
4223                        .unwrap_or_else(|e| e.into_inner()) = map;
4224                }
4225            }
4226        }
4227        self.iam_authorization_enabled
4228            .store(enabled, Ordering::Release);
4229        self.invalidate_iam_cache(None);
4230
4231        // #712 / S5A: load persisted enforcement mode, if any. Absence
4232        // of the key means an existing install upgrading past #712 —
4233        // stay on the lenient default written at construction time.
4234        // Fresh installs receive `PolicyOnly` via the bootstrap path
4235        // (`mark_bootstrap_complete`) before the first restart, so this
4236        // hydrate path picks it up on every subsequent boot.
4237        if let Some(stored) = self.vault_kv_get("red.config.policy.enforcement_mode") {
4238            if let Some(mode) = PolicyEnforcementMode::parse(&stored) {
4239                *self
4240                    .enforcement_mode
4241                    .write()
4242                    .unwrap_or_else(|e| e.into_inner()) = mode;
4243            }
4244        }
4245
4246        // #712 / S5A: legacy_rbac is a transitional posture; emit one
4247        // boot-time warning so operators see the migration nudge in
4248        // their logs. The flag inside take_legacy_rbac_warn_once
4249        // guarantees we don't spam reload cycles.
4250        if self.take_legacy_rbac_warn_once() {
4251            tracing::warn!(
4252                target: "reddb::auth::enforcement_mode",
4253                mode = "legacy_rbac",
4254                hard_version = crate::auth::enforcement_mode::POLICY_ONLY_HARD_VERSION,
4255                "policy enforcement_mode=legacy_rbac (transitional). Run \
4256                 `MIGRATE POLICY MODE TO 'policy_only'` (next slice S5B) \
4257                 before {} to flip this install over to the strict posture.",
4258                crate::auth::enforcement_mode::POLICY_ONLY_HARD_VERSION,
4259            );
4260        }
4261    }
4262
4263    /// Snapshot policies + attachments into the vault KV. Called
4264    /// after every mutation.
4265    fn persist_iam_to_kv(&self) {
4266        let enabled = if self.iam_authorization_enabled() {
4267            "true"
4268        } else {
4269            "false"
4270        };
4271        self.vault_kv_set("red.iam.enabled".to_string(), enabled.to_string());
4272
4273        // Policies: `{ "<id>": <policy_json>, ... }`
4274        let policies_obj = {
4275            let map = self.policies.read().unwrap_or_else(|e| e.into_inner());
4276            let mut obj = crate::serde_json::Map::new();
4277            for (id, p) in map.iter() {
4278                let s = p.to_json_string();
4279                if let Ok(v) = crate::serde_json::from_str::<crate::serde_json::Value>(&s) {
4280                    obj.insert(id.clone(), v);
4281                }
4282            }
4283            crate::serde_json::Value::Object(obj).to_string_compact()
4284        };
4285        self.vault_kv_set("red.iam.policies".to_string(), policies_obj);
4286
4287        // User attachments: `{ "<encoded_uid>": [ "<policy_id>", ... ], ... }`
4288        let users_obj = {
4289            let map = self
4290                .user_attachments
4291                .read()
4292                .unwrap_or_else(|e| e.into_inner());
4293            let mut obj = crate::serde_json::Map::new();
4294            for (uid, ids) in map.iter() {
4295                let arr = crate::serde_json::Value::Array(
4296                    ids.iter()
4297                        .map(|s| crate::serde_json::Value::String(s.clone()))
4298                        .collect(),
4299                );
4300                obj.insert(encode_uid(uid), arr);
4301            }
4302            crate::serde_json::Value::Object(obj).to_string_compact()
4303        };
4304        self.vault_kv_set("red.iam.attachments.users".to_string(), users_obj);
4305
4306        // Group attachments.
4307        let groups_obj = {
4308            let map = self
4309                .group_attachments
4310                .read()
4311                .unwrap_or_else(|e| e.into_inner());
4312            let mut obj = crate::serde_json::Map::new();
4313            for (g, ids) in map.iter() {
4314                let arr = crate::serde_json::Value::Array(
4315                    ids.iter()
4316                        .map(|s| crate::serde_json::Value::String(s.clone()))
4317                        .collect(),
4318                );
4319                obj.insert(g.clone(), arr);
4320            }
4321            crate::serde_json::Value::Object(obj).to_string_compact()
4322        };
4323        self.vault_kv_set("red.iam.attachments.groups".to_string(), groups_obj);
4324    }
4325}
4326
4327fn synthetic_grant_matches(policy: &Policy, resource: &Resource, actions: &[Action]) -> bool {
4328    policy.statements.iter().any(|st| {
4329        st.effect == crate::auth::policies::Effect::Allow
4330            && st.condition.is_none()
4331            && grant_actions_overlap(&st.actions, actions)
4332            && grant_resource_matches(&st.resources, resource)
4333    })
4334}
4335
4336fn grant_actions_overlap(
4337    patterns: &[crate::auth::policies::ActionPattern],
4338    actions: &[Action],
4339) -> bool {
4340    if actions.contains(&Action::All) {
4341        return true;
4342    }
4343    patterns.iter().any(|pat| match pat {
4344        crate::auth::policies::ActionPattern::Wildcard => true,
4345        crate::auth::policies::ActionPattern::Exact(s) => {
4346            actions.iter().any(|a| s.eq_ignore_ascii_case(a.as_str()))
4347        }
4348        crate::auth::policies::ActionPattern::Prefix(_) => false,
4349    })
4350}
4351
4352fn grant_resource_matches(
4353    patterns: &[crate::auth::policies::ResourcePattern],
4354    resource: &Resource,
4355) -> bool {
4356    let expected = grant_resource_pattern(resource);
4357    patterns.iter().any(|pat| pat == &expected)
4358}
4359
4360fn grant_resource_pattern(resource: &Resource) -> crate::auth::policies::ResourcePattern {
4361    use crate::auth::policies::ResourcePattern;
4362
4363    match resource {
4364        Resource::Database => ResourcePattern::Glob("table:*".to_string()),
4365        Resource::Schema(s) => ResourcePattern::Glob(format!("table:{s}.*")),
4366        Resource::Table { schema, table } => ResourcePattern::Exact {
4367            kind: "table".to_string(),
4368            name: match schema {
4369                Some(s) => format!("{s}.{table}"),
4370                None => table.clone(),
4371            },
4372        },
4373        Resource::Function { schema, name } => ResourcePattern::Exact {
4374            kind: "function".to_string(),
4375            name: match schema {
4376                Some(s) => format!("{s}.{name}"),
4377                None => name.clone(),
4378            },
4379        },
4380    }
4381}
4382
4383// ===========================================================================
4384// ACL serialization helpers — line-oriented, human-readable so an
4385// operator inspecting the vault dump can spot misconfigurations.
4386//
4387// Format (one record per line):
4388//   GRANT|<resource>|<actions_csv>|<with_grant_option>|<tenant_or_*>|<granted_by>|<granted_at>
4389//   ATTR|<valid_until>|<connection_limit>|<search_path>
4390//
4391// Resources are encoded as:
4392//   db                          → Database
4393//   schema:<name>               → Schema(name)
4394//   table:<schema_or_*>:<name>  → Table { schema, table }
4395//   func:<schema_or_*>:<name>   → Function { schema, name }
4396// ===========================================================================
4397
4398fn encode_uid(uid: &UserId) -> String {
4399    match &uid.tenant {
4400        Some(t) => format!("{}/{}", t, uid.username),
4401        None => format!("*/{}", uid.username),
4402    }
4403}
4404
4405fn decode_uid(s: &str) -> Option<UserId> {
4406    let (tenant, username) = s.split_once('/')?;
4407    Some(if tenant == "*" {
4408        UserId::platform(username)
4409    } else {
4410        UserId::scoped(tenant, username)
4411    })
4412}
4413
4414fn encode_resource(r: &Resource) -> String {
4415    match r {
4416        Resource::Database => "db".into(),
4417        Resource::Schema(s) => format!("schema:{}", s),
4418        Resource::Table { schema, table } => {
4419            format!("table:{}:{}", schema.as_deref().unwrap_or("*"), table)
4420        }
4421        Resource::Function { schema, name } => {
4422            format!("func:{}:{}", schema.as_deref().unwrap_or("*"), name)
4423        }
4424    }
4425}
4426
4427fn decode_resource(s: &str) -> Option<Resource> {
4428    if s == "db" {
4429        return Some(Resource::Database);
4430    }
4431    if let Some(rest) = s.strip_prefix("schema:") {
4432        return Some(Resource::Schema(rest.to_string()));
4433    }
4434    if let Some(rest) = s.strip_prefix("table:") {
4435        let (schema, table) = rest.split_once(':')?;
4436        return Some(Resource::Table {
4437            schema: if schema == "*" {
4438                None
4439            } else {
4440                Some(schema.to_string())
4441            },
4442            table: table.to_string(),
4443        });
4444    }
4445    if let Some(rest) = s.strip_prefix("func:") {
4446        let (schema, name) = rest.split_once(':')?;
4447        return Some(Resource::Function {
4448            schema: if schema == "*" {
4449                None
4450            } else {
4451                Some(schema.to_string())
4452            },
4453            name: name.to_string(),
4454        });
4455    }
4456    None
4457}
4458
4459fn encode_grants_blob(grants: &[Grant]) -> String {
4460    let mut out = String::new();
4461    for g in grants {
4462        let actions: Vec<&str> = g.actions.iter().map(|a| a.as_str()).collect();
4463        out.push_str(&format!(
4464            "GRANT|{}|{}|{}|{}|{}|{}\n",
4465            encode_resource(&g.resource),
4466            actions.join(","),
4467            g.with_grant_option,
4468            g.tenant.as_deref().unwrap_or("*"),
4469            g.granted_by,
4470            g.granted_at,
4471        ));
4472    }
4473    out
4474}
4475
4476fn decode_grants_blob(s: &str) -> Option<Vec<Grant>> {
4477    let mut out = Vec::new();
4478    for line in s.lines() {
4479        if line.is_empty() {
4480            continue;
4481        }
4482        let parts: Vec<&str> = line.split('|').collect();
4483        if parts.len() != 7 || parts[0] != "GRANT" {
4484            return None;
4485        }
4486        let resource = decode_resource(parts[1])?;
4487        let mut actions = std::collections::BTreeSet::new();
4488        for token in parts[2].split(',') {
4489            if let Some(a) = Action::from_keyword(token) {
4490                actions.insert(a);
4491            }
4492        }
4493        let with_grant_option = parts[3] == "true";
4494        let tenant = if parts[4] == "*" {
4495            None
4496        } else {
4497            Some(parts[4].to_string())
4498        };
4499        let granted_by = parts[5].to_string();
4500        let granted_at: u128 = parts[6].parse().unwrap_or(0);
4501        out.push(Grant {
4502            // Principal field is reconstructed by the loader from the
4503            // storage-key prefix; default to `Public` here.
4504            principal: GrantPrincipal::Public,
4505            resource,
4506            actions,
4507            with_grant_option,
4508            granted_by,
4509            granted_at,
4510            tenant,
4511            columns: None,
4512        });
4513    }
4514    Some(out)
4515}
4516
4517fn encode_attrs_blob(a: &UserAttributes) -> String {
4518    let valid = a
4519        .valid_until
4520        .map(|t| t.to_string())
4521        .unwrap_or_else(|| "*".into());
4522    let limit = a
4523        .connection_limit
4524        .map(|l| l.to_string())
4525        .unwrap_or_else(|| "*".into());
4526    let path = a.search_path.clone().unwrap_or_else(|| "*".into());
4527    let groups = if a.groups.is_empty() {
4528        "*".to_string()
4529    } else {
4530        a.groups.join(",")
4531    };
4532    format!("ATTR|{}|{}|{}|{}\n", valid, limit, path, groups)
4533}
4534
4535fn decode_attrs_blob(s: &str) -> Option<UserAttributes> {
4536    let line = s.lines().next()?;
4537    let parts: Vec<&str> = line.split('|').collect();
4538    if !(parts.len() == 4 || parts.len() == 5) || parts[0] != "ATTR" {
4539        return None;
4540    }
4541    let groups = if parts.get(4).copied().unwrap_or("*") == "*" {
4542        Vec::new()
4543    } else {
4544        parts[4]
4545            .split(',')
4546            .filter(|g| !g.is_empty())
4547            .map(|g| g.to_string())
4548            .collect()
4549    };
4550    Some(UserAttributes {
4551        valid_until: if parts[1] == "*" {
4552            None
4553        } else {
4554            parts[1].parse().ok()
4555        },
4556        connection_limit: if parts[2] == "*" {
4557            None
4558        } else {
4559            parts[2].parse().ok()
4560        },
4561        search_path: if parts[3] == "*" {
4562            None
4563        } else {
4564            Some(parts[3].to_string())
4565        },
4566        groups,
4567    })
4568}
4569
4570// ===========================================================================
4571// Password hashing
4572// ===========================================================================
4573
4574/// Derive a SCRAM-SHA-256 verifier for a fresh user / password
4575/// rotation. Salt is 16 random bytes; iter is the engine default
4576/// (`scram::DEFAULT_ITER`). Stored alongside the Argon2 password
4577/// hash so HTTP login + v2 SCRAM can both authenticate the same
4578/// user.
4579fn make_scram_verifier(password: &str) -> crate::auth::scram::ScramVerifier {
4580    let salt = random_bytes(16);
4581    crate::auth::scram::ScramVerifier::from_password(
4582        password,
4583        salt,
4584        crate::auth::scram::DEFAULT_ITER,
4585    )
4586}
4587
4588/// Hash a password using Argon2id.
4589///
4590/// Format: `argon2id$<salt_hex>$<hash_hex>`
4591pub(crate) fn hash_password(password: &str) -> String {
4592    let salt = random_bytes(16);
4593    let params = auth_argon2_params();
4594    let hash = derive_key(password.as_bytes(), &salt, &params);
4595    format!("argon2id${}${}", hex::encode(&salt), hex::encode(&hash))
4596}
4597
4598/// Verify a password against a stored `argon2id$<salt>$<hash>` string.
4599pub(crate) fn verify_password(password: &str, stored_hash: &str) -> bool {
4600    let parts: Vec<&str> = stored_hash.splitn(3, '$').collect();
4601    if parts.len() != 3 || parts[0] != "argon2id" {
4602        return false;
4603    }
4604
4605    let salt = match hex::decode(parts[1]) {
4606        Ok(s) => s,
4607        Err(_) => return false,
4608    };
4609
4610    let expected_hash = match hex::decode(parts[2]) {
4611        Ok(h) => h,
4612        Err(_) => return false,
4613    };
4614
4615    let params = auth_argon2_params();
4616    let computed = derive_key(password.as_bytes(), &salt, &params);
4617    constant_time_eq(&computed, &expected_hash)
4618}
4619
4620/// Constant-time byte comparison to avoid timing side-channels.
4621fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
4622    if a.len() != b.len() {
4623        return false;
4624    }
4625    let mut diff: u8 = 0;
4626    for (x, y) in a.iter().zip(b.iter()) {
4627        diff |= x ^ y;
4628    }
4629    diff == 0
4630}
4631
4632// ===========================================================================
4633// Token generation
4634// ===========================================================================
4635
4636fn generate_session_token() -> String {
4637    format!("rs_{}", hex::encode(random_bytes(32)))
4638}
4639
4640fn generate_api_key() -> String {
4641    format!("rk_{}", hex::encode(random_bytes(32)))
4642}
4643
4644/// Generate `n` random bytes and return as a hex string.
4645fn random_hex(n: usize) -> String {
4646    hex::encode(random_bytes(n))
4647}
4648
4649/// Generate `n` cryptographically random bytes using the OS CSPRNG,
4650/// then mix with SHA-256 for domain separation.
4651pub(crate) fn random_bytes(n: usize) -> Vec<u8> {
4652    let mut buf = vec![0u8; n.max(32)];
4653    if os_random::fill_bytes(&mut buf).is_err() {
4654        // Fallback: use system time and pointers as entropy (best-effort).
4655        let seed = now_ms().to_le_bytes();
4656        for (i, byte) in buf.iter_mut().enumerate() {
4657            *byte = seed[i % seed.len()] ^ (i as u8);
4658        }
4659    }
4660    // SHA-256 mix to ensure uniform distribution.
4661    let digest = sha256(&buf);
4662    if n <= 32 {
4663        digest[..n].to_vec()
4664    } else {
4665        // Chain SHA-256 for longer outputs (unusual but supported).
4666        let mut out = Vec::with_capacity(n);
4667        let mut prev = digest;
4668        while out.len() < n {
4669            out.extend_from_slice(&prev[..std::cmp::min(32, n - out.len())]);
4670            prev = sha256(&prev);
4671        }
4672        out
4673    }
4674}
4675
4676// ===========================================================================
4677// Helpers
4678// ===========================================================================
4679
4680fn lock_err<T>(_: T) -> AuthError {
4681    AuthError::Internal("lock poisoned".to_string())
4682}
4683
4684fn reject_system_owned(uid: &UserId, user: &User) -> Result<(), AuthError> {
4685    if user.system_owned {
4686        return Err(AuthError::SystemUserImmutable {
4687            username: uid.to_string(),
4688        });
4689    }
4690    Ok(())
4691}
4692
4693// ===========================================================================
4694// Tests
4695// ===========================================================================
4696
4697#[cfg(test)]
4698mod tests {
4699    use super::*;
4700
4701    fn test_config() -> AuthConfig {
4702        AuthConfig {
4703            enabled: true,
4704            session_ttl_secs: 60,
4705            require_auth: true,
4706            auto_encrypt_storage: false,
4707            vault_enabled: false,
4708            cert: Default::default(),
4709            oauth: Default::default(),
4710        }
4711    }
4712
4713    #[test]
4714    fn test_create_and_list_users() {
4715        let store = AuthStore::new(test_config());
4716        store.create_user("alice", "pass1", Role::Admin).unwrap();
4717        store.create_user("bob", "pass2", Role::Read).unwrap();
4718
4719        let users = store.list_users();
4720        assert_eq!(users.len(), 2);
4721        // Password hashes should be redacted.
4722        for u in &users {
4723            assert!(u.password_hash.is_empty());
4724        }
4725    }
4726
4727    #[test]
4728    fn test_create_duplicate_user() {
4729        let store = AuthStore::new(test_config());
4730        store.create_user("alice", "pass", Role::Admin).unwrap();
4731        let err = store.create_user("alice", "pass2", Role::Read).unwrap_err();
4732        assert!(matches!(err, AuthError::UserExists(_)));
4733    }
4734
4735    #[test]
4736    fn test_authenticate_and_validate() {
4737        let store = AuthStore::new(test_config());
4738        store.create_user("alice", "secret", Role::Write).unwrap();
4739
4740        let session = store.authenticate("alice", "secret").unwrap();
4741        assert!(session.token.starts_with("rs_"));
4742
4743        let (username, role) = store.validate_token(&session.token).unwrap();
4744        assert_eq!(username, "alice");
4745        assert_eq!(role, Role::Write);
4746    }
4747
4748    #[test]
4749    fn test_authenticate_wrong_password() {
4750        let store = AuthStore::new(test_config());
4751        store.create_user("alice", "secret", Role::Read).unwrap();
4752
4753        let err = store.authenticate("alice", "wrong").unwrap_err();
4754        assert!(matches!(err, AuthError::InvalidCredentials));
4755    }
4756
4757    #[test]
4758    fn test_api_key_lifecycle() {
4759        let store = AuthStore::new(test_config());
4760        store.create_user("alice", "pass", Role::Admin).unwrap();
4761
4762        let key = store
4763            .create_api_key("alice", "ci-token", Role::Write)
4764            .unwrap();
4765        assert!(key.key.starts_with("rk_"));
4766
4767        let (username, role) = store.validate_token(&key.key).unwrap();
4768        assert_eq!(username, "alice");
4769        assert_eq!(role, Role::Write);
4770
4771        store.revoke_api_key(&key.key).unwrap();
4772        assert!(store.validate_token(&key.key).is_none());
4773    }
4774
4775    #[test]
4776    fn test_api_key_role_exceeded() {
4777        let store = AuthStore::new(test_config());
4778        store.create_user("bob", "pass", Role::Read).unwrap();
4779
4780        let err = store
4781            .create_api_key("bob", "escalate", Role::Admin)
4782            .unwrap_err();
4783        assert!(matches!(err, AuthError::RoleExceeded { .. }));
4784    }
4785
4786    #[test]
4787    fn test_change_password() {
4788        let store = AuthStore::new(test_config());
4789        store.create_user("alice", "old", Role::Write).unwrap();
4790
4791        store.change_password("alice", "old", "new").unwrap();
4792
4793        // Old password should fail.
4794        assert!(store.authenticate("alice", "old").is_err());
4795        // New password should succeed.
4796        assert!(store.authenticate("alice", "new").is_ok());
4797    }
4798
4799    #[test]
4800    fn test_change_role() {
4801        let store = AuthStore::new(test_config());
4802        store.create_user("alice", "pass", Role::Admin).unwrap();
4803        store.create_api_key("alice", "key1", Role::Admin).unwrap();
4804
4805        store.change_role("alice", Role::Read).unwrap();
4806
4807        // User's role should be Read now.
4808        let users = store.list_users();
4809        let alice = users.iter().find(|u| u.username == "alice").unwrap();
4810        assert_eq!(alice.role, Role::Read);
4811
4812        // API keys should have been downgraded.
4813        assert_eq!(alice.api_keys[0].role, Role::Read);
4814    }
4815
4816    #[test]
4817    fn test_system_owned_user_blocks_destructive_mutations() {
4818        let store = AuthStore::new(test_config());
4819        store
4820            .create_system_user("system", "pass", Role::Admin, None)
4821            .unwrap();
4822
4823        let uid = UserId::platform("system");
4824        let err = store.delete_user("system").unwrap_err();
4825        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
4826
4827        let err = store.change_password("system", "pass", "new").unwrap_err();
4828        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
4829
4830        let err = store.change_role("system", Role::Read).unwrap_err();
4831        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
4832
4833        let err = store.set_user_enabled(&uid, false).unwrap_err();
4834        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
4835
4836        let key = store
4837            .create_api_key("system", "rotation", Role::Admin)
4838            .unwrap();
4839        assert!(store.validate_token(&key.key).is_some());
4840        store.revoke_api_key(&key.key).unwrap();
4841        assert!(store.validate_token(&key.key).is_none());
4842    }
4843
4844    #[test]
4845    fn test_tenant_scoped_system_owned_user_guardrail_integration() {
4846        // Acceptance #1 + #2 + #4 (tenant-scoped half) for issue #647.
4847        //
4848        // The destructive-mutation guard (`reject_system_owned`) is on
4849        // the tenant-aware code paths (`*_in_tenant`), but the existing
4850        // `test_system_owned_user_blocks_destructive_mutations` only
4851        // exercises the platform shim. Pin the tenant-scoped path so a
4852        // future refactor that loses the guard on `*_in_tenant` is
4853        // caught.
4854        let store = AuthStore::new(test_config());
4855        store
4856            .create_system_user("ops", "pass", Role::Admin, Some("acme"))
4857            .unwrap();
4858
4859        let uid = UserId::scoped("acme", "ops");
4860
4861        // delete / disable / password-change / role-change — all four
4862        // destructive paths block on a tenant-scoped system-owned user.
4863        let err = store
4864            .delete_user_in_tenant(Some("acme"), "ops")
4865            .unwrap_err();
4866        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
4867
4868        let err = store
4869            .change_password_in_tenant(Some("acme"), "ops", "pass", "new")
4870            .unwrap_err();
4871        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
4872
4873        let err = store
4874            .change_role_in_tenant(Some("acme"), "ops", Role::Read)
4875            .unwrap_err();
4876        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
4877
4878        let err = store.set_user_enabled(&uid, false).unwrap_err();
4879        assert!(matches!(err, AuthError::SystemUserImmutable { .. }));
4880
4881        // Tenant-isolation sanity: a same-named non-system user under a
4882        // different tenant remains fully mutable. Without this the test
4883        // above could be passing because the guard refuses all `ops`
4884        // users by name rather than by id.
4885        store
4886            .create_user_in_tenant(Some("globex"), "ops", "pass", Role::Admin)
4887            .unwrap();
4888        store
4889            .change_role_in_tenant(Some("globex"), "ops", Role::Read)
4890            .unwrap();
4891        store.delete_user_in_tenant(Some("globex"), "ops").unwrap();
4892
4893        // API-key rotation path still works for the system-owned user
4894        // (intentionally bypasses `reject_system_owned`).
4895        let key = store
4896            .create_api_key_in_tenant(Some("acme"), "ops", "rotation", Role::Admin)
4897            .unwrap();
4898        assert!(store.validate_token(&key.key).is_some());
4899        store.revoke_api_key(&key.key).unwrap();
4900        assert!(store.validate_token(&key.key).is_none());
4901    }
4902
4903    #[test]
4904    fn test_regular_user_mutations_still_work() {
4905        let store = AuthStore::new(test_config());
4906        store.create_user("alice", "old", Role::Admin).unwrap();
4907
4908        let uid = UserId::platform("alice");
4909        store.set_user_enabled(&uid, false).unwrap();
4910        assert!(matches!(
4911            store.authenticate("alice", "old"),
4912            Err(AuthError::InvalidCredentials)
4913        ));
4914
4915        store.set_user_enabled(&uid, true).unwrap();
4916        store.change_password("alice", "old", "new").unwrap();
4917        store.change_role("alice", Role::Read).unwrap();
4918        store.delete_user("alice").unwrap();
4919        assert!(matches!(
4920            store.authenticate("alice", "new"),
4921            Err(AuthError::InvalidCredentials)
4922        ));
4923    }
4924
4925    #[test]
4926    fn test_delete_user() {
4927        let store = AuthStore::new(test_config());
4928        store.create_user("alice", "pass", Role::Admin).unwrap();
4929        let key = store.create_api_key("alice", "key1", Role::Read).unwrap();
4930        let session = store.authenticate("alice", "pass").unwrap();
4931
4932        store.delete_user("alice").unwrap();
4933
4934        assert!(store.validate_token(&key.key).is_none());
4935        assert!(store.validate_token(&session.token).is_none());
4936        assert!(store.list_users().is_empty());
4937    }
4938
4939    #[test]
4940    fn test_revoke_session() {
4941        let store = AuthStore::new(test_config());
4942        store.create_user("alice", "pass", Role::Read).unwrap();
4943        let session = store.authenticate("alice", "pass").unwrap();
4944
4945        store.revoke_session(&session.token);
4946        assert!(store.validate_token(&session.token).is_none());
4947    }
4948
4949    #[test]
4950    fn test_password_hash_format() {
4951        let hash = hash_password("test");
4952        assert!(hash.starts_with("argon2id$"));
4953        let parts: Vec<&str> = hash.splitn(3, '$').collect();
4954        assert_eq!(parts.len(), 3);
4955        // Salt is 16 bytes = 32 hex chars.
4956        assert_eq!(parts[1].len(), 32);
4957        // Hash is 32 bytes = 64 hex chars.
4958        assert_eq!(parts[2].len(), 64);
4959    }
4960
4961    #[test]
4962    fn test_constant_time_eq() {
4963        assert!(constant_time_eq(b"hello", b"hello"));
4964        assert!(!constant_time_eq(b"hello", b"world"));
4965        assert!(!constant_time_eq(b"short", b"longer"));
4966    }
4967
4968    #[test]
4969    fn test_bootstrap_seals_permanently() {
4970        let store = AuthStore::new(test_config());
4971
4972        assert!(store.needs_bootstrap());
4973        assert!(!store.is_bootstrapped());
4974
4975        // First bootstrap succeeds
4976        let result = store.bootstrap("admin", "secret");
4977        assert!(result.is_ok());
4978        let br = result.unwrap();
4979        assert_eq!(br.user.username, "admin");
4980        assert_eq!(br.user.role, Role::Admin);
4981        assert!(br.api_key.key.starts_with("rk_"));
4982        // No vault configured, so no certificate.
4983        assert!(br.certificate.is_none());
4984
4985        // Sealed now
4986        assert!(!store.needs_bootstrap());
4987        assert!(store.is_bootstrapped());
4988
4989        // Second bootstrap fails -- sealed permanently
4990        let result = store.bootstrap("admin2", "secret2");
4991        assert!(result.is_err());
4992        let err = result.unwrap_err();
4993        assert!(err.to_string().contains("sealed permanently"));
4994
4995        // Only 1 user exists (the first one)
4996        assert_eq!(store.list_users().len(), 1);
4997        assert_eq!(store.list_users()[0].username, "admin");
4998    }
4999
5000    #[test]
5001    fn test_bootstrap_after_manual_user_creation() {
5002        let store = AuthStore::new(test_config());
5003
5004        // Create a user manually first
5005        store.create_user("existing", "pass", Role::Read).unwrap();
5006
5007        // Bootstrap sees the seal hasn't been set but users exist
5008        // The atomic seal fires first, then the users check catches it
5009        assert!(!store.needs_bootstrap()); // users exist → false
5010    }
5011
5012    // ---------------------------------------------------------------
5013    // Tenant scoping
5014    // ---------------------------------------------------------------
5015
5016    #[test]
5017    fn test_same_username_two_tenants_distinct() {
5018        let store = AuthStore::new(test_config());
5019        store
5020            .create_user_in_tenant(Some("acme"), "alice", "pw-acme", Role::Write)
5021            .unwrap();
5022        store
5023            .create_user_in_tenant(Some("globex"), "alice", "pw-globex", Role::Read)
5024            .unwrap();
5025
5026        // Two distinct users.
5027        let users = store.list_users();
5028        assert_eq!(users.len(), 2);
5029
5030        // Each verifies its own password under its own tenant.
5031        assert!(store
5032            .authenticate_in_tenant(Some("acme"), "alice", "pw-acme")
5033            .is_ok());
5034        assert!(store
5035            .authenticate_in_tenant(Some("globex"), "alice", "pw-globex")
5036            .is_ok());
5037
5038        // Cross-tenant credentials are rejected.
5039        assert!(store
5040            .authenticate_in_tenant(Some("acme"), "alice", "pw-globex")
5041            .is_err());
5042        assert!(store
5043            .authenticate_in_tenant(Some("globex"), "alice", "pw-acme")
5044            .is_err());
5045    }
5046
5047    #[test]
5048    fn test_session_carries_tenant() {
5049        let store = AuthStore::new(test_config());
5050        store
5051            .create_user_in_tenant(Some("acme"), "alice", "pw", Role::Admin)
5052            .unwrap();
5053        let session = store
5054            .authenticate_in_tenant(Some("acme"), "alice", "pw")
5055            .unwrap();
5056        assert_eq!(session.tenant_id.as_deref(), Some("acme"));
5057
5058        let (id, role) = store.validate_token_full(&session.token).unwrap();
5059        assert_eq!(id.tenant.as_deref(), Some("acme"));
5060        assert_eq!(id.username, "alice");
5061        assert_eq!(role, Role::Admin);
5062    }
5063
5064    #[test]
5065    fn test_platform_user_has_no_tenant() {
5066        let store = AuthStore::new(test_config());
5067        store.create_user("admin", "pw", Role::Admin).unwrap();
5068        let session = store.authenticate("admin", "pw").unwrap();
5069        assert!(session.tenant_id.is_none());
5070
5071        let (id, _) = store.validate_token_full(&session.token).unwrap();
5072        assert!(id.tenant.is_none());
5073    }
5074
5075    #[test]
5076    fn test_lookup_scram_verifier_global_resolves_platform() {
5077        let store = AuthStore::new(test_config());
5078        store.create_user("admin", "pw", Role::Admin).unwrap();
5079        store
5080            .create_user_in_tenant(Some("acme"), "admin", "pw", Role::Admin)
5081            .unwrap();
5082
5083        // The global helper picks the platform-tenant user only.
5084        let v = store.lookup_scram_verifier_global("admin");
5085        assert!(v.is_some());
5086
5087        // The tenant-scoped user has its own verifier.
5088        let v_acme = store.lookup_scram_verifier(&UserId::scoped("acme", "admin"));
5089        assert!(v_acme.is_some());
5090
5091        // The two verifiers carry independent salts.
5092        assert_ne!(v.unwrap().salt, v_acme.unwrap().salt);
5093    }
5094
5095    #[test]
5096    fn test_delete_in_tenant_does_not_touch_other_tenant() {
5097        let store = AuthStore::new(test_config());
5098        store
5099            .create_user_in_tenant(Some("acme"), "alice", "pw", Role::Admin)
5100            .unwrap();
5101        store
5102            .create_user_in_tenant(Some("globex"), "alice", "pw", Role::Admin)
5103            .unwrap();
5104
5105        store.delete_user_in_tenant(Some("acme"), "alice").unwrap();
5106
5107        // Globex still alive.
5108        assert!(store
5109            .authenticate_in_tenant(Some("globex"), "alice", "pw")
5110            .is_ok());
5111        // Acme gone.
5112        assert!(store
5113            .authenticate_in_tenant(Some("acme"), "alice", "pw")
5114            .is_err());
5115    }
5116
5117    #[test]
5118    fn test_user_id_display() {
5119        assert_eq!(UserId::platform("admin").to_string(), "admin");
5120        assert_eq!(UserId::scoped("acme", "alice").to_string(), "acme/alice");
5121    }
5122
5123    // -----------------------------------------------------------------
5124    // PolicyEnforcementMode wiring (#712 / S5A)
5125    // -----------------------------------------------------------------
5126
5127    fn enforcement_eval_ctx(role: Role) -> EvalContext {
5128        EvalContext {
5129            principal_tenant: None,
5130            current_tenant: None,
5131            peer_ip: None,
5132            mfa_present: false,
5133            now_ms: 1_700_000_000_000,
5134            principal_is_admin_role: role == Role::Admin,
5135            principal_is_system_owned: false,
5136            principal_is_platform_scoped: true,
5137        }
5138    }
5139
5140    /// Helper: install a single unrelated allow policy so the IAM
5141    /// path is the authoritative one — without any installed policy
5142    /// the auth flow short-circuits before we ever consult the mode.
5143    fn install_unrelated_policy(store: &AuthStore) {
5144        store
5145            .put_policy(
5146                Policy::from_json_str(
5147                    r#"{"id":"p-unrelated","version":1,"statements":[{"effect":"allow","actions":["select"],"resources":["table:public.other"]}]}"#,
5148                )
5149                .unwrap(),
5150            )
5151            .unwrap();
5152    }
5153
5154    #[test]
5155    fn enforcement_mode_default_for_new_store_is_legacy_rbac() {
5156        // Pre-bootstrap construction = "existing install" path.
5157        // Defaulting to LegacyRbac preserves pre-#712 behaviour on
5158        // upgrade so an operator that has not yet attached IAM
5159        // policies does not lose access after the upgrade.
5160        let store = AuthStore::new(test_config());
5161        assert_eq!(store.enforcement_mode(), PolicyEnforcementMode::LegacyRbac);
5162    }
5163
5164    #[test]
5165    fn enforcement_mode_set_round_trips_and_returns_previous() {
5166        let store = AuthStore::new(test_config());
5167        let prev = store.set_enforcement_mode(PolicyEnforcementMode::PolicyOnly);
5168        assert_eq!(prev, PolicyEnforcementMode::LegacyRbac);
5169        assert_eq!(store.enforcement_mode(), PolicyEnforcementMode::PolicyOnly);
5170        let prev = store.set_enforcement_mode(PolicyEnforcementMode::LegacyRbac);
5171        assert_eq!(prev, PolicyEnforcementMode::PolicyOnly);
5172    }
5173
5174    #[test]
5175    fn policy_only_mode_treats_no_matching_policy_as_deny() {
5176        // Acceptance #2: in `policy_only` mode a principal with no
5177        // matching policy is denied even when the action's role
5178        // floor would otherwise have allowed it.
5179        let store = AuthStore::new(test_config());
5180        store.create_user("alice", "p", Role::Admin).unwrap();
5181        let uid = UserId::platform("alice");
5182        install_unrelated_policy(&store);
5183        store.set_enforcement_mode(PolicyEnforcementMode::PolicyOnly);
5184
5185        let resource = ResourceRef::new("table", "public.t");
5186        // Even Role::Admin is denied — DefaultDeny is the deciding
5187        // signal in `policy_only`, regardless of role.
5188        assert!(!store.check_policy_authz_with_role(
5189            &uid,
5190            "select",
5191            &resource,
5192            &enforcement_eval_ctx(Role::Admin),
5193            Role::Admin,
5194        ));
5195    }
5196
5197    #[test]
5198    fn legacy_rbac_mode_falls_back_to_role_decision() {
5199        // Acceptance #3: in `legacy_rbac` mode the same principal
5200        // falls through to the role-based decision. `select` requires
5201        // only `Read`, so all three roles satisfy it; a write verb
5202        // requires `Write`, so `Read` does not.
5203        let store = AuthStore::new(test_config());
5204        store.create_user("alice", "p", Role::Read).unwrap();
5205        let uid = UserId::platform("alice");
5206        install_unrelated_policy(&store);
5207        store.set_enforcement_mode(PolicyEnforcementMode::LegacyRbac);
5208
5209        let table = ResourceRef::new("table", "public.t");
5210        // Read role + select action: allowed via the legacy fallback.
5211        assert!(store.check_policy_authz_with_role(
5212            &uid,
5213            "select",
5214            &table,
5215            &enforcement_eval_ctx(Role::Read),
5216            Role::Read,
5217        ));
5218        // Read role + insert action: legacy decision says no.
5219        assert!(!store.check_policy_authz_with_role(
5220            &uid,
5221            "insert",
5222            &table,
5223            &enforcement_eval_ctx(Role::Read),
5224            Role::Read,
5225        ));
5226        // Admin role + insert action: legacy decision says yes.
5227        assert!(store.check_policy_authz_with_role(
5228            &uid,
5229            "insert",
5230            &table,
5231            &enforcement_eval_ctx(Role::Admin),
5232            Role::Admin,
5233        ));
5234    }
5235
5236    #[test]
5237    fn explicit_deny_wins_irrespective_of_mode_or_role() {
5238        // Even in `legacy_rbac` with an Admin role, an explicit
5239        // matching Deny statement always denies. The mode only
5240        // influences the `DefaultDeny` arm, never an explicit one.
5241        let store = AuthStore::new(test_config());
5242        store.create_user("alice", "p", Role::Admin).unwrap();
5243        let uid = UserId::platform("alice");
5244        store
5245            .put_policy(
5246                Policy::from_json_str(
5247                    r#"{"id":"p-deny-select","version":1,"statements":[{"effect":"deny","actions":["select"],"resources":["table:public.t"]}]}"#,
5248                )
5249                .unwrap(),
5250            )
5251            .unwrap();
5252        store
5253            .attach_policy(PrincipalRef::User(uid.clone()), "p-deny-select")
5254            .unwrap();
5255
5256        let resource = ResourceRef::new("table", "public.t");
5257        for mode in [
5258            PolicyEnforcementMode::LegacyRbac,
5259            PolicyEnforcementMode::PolicyOnly,
5260        ] {
5261            store.set_enforcement_mode(mode);
5262            assert!(
5263                !store.check_policy_authz_with_role(
5264                    &uid,
5265                    "select",
5266                    &resource,
5267                    &enforcement_eval_ctx(Role::Admin),
5268                    Role::Admin,
5269                ),
5270                "explicit deny must win under mode {mode:?}"
5271            );
5272        }
5273    }
5274
5275    #[test]
5276    fn explicit_allow_wins_irrespective_of_mode_or_role() {
5277        // Symmetric guarantee: an explicit Allow is honoured even in
5278        // `policy_only` mode for a role that the legacy fallback
5279        // would have rejected (Read role + admin-tier action).
5280        let store = AuthStore::new(test_config());
5281        store.create_user("alice", "p", Role::Read).unwrap();
5282        let uid = UserId::platform("alice");
5283        store
5284            .put_policy(
5285                Policy::from_json_str(
5286                    r#"{"id":"p-allow-create","version":1,"statements":[{"effect":"allow","actions":["create"],"resources":["table:public.t"]}]}"#,
5287                )
5288                .unwrap(),
5289            )
5290            .unwrap();
5291        store
5292            .attach_policy(PrincipalRef::User(uid.clone()), "p-allow-create")
5293            .unwrap();
5294        store.set_enforcement_mode(PolicyEnforcementMode::PolicyOnly);
5295
5296        let resource = ResourceRef::new("table", "public.t");
5297        assert!(store.check_policy_authz_with_role(
5298            &uid,
5299            "create",
5300            &resource,
5301            &enforcement_eval_ctx(Role::Read),
5302            Role::Read,
5303        ));
5304    }
5305
5306    #[test]
5307    fn take_legacy_rbac_warn_once_is_at_most_once_per_boot() {
5308        let store = AuthStore::new(test_config());
5309        // Default mode = LegacyRbac, so the first call wins the token.
5310        assert!(store.take_legacy_rbac_warn_once());
5311        // Every subsequent call is a no-op for the lifetime of this
5312        // process / AuthStore — that is the "once per boot" guarantee.
5313        for _ in 0..3 {
5314            assert!(!store.take_legacy_rbac_warn_once());
5315        }
5316    }
5317
5318    #[test]
5319    fn take_legacy_rbac_warn_once_is_silent_under_policy_only() {
5320        // Acceptance #5 phrases the warning as "Boot in legacy_rbac
5321        // mode emits exactly one warn entry per boot" — implicitly,
5322        // `policy_only` boots emit zero. The token must not be
5323        // claimable when the mode is strict so the boot path stays
5324        // quiet.
5325        let store = AuthStore::new(test_config());
5326        store.set_enforcement_mode(PolicyEnforcementMode::PolicyOnly);
5327        assert!(!store.take_legacy_rbac_warn_once());
5328        // And, importantly, switching back to LegacyRbac later does
5329        // not "owe" us a warning — the token was never claimed under
5330        // policy_only, so it is still available the first time the
5331        // mode is legacy_rbac.
5332        store.set_enforcement_mode(PolicyEnforcementMode::LegacyRbac);
5333        assert!(store.take_legacy_rbac_warn_once());
5334        assert!(!store.take_legacy_rbac_warn_once());
5335    }
5336
5337    #[test]
5338    fn strict_check_policy_authz_ignores_enforcement_mode() {
5339        // Governance APIs (managed_config, registry, managed_policy)
5340        // call the strict `check_policy_authz` — it must return
5341        // `false` on DefaultDeny regardless of mode so the lenient
5342        // posture cannot accidentally elevate an admin-tier mutation.
5343        let store = AuthStore::new(test_config());
5344        store.create_user("alice", "p", Role::Admin).unwrap();
5345        let uid = UserId::platform("alice");
5346        install_unrelated_policy(&store);
5347        store.set_enforcement_mode(PolicyEnforcementMode::LegacyRbac);
5348
5349        let resource = ResourceRef::new("table", "public.t");
5350        assert!(!store.check_policy_authz(
5351            &uid,
5352            "select",
5353            &resource,
5354            &enforcement_eval_ctx(Role::Admin),
5355        ));
5356    }
5357}