Skip to main content

rivven_core/
auth.rs

1//! Authentication and Authorization (RBAC/ACL) for Rivven
2//!
3//! This module provides production-grade security for Rivven including:
4//! - SASL/PLAIN authentication (compatible with Kafka clients)
5//! - SCRAM-SHA-256 authentication (more secure, salted)
6//! - Role-Based Access Control (RBAC)
7//! - Topic/Schema-level Access Control Lists (ACLs)
8//! - Principal management (users, service accounts)
9//!
10//! ## Security Model
11//!
12//! Rivven uses a principal-based security model:
13//! - **Principal**: An authenticated identity (user, service account)
14//! - **Role**: A set of permissions (admin, producer, consumer)
15//! - **ACL**: Fine-grained access rules for specific resources
16//!
17//! ## Threat Model
18//!
19//! This implementation defends against:
20//! - Credential stuffing (rate limiting on auth failures)
21//! - Timing attacks (constant-time password comparison)
22//! - Replay attacks (nonce-based challenge-response)
23//! - Privilege escalation (strict role hierarchy)
24//! - Resource enumeration (deny by default)
25
26use parking_lot::RwLock;
27use ring::rand::{SecureRandom, SystemRandom};
28use serde::{Deserialize, Serialize};
29use sha2::{Digest, Sha256};
30use std::collections::{HashMap, HashSet};
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33use thiserror::Error;
34use tracing::{debug, warn};
35
36// ============================================================================
37// Error Types
38// ============================================================================
39
40#[derive(Error, Debug, Clone)]
41pub enum AuthError {
42    #[error("Authentication failed")]
43    AuthenticationFailed,
44
45    #[error("Invalid credentials")]
46    InvalidCredentials,
47
48    #[error("Principal not found: {0}")]
49    PrincipalNotFound(String),
50
51    #[error("Principal already exists: {0}")]
52    PrincipalAlreadyExists(String),
53
54    #[error("Access denied: {0}")]
55    AccessDenied(String),
56
57    #[error("Permission denied: {principal} lacks {permission} on {resource}")]
58    PermissionDenied {
59        principal: String,
60        permission: String,
61        resource: String,
62    },
63
64    #[error("Role not found: {0}")]
65    RoleNotFound(String),
66
67    #[error("Invalid token")]
68    InvalidToken,
69
70    #[error("Token expired")]
71    TokenExpired,
72
73    #[error("Rate limited: too many authentication failures")]
74    RateLimited,
75
76    #[error("Internal error: {0}")]
77    Internal(String),
78}
79
80pub type AuthResult<T> = std::result::Result<T, AuthError>;
81
82// ============================================================================
83// Resource Types for ACLs
84// ============================================================================
85
86/// Types of resources that can have ACLs
87#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
88pub enum ResourceType {
89    /// All cluster operations
90    Cluster,
91    /// Specific topic
92    Topic(String),
93    /// Pattern-matched topics (e.g., "orders-*")
94    TopicPattern(String),
95    /// Consumer group
96    ConsumerGroup(String),
97    /// Schema subject
98    Schema(String),
99    /// Transactional ID
100    TransactionalId(String),
101}
102
103impl ResourceType {
104    /// Check if this resource matches another (for pattern matching)
105    pub fn matches(&self, other: &ResourceType) -> bool {
106        match (self, other) {
107            // Exact match
108            (a, b) if a == b => true,
109
110            // Topic pattern matching
111            (ResourceType::TopicPattern(pattern), ResourceType::Topic(name)) => {
112                Self::glob_match(pattern, name)
113            }
114            (ResourceType::Topic(name), ResourceType::TopicPattern(pattern)) => {
115                Self::glob_match(pattern, name)
116            }
117
118            _ => false,
119        }
120    }
121
122    /// Simple glob matching for patterns like "orders-*"
123    fn glob_match(pattern: &str, text: &str) -> bool {
124        if pattern == "*" {
125            return true;
126        }
127
128        if let Some(prefix) = pattern.strip_suffix('*') {
129            return text.starts_with(prefix);
130        }
131
132        if let Some(suffix) = pattern.strip_prefix('*') {
133            return text.ends_with(suffix);
134        }
135
136        // Check for middle wildcard (e.g., "pre*suf")
137        if let Some(idx) = pattern.find('*') {
138            let prefix = &pattern[..idx];
139            let suffix = &pattern[idx + 1..];
140            return text.starts_with(prefix)
141                && text.ends_with(suffix)
142                && text.len() >= prefix.len() + suffix.len();
143        }
144
145        pattern == text
146    }
147}
148
149// ============================================================================
150// Permissions
151// ============================================================================
152
153/// Operations that can be performed on resources
154#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
155pub enum Permission {
156    // Topic operations
157    Read,     // Consume from topic
158    Write,    // Produce to topic
159    Create,   // Create topic
160    Delete,   // Delete topic
161    Alter,    // Modify topic config
162    Describe, // View topic metadata
163
164    // Consumer group operations
165    GroupRead,   // Read group state
166    GroupDelete, // Delete consumer group
167
168    // Cluster operations
169    ClusterAction,   // Cluster-wide actions (rebalance, etc.)
170    IdempotentWrite, // Idempotent producer
171
172    // Admin operations
173    AlterConfigs,    // Modify broker configs
174    DescribeConfigs, // View broker configs
175
176    // Full access
177    All, // All permissions (super admin)
178}
179
180impl Permission {
181    /// Check if this permission implies another permission
182    /// A permission implies itself + any subordinate permissions
183    pub fn implies(&self, other: &Permission) -> bool {
184        // Same permission always implies itself
185        if self == other {
186            return true;
187        }
188
189        match self {
190            Permission::All => true, // All implies everything
191            // These permissions imply Describe
192            Permission::Alter | Permission::Write | Permission::Read => {
193                matches!(other, Permission::Describe)
194            }
195            _ => false,
196        }
197    }
198}
199
200// ============================================================================
201// Principal Types
202// ============================================================================
203
204/// Type of principal (for audit logging and quotas)
205#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
206pub enum PrincipalType {
207    User,
208    ServiceAccount,
209    Anonymous,
210}
211
212/// A security principal (identity)
213///
214/// # Security Note
215///
216/// This struct implements a custom Debug that redacts the password_hash field
217/// to prevent accidental leakage to logs.
218#[derive(Clone, Serialize, Deserialize)]
219pub struct Principal {
220    /// Principal name (unique identifier)
221    pub name: String,
222
223    /// Type of principal
224    pub principal_type: PrincipalType,
225
226    /// Hashed password (SCRAM-SHA-256 format)
227    pub password_hash: PasswordHash,
228
229    /// Roles assigned to this principal
230    pub roles: HashSet<String>,
231
232    /// Whether the principal is enabled
233    pub enabled: bool,
234
235    /// Optional metadata (tags, labels)
236    pub metadata: HashMap<String, String>,
237
238    /// Creation timestamp
239    pub created_at: u64,
240}
241
242impl std::fmt::Debug for Principal {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        f.debug_struct("Principal")
245            .field("name", &self.name)
246            .field("principal_type", &self.principal_type)
247            .field("password_hash", &"[REDACTED]")
248            .field("roles", &self.roles)
249            .field("enabled", &self.enabled)
250            .field("metadata", &self.metadata)
251            .field("created_at", &self.created_at)
252            .finish()
253    }
254}
255
256/// SCRAM-SHA-256 password hash with salt and iterations
257///
258/// # Security Note
259///
260/// This struct implements a custom Debug that redacts sensitive key material
261/// to prevent accidental leakage to logs.
262#[derive(Clone, Serialize, Deserialize)]
263pub struct PasswordHash {
264    /// Salt (32 bytes, base64 encoded for storage)
265    pub salt: Vec<u8>,
266    /// Number of iterations
267    pub iterations: u32,
268    /// Server key
269    pub server_key: Vec<u8>,
270    /// Stored key
271    pub stored_key: Vec<u8>,
272}
273
274impl std::fmt::Debug for PasswordHash {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        f.debug_struct("PasswordHash")
277            .field("salt", &"[REDACTED]")
278            .field("iterations", &self.iterations)
279            .field("server_key", &"[REDACTED]")
280            .field("stored_key", &"[REDACTED]")
281            .finish()
282    }
283}
284
285impl PasswordHash {
286    /// Create a new password hash from plaintext
287    pub fn new(password: &str) -> Self {
288        let rng = SystemRandom::new();
289        let mut salt = vec![0u8; 32];
290        rng.fill(&mut salt).expect("Failed to generate salt");
291
292        Self::with_salt(password, &salt, 600_000)
293    }
294
295    /// Create a password hash with a specific salt (for testing/migration)
296    pub fn with_salt(password: &str, salt: &[u8], iterations: u32) -> Self {
297        // PBKDF2-HMAC-SHA256 derivation
298        let salted_password = Self::pbkdf2_sha256(password.as_bytes(), salt, iterations);
299
300        // Derive client and server keys
301        let client_key = Self::hmac_sha256(&salted_password, b"Client Key");
302        let server_key = Self::hmac_sha256(&salted_password, b"Server Key");
303
304        // Stored key = H(client_key)
305        let stored_key = Sha256::digest(&client_key).to_vec();
306
307        PasswordHash {
308            salt: salt.to_vec(),
309            iterations,
310            server_key,
311            stored_key,
312        }
313    }
314
315    /// Verify a password against this hash (constant-time comparison)
316    pub fn verify(&self, password: &str) -> bool {
317        let salted_password = Self::pbkdf2_sha256(password.as_bytes(), &self.salt, self.iterations);
318        let client_key = Self::hmac_sha256(&salted_password, b"Client Key");
319        let stored_key = Sha256::digest(&client_key);
320
321        // Constant-time comparison to prevent timing attacks
322        Self::constant_time_compare(&stored_key, &self.stored_key)
323    }
324
325    /// Constant-time comparison to prevent timing attacks
326    pub fn constant_time_compare(a: &[u8], b: &[u8]) -> bool {
327        if a.len() != b.len() {
328            return false;
329        }
330
331        // XOR all bytes and accumulate - timing is constant regardless of where mismatch occurs
332        let mut result = 0u8;
333        for (x, y) in a.iter().zip(b.iter()) {
334            result |= x ^ y;
335        }
336        result == 0
337    }
338
339    /// PBKDF2-HMAC-SHA256 key derivation
340    fn pbkdf2_sha256(password: &[u8], salt: &[u8], iterations: u32) -> Vec<u8> {
341        use hmac::{Hmac, Mac};
342        type HmacSha256 = Hmac<Sha256>;
343
344        let mut result = vec![0u8; 32];
345
346        // U1 = PRF(Password, Salt || INT(1))
347        let mut mac = HmacSha256::new_from_slice(password).expect("HMAC accepts any key length");
348        mac.update(salt);
349        mac.update(&1u32.to_be_bytes());
350        let mut u = mac.finalize().into_bytes();
351        result.copy_from_slice(&u);
352
353        // Ui = PRF(Password, Ui-1)
354        for _ in 1..iterations {
355            let mut mac =
356                HmacSha256::new_from_slice(password).expect("HMAC accepts any key length");
357            mac.update(&u);
358            u = mac.finalize().into_bytes();
359
360            for (r, ui) in result.iter_mut().zip(u.iter()) {
361                *r ^= ui;
362            }
363        }
364
365        result
366    }
367
368    /// HMAC-SHA256
369    pub fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
370        use hmac::{Hmac, Mac};
371        type HmacSha256 = Hmac<Sha256>;
372
373        let mut mac = HmacSha256::new_from_slice(key).expect("HMAC accepts any key length");
374        mac.update(data);
375        mac.finalize().into_bytes().to_vec()
376    }
377}
378
379// ============================================================================
380// Roles
381// ============================================================================
382
383/// A role with a set of permissions
384#[derive(Debug, Clone, Serialize, Deserialize)]
385pub struct Role {
386    /// Role name
387    pub name: String,
388
389    /// Description
390    pub description: String,
391
392    /// Permissions granted by this role
393    pub permissions: HashSet<(ResourceType, Permission)>,
394
395    /// Whether this is a built-in role
396    pub builtin: bool,
397}
398
399impl Role {
400    /// Create a built-in admin role
401    pub fn admin() -> Self {
402        let mut permissions = HashSet::new();
403        permissions.insert((ResourceType::Cluster, Permission::All));
404
405        Role {
406            name: "admin".to_string(),
407            description: "Full administrative access to all resources".to_string(),
408            permissions,
409            builtin: true,
410        }
411    }
412
413    /// Create a built-in producer role
414    pub fn producer() -> Self {
415        let mut permissions = HashSet::new();
416        permissions.insert((
417            ResourceType::TopicPattern("*".to_string()),
418            Permission::Write,
419        ));
420        permissions.insert((
421            ResourceType::TopicPattern("*".to_string()),
422            Permission::Describe,
423        ));
424        permissions.insert((ResourceType::Cluster, Permission::IdempotentWrite));
425
426        Role {
427            name: "producer".to_string(),
428            description: "Can produce to all topics".to_string(),
429            permissions,
430            builtin: true,
431        }
432    }
433
434    /// Create a built-in consumer role
435    pub fn consumer() -> Self {
436        let mut permissions = HashSet::new();
437        permissions.insert((
438            ResourceType::TopicPattern("*".to_string()),
439            Permission::Read,
440        ));
441        permissions.insert((
442            ResourceType::TopicPattern("*".to_string()),
443            Permission::Describe,
444        ));
445        permissions.insert((
446            ResourceType::ConsumerGroup("*".to_string()),
447            Permission::GroupRead,
448        ));
449
450        Role {
451            name: "consumer".to_string(),
452            description: "Can consume from all topics".to_string(),
453            permissions,
454            builtin: true,
455        }
456    }
457
458    /// Create a built-in read-only role
459    pub fn read_only() -> Self {
460        let mut permissions = HashSet::new();
461        permissions.insert((
462            ResourceType::TopicPattern("*".to_string()),
463            Permission::Read,
464        ));
465        permissions.insert((
466            ResourceType::TopicPattern("*".to_string()),
467            Permission::Describe,
468        ));
469
470        Role {
471            name: "read-only".to_string(),
472            description: "Read-only access to all topics".to_string(),
473            permissions,
474            builtin: true,
475        }
476    }
477}
478
479// ============================================================================
480// Access Control List (ACL)
481// ============================================================================
482
483/// An ACL entry
484#[derive(Debug, Clone, Serialize, Deserialize)]
485pub struct AclEntry {
486    /// Principal this ACL applies to (use "*" for all)
487    pub principal: String,
488
489    /// Resource this ACL applies to
490    pub resource: ResourceType,
491
492    /// Permission granted or denied
493    pub permission: Permission,
494
495    /// Whether this is an allow or deny rule
496    pub allow: bool,
497
498    /// Host pattern (IP or hostname, "*" for all)
499    pub host: String,
500}
501
502/// Indexed ACL store for O(1) average-case lookups.
503///
504/// ACL entries are indexed by principal name. Wildcard (`*`) entries are
505/// stored separately and consulted on every lookup to preserve deny-takes-
506/// precedence semantics. For N total ACLs but only W wildcard rules, a
507/// lookup scans at most `entries_for_principal + W` entries instead of N.
508#[derive(Debug, Default)]
509struct AclIndex {
510    /// ACLs keyed by exact principal name.
511    by_principal: HashMap<String, Vec<AclEntry>>,
512    /// Wildcard ACLs (principal == "*"). Checked on every lookup.
513    wildcard: Vec<AclEntry>,
514}
515
516impl AclIndex {
517    fn new() -> Self {
518        Self::default()
519    }
520
521    fn push(&mut self, entry: AclEntry) {
522        if entry.principal == "*" {
523            self.wildcard.push(entry);
524        } else {
525            self.by_principal
526                .entry(entry.principal.clone())
527                .or_default()
528                .push(entry);
529        }
530    }
531
532    fn retain<F: Fn(&AclEntry) -> bool>(&mut self, predicate: F) {
533        self.wildcard.retain(&predicate);
534        self.by_principal.retain(|_, entries| {
535            entries.retain(&predicate);
536            !entries.is_empty()
537        });
538    }
539
540    /// Return all entries that could apply to the given principal.
541    fn lookup(&self, principal: &str) -> impl Iterator<Item = &AclEntry> {
542        let specific = self
543            .by_principal
544            .get(principal)
545            .map(|v| v.as_slice())
546            .unwrap_or(&[]);
547        specific.iter().chain(self.wildcard.iter())
548    }
549
550    fn to_vec(&self) -> Vec<AclEntry> {
551        self.wildcard
552            .iter()
553            .chain(self.by_principal.values().flatten())
554            .cloned()
555            .collect()
556    }
557}
558
559// ============================================================================
560// Session and Token
561// ============================================================================
562
563/// An authenticated session
564#[derive(Debug, Clone)]
565pub struct AuthSession {
566    /// Session ID
567    pub id: String,
568
569    /// Authenticated principal name
570    pub principal_name: String,
571
572    /// Principal type
573    pub principal_type: PrincipalType,
574
575    /// Resolved permissions (cached for performance)
576    pub permissions: HashSet<(ResourceType, Permission)>,
577
578    /// Session creation time
579    pub created_at: Instant,
580
581    /// Session expiration time
582    pub expires_at: Instant,
583
584    /// Client IP address
585    pub client_ip: String,
586}
587
588impl AuthSession {
589    /// Check if the session has expired
590    pub fn is_expired(&self) -> bool {
591        Instant::now() >= self.expires_at
592    }
593
594    /// Check if this session has a specific permission on a resource
595    pub fn has_permission(&self, resource: &ResourceType, permission: &Permission) -> bool {
596        // Admin has all permissions
597        if self
598            .permissions
599            .contains(&(ResourceType::Cluster, Permission::All))
600        {
601            return true;
602        }
603
604        // Check direct permission
605        if self.permissions.contains(&(resource.clone(), *permission)) {
606            return true;
607        }
608
609        // Check pattern matches and permission implies
610        for (res, perm) in &self.permissions {
611            // Check if the granted resource (res) matches the requested resource
612            // AND the granted permission (perm) implies the requested permission
613            let resource_matches = res.matches(resource);
614            let permission_implies = perm.implies(permission);
615            if resource_matches && permission_implies {
616                return true;
617            }
618        }
619
620        false
621    }
622}
623
624// ============================================================================
625// Auth Manager
626// ============================================================================
627
628/// Configuration for the authentication manager
629#[derive(Debug, Clone)]
630pub struct AuthConfig {
631    /// Session timeout
632    pub session_timeout: Duration,
633
634    /// Maximum failed auth attempts before lockout
635    pub max_failed_attempts: u32,
636
637    /// Lockout duration after max failed attempts
638    pub lockout_duration: Duration,
639
640    /// Whether to require authentication (false = anonymous access allowed)
641    pub require_authentication: bool,
642
643    /// Whether to enable ACL enforcement
644    pub enable_acls: bool,
645
646    /// Default deny (true = deny unless explicitly allowed)
647    pub default_deny: bool,
648}
649
650impl Default for AuthConfig {
651    fn default() -> Self {
652        AuthConfig {
653            session_timeout: Duration::from_secs(3600), // 1 hour
654            max_failed_attempts: 5,
655            lockout_duration: Duration::from_secs(300), // 5 minutes
656            require_authentication: false,              // Default to open for dev
657            enable_acls: false,
658            default_deny: true,
659        }
660    }
661}
662
663/// Tracks failed authentication attempts for rate limiting
664struct FailedAttemptTracker {
665    attempts: HashMap<String, Vec<Instant>>,
666    lockouts: HashMap<String, Instant>,
667}
668
669impl FailedAttemptTracker {
670    fn new() -> Self {
671        Self {
672            attempts: HashMap::new(),
673            lockouts: HashMap::new(),
674        }
675    }
676
677    /// Check if an identifier is currently locked out
678    fn is_locked_out(&self, identifier: &str, lockout_duration: Duration) -> bool {
679        if let Some(lockout_time) = self.lockouts.get(identifier) {
680            if lockout_time.elapsed() < lockout_duration {
681                return true;
682            }
683        }
684        false
685    }
686
687    /// Record a failed attempt
688    fn record_failure(
689        &mut self,
690        identifier: &str,
691        max_attempts: u32,
692        lockout_duration: Duration,
693    ) -> bool {
694        let now = Instant::now();
695
696        // Clean up old lockouts
697        self.lockouts.retain(|_, t| t.elapsed() < lockout_duration);
698
699        // Get or create attempt list
700        let attempts = self.attempts.entry(identifier.to_string()).or_default();
701
702        // Remove attempts older than lockout duration
703        attempts.retain(|t| t.elapsed() < lockout_duration);
704
705        // Add this attempt
706        attempts.push(now);
707
708        // Check if we've exceeded max attempts
709        let exceeded = attempts.len() >= max_attempts as usize;
710        if exceeded {
711            warn!(
712                "Principal '{}' locked out after {} failed attempts",
713                identifier, max_attempts
714            );
715            self.lockouts.insert(identifier.to_string(), now);
716        }
717
718        // Periodic cleanup: remove entries with no recent attempts to bound memory
719        if self.attempts.len() > 10_000 {
720            self.attempts.retain(|_, v| !v.is_empty());
721        }
722
723        exceeded
724    }
725
726    /// Clear failures for an identifier (on successful auth)
727    fn clear_failures(&mut self, identifier: &str) {
728        self.attempts.remove(identifier);
729        self.lockouts.remove(identifier);
730    }
731}
732
733/// Validate password strength with complexity requirements
734fn validate_password_strength(password: &str) -> AuthResult<()> {
735    if password.len() < 8 {
736        return Err(AuthError::Internal(
737            "Password must be at least 8 characters".to_string(),
738        ));
739    }
740
741    let has_uppercase = password.chars().any(|c| c.is_ascii_uppercase());
742    let has_lowercase = password.chars().any(|c| c.is_ascii_lowercase());
743    let has_digit = password.chars().any(|c| c.is_ascii_digit());
744    let has_special = password.chars().any(|c| !c.is_alphanumeric());
745
746    if !has_uppercase {
747        return Err(AuthError::Internal(
748            "Password must contain at least one uppercase letter".to_string(),
749        ));
750    }
751    if !has_lowercase {
752        return Err(AuthError::Internal(
753            "Password must contain at least one lowercase letter".to_string(),
754        ));
755    }
756    if !has_digit {
757        return Err(AuthError::Internal(
758            "Password must contain at least one digit".to_string(),
759        ));
760    }
761    if !has_special {
762        return Err(AuthError::Internal(
763            "Password must contain at least one special character".to_string(),
764        ));
765    }
766
767    Ok(())
768}
769
770/// The main authentication and authorization manager
771pub struct AuthManager {
772    config: AuthConfig,
773
774    /// Principals (users/service accounts)
775    principals: RwLock<HashMap<String, Principal>>,
776
777    /// Roles
778    roles: RwLock<HashMap<String, Role>>,
779
780    /// ACL entries (indexed by principal for fast lookup)
781    acls: RwLock<AclIndex>,
782
783    /// Active sessions
784    sessions: RwLock<HashMap<String, AuthSession>>,
785
786    /// Failed attempt tracking
787    failed_attempts: RwLock<FailedAttemptTracker>,
788
789    /// Random number generator for session IDs
790    rng: SystemRandom,
791}
792
793impl AuthManager {
794    /// Create a new authentication manager
795    pub fn new(config: AuthConfig) -> Self {
796        let manager = Self {
797            config,
798            principals: RwLock::new(HashMap::new()),
799            roles: RwLock::new(HashMap::new()),
800            acls: RwLock::new(AclIndex::new()),
801            sessions: RwLock::new(HashMap::new()),
802            failed_attempts: RwLock::new(FailedAttemptTracker::new()),
803            rng: SystemRandom::new(),
804        };
805
806        // Initialize built-in roles
807        manager.init_builtin_roles();
808
809        manager
810    }
811
812    /// Create with default config
813    pub fn new_default() -> Self {
814        Self::new(AuthConfig::default())
815    }
816
817    /// Create an auth manager with authentication enabled
818    pub fn with_auth_enabled() -> Self {
819        Self::new(AuthConfig {
820            require_authentication: true,
821            enable_acls: true,
822            ..Default::default()
823        })
824    }
825
826    /// Initialize built-in roles
827    fn init_builtin_roles(&self) {
828        let mut roles = self.roles.write();
829        roles.insert("admin".to_string(), Role::admin());
830        roles.insert("producer".to_string(), Role::producer());
831        roles.insert("consumer".to_string(), Role::consumer());
832        roles.insert("read-only".to_string(), Role::read_only());
833    }
834
835    // ========================================================================
836    // Principal Management
837    // ========================================================================
838
839    /// Create a new principal (user or service account)
840    pub fn create_principal(
841        &self,
842        name: &str,
843        password: &str,
844        principal_type: PrincipalType,
845        roles: HashSet<String>,
846    ) -> AuthResult<()> {
847        // Validate principal name
848        if name.is_empty() || name.len() > 255 {
849            return Err(AuthError::Internal("Invalid principal name".to_string()));
850        }
851
852        // Validate password strength (minimum requirements + complexity)
853        validate_password_strength(password)?;
854
855        // Validate roles exist
856        {
857            let role_map = self.roles.read();
858            for role in &roles {
859                if !role_map.contains_key(role) {
860                    return Err(AuthError::RoleNotFound(role.clone()));
861                }
862            }
863        }
864
865        let mut principals = self.principals.write();
866
867        if principals.contains_key(name) {
868            return Err(AuthError::PrincipalAlreadyExists(name.to_string()));
869        }
870
871        let principal = Principal {
872            name: name.to_string(),
873            principal_type,
874            password_hash: PasswordHash::new(password),
875            roles,
876            enabled: true,
877            metadata: HashMap::new(),
878            created_at: std::time::SystemTime::now()
879                .duration_since(std::time::UNIX_EPOCH)
880                .unwrap_or_default()
881                .as_secs(),
882        };
883
884        principals.insert(name.to_string(), principal);
885        debug!("Created principal: {}", name);
886
887        Ok(())
888    }
889
890    /// Delete a principal
891    pub fn delete_principal(&self, name: &str) -> AuthResult<()> {
892        let mut principals = self.principals.write();
893
894        if principals.remove(name).is_none() {
895            return Err(AuthError::PrincipalNotFound(name.to_string()));
896        }
897
898        // Also invalidate any active sessions for this principal
899        let mut sessions = self.sessions.write();
900        sessions.retain(|_, s| s.principal_name != name);
901
902        debug!("Deleted principal: {}", name);
903        Ok(())
904    }
905
906    /// Get a principal by name
907    pub fn get_principal(&self, name: &str) -> Option<Principal> {
908        self.principals.read().get(name).cloned()
909    }
910
911    /// List all principals
912    pub fn list_principals(&self) -> Vec<String> {
913        self.principals.read().keys().cloned().collect()
914    }
915
916    /// Update principal password
917    pub fn update_password(&self, name: &str, new_password: &str) -> AuthResult<()> {
918        // Validate password strength (minimum requirements + complexity)
919        validate_password_strength(new_password)?;
920
921        let mut principals = self.principals.write();
922
923        let principal = principals
924            .get_mut(name)
925            .ok_or_else(|| AuthError::PrincipalNotFound(name.to_string()))?;
926
927        principal.password_hash = PasswordHash::new(new_password);
928
929        // Invalidate sessions
930        let mut sessions = self.sessions.write();
931        sessions.retain(|_, s| s.principal_name != name);
932
933        debug!("Updated password for principal: {}", name);
934        Ok(())
935    }
936
937    /// Add a role to a principal
938    pub fn add_role_to_principal(&self, principal_name: &str, role_name: &str) -> AuthResult<()> {
939        // Validate role exists
940        if !self.roles.read().contains_key(role_name) {
941            return Err(AuthError::RoleNotFound(role_name.to_string()));
942        }
943
944        let mut principals = self.principals.write();
945
946        let principal = principals
947            .get_mut(principal_name)
948            .ok_or_else(|| AuthError::PrincipalNotFound(principal_name.to_string()))?;
949
950        principal.roles.insert(role_name.to_string());
951
952        debug!(
953            "Added role '{}' to principal '{}'",
954            role_name, principal_name
955        );
956        Ok(())
957    }
958
959    /// Remove a role from a principal
960    pub fn remove_role_from_principal(
961        &self,
962        principal_name: &str,
963        role_name: &str,
964    ) -> AuthResult<()> {
965        let mut principals = self.principals.write();
966
967        let principal = principals
968            .get_mut(principal_name)
969            .ok_or_else(|| AuthError::PrincipalNotFound(principal_name.to_string()))?;
970
971        principal.roles.remove(role_name);
972
973        debug!(
974            "Removed role '{}' from principal '{}'",
975            role_name, principal_name
976        );
977        Ok(())
978    }
979
980    // ========================================================================
981    // Role Management
982    // ========================================================================
983
984    /// Create a custom role
985    pub fn create_role(&self, role: Role) -> AuthResult<()> {
986        let mut roles = self.roles.write();
987
988        if roles.contains_key(&role.name) {
989            return Err(AuthError::Internal(format!(
990                "Role '{}' already exists",
991                role.name
992            )));
993        }
994
995        debug!("Created role: {}", role.name);
996        roles.insert(role.name.clone(), role);
997        Ok(())
998    }
999
1000    /// Delete a custom role
1001    pub fn delete_role(&self, name: &str) -> AuthResult<()> {
1002        let mut roles = self.roles.write();
1003
1004        if let Some(role) = roles.get(name) {
1005            if role.builtin {
1006                return Err(AuthError::Internal(
1007                    "Cannot delete built-in role".to_string(),
1008                ));
1009            }
1010        } else {
1011            return Err(AuthError::RoleNotFound(name.to_string()));
1012        }
1013
1014        roles.remove(name);
1015        debug!("Deleted role: {}", name);
1016        Ok(())
1017    }
1018
1019    /// Get a role by name
1020    pub fn get_role(&self, name: &str) -> Option<Role> {
1021        self.roles.read().get(name).cloned()
1022    }
1023
1024    /// List all roles
1025    pub fn list_roles(&self) -> Vec<String> {
1026        self.roles.read().keys().cloned().collect()
1027    }
1028
1029    // ========================================================================
1030    // ACL Management
1031    // ========================================================================
1032
1033    /// Add an ACL entry
1034    pub fn add_acl(&self, entry: AclEntry) {
1035        let mut acls = self.acls.write();
1036        acls.push(entry);
1037    }
1038
1039    /// Remove ACL entries matching criteria
1040    pub fn remove_acls(&self, principal: Option<&str>, resource: Option<&ResourceType>) {
1041        let mut acls = self.acls.write();
1042        acls.retain(|acl| {
1043            let principal_match =
1044                principal.is_none_or(|p| acl.principal == p || acl.principal == "*");
1045            let resource_match = resource.is_none_or(|r| &acl.resource == r);
1046            !(principal_match && resource_match)
1047        });
1048    }
1049
1050    /// List ACL entries
1051    pub fn list_acls(&self) -> Vec<AclEntry> {
1052        self.acls.read().to_vec()
1053    }
1054
1055    // ========================================================================
1056    // Authentication
1057    // ========================================================================
1058
1059    /// Authenticate a principal and create a session
1060    pub fn authenticate(
1061        &self,
1062        username: &str,
1063        password: &str,
1064        client_ip: &str,
1065    ) -> AuthResult<AuthSession> {
1066        // Check rate limiting
1067        {
1068            let tracker = self.failed_attempts.read();
1069            if tracker.is_locked_out(username, self.config.lockout_duration) {
1070                warn!(
1071                    "Authentication attempt for locked-out principal: {}",
1072                    username
1073                );
1074                return Err(AuthError::RateLimited);
1075            }
1076            if tracker.is_locked_out(client_ip, self.config.lockout_duration) {
1077                warn!("Authentication attempt from locked-out IP: {}", client_ip);
1078                return Err(AuthError::RateLimited);
1079            }
1080        }
1081
1082        // Look up principal
1083        let principal = {
1084            let principals = self.principals.read();
1085            principals.get(username).cloned()
1086        };
1087
1088        let principal = match principal {
1089            Some(p) if p.enabled => p,
1090            Some(_) => {
1091                // Disabled account - don't leak this info
1092                self.record_auth_failure(username, client_ip);
1093                return Err(AuthError::AuthenticationFailed);
1094            }
1095            None => {
1096                // Unknown principal - still do constant-time password check
1097                // to prevent timing attacks that enumerate users
1098                let dummy = PasswordHash::new("dummy");
1099                let _ = dummy.verify(password);
1100                self.record_auth_failure(username, client_ip);
1101                return Err(AuthError::AuthenticationFailed);
1102            }
1103        };
1104
1105        // Verify password (constant-time comparison)
1106        if !principal.password_hash.verify(password) {
1107            self.record_auth_failure(username, client_ip);
1108            return Err(AuthError::AuthenticationFailed);
1109        }
1110
1111        // Clear any failed attempt tracking
1112        self.failed_attempts.write().clear_failures(username);
1113        self.failed_attempts.write().clear_failures(client_ip);
1114
1115        // Build session with resolved permissions
1116        let permissions = self.resolve_permissions(&principal);
1117
1118        // Generate session ID
1119        let mut session_id = vec![0u8; 32];
1120        self.rng
1121            .fill(&mut session_id)
1122            .map_err(|_| AuthError::Internal("RNG failed".to_string()))?;
1123        let session_id = hex::encode(&session_id);
1124
1125        let now = Instant::now();
1126        let session = AuthSession {
1127            id: session_id.clone(),
1128            principal_name: principal.name.clone(),
1129            principal_type: principal.principal_type.clone(),
1130            permissions,
1131            created_at: now,
1132            expires_at: now + self.config.session_timeout,
1133            client_ip: client_ip.to_string(),
1134        };
1135
1136        // Store session
1137        self.sessions.write().insert(session_id, session.clone());
1138
1139        debug!("Authenticated principal '{}' from {}", username, client_ip);
1140        Ok(session)
1141    }
1142
1143    /// Record a failed authentication attempt
1144    fn record_auth_failure(&self, username: &str, client_ip: &str) {
1145        let mut tracker = self.failed_attempts.write();
1146        tracker.record_failure(
1147            username,
1148            self.config.max_failed_attempts,
1149            self.config.lockout_duration,
1150        );
1151        tracker.record_failure(
1152            client_ip,
1153            self.config.max_failed_attempts * 2,
1154            self.config.lockout_duration,
1155        );
1156    }
1157
1158    /// Get an active session by ID
1159    pub fn get_session(&self, session_id: &str) -> Option<AuthSession> {
1160        let sessions = self.sessions.read();
1161        sessions.get(session_id).and_then(|s| {
1162            if s.is_expired() {
1163                None
1164            } else {
1165                Some(s.clone())
1166            }
1167        })
1168    }
1169
1170    /// Invalidate a session (logout)
1171    pub fn invalidate_session(&self, session_id: &str) {
1172        self.sessions.write().remove(session_id);
1173    }
1174
1175    /// Invalidate all sessions for a principal
1176    pub fn invalidate_all_sessions(&self, principal_name: &str) {
1177        self.sessions
1178            .write()
1179            .retain(|_, s| s.principal_name != principal_name);
1180    }
1181
1182    /// Clean up expired sessions
1183    pub fn cleanup_expired_sessions(&self) {
1184        self.sessions.write().retain(|_, s| !s.is_expired());
1185    }
1186
1187    /// Create a session for a principal (used by SCRAM after successful auth)
1188    pub fn create_session(&self, principal: &Principal) -> AuthSession {
1189        let permissions = self.resolve_permissions(principal);
1190
1191        let mut session_id = vec![0u8; 32];
1192        self.rng.fill(&mut session_id).expect("RNG failed");
1193        let session_id = hex::encode(&session_id);
1194
1195        let now = Instant::now();
1196        let session = AuthSession {
1197            id: session_id.clone(),
1198            principal_name: principal.name.clone(),
1199            principal_type: principal.principal_type.clone(),
1200            permissions,
1201            created_at: now,
1202            expires_at: now + self.config.session_timeout,
1203            client_ip: "scram".to_string(),
1204        };
1205
1206        self.sessions.write().insert(session_id, session.clone());
1207        session
1208    }
1209
1210    /// Create a session for an API key authentication
1211    ///
1212    /// Creates a synthetic session for API key-based authentication.
1213    /// The session inherits permissions from the specified roles.
1214    pub fn create_api_key_session(&self, principal_name: &str, roles: &[String]) -> AuthSession {
1215        // Resolve permissions from roles
1216        let mut permissions = HashSet::new();
1217        {
1218            let roles_map = self.roles.read();
1219            for role_name in roles {
1220                if let Some(role) = roles_map.get(role_name) {
1221                    permissions.extend(role.permissions.iter().cloned());
1222                }
1223            }
1224        }
1225
1226        let mut session_id = vec![0u8; 32];
1227        self.rng.fill(&mut session_id).expect("RNG failed");
1228        let session_id = hex::encode(&session_id);
1229
1230        let now = Instant::now();
1231        let session = AuthSession {
1232            id: session_id.clone(),
1233            principal_name: principal_name.to_string(),
1234            principal_type: PrincipalType::ServiceAccount,
1235            permissions,
1236            created_at: now,
1237            expires_at: now + self.config.session_timeout,
1238            client_ip: "api-key".to_string(),
1239        };
1240
1241        self.sessions.write().insert(session_id, session.clone());
1242        debug!(principal = %principal_name, "Created API key session");
1243        session
1244    }
1245
1246    /// Create a session for JWT/OIDC authentication
1247    ///
1248    /// Creates a synthetic session for JWT-authenticated users.
1249    /// The session inherits permissions from the specified groups/roles.
1250    pub fn create_jwt_session(&self, principal_name: &str, groups: &[String]) -> AuthSession {
1251        // Resolve permissions from groups (treated as roles)
1252        let mut permissions = HashSet::new();
1253        {
1254            let roles_map = self.roles.read();
1255            for group in groups {
1256                if let Some(role) = roles_map.get(group) {
1257                    permissions.extend(role.permissions.iter().cloned());
1258                }
1259            }
1260        }
1261
1262        let mut session_id = vec![0u8; 32];
1263        self.rng.fill(&mut session_id).expect("RNG failed");
1264        let session_id = hex::encode(&session_id);
1265
1266        let now = Instant::now();
1267        let session = AuthSession {
1268            id: session_id.clone(),
1269            principal_name: principal_name.to_string(),
1270            principal_type: PrincipalType::User,
1271            permissions,
1272            created_at: now,
1273            expires_at: now + self.config.session_timeout,
1274            client_ip: "jwt".to_string(),
1275        };
1276
1277        self.sessions.write().insert(session_id, session.clone());
1278        debug!(principal = %principal_name, groups = ?groups, "Created JWT session");
1279        session
1280    }
1281
1282    /// Get a session by principal name (for API key validation)
1283    pub fn get_session_by_principal(&self, principal_name: &str) -> Option<AuthSession> {
1284        let sessions = self.sessions.read();
1285        sessions
1286            .values()
1287            .find(|s| s.principal_name == principal_name && !s.is_expired())
1288            .cloned()
1289    }
1290
1291    // ========================================================================
1292    // Authorization
1293    // ========================================================================
1294
1295    /// Resolve all permissions for a principal
1296    fn resolve_permissions(&self, principal: &Principal) -> HashSet<(ResourceType, Permission)> {
1297        let mut permissions = HashSet::new();
1298
1299        let roles = self.roles.read();
1300
1301        // Collect permissions from all roles
1302        for role_name in &principal.roles {
1303            if let Some(role) = roles.get(role_name) {
1304                permissions.extend(role.permissions.iter().cloned());
1305            }
1306        }
1307
1308        permissions
1309    }
1310
1311    /// Check if a session/principal has permission on a resource
1312    pub fn authorize(
1313        &self,
1314        session: &AuthSession,
1315        resource: &ResourceType,
1316        permission: Permission,
1317        client_ip: &str,
1318    ) -> AuthResult<()> {
1319        // If auth is not required and no ACLs, allow everything
1320        if !self.config.require_authentication && !self.config.enable_acls {
1321            return Ok(());
1322        }
1323
1324        // Check session expiration
1325        if session.is_expired() {
1326            return Err(AuthError::TokenExpired);
1327        }
1328
1329        // Check role-based permissions
1330        if session.has_permission(resource, &permission) {
1331            return Ok(());
1332        }
1333
1334        // Check ACL entries
1335        if self.config.enable_acls
1336            && self.check_acls(&session.principal_name, resource, permission, client_ip)
1337        {
1338            return Ok(());
1339        }
1340
1341        // Default deny
1342        if self.config.default_deny {
1343            warn!(
1344                "Access denied: {} attempted {} on {:?} from {}",
1345                session.principal_name,
1346                format!("{:?}", permission),
1347                resource,
1348                client_ip
1349            );
1350            return Err(AuthError::PermissionDenied {
1351                principal: session.principal_name.clone(),
1352                permission: format!("{:?}", permission),
1353                resource: format!("{:?}", resource),
1354            });
1355        }
1356
1357        Ok(())
1358    }
1359
1360    /// Check ACL entries for authorization (indexed lookup — O(W + P) instead of O(N))
1361    fn check_acls(
1362        &self,
1363        principal: &str,
1364        resource: &ResourceType,
1365        permission: Permission,
1366        client_ip: &str,
1367    ) -> bool {
1368        let acls = self.acls.read();
1369
1370        // Only iterate entries for this principal + wildcard entries
1371        // Check deny rules first (deny takes precedence)
1372        for acl in acls.lookup(principal) {
1373            if !acl.allow
1374                && (acl.host == client_ip || acl.host == "*")
1375                && acl.resource.matches(resource)
1376                && (acl.permission == permission || acl.permission == Permission::All)
1377            {
1378                return false; // Explicit deny
1379            }
1380        }
1381
1382        // Check allow rules
1383        for acl in acls.lookup(principal) {
1384            if acl.allow
1385                && (acl.host == client_ip || acl.host == "*")
1386                && acl.resource.matches(resource)
1387                && (acl.permission == permission || acl.permission == Permission::All)
1388            {
1389                return true;
1390            }
1391        }
1392
1393        false
1394    }
1395
1396    /// Simple authorization check without session (for internal use)
1397    #[allow(unused_variables)]
1398    pub fn authorize_anonymous(
1399        &self,
1400        resource: &ResourceType,
1401        permission: Permission,
1402    ) -> AuthResult<()> {
1403        if !self.config.require_authentication {
1404            return Ok(());
1405        }
1406
1407        Err(AuthError::AuthenticationFailed)
1408    }
1409}
1410
1411// ============================================================================
1412// SASL/PLAIN Support (Kafka-compatible)
1413// ============================================================================
1414
1415/// SASL/PLAIN authentication handler
1416pub struct SaslPlainAuth {
1417    auth_manager: Arc<AuthManager>,
1418}
1419
1420impl SaslPlainAuth {
1421    pub fn new(auth_manager: Arc<AuthManager>) -> Self {
1422        Self { auth_manager }
1423    }
1424
1425    /// Parse and authenticate a SASL/PLAIN request
1426    /// Format: \[authzid\] NUL authcid NUL passwd
1427    pub fn authenticate(&self, sasl_bytes: &[u8], client_ip: &str) -> AuthResult<AuthSession> {
1428        // Parse SASL/PLAIN format: [authzid] \0 authcid \0 password
1429        let parts: Vec<&[u8]> = sasl_bytes.split(|&b| b == 0).collect();
1430
1431        if parts.len() < 2 {
1432            return Err(AuthError::InvalidCredentials);
1433        }
1434
1435        // Handle both 2-part (authcid, passwd) and 3-part (authzid, authcid, passwd)
1436        let (username, password) = if parts.len() == 2 {
1437            (
1438                std::str::from_utf8(parts[0]).map_err(|_| AuthError::InvalidCredentials)?,
1439                std::str::from_utf8(parts[1]).map_err(|_| AuthError::InvalidCredentials)?,
1440            )
1441        } else {
1442            // 3-part format - authzid is ignored, use authcid
1443            (
1444                std::str::from_utf8(parts[1]).map_err(|_| AuthError::InvalidCredentials)?,
1445                std::str::from_utf8(parts[2]).map_err(|_| AuthError::InvalidCredentials)?,
1446            )
1447        };
1448
1449        self.auth_manager
1450            .authenticate(username, password, client_ip)
1451    }
1452}
1453
1454// ============================================================================
1455// SCRAM-SHA-256 Support (RFC 5802 / RFC 7677)
1456// ============================================================================
1457
1458/// SCRAM-SHA-256 authentication state machine
1459///
1460/// Implements the full SCRAM protocol for secure password-based authentication.
1461/// This is significantly more secure than PLAIN because:
1462/// 1. The password is never sent over the wire (even encrypted)
1463/// 2. The server stores derived keys, not the password
1464/// 3. Mutual authentication (server proves it knows the password too)
1465/// 4. Protection against replay attacks via nonces
1466#[derive(Debug, Clone)]
1467pub enum ScramState {
1468    /// Waiting for client-first-message
1469    Initial,
1470    /// Waiting for client-final-message
1471    ServerFirstSent {
1472        username: String,
1473        client_nonce: String,
1474        server_nonce: String,
1475        salt: Vec<u8>,
1476        iterations: u32,
1477        auth_message: String,
1478    },
1479    /// Authentication complete (success or failure pending verification)
1480    Complete,
1481}
1482
1483/// SCRAM-SHA-256 authentication handler
1484pub struct SaslScramAuth {
1485    auth_manager: Arc<AuthManager>,
1486}
1487
1488impl SaslScramAuth {
1489    pub fn new(auth_manager: Arc<AuthManager>) -> Self {
1490        Self { auth_manager }
1491    }
1492
1493    /// Process client-first-message and return server-first-message
1494    ///
1495    /// Client-first-message format: `n,,n=<username>,r=<client-nonce>`
1496    /// Server-first-message format: `r=<combined-nonce>,s=<salt>,i=<iterations>`
1497    pub fn process_client_first(
1498        &self,
1499        client_first: &[u8],
1500        client_ip: &str,
1501    ) -> AuthResult<(ScramState, Vec<u8>)> {
1502        let client_first_str =
1503            std::str::from_utf8(client_first).map_err(|_| AuthError::InvalidCredentials)?;
1504
1505        // Parse client-first-message
1506        // Format: gs2-header,client-first-message-bare
1507        // gs2-header: n,, (no channel binding)
1508        // client-first-message-bare: n=<user>,r=<nonce>
1509
1510        let parts: Vec<&str> = client_first_str.splitn(3, ',').collect();
1511        if parts.len() < 3 {
1512            return Err(AuthError::InvalidCredentials);
1513        }
1514
1515        // Skip gs2-header (parts[0] and parts[1])
1516        let client_first_bare = if parts[0] == "n" || parts[0] == "y" || parts[0] == "p" {
1517            // gs2-header present, skip first two parts
1518            &client_first_str[parts[0].len() + 1 + parts[1].len() + 1..]
1519        } else {
1520            // No gs2-header, message is just client-first-message-bare
1521            client_first_str
1522        };
1523
1524        // Parse client-first-message-bare
1525        let mut username = None;
1526        let mut client_nonce = None;
1527
1528        for attr in client_first_bare.split(',') {
1529            if let Some(value) = attr.strip_prefix("n=") {
1530                username = Some(Self::unescape_username(value));
1531            } else if let Some(value) = attr.strip_prefix("r=") {
1532                client_nonce = Some(value.to_string());
1533            }
1534        }
1535
1536        let username = username.ok_or(AuthError::InvalidCredentials)?;
1537        let client_nonce = client_nonce.ok_or(AuthError::InvalidCredentials)?;
1538
1539        // Look up principal to get salt and iterations
1540        let (salt, iterations) = match self.auth_manager.get_principal(&username) {
1541            Some(principal) => (
1542                principal.password_hash.salt.clone(),
1543                principal.password_hash.iterations,
1544            ),
1545            None => {
1546                // User not found - generate fake salt to prevent enumeration
1547                // Still continue with the protocol to not leak timing info
1548                warn!(
1549                    "SCRAM auth for unknown user '{}' from {}",
1550                    username, client_ip
1551                );
1552                let rng = SystemRandom::new();
1553                let mut fake_salt = vec![0u8; 32];
1554                rng.fill(&mut fake_salt).expect("Failed to generate salt");
1555                (fake_salt, 4096)
1556            }
1557        };
1558
1559        // Generate server nonce (random bytes, base64 encoded)
1560        let rng = SystemRandom::new();
1561        let mut server_nonce_bytes = vec![0u8; 24];
1562        rng.fill(&mut server_nonce_bytes)
1563            .expect("Failed to generate nonce");
1564        let server_nonce = base64_encode(&server_nonce_bytes);
1565        let combined_nonce = format!("{}{}", client_nonce, server_nonce);
1566
1567        // Build server-first-message
1568        let salt_b64 = base64_encode(&salt);
1569        let server_first = format!("r={},s={},i={}", combined_nonce, salt_b64, iterations);
1570
1571        // Store auth message for later verification
1572        let auth_message = format!(
1573            "{},{},c=biws,r={}",
1574            client_first_bare, server_first, combined_nonce
1575        );
1576
1577        let state = ScramState::ServerFirstSent {
1578            username,
1579            client_nonce,
1580            server_nonce,
1581            salt,
1582            iterations,
1583            auth_message,
1584        };
1585
1586        Ok((state, server_first.into_bytes()))
1587    }
1588
1589    /// Process client-final-message and return server-final-message
1590    ///
1591    /// Client-final-message format: `c=<channel-binding>,r=<nonce>,p=<proof>`
1592    /// Server-final-message format: `v=<verifier>` (on success) or `e=<error>`
1593    pub fn process_client_final(
1594        &self,
1595        state: &ScramState,
1596        client_final: &[u8],
1597        client_ip: &str,
1598    ) -> AuthResult<(AuthSession, Vec<u8>)> {
1599        let ScramState::ServerFirstSent {
1600            username,
1601            client_nonce,
1602            server_nonce,
1603            salt: _,       // Not needed for verification, stored in principal
1604            iterations: _, // Not needed for verification, stored in principal
1605            auth_message,
1606        } = state
1607        else {
1608            return Err(AuthError::Internal("Invalid SCRAM state".to_string()));
1609        };
1610
1611        let client_final_str =
1612            std::str::from_utf8(client_final).map_err(|_| AuthError::InvalidCredentials)?;
1613
1614        // Parse client-final-message
1615        let mut channel_binding = None;
1616        let mut nonce = None;
1617        let mut proof = None;
1618
1619        for attr in client_final_str.split(',') {
1620            if let Some(value) = attr.strip_prefix("c=") {
1621                channel_binding = Some(value.to_string());
1622            } else if let Some(value) = attr.strip_prefix("r=") {
1623                nonce = Some(value.to_string());
1624            } else if let Some(value) = attr.strip_prefix("p=") {
1625                proof = Some(value.to_string());
1626            }
1627        }
1628
1629        let _channel_binding = channel_binding.ok_or(AuthError::InvalidCredentials)?;
1630        let nonce = nonce.ok_or(AuthError::InvalidCredentials)?;
1631        let proof_b64 = proof.ok_or(AuthError::InvalidCredentials)?;
1632
1633        // Verify nonce
1634        let expected_nonce = format!("{}{}", client_nonce, server_nonce);
1635        if nonce != expected_nonce {
1636            warn!("SCRAM nonce mismatch for '{}' from {}", username, client_ip);
1637            return Err(AuthError::InvalidCredentials);
1638        }
1639
1640        // Get principal
1641        let principal = self
1642            .auth_manager
1643            .get_principal(username)
1644            .ok_or(AuthError::AuthenticationFailed)?;
1645
1646        // Verify client proof
1647        // ClientProof = ClientKey XOR ClientSignature
1648        // ClientSignature = HMAC(StoredKey, AuthMessage)
1649        // We need to verify: H(ClientKey) == StoredKey
1650
1651        let client_proof = base64_decode(&proof_b64).map_err(|_| AuthError::InvalidCredentials)?;
1652
1653        // Compute expected client signature
1654        let client_signature =
1655            PasswordHash::hmac_sha256(&principal.password_hash.stored_key, auth_message.as_bytes());
1656
1657        // Recover ClientKey = ClientProof XOR ClientSignature
1658        if client_proof.len() != client_signature.len() {
1659            return Err(AuthError::InvalidCredentials);
1660        }
1661
1662        let client_key: Vec<u8> = client_proof
1663            .iter()
1664            .zip(client_signature.iter())
1665            .map(|(p, s)| p ^ s)
1666            .collect();
1667
1668        // Verify: H(ClientKey) == StoredKey (constant-time comparison)
1669        let computed_stored_key = Sha256::digest(&client_key);
1670        if !PasswordHash::constant_time_compare(
1671            &computed_stored_key,
1672            &principal.password_hash.stored_key,
1673        ) {
1674            warn!(
1675                "SCRAM authentication failed for '{}' from {}",
1676                username, client_ip
1677            );
1678            return Err(AuthError::AuthenticationFailed);
1679        }
1680
1681        // Compute server signature for mutual authentication
1682        let server_signature =
1683            PasswordHash::hmac_sha256(&principal.password_hash.server_key, auth_message.as_bytes());
1684        let server_final = format!("v={}", base64_encode(&server_signature));
1685
1686        // Create session
1687        let session = self.auth_manager.create_session(&principal);
1688        debug!(
1689            "SCRAM authentication successful for '{}' from {}",
1690            username, client_ip
1691        );
1692
1693        Ok((session, server_final.into_bytes()))
1694    }
1695
1696    /// Unescape SCRAM username (=2C -> , and =3D -> =)
1697    fn unescape_username(s: &str) -> String {
1698        s.replace("=2C", ",").replace("=3D", "=")
1699    }
1700}
1701
1702/// Base64 encode (standard alphabet)
1703fn base64_encode(data: &[u8]) -> String {
1704    use base64::{engine::general_purpose::STANDARD, Engine as _};
1705    STANDARD.encode(data)
1706}
1707
1708/// Base64 decode (standard alphabet)
1709fn base64_decode(s: &str) -> Result<Vec<u8>, base64::DecodeError> {
1710    use base64::{engine::general_purpose::STANDARD, Engine as _};
1711    STANDARD.decode(s)
1712}
1713
1714// ============================================================================
1715// Tests
1716// ============================================================================
1717
1718#[cfg(test)]
1719mod tests {
1720    use super::*;
1721
1722    #[test]
1723    fn test_password_hash_verify() {
1724        let hash = PasswordHash::new("Test@Pass123");
1725        assert!(hash.verify("Test@Pass123"));
1726        assert!(!hash.verify("Wrong@Pass1"));
1727        assert!(!hash.verify(""));
1728        assert!(!hash.verify("Test@Pass12")); // Off by one
1729    }
1730
1731    #[test]
1732    fn test_password_hash_timing_attack_resistant() {
1733        // Both wrong passwords should take similar time
1734        // (This is more of a design assertion than a precise timing test)
1735        let hash = PasswordHash::new("Correct@Pass1");
1736
1737        // Wrong but similar length
1738        assert!(!hash.verify("Wrong@Pass1"));
1739
1740        // Wrong and very different
1741        assert!(!hash.verify("x"));
1742
1743        // Both should still return false (constant-time)
1744    }
1745
1746    #[test]
1747    fn test_create_principal() {
1748        let auth = AuthManager::new_default();
1749
1750        let mut roles = HashSet::new();
1751        roles.insert("producer".to_string());
1752
1753        auth.create_principal(
1754            "alice",
1755            "Secure@Pass123",
1756            PrincipalType::User,
1757            roles.clone(),
1758        )
1759        .expect("Failed to create principal");
1760
1761        // Duplicate should fail
1762        assert!(auth
1763            .create_principal("alice", "Other@Pass1", PrincipalType::User, roles.clone())
1764            .is_err());
1765
1766        // Verify principal exists
1767        let principal = auth.get_principal("alice").expect("Principal not found");
1768        assert_eq!(principal.name, "alice");
1769        assert!(principal.roles.contains("producer"));
1770    }
1771
1772    #[test]
1773    fn test_authentication_success() {
1774        let auth = AuthManager::new_default();
1775
1776        let mut roles = HashSet::new();
1777        roles.insert("producer".to_string());
1778
1779        auth.create_principal("bob", "Bob@Pass123", PrincipalType::User, roles)
1780            .unwrap();
1781
1782        let session = auth
1783            .authenticate("bob", "Bob@Pass123", "127.0.0.1")
1784            .expect("Authentication should succeed");
1785
1786        assert_eq!(session.principal_name, "bob");
1787        assert!(!session.is_expired());
1788    }
1789
1790    #[test]
1791    fn test_authentication_failure() {
1792        let auth = AuthManager::new_default();
1793
1794        let mut roles = HashSet::new();
1795        roles.insert("producer".to_string());
1796
1797        auth.create_principal("charlie", "Correct@Pass1", PrincipalType::User, roles)
1798            .unwrap();
1799
1800        // Wrong password
1801        let result = auth.authenticate("charlie", "Wrong@Pass1", "127.0.0.1");
1802        assert!(matches!(result, Err(AuthError::AuthenticationFailed)));
1803
1804        // Unknown user
1805        let result = auth.authenticate("unknown", "password", "127.0.0.1");
1806        assert!(matches!(result, Err(AuthError::AuthenticationFailed)));
1807    }
1808
1809    #[test]
1810    fn test_rate_limiting() {
1811        let config = AuthConfig {
1812            max_failed_attempts: 3,
1813            // Use a long lockout so it doesn't expire during slow debug-mode PBKDF2 ops
1814            lockout_duration: Duration::from_secs(120),
1815            ..Default::default()
1816        };
1817        let auth = AuthManager::new(config);
1818
1819        let mut roles = HashSet::new();
1820        roles.insert("consumer".to_string());
1821        auth.create_principal("eve", "Eve@Pass123", PrincipalType::User, roles)
1822            .unwrap();
1823
1824        // Fail 3 times
1825        for _ in 0..3 {
1826            let _ = auth.authenticate("eve", "wrong", "192.168.1.1");
1827        }
1828
1829        // Now should be rate limited
1830        let result = auth.authenticate("eve", "Eve@Pass123", "192.168.1.1");
1831        assert!(matches!(result, Err(AuthError::RateLimited)));
1832
1833        // Clear lockout manually (instead of sleeping, which is unreliable in debug builds
1834        // where PBKDF2 at 600k iterations can exceed short lockout durations)
1835        {
1836            let mut tracker = auth.failed_attempts.write();
1837            tracker.clear_failures("eve");
1838            tracker.clear_failures("192.168.1.1");
1839        }
1840
1841        // Should work now
1842        let result = auth.authenticate("eve", "Eve@Pass123", "192.168.1.1");
1843        assert!(result.is_ok());
1844    }
1845
1846    #[test]
1847    fn test_role_permissions() {
1848        let auth = AuthManager::with_auth_enabled();
1849
1850        let mut roles = HashSet::new();
1851        roles.insert("producer".to_string());
1852        auth.create_principal("producer_user", "Prod@Pass123", PrincipalType::User, roles)
1853            .unwrap();
1854
1855        let session = auth
1856            .authenticate("producer_user", "Prod@Pass123", "127.0.0.1")
1857            .unwrap();
1858
1859        // Producer should have write permission on topics
1860        assert!(session.has_permission(
1861            &ResourceType::Topic("orders".to_string()),
1862            &Permission::Write
1863        ));
1864
1865        // Producer should not have delete permission
1866        assert!(!session.has_permission(
1867            &ResourceType::Topic("orders".to_string()),
1868            &Permission::Delete
1869        ));
1870    }
1871
1872    #[test]
1873    fn test_admin_has_all_permissions() {
1874        let auth = AuthManager::with_auth_enabled();
1875
1876        let mut roles = HashSet::new();
1877        roles.insert("admin".to_string());
1878        auth.create_principal("admin_user", "Admin@Pass1", PrincipalType::User, roles)
1879            .unwrap();
1880
1881        let session = auth
1882            .authenticate("admin_user", "Admin@Pass1", "127.0.0.1")
1883            .unwrap();
1884
1885        // Admin should have all permissions
1886        assert!(session.has_permission(&ResourceType::Cluster, &Permission::All));
1887        assert!(session.has_permission(
1888            &ResourceType::Topic("any_topic".to_string()),
1889            &Permission::Delete
1890        ));
1891    }
1892
1893    #[test]
1894    fn test_resource_pattern_matching() {
1895        assert!(ResourceType::TopicPattern("*".to_string())
1896            .matches(&ResourceType::Topic("anything".to_string())));
1897
1898        assert!(ResourceType::TopicPattern("orders-*".to_string())
1899            .matches(&ResourceType::Topic("orders-us".to_string())));
1900
1901        assert!(ResourceType::TopicPattern("orders-*".to_string())
1902            .matches(&ResourceType::Topic("orders-eu".to_string())));
1903
1904        assert!(!ResourceType::TopicPattern("orders-*".to_string())
1905            .matches(&ResourceType::Topic("events-us".to_string())));
1906    }
1907
1908    #[test]
1909    fn test_acl_enforcement() {
1910        let auth = AuthManager::new(AuthConfig {
1911            require_authentication: true,
1912            enable_acls: true,
1913            default_deny: true,
1914            ..Default::default()
1915        });
1916
1917        let mut roles = HashSet::new();
1918        roles.insert("read-only".to_string());
1919        auth.create_principal("reader", "Read@Pass123", PrincipalType::User, roles)
1920            .unwrap();
1921
1922        // Add ACL allowing write to specific topic
1923        auth.add_acl(AclEntry {
1924            principal: "reader".to_string(),
1925            resource: ResourceType::Topic("special-topic".to_string()),
1926            permission: Permission::Write,
1927            allow: true,
1928            host: "*".to_string(),
1929        });
1930
1931        let session = auth
1932            .authenticate("reader", "Read@Pass123", "127.0.0.1")
1933            .unwrap();
1934
1935        // Should be able to write to special-topic via ACL
1936        let result = auth.authorize(
1937            &session,
1938            &ResourceType::Topic("special-topic".to_string()),
1939            Permission::Write,
1940            "127.0.0.1",
1941        );
1942        assert!(result.is_ok());
1943
1944        // Should NOT be able to write to other topics
1945        let result = auth.authorize(
1946            &session,
1947            &ResourceType::Topic("other-topic".to_string()),
1948            Permission::Write,
1949            "127.0.0.1",
1950        );
1951        assert!(result.is_err());
1952    }
1953
1954    #[test]
1955    fn test_sasl_plain_authentication() {
1956        let auth = Arc::new(AuthManager::new_default());
1957
1958        let mut roles = HashSet::new();
1959        roles.insert("producer".to_string());
1960        auth.create_principal("sasl_user", "Sasl@Pass123", PrincipalType::User, roles)
1961            .unwrap();
1962
1963        let sasl = SaslPlainAuth::new(auth);
1964
1965        // Test 2-part format: username\0password
1966        let two_part = b"sasl_user\0Sasl@Pass123";
1967        let result = sasl.authenticate(two_part, "127.0.0.1");
1968        assert!(result.is_ok());
1969
1970        // Test 3-part format: authzid\0username\0password
1971        let three_part = b"\0sasl_user\0Sasl@Pass123";
1972        let result = sasl.authenticate(three_part, "127.0.0.1");
1973        assert!(result.is_ok());
1974    }
1975
1976    #[test]
1977    fn test_session_expiration() {
1978        let config = AuthConfig {
1979            session_timeout: Duration::from_millis(100),
1980            ..Default::default()
1981        };
1982        let auth = AuthManager::new(config);
1983
1984        let mut roles = HashSet::new();
1985        roles.insert("producer".to_string());
1986        auth.create_principal("expiring", "Expiry@Pass1", PrincipalType::User, roles)
1987            .unwrap();
1988
1989        let session = auth
1990            .authenticate("expiring", "Expiry@Pass1", "127.0.0.1")
1991            .unwrap();
1992        assert!(!session.is_expired());
1993
1994        // Wait for session to expire
1995        std::thread::sleep(Duration::from_millis(150));
1996
1997        // Session should be expired
1998        let session = AuthSession {
1999            expires_at: session.expires_at,
2000            ..session
2001        };
2002        assert!(session.is_expired());
2003    }
2004
2005    #[test]
2006    fn test_delete_principal_invalidates_sessions() {
2007        let auth = AuthManager::new_default();
2008
2009        let mut roles = HashSet::new();
2010        roles.insert("producer".to_string());
2011        auth.create_principal("deleteme", "Delete@Pass1", PrincipalType::User, roles)
2012            .unwrap();
2013
2014        let session = auth
2015            .authenticate("deleteme", "Delete@Pass1", "127.0.0.1")
2016            .unwrap();
2017
2018        // Session should exist
2019        assert!(auth.get_session(&session.id).is_some());
2020
2021        // Delete principal
2022        auth.delete_principal("deleteme").unwrap();
2023
2024        // Session should be gone
2025        assert!(auth.get_session(&session.id).is_none());
2026    }
2027
2028    #[test]
2029    fn test_disabled_principal_cannot_authenticate() {
2030        let auth = AuthManager::new_default();
2031
2032        let mut roles = HashSet::new();
2033        roles.insert("producer".to_string());
2034        auth.create_principal("disabled_user", "Disable@Pass1", PrincipalType::User, roles)
2035            .unwrap();
2036
2037        // Disable the principal
2038        {
2039            let mut principals = auth.principals.write();
2040            if let Some(p) = principals.get_mut("disabled_user") {
2041                p.enabled = false;
2042            }
2043        }
2044
2045        // Should fail to authenticate
2046        let result = auth.authenticate("disabled_user", "Disable@Pass1", "127.0.0.1");
2047        assert!(matches!(result, Err(AuthError::AuthenticationFailed)));
2048    }
2049
2050    #[test]
2051    fn test_password_hash_debug_redacts_sensitive_data() {
2052        let hash = PasswordHash::new("super_secret_password");
2053        let debug_output = format!("{:?}", hash);
2054
2055        // Should contain REDACTED markers
2056        assert!(
2057            debug_output.contains("[REDACTED]"),
2058            "Debug output should contain [REDACTED]"
2059        );
2060
2061        // Should NOT contain actual salt or key material
2062        // Salt and keys are binary data, but let's ensure no suspicious patterns
2063        assert!(
2064            !debug_output.contains("super_secret_password"),
2065            "Debug output should not contain password"
2066        );
2067
2068        // Should show iterations (not sensitive)
2069        assert!(
2070            debug_output.contains("iterations"),
2071            "Debug output should show iterations field"
2072        );
2073    }
2074
2075    #[test]
2076    fn test_principal_debug_redacts_password_hash() {
2077        let principal = Principal {
2078            name: "test_user".to_string(),
2079            principal_type: PrincipalType::User,
2080            password_hash: PasswordHash::new("secret_password"),
2081            roles: HashSet::from(["admin".to_string()]),
2082            enabled: true,
2083            metadata: HashMap::new(),
2084            created_at: 1234567890,
2085        };
2086
2087        let debug_output = format!("{:?}", principal);
2088
2089        // Should contain REDACTED for password_hash
2090        assert!(
2091            debug_output.contains("[REDACTED]"),
2092            "Debug output should contain [REDACTED]: {}",
2093            debug_output
2094        );
2095
2096        // Should still show non-sensitive fields
2097        assert!(
2098            debug_output.contains("test_user"),
2099            "Debug output should show name"
2100        );
2101        assert!(
2102            debug_output.contains("admin"),
2103            "Debug output should show roles"
2104        );
2105    }
2106
2107    // ========================================================================
2108    // SCRAM-SHA-256 Tests
2109    // ========================================================================
2110
2111    #[test]
2112    fn test_scram_full_handshake() {
2113        use sha2::{Digest, Sha256};
2114
2115        let auth = Arc::new(AuthManager::new_default());
2116
2117        // Create a user
2118        let mut roles = HashSet::new();
2119        roles.insert("producer".to_string());
2120        auth.create_principal("scram_user", "Scram@Pass123", PrincipalType::User, roles)
2121            .expect("Failed to create principal");
2122
2123        let scram = SaslScramAuth::new(auth.clone());
2124
2125        // Step 1: Client sends client-first-message
2126        let client_nonce = "rOprNGfwEbeRWgbNEkqO";
2127        let client_first = format!("n,,n=scram_user,r={}", client_nonce);
2128
2129        let (state, server_first) = scram
2130            .process_client_first(client_first.as_bytes(), "127.0.0.1")
2131            .expect("client-first processing should succeed");
2132
2133        // Verify server-first-message format
2134        let server_first_str = std::str::from_utf8(&server_first).expect("valid UTF-8");
2135        assert!(server_first_str.starts_with(&format!("r={}", client_nonce)));
2136        assert!(server_first_str.contains(",s="));
2137        assert!(server_first_str.contains(",i="));
2138
2139        // Parse server-first-message to build client-final
2140        let ScramState::ServerFirstSent {
2141            username: _,
2142            client_nonce: _,
2143            server_nonce: _,
2144            salt,
2145            iterations,
2146            auth_message: _,
2147        } = &state
2148        else {
2149            panic!("Expected ServerFirstSent state");
2150        };
2151
2152        // Step 2: Client computes proof and sends client-final-message
2153        // ClientProof = ClientKey XOR ClientSignature
2154        let salted_password = compute_salted_password("Scram@Pass123", salt, *iterations);
2155        let client_key = PasswordHash::hmac_sha256(&salted_password, b"Client Key");
2156        let stored_key = Sha256::digest(&client_key);
2157
2158        // Build auth message
2159        let client_first_bare = format!("n=scram_user,r={}", client_nonce);
2160        let combined_nonce: String = server_first_str
2161            .split(',')
2162            .find(|s| s.starts_with("r="))
2163            .map(|s| &s[2..])
2164            .unwrap()
2165            .to_string();
2166
2167        let auth_message = format!(
2168            "{},{},c=biws,r={}",
2169            client_first_bare, server_first_str, combined_nonce
2170        );
2171
2172        let client_signature = PasswordHash::hmac_sha256(&stored_key, auth_message.as_bytes());
2173        let client_proof: Vec<u8> = client_key
2174            .iter()
2175            .zip(client_signature.iter())
2176            .map(|(k, s)| k ^ s)
2177            .collect();
2178
2179        let client_final = format!(
2180            "c=biws,r={},p={}",
2181            combined_nonce,
2182            base64_encode(&client_proof)
2183        );
2184
2185        // Step 3: Server verifies and responds
2186        let (session, server_final) = scram
2187            .process_client_final(&state, client_final.as_bytes(), "127.0.0.1")
2188            .expect("client-final processing should succeed");
2189
2190        // Verify session was created
2191        assert_eq!(session.principal_name, "scram_user");
2192        assert!(!session.is_expired());
2193
2194        // Verify server-final-message (mutual authentication)
2195        let server_final_str = std::str::from_utf8(&server_final).expect("valid UTF-8");
2196        assert!(server_final_str.starts_with("v="));
2197    }
2198
2199    #[test]
2200    fn test_scram_wrong_password() {
2201        let auth = Arc::new(AuthManager::new_default());
2202
2203        let mut roles = HashSet::new();
2204        roles.insert("producer".to_string());
2205        auth.create_principal("scram_user2", "Correct@Pass1", PrincipalType::User, roles)
2206            .expect("Failed to create principal");
2207
2208        let scram = SaslScramAuth::new(auth.clone());
2209
2210        // Client-first with correct username
2211        let client_nonce = "test_nonce_12345";
2212        let client_first = format!("n,,n=scram_user2,r={}", client_nonce);
2213
2214        let (state, server_first) = scram
2215            .process_client_first(client_first.as_bytes(), "127.0.0.1")
2216            .expect("client-first processing should succeed");
2217
2218        // Parse server response
2219        let server_first_str = std::str::from_utf8(&server_first).expect("valid UTF-8");
2220        let combined_nonce: String = server_first_str
2221            .split(',')
2222            .find(|s| s.starts_with("r="))
2223            .map(|s| &s[2..])
2224            .unwrap()
2225            .to_string();
2226
2227        // Compute proof with WRONG password
2228        let ScramState::ServerFirstSent {
2229            salt, iterations, ..
2230        } = &state
2231        else {
2232            panic!("Expected ServerFirstSent state");
2233        };
2234
2235        let salted_password = compute_salted_password("Wrong@Pass1", salt, *iterations);
2236        let client_key = PasswordHash::hmac_sha256(&salted_password, b"Client Key");
2237        let stored_key = sha2::Sha256::digest(&client_key);
2238
2239        let client_first_bare = format!("n=scram_user2,r={}", client_nonce);
2240        let auth_message = format!(
2241            "{},{},c=biws,r={}",
2242            client_first_bare, server_first_str, combined_nonce
2243        );
2244
2245        let client_signature = PasswordHash::hmac_sha256(&stored_key, auth_message.as_bytes());
2246        let client_proof: Vec<u8> = client_key
2247            .iter()
2248            .zip(client_signature.iter())
2249            .map(|(k, s)| k ^ s)
2250            .collect();
2251
2252        let client_final = format!(
2253            "c=biws,r={},p={}",
2254            combined_nonce,
2255            base64_encode(&client_proof)
2256        );
2257
2258        // Should fail
2259        let result = scram.process_client_final(&state, client_final.as_bytes(), "127.0.0.1");
2260        assert!(result.is_err());
2261        assert!(matches!(result, Err(AuthError::AuthenticationFailed)));
2262    }
2263
2264    #[test]
2265    fn test_scram_nonexistent_user() {
2266        let auth = Arc::new(AuthManager::new_default());
2267        let scram = SaslScramAuth::new(auth.clone());
2268
2269        // Client-first for nonexistent user
2270        let client_first = "n,,n=nonexistent_user,r=test_nonce";
2271
2272        // Should still return a server-first (to prevent enumeration)
2273        let result = scram.process_client_first(client_first.as_bytes(), "127.0.0.1");
2274        assert!(
2275            result.is_ok(),
2276            "Should return fake server-first to prevent enumeration"
2277        );
2278
2279        let (state, server_first) = result.unwrap();
2280        let server_first_str = std::str::from_utf8(&server_first).expect("valid UTF-8");
2281
2282        // Should have valid format (fake salt/iterations)
2283        assert!(server_first_str.contains("r=test_nonce"));
2284        assert!(server_first_str.contains(",s="));
2285        assert!(server_first_str.contains(",i="));
2286
2287        // Final step should fail
2288        let combined_nonce: String = server_first_str
2289            .split(',')
2290            .find(|s| s.starts_with("r="))
2291            .map(|s| &s[2..])
2292            .unwrap()
2293            .to_string();
2294
2295        let client_final = format!("c=biws,r={},p=dW5rbm93bg==", combined_nonce);
2296        let result = scram.process_client_final(&state, client_final.as_bytes(), "127.0.0.1");
2297        assert!(result.is_err());
2298    }
2299
2300    #[test]
2301    fn test_scram_nonce_mismatch() {
2302        let auth = Arc::new(AuthManager::new_default());
2303
2304        let mut roles = HashSet::new();
2305        roles.insert("producer".to_string());
2306        auth.create_principal("scram_user3", "Scram3@Pass1", PrincipalType::User, roles)
2307            .expect("Failed to create principal");
2308
2309        let scram = SaslScramAuth::new(auth.clone());
2310
2311        let client_first = "n,,n=scram_user3,r=original_nonce";
2312        let (state, _server_first) = scram
2313            .process_client_first(client_first.as_bytes(), "127.0.0.1")
2314            .expect("client-first should succeed");
2315
2316        // Client-final with different nonce prefix (attack attempt)
2317        let client_final = "c=biws,r=tampered_nonce_plus_server,p=dW5rbm93bg==";
2318        let result = scram.process_client_final(&state, client_final.as_bytes(), "127.0.0.1");
2319        assert!(result.is_err());
2320        assert!(matches!(result, Err(AuthError::InvalidCredentials)));
2321    }
2322
2323    /// Helper: Compute salted password (PBKDF2)
2324    fn compute_salted_password(password: &str, salt: &[u8], iterations: u32) -> Vec<u8> {
2325        use hmac::{Hmac, Mac};
2326        type HmacSha256 = Hmac<sha2::Sha256>;
2327
2328        let mut result = vec![0u8; 32];
2329
2330        let mut mac =
2331            HmacSha256::new_from_slice(password.as_bytes()).expect("HMAC accepts any key length");
2332        mac.update(salt);
2333        mac.update(&1u32.to_be_bytes());
2334        let mut u = mac.finalize().into_bytes();
2335        result.copy_from_slice(&u);
2336
2337        for _ in 1..iterations {
2338            let mut mac = HmacSha256::new_from_slice(password.as_bytes())
2339                .expect("HMAC accepts any key length");
2340            mac.update(&u);
2341            u = mac.finalize().into_bytes();
2342
2343            for (r, ui) in result.iter_mut().zip(u.iter()) {
2344                *r ^= ui;
2345            }
2346        }
2347
2348        result
2349    }
2350}