1use parking_lot::RwLock;
27use ring::rand::{SecureRandom, SystemRandom};
28use serde::{Deserialize, Serialize};
29use sha2::{Digest, Sha256};
30use std::collections::{HashMap, HashSet};
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33use thiserror::Error;
34use tracing::{debug, warn};
35
36#[derive(Error, Debug, Clone)]
41pub enum AuthError {
42 #[error("Authentication failed")]
43 AuthenticationFailed,
44
45 #[error("Invalid credentials")]
46 InvalidCredentials,
47
48 #[error("Principal not found: {0}")]
49 PrincipalNotFound(String),
50
51 #[error("Principal already exists: {0}")]
52 PrincipalAlreadyExists(String),
53
54 #[error("Access denied: {0}")]
55 AccessDenied(String),
56
57 #[error("Permission denied: {principal} lacks {permission} on {resource}")]
58 PermissionDenied {
59 principal: String,
60 permission: String,
61 resource: String,
62 },
63
64 #[error("Role not found: {0}")]
65 RoleNotFound(String),
66
67 #[error("Invalid token")]
68 InvalidToken,
69
70 #[error("Token expired")]
71 TokenExpired,
72
73 #[error("Rate limited: too many authentication failures")]
74 RateLimited,
75
76 #[error("Internal error: {0}")]
77 Internal(String),
78}
79
80pub type AuthResult<T> = std::result::Result<T, AuthError>;
81
82#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
88pub enum ResourceType {
89 Cluster,
91 Topic(String),
93 TopicPattern(String),
95 ConsumerGroup(String),
97 Schema(String),
99 TransactionalId(String),
101}
102
103impl ResourceType {
104 pub fn matches(&self, other: &ResourceType) -> bool {
106 match (self, other) {
107 (a, b) if a == b => true,
109
110 (ResourceType::TopicPattern(pattern), ResourceType::Topic(name)) => {
112 Self::glob_match(pattern, name)
113 }
114 (ResourceType::Topic(name), ResourceType::TopicPattern(pattern)) => {
115 Self::glob_match(pattern, name)
116 }
117
118 _ => false,
119 }
120 }
121
122 fn glob_match(pattern: &str, text: &str) -> bool {
124 if pattern == "*" {
125 return true;
126 }
127
128 if let Some(prefix) = pattern.strip_suffix('*') {
129 return text.starts_with(prefix);
130 }
131
132 if let Some(suffix) = pattern.strip_prefix('*') {
133 return text.ends_with(suffix);
134 }
135
136 if let Some(idx) = pattern.find('*') {
138 let prefix = &pattern[..idx];
139 let suffix = &pattern[idx + 1..];
140 return text.starts_with(prefix)
141 && text.ends_with(suffix)
142 && text.len() >= prefix.len() + suffix.len();
143 }
144
145 pattern == text
146 }
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
155pub enum Permission {
156 Read, Write, Create, Delete, Alter, Describe, GroupRead, GroupDelete, ClusterAction, IdempotentWrite, AlterConfigs, DescribeConfigs, All, }
179
180impl Permission {
181 pub fn implies(&self, other: &Permission) -> bool {
184 if self == other {
186 return true;
187 }
188
189 match self {
190 Permission::All => true, Permission::Alter | Permission::Write | Permission::Read => {
193 matches!(other, Permission::Describe)
194 }
195 _ => false,
196 }
197 }
198}
199
200#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
206pub enum PrincipalType {
207 User,
208 ServiceAccount,
209 Anonymous,
210}
211
212#[derive(Clone, Serialize, Deserialize)]
219pub struct Principal {
220 pub name: String,
222
223 pub principal_type: PrincipalType,
225
226 pub password_hash: PasswordHash,
228
229 pub roles: HashSet<String>,
231
232 pub enabled: bool,
234
235 pub metadata: HashMap<String, String>,
237
238 pub created_at: u64,
240}
241
242impl std::fmt::Debug for Principal {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 f.debug_struct("Principal")
245 .field("name", &self.name)
246 .field("principal_type", &self.principal_type)
247 .field("password_hash", &"[REDACTED]")
248 .field("roles", &self.roles)
249 .field("enabled", &self.enabled)
250 .field("metadata", &self.metadata)
251 .field("created_at", &self.created_at)
252 .finish()
253 }
254}
255
256#[derive(Clone, Serialize, Deserialize)]
263pub struct PasswordHash {
264 pub salt: Vec<u8>,
266 pub iterations: u32,
268 pub server_key: Vec<u8>,
270 pub stored_key: Vec<u8>,
272}
273
274impl std::fmt::Debug for PasswordHash {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 f.debug_struct("PasswordHash")
277 .field("salt", &"[REDACTED]")
278 .field("iterations", &self.iterations)
279 .field("server_key", &"[REDACTED]")
280 .field("stored_key", &"[REDACTED]")
281 .finish()
282 }
283}
284
285impl PasswordHash {
286 pub fn new(password: &str) -> Self {
288 let rng = SystemRandom::new();
289 let mut salt = vec![0u8; 32];
290 rng.fill(&mut salt).expect("Failed to generate salt");
291
292 Self::with_salt(password, &salt, 600_000)
293 }
294
295 pub fn with_salt(password: &str, salt: &[u8], iterations: u32) -> Self {
297 let salted_password = Self::pbkdf2_sha256(password.as_bytes(), salt, iterations);
299
300 let client_key = Self::hmac_sha256(&salted_password, b"Client Key");
302 let server_key = Self::hmac_sha256(&salted_password, b"Server Key");
303
304 let stored_key = Sha256::digest(&client_key).to_vec();
306
307 PasswordHash {
308 salt: salt.to_vec(),
309 iterations,
310 server_key,
311 stored_key,
312 }
313 }
314
315 pub fn verify(&self, password: &str) -> bool {
317 let salted_password = Self::pbkdf2_sha256(password.as_bytes(), &self.salt, self.iterations);
318 let client_key = Self::hmac_sha256(&salted_password, b"Client Key");
319 let stored_key = Sha256::digest(&client_key);
320
321 Self::constant_time_compare(&stored_key, &self.stored_key)
323 }
324
325 pub fn constant_time_compare(a: &[u8], b: &[u8]) -> bool {
327 if a.len() != b.len() {
328 return false;
329 }
330
331 let mut result = 0u8;
333 for (x, y) in a.iter().zip(b.iter()) {
334 result |= x ^ y;
335 }
336 result == 0
337 }
338
339 fn pbkdf2_sha256(password: &[u8], salt: &[u8], iterations: u32) -> Vec<u8> {
341 use hmac::{Hmac, Mac};
342 type HmacSha256 = Hmac<Sha256>;
343
344 let mut result = vec![0u8; 32];
345
346 let mut mac = HmacSha256::new_from_slice(password).expect("HMAC accepts any key length");
348 mac.update(salt);
349 mac.update(&1u32.to_be_bytes());
350 let mut u = mac.finalize().into_bytes();
351 result.copy_from_slice(&u);
352
353 for _ in 1..iterations {
355 let mut mac =
356 HmacSha256::new_from_slice(password).expect("HMAC accepts any key length");
357 mac.update(&u);
358 u = mac.finalize().into_bytes();
359
360 for (r, ui) in result.iter_mut().zip(u.iter()) {
361 *r ^= ui;
362 }
363 }
364
365 result
366 }
367
368 pub fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
370 use hmac::{Hmac, Mac};
371 type HmacSha256 = Hmac<Sha256>;
372
373 let mut mac = HmacSha256::new_from_slice(key).expect("HMAC accepts any key length");
374 mac.update(data);
375 mac.finalize().into_bytes().to_vec()
376 }
377}
378
379#[derive(Debug, Clone, Serialize, Deserialize)]
385pub struct Role {
386 pub name: String,
388
389 pub description: String,
391
392 pub permissions: HashSet<(ResourceType, Permission)>,
394
395 pub builtin: bool,
397}
398
399impl Role {
400 pub fn admin() -> Self {
402 let mut permissions = HashSet::new();
403 permissions.insert((ResourceType::Cluster, Permission::All));
404
405 Role {
406 name: "admin".to_string(),
407 description: "Full administrative access to all resources".to_string(),
408 permissions,
409 builtin: true,
410 }
411 }
412
413 pub fn producer() -> Self {
415 let mut permissions = HashSet::new();
416 permissions.insert((
417 ResourceType::TopicPattern("*".to_string()),
418 Permission::Write,
419 ));
420 permissions.insert((
421 ResourceType::TopicPattern("*".to_string()),
422 Permission::Describe,
423 ));
424 permissions.insert((ResourceType::Cluster, Permission::IdempotentWrite));
425
426 Role {
427 name: "producer".to_string(),
428 description: "Can produce to all topics".to_string(),
429 permissions,
430 builtin: true,
431 }
432 }
433
434 pub fn consumer() -> Self {
436 let mut permissions = HashSet::new();
437 permissions.insert((
438 ResourceType::TopicPattern("*".to_string()),
439 Permission::Read,
440 ));
441 permissions.insert((
442 ResourceType::TopicPattern("*".to_string()),
443 Permission::Describe,
444 ));
445 permissions.insert((
446 ResourceType::ConsumerGroup("*".to_string()),
447 Permission::GroupRead,
448 ));
449
450 Role {
451 name: "consumer".to_string(),
452 description: "Can consume from all topics".to_string(),
453 permissions,
454 builtin: true,
455 }
456 }
457
458 pub fn read_only() -> Self {
460 let mut permissions = HashSet::new();
461 permissions.insert((
462 ResourceType::TopicPattern("*".to_string()),
463 Permission::Read,
464 ));
465 permissions.insert((
466 ResourceType::TopicPattern("*".to_string()),
467 Permission::Describe,
468 ));
469
470 Role {
471 name: "read-only".to_string(),
472 description: "Read-only access to all topics".to_string(),
473 permissions,
474 builtin: true,
475 }
476 }
477}
478
479#[derive(Debug, Clone, Serialize, Deserialize)]
485pub struct AclEntry {
486 pub principal: String,
488
489 pub resource: ResourceType,
491
492 pub permission: Permission,
494
495 pub allow: bool,
497
498 pub host: String,
500}
501
502#[derive(Debug, Default)]
509struct AclIndex {
510 by_principal: HashMap<String, Vec<AclEntry>>,
512 wildcard: Vec<AclEntry>,
514}
515
516impl AclIndex {
517 fn new() -> Self {
518 Self::default()
519 }
520
521 fn push(&mut self, entry: AclEntry) {
522 if entry.principal == "*" {
523 self.wildcard.push(entry);
524 } else {
525 self.by_principal
526 .entry(entry.principal.clone())
527 .or_default()
528 .push(entry);
529 }
530 }
531
532 fn retain<F: Fn(&AclEntry) -> bool>(&mut self, predicate: F) {
533 self.wildcard.retain(&predicate);
534 self.by_principal.retain(|_, entries| {
535 entries.retain(&predicate);
536 !entries.is_empty()
537 });
538 }
539
540 fn lookup(&self, principal: &str) -> impl Iterator<Item = &AclEntry> {
542 let specific = self
543 .by_principal
544 .get(principal)
545 .map(|v| v.as_slice())
546 .unwrap_or(&[]);
547 specific.iter().chain(self.wildcard.iter())
548 }
549
550 fn to_vec(&self) -> Vec<AclEntry> {
551 self.wildcard
552 .iter()
553 .chain(self.by_principal.values().flatten())
554 .cloned()
555 .collect()
556 }
557}
558
559#[derive(Debug, Clone)]
565pub struct AuthSession {
566 pub id: String,
568
569 pub principal_name: String,
571
572 pub principal_type: PrincipalType,
574
575 pub permissions: HashSet<(ResourceType, Permission)>,
577
578 pub created_at: Instant,
580
581 pub expires_at: Instant,
583
584 pub client_ip: String,
586}
587
588impl AuthSession {
589 pub fn is_expired(&self) -> bool {
591 Instant::now() >= self.expires_at
592 }
593
594 pub fn has_permission(&self, resource: &ResourceType, permission: &Permission) -> bool {
596 if self
598 .permissions
599 .contains(&(ResourceType::Cluster, Permission::All))
600 {
601 return true;
602 }
603
604 if self.permissions.contains(&(resource.clone(), *permission)) {
606 return true;
607 }
608
609 for (res, perm) in &self.permissions {
611 let resource_matches = res.matches(resource);
614 let permission_implies = perm.implies(permission);
615 if resource_matches && permission_implies {
616 return true;
617 }
618 }
619
620 false
621 }
622}
623
624#[derive(Debug, Clone)]
630pub struct AuthConfig {
631 pub session_timeout: Duration,
633
634 pub max_failed_attempts: u32,
636
637 pub lockout_duration: Duration,
639
640 pub require_authentication: bool,
642
643 pub enable_acls: bool,
645
646 pub default_deny: bool,
648}
649
650impl Default for AuthConfig {
651 fn default() -> Self {
652 AuthConfig {
653 session_timeout: Duration::from_secs(3600), max_failed_attempts: 5,
655 lockout_duration: Duration::from_secs(300), require_authentication: false, enable_acls: false,
658 default_deny: true,
659 }
660 }
661}
662
663struct FailedAttemptTracker {
665 attempts: HashMap<String, Vec<Instant>>,
666 lockouts: HashMap<String, Instant>,
667}
668
669impl FailedAttemptTracker {
670 fn new() -> Self {
671 Self {
672 attempts: HashMap::new(),
673 lockouts: HashMap::new(),
674 }
675 }
676
677 fn is_locked_out(&self, identifier: &str, lockout_duration: Duration) -> bool {
679 if let Some(lockout_time) = self.lockouts.get(identifier) {
680 if lockout_time.elapsed() < lockout_duration {
681 return true;
682 }
683 }
684 false
685 }
686
687 fn record_failure(
689 &mut self,
690 identifier: &str,
691 max_attempts: u32,
692 lockout_duration: Duration,
693 ) -> bool {
694 let now = Instant::now();
695
696 self.lockouts.retain(|_, t| t.elapsed() < lockout_duration);
698
699 let attempts = self.attempts.entry(identifier.to_string()).or_default();
701
702 attempts.retain(|t| t.elapsed() < lockout_duration);
704
705 attempts.push(now);
707
708 let exceeded = attempts.len() >= max_attempts as usize;
710 if exceeded {
711 warn!(
712 "Principal '{}' locked out after {} failed attempts",
713 identifier, max_attempts
714 );
715 self.lockouts.insert(identifier.to_string(), now);
716 }
717
718 if self.attempts.len() > 10_000 {
720 self.attempts.retain(|_, v| !v.is_empty());
721 }
722
723 exceeded
724 }
725
726 fn clear_failures(&mut self, identifier: &str) {
728 self.attempts.remove(identifier);
729 self.lockouts.remove(identifier);
730 }
731}
732
733fn validate_password_strength(password: &str) -> AuthResult<()> {
735 if password.len() < 8 {
736 return Err(AuthError::Internal(
737 "Password must be at least 8 characters".to_string(),
738 ));
739 }
740
741 let has_uppercase = password.chars().any(|c| c.is_ascii_uppercase());
742 let has_lowercase = password.chars().any(|c| c.is_ascii_lowercase());
743 let has_digit = password.chars().any(|c| c.is_ascii_digit());
744 let has_special = password.chars().any(|c| !c.is_alphanumeric());
745
746 if !has_uppercase {
747 return Err(AuthError::Internal(
748 "Password must contain at least one uppercase letter".to_string(),
749 ));
750 }
751 if !has_lowercase {
752 return Err(AuthError::Internal(
753 "Password must contain at least one lowercase letter".to_string(),
754 ));
755 }
756 if !has_digit {
757 return Err(AuthError::Internal(
758 "Password must contain at least one digit".to_string(),
759 ));
760 }
761 if !has_special {
762 return Err(AuthError::Internal(
763 "Password must contain at least one special character".to_string(),
764 ));
765 }
766
767 Ok(())
768}
769
770pub struct AuthManager {
772 config: AuthConfig,
773
774 principals: RwLock<HashMap<String, Principal>>,
776
777 roles: RwLock<HashMap<String, Role>>,
779
780 acls: RwLock<AclIndex>,
782
783 sessions: RwLock<HashMap<String, AuthSession>>,
785
786 failed_attempts: RwLock<FailedAttemptTracker>,
788
789 rng: SystemRandom,
791}
792
793impl AuthManager {
794 pub fn new(config: AuthConfig) -> Self {
796 let manager = Self {
797 config,
798 principals: RwLock::new(HashMap::new()),
799 roles: RwLock::new(HashMap::new()),
800 acls: RwLock::new(AclIndex::new()),
801 sessions: RwLock::new(HashMap::new()),
802 failed_attempts: RwLock::new(FailedAttemptTracker::new()),
803 rng: SystemRandom::new(),
804 };
805
806 manager.init_builtin_roles();
808
809 manager
810 }
811
812 pub fn new_default() -> Self {
814 Self::new(AuthConfig::default())
815 }
816
817 pub fn with_auth_enabled() -> Self {
819 Self::new(AuthConfig {
820 require_authentication: true,
821 enable_acls: true,
822 ..Default::default()
823 })
824 }
825
826 fn init_builtin_roles(&self) {
828 let mut roles = self.roles.write();
829 roles.insert("admin".to_string(), Role::admin());
830 roles.insert("producer".to_string(), Role::producer());
831 roles.insert("consumer".to_string(), Role::consumer());
832 roles.insert("read-only".to_string(), Role::read_only());
833 }
834
835 pub fn create_principal(
841 &self,
842 name: &str,
843 password: &str,
844 principal_type: PrincipalType,
845 roles: HashSet<String>,
846 ) -> AuthResult<()> {
847 if name.is_empty() || name.len() > 255 {
849 return Err(AuthError::Internal("Invalid principal name".to_string()));
850 }
851
852 validate_password_strength(password)?;
854
855 {
857 let role_map = self.roles.read();
858 for role in &roles {
859 if !role_map.contains_key(role) {
860 return Err(AuthError::RoleNotFound(role.clone()));
861 }
862 }
863 }
864
865 let mut principals = self.principals.write();
866
867 if principals.contains_key(name) {
868 return Err(AuthError::PrincipalAlreadyExists(name.to_string()));
869 }
870
871 let principal = Principal {
872 name: name.to_string(),
873 principal_type,
874 password_hash: PasswordHash::new(password),
875 roles,
876 enabled: true,
877 metadata: HashMap::new(),
878 created_at: std::time::SystemTime::now()
879 .duration_since(std::time::UNIX_EPOCH)
880 .unwrap_or_default()
881 .as_secs(),
882 };
883
884 principals.insert(name.to_string(), principal);
885 debug!("Created principal: {}", name);
886
887 Ok(())
888 }
889
890 pub fn delete_principal(&self, name: &str) -> AuthResult<()> {
892 let mut principals = self.principals.write();
893
894 if principals.remove(name).is_none() {
895 return Err(AuthError::PrincipalNotFound(name.to_string()));
896 }
897
898 let mut sessions = self.sessions.write();
900 sessions.retain(|_, s| s.principal_name != name);
901
902 debug!("Deleted principal: {}", name);
903 Ok(())
904 }
905
906 pub fn get_principal(&self, name: &str) -> Option<Principal> {
908 self.principals.read().get(name).cloned()
909 }
910
911 pub fn list_principals(&self) -> Vec<String> {
913 self.principals.read().keys().cloned().collect()
914 }
915
916 pub fn update_password(&self, name: &str, new_password: &str) -> AuthResult<()> {
918 validate_password_strength(new_password)?;
920
921 let mut principals = self.principals.write();
922
923 let principal = principals
924 .get_mut(name)
925 .ok_or_else(|| AuthError::PrincipalNotFound(name.to_string()))?;
926
927 principal.password_hash = PasswordHash::new(new_password);
928
929 let mut sessions = self.sessions.write();
931 sessions.retain(|_, s| s.principal_name != name);
932
933 debug!("Updated password for principal: {}", name);
934 Ok(())
935 }
936
937 pub fn add_role_to_principal(&self, principal_name: &str, role_name: &str) -> AuthResult<()> {
939 if !self.roles.read().contains_key(role_name) {
941 return Err(AuthError::RoleNotFound(role_name.to_string()));
942 }
943
944 let mut principals = self.principals.write();
945
946 let principal = principals
947 .get_mut(principal_name)
948 .ok_or_else(|| AuthError::PrincipalNotFound(principal_name.to_string()))?;
949
950 principal.roles.insert(role_name.to_string());
951
952 debug!(
953 "Added role '{}' to principal '{}'",
954 role_name, principal_name
955 );
956 Ok(())
957 }
958
959 pub fn remove_role_from_principal(
961 &self,
962 principal_name: &str,
963 role_name: &str,
964 ) -> AuthResult<()> {
965 let mut principals = self.principals.write();
966
967 let principal = principals
968 .get_mut(principal_name)
969 .ok_or_else(|| AuthError::PrincipalNotFound(principal_name.to_string()))?;
970
971 principal.roles.remove(role_name);
972
973 debug!(
974 "Removed role '{}' from principal '{}'",
975 role_name, principal_name
976 );
977 Ok(())
978 }
979
980 pub fn create_role(&self, role: Role) -> AuthResult<()> {
986 let mut roles = self.roles.write();
987
988 if roles.contains_key(&role.name) {
989 return Err(AuthError::Internal(format!(
990 "Role '{}' already exists",
991 role.name
992 )));
993 }
994
995 debug!("Created role: {}", role.name);
996 roles.insert(role.name.clone(), role);
997 Ok(())
998 }
999
1000 pub fn delete_role(&self, name: &str) -> AuthResult<()> {
1002 let mut roles = self.roles.write();
1003
1004 if let Some(role) = roles.get(name) {
1005 if role.builtin {
1006 return Err(AuthError::Internal(
1007 "Cannot delete built-in role".to_string(),
1008 ));
1009 }
1010 } else {
1011 return Err(AuthError::RoleNotFound(name.to_string()));
1012 }
1013
1014 roles.remove(name);
1015 debug!("Deleted role: {}", name);
1016 Ok(())
1017 }
1018
1019 pub fn get_role(&self, name: &str) -> Option<Role> {
1021 self.roles.read().get(name).cloned()
1022 }
1023
1024 pub fn list_roles(&self) -> Vec<String> {
1026 self.roles.read().keys().cloned().collect()
1027 }
1028
1029 pub fn add_acl(&self, entry: AclEntry) {
1035 let mut acls = self.acls.write();
1036 acls.push(entry);
1037 }
1038
1039 pub fn remove_acls(&self, principal: Option<&str>, resource: Option<&ResourceType>) {
1041 let mut acls = self.acls.write();
1042 acls.retain(|acl| {
1043 let principal_match =
1044 principal.is_none_or(|p| acl.principal == p || acl.principal == "*");
1045 let resource_match = resource.is_none_or(|r| &acl.resource == r);
1046 !(principal_match && resource_match)
1047 });
1048 }
1049
1050 pub fn list_acls(&self) -> Vec<AclEntry> {
1052 self.acls.read().to_vec()
1053 }
1054
1055 pub fn authenticate(
1061 &self,
1062 username: &str,
1063 password: &str,
1064 client_ip: &str,
1065 ) -> AuthResult<AuthSession> {
1066 {
1068 let tracker = self.failed_attempts.read();
1069 if tracker.is_locked_out(username, self.config.lockout_duration) {
1070 warn!(
1071 "Authentication attempt for locked-out principal: {}",
1072 username
1073 );
1074 return Err(AuthError::RateLimited);
1075 }
1076 if tracker.is_locked_out(client_ip, self.config.lockout_duration) {
1077 warn!("Authentication attempt from locked-out IP: {}", client_ip);
1078 return Err(AuthError::RateLimited);
1079 }
1080 }
1081
1082 let principal = {
1084 let principals = self.principals.read();
1085 principals.get(username).cloned()
1086 };
1087
1088 let principal = match principal {
1089 Some(p) if p.enabled => p,
1090 Some(_) => {
1091 self.record_auth_failure(username, client_ip);
1093 return Err(AuthError::AuthenticationFailed);
1094 }
1095 None => {
1096 let dummy = PasswordHash::new("dummy");
1099 let _ = dummy.verify(password);
1100 self.record_auth_failure(username, client_ip);
1101 return Err(AuthError::AuthenticationFailed);
1102 }
1103 };
1104
1105 if !principal.password_hash.verify(password) {
1107 self.record_auth_failure(username, client_ip);
1108 return Err(AuthError::AuthenticationFailed);
1109 }
1110
1111 self.failed_attempts.write().clear_failures(username);
1113 self.failed_attempts.write().clear_failures(client_ip);
1114
1115 let permissions = self.resolve_permissions(&principal);
1117
1118 let mut session_id = vec![0u8; 32];
1120 self.rng
1121 .fill(&mut session_id)
1122 .map_err(|_| AuthError::Internal("RNG failed".to_string()))?;
1123 let session_id = hex::encode(&session_id);
1124
1125 let now = Instant::now();
1126 let session = AuthSession {
1127 id: session_id.clone(),
1128 principal_name: principal.name.clone(),
1129 principal_type: principal.principal_type.clone(),
1130 permissions,
1131 created_at: now,
1132 expires_at: now + self.config.session_timeout,
1133 client_ip: client_ip.to_string(),
1134 };
1135
1136 self.sessions.write().insert(session_id, session.clone());
1138
1139 debug!("Authenticated principal '{}' from {}", username, client_ip);
1140 Ok(session)
1141 }
1142
1143 fn record_auth_failure(&self, username: &str, client_ip: &str) {
1145 let mut tracker = self.failed_attempts.write();
1146 tracker.record_failure(
1147 username,
1148 self.config.max_failed_attempts,
1149 self.config.lockout_duration,
1150 );
1151 tracker.record_failure(
1152 client_ip,
1153 self.config.max_failed_attempts * 2,
1154 self.config.lockout_duration,
1155 );
1156 }
1157
1158 pub fn get_session(&self, session_id: &str) -> Option<AuthSession> {
1160 let sessions = self.sessions.read();
1161 sessions.get(session_id).and_then(|s| {
1162 if s.is_expired() {
1163 None
1164 } else {
1165 Some(s.clone())
1166 }
1167 })
1168 }
1169
1170 pub fn invalidate_session(&self, session_id: &str) {
1172 self.sessions.write().remove(session_id);
1173 }
1174
1175 pub fn invalidate_all_sessions(&self, principal_name: &str) {
1177 self.sessions
1178 .write()
1179 .retain(|_, s| s.principal_name != principal_name);
1180 }
1181
1182 pub fn cleanup_expired_sessions(&self) {
1184 self.sessions.write().retain(|_, s| !s.is_expired());
1185 }
1186
1187 pub fn create_session(&self, principal: &Principal) -> AuthSession {
1189 let permissions = self.resolve_permissions(principal);
1190
1191 let mut session_id = vec![0u8; 32];
1192 self.rng.fill(&mut session_id).expect("RNG failed");
1193 let session_id = hex::encode(&session_id);
1194
1195 let now = Instant::now();
1196 let session = AuthSession {
1197 id: session_id.clone(),
1198 principal_name: principal.name.clone(),
1199 principal_type: principal.principal_type.clone(),
1200 permissions,
1201 created_at: now,
1202 expires_at: now + self.config.session_timeout,
1203 client_ip: "scram".to_string(),
1204 };
1205
1206 self.sessions.write().insert(session_id, session.clone());
1207 session
1208 }
1209
1210 pub fn create_api_key_session(&self, principal_name: &str, roles: &[String]) -> AuthSession {
1215 let mut permissions = HashSet::new();
1217 {
1218 let roles_map = self.roles.read();
1219 for role_name in roles {
1220 if let Some(role) = roles_map.get(role_name) {
1221 permissions.extend(role.permissions.iter().cloned());
1222 }
1223 }
1224 }
1225
1226 let mut session_id = vec![0u8; 32];
1227 self.rng.fill(&mut session_id).expect("RNG failed");
1228 let session_id = hex::encode(&session_id);
1229
1230 let now = Instant::now();
1231 let session = AuthSession {
1232 id: session_id.clone(),
1233 principal_name: principal_name.to_string(),
1234 principal_type: PrincipalType::ServiceAccount,
1235 permissions,
1236 created_at: now,
1237 expires_at: now + self.config.session_timeout,
1238 client_ip: "api-key".to_string(),
1239 };
1240
1241 self.sessions.write().insert(session_id, session.clone());
1242 debug!(principal = %principal_name, "Created API key session");
1243 session
1244 }
1245
1246 pub fn create_jwt_session(&self, principal_name: &str, groups: &[String]) -> AuthSession {
1251 let mut permissions = HashSet::new();
1253 {
1254 let roles_map = self.roles.read();
1255 for group in groups {
1256 if let Some(role) = roles_map.get(group) {
1257 permissions.extend(role.permissions.iter().cloned());
1258 }
1259 }
1260 }
1261
1262 let mut session_id = vec![0u8; 32];
1263 self.rng.fill(&mut session_id).expect("RNG failed");
1264 let session_id = hex::encode(&session_id);
1265
1266 let now = Instant::now();
1267 let session = AuthSession {
1268 id: session_id.clone(),
1269 principal_name: principal_name.to_string(),
1270 principal_type: PrincipalType::User,
1271 permissions,
1272 created_at: now,
1273 expires_at: now + self.config.session_timeout,
1274 client_ip: "jwt".to_string(),
1275 };
1276
1277 self.sessions.write().insert(session_id, session.clone());
1278 debug!(principal = %principal_name, groups = ?groups, "Created JWT session");
1279 session
1280 }
1281
1282 pub fn get_session_by_principal(&self, principal_name: &str) -> Option<AuthSession> {
1284 let sessions = self.sessions.read();
1285 sessions
1286 .values()
1287 .find(|s| s.principal_name == principal_name && !s.is_expired())
1288 .cloned()
1289 }
1290
1291 fn resolve_permissions(&self, principal: &Principal) -> HashSet<(ResourceType, Permission)> {
1297 let mut permissions = HashSet::new();
1298
1299 let roles = self.roles.read();
1300
1301 for role_name in &principal.roles {
1303 if let Some(role) = roles.get(role_name) {
1304 permissions.extend(role.permissions.iter().cloned());
1305 }
1306 }
1307
1308 permissions
1309 }
1310
1311 pub fn authorize(
1313 &self,
1314 session: &AuthSession,
1315 resource: &ResourceType,
1316 permission: Permission,
1317 client_ip: &str,
1318 ) -> AuthResult<()> {
1319 if !self.config.require_authentication && !self.config.enable_acls {
1321 return Ok(());
1322 }
1323
1324 if session.is_expired() {
1326 return Err(AuthError::TokenExpired);
1327 }
1328
1329 if session.has_permission(resource, &permission) {
1331 return Ok(());
1332 }
1333
1334 if self.config.enable_acls
1336 && self.check_acls(&session.principal_name, resource, permission, client_ip)
1337 {
1338 return Ok(());
1339 }
1340
1341 if self.config.default_deny {
1343 warn!(
1344 "Access denied: {} attempted {} on {:?} from {}",
1345 session.principal_name,
1346 format!("{:?}", permission),
1347 resource,
1348 client_ip
1349 );
1350 return Err(AuthError::PermissionDenied {
1351 principal: session.principal_name.clone(),
1352 permission: format!("{:?}", permission),
1353 resource: format!("{:?}", resource),
1354 });
1355 }
1356
1357 Ok(())
1358 }
1359
1360 fn check_acls(
1362 &self,
1363 principal: &str,
1364 resource: &ResourceType,
1365 permission: Permission,
1366 client_ip: &str,
1367 ) -> bool {
1368 let acls = self.acls.read();
1369
1370 for acl in acls.lookup(principal) {
1373 if !acl.allow
1374 && (acl.host == client_ip || acl.host == "*")
1375 && acl.resource.matches(resource)
1376 && (acl.permission == permission || acl.permission == Permission::All)
1377 {
1378 return false; }
1380 }
1381
1382 for acl in acls.lookup(principal) {
1384 if acl.allow
1385 && (acl.host == client_ip || acl.host == "*")
1386 && acl.resource.matches(resource)
1387 && (acl.permission == permission || acl.permission == Permission::All)
1388 {
1389 return true;
1390 }
1391 }
1392
1393 false
1394 }
1395
1396 #[allow(unused_variables)]
1398 pub fn authorize_anonymous(
1399 &self,
1400 resource: &ResourceType,
1401 permission: Permission,
1402 ) -> AuthResult<()> {
1403 if !self.config.require_authentication {
1404 return Ok(());
1405 }
1406
1407 Err(AuthError::AuthenticationFailed)
1408 }
1409}
1410
1411pub struct SaslPlainAuth {
1417 auth_manager: Arc<AuthManager>,
1418}
1419
1420impl SaslPlainAuth {
1421 pub fn new(auth_manager: Arc<AuthManager>) -> Self {
1422 Self { auth_manager }
1423 }
1424
1425 pub fn authenticate(&self, sasl_bytes: &[u8], client_ip: &str) -> AuthResult<AuthSession> {
1428 let parts: Vec<&[u8]> = sasl_bytes.split(|&b| b == 0).collect();
1430
1431 if parts.len() < 2 {
1432 return Err(AuthError::InvalidCredentials);
1433 }
1434
1435 let (username, password) = if parts.len() == 2 {
1437 (
1438 std::str::from_utf8(parts[0]).map_err(|_| AuthError::InvalidCredentials)?,
1439 std::str::from_utf8(parts[1]).map_err(|_| AuthError::InvalidCredentials)?,
1440 )
1441 } else {
1442 (
1444 std::str::from_utf8(parts[1]).map_err(|_| AuthError::InvalidCredentials)?,
1445 std::str::from_utf8(parts[2]).map_err(|_| AuthError::InvalidCredentials)?,
1446 )
1447 };
1448
1449 self.auth_manager
1450 .authenticate(username, password, client_ip)
1451 }
1452}
1453
1454#[derive(Debug, Clone)]
1467pub enum ScramState {
1468 Initial,
1470 ServerFirstSent {
1472 username: String,
1473 client_nonce: String,
1474 server_nonce: String,
1475 salt: Vec<u8>,
1476 iterations: u32,
1477 auth_message: String,
1478 },
1479 Complete,
1481}
1482
1483pub struct SaslScramAuth {
1485 auth_manager: Arc<AuthManager>,
1486}
1487
1488impl SaslScramAuth {
1489 pub fn new(auth_manager: Arc<AuthManager>) -> Self {
1490 Self { auth_manager }
1491 }
1492
1493 pub fn process_client_first(
1498 &self,
1499 client_first: &[u8],
1500 client_ip: &str,
1501 ) -> AuthResult<(ScramState, Vec<u8>)> {
1502 let client_first_str =
1503 std::str::from_utf8(client_first).map_err(|_| AuthError::InvalidCredentials)?;
1504
1505 let parts: Vec<&str> = client_first_str.splitn(3, ',').collect();
1511 if parts.len() < 3 {
1512 return Err(AuthError::InvalidCredentials);
1513 }
1514
1515 let client_first_bare = if parts[0] == "n" || parts[0] == "y" || parts[0] == "p" {
1517 &client_first_str[parts[0].len() + 1 + parts[1].len() + 1..]
1519 } else {
1520 client_first_str
1522 };
1523
1524 let mut username = None;
1526 let mut client_nonce = None;
1527
1528 for attr in client_first_bare.split(',') {
1529 if let Some(value) = attr.strip_prefix("n=") {
1530 username = Some(Self::unescape_username(value));
1531 } else if let Some(value) = attr.strip_prefix("r=") {
1532 client_nonce = Some(value.to_string());
1533 }
1534 }
1535
1536 let username = username.ok_or(AuthError::InvalidCredentials)?;
1537 let client_nonce = client_nonce.ok_or(AuthError::InvalidCredentials)?;
1538
1539 let (salt, iterations) = match self.auth_manager.get_principal(&username) {
1541 Some(principal) => (
1542 principal.password_hash.salt.clone(),
1543 principal.password_hash.iterations,
1544 ),
1545 None => {
1546 warn!(
1549 "SCRAM auth for unknown user '{}' from {}",
1550 username, client_ip
1551 );
1552 let rng = SystemRandom::new();
1553 let mut fake_salt = vec![0u8; 32];
1554 rng.fill(&mut fake_salt).expect("Failed to generate salt");
1555 (fake_salt, 4096)
1556 }
1557 };
1558
1559 let rng = SystemRandom::new();
1561 let mut server_nonce_bytes = vec![0u8; 24];
1562 rng.fill(&mut server_nonce_bytes)
1563 .expect("Failed to generate nonce");
1564 let server_nonce = base64_encode(&server_nonce_bytes);
1565 let combined_nonce = format!("{}{}", client_nonce, server_nonce);
1566
1567 let salt_b64 = base64_encode(&salt);
1569 let server_first = format!("r={},s={},i={}", combined_nonce, salt_b64, iterations);
1570
1571 let auth_message = format!(
1573 "{},{},c=biws,r={}",
1574 client_first_bare, server_first, combined_nonce
1575 );
1576
1577 let state = ScramState::ServerFirstSent {
1578 username,
1579 client_nonce,
1580 server_nonce,
1581 salt,
1582 iterations,
1583 auth_message,
1584 };
1585
1586 Ok((state, server_first.into_bytes()))
1587 }
1588
1589 pub fn process_client_final(
1594 &self,
1595 state: &ScramState,
1596 client_final: &[u8],
1597 client_ip: &str,
1598 ) -> AuthResult<(AuthSession, Vec<u8>)> {
1599 let ScramState::ServerFirstSent {
1600 username,
1601 client_nonce,
1602 server_nonce,
1603 salt: _, iterations: _, auth_message,
1606 } = state
1607 else {
1608 return Err(AuthError::Internal("Invalid SCRAM state".to_string()));
1609 };
1610
1611 let client_final_str =
1612 std::str::from_utf8(client_final).map_err(|_| AuthError::InvalidCredentials)?;
1613
1614 let mut channel_binding = None;
1616 let mut nonce = None;
1617 let mut proof = None;
1618
1619 for attr in client_final_str.split(',') {
1620 if let Some(value) = attr.strip_prefix("c=") {
1621 channel_binding = Some(value.to_string());
1622 } else if let Some(value) = attr.strip_prefix("r=") {
1623 nonce = Some(value.to_string());
1624 } else if let Some(value) = attr.strip_prefix("p=") {
1625 proof = Some(value.to_string());
1626 }
1627 }
1628
1629 let _channel_binding = channel_binding.ok_or(AuthError::InvalidCredentials)?;
1630 let nonce = nonce.ok_or(AuthError::InvalidCredentials)?;
1631 let proof_b64 = proof.ok_or(AuthError::InvalidCredentials)?;
1632
1633 let expected_nonce = format!("{}{}", client_nonce, server_nonce);
1635 if nonce != expected_nonce {
1636 warn!("SCRAM nonce mismatch for '{}' from {}", username, client_ip);
1637 return Err(AuthError::InvalidCredentials);
1638 }
1639
1640 let principal = self
1642 .auth_manager
1643 .get_principal(username)
1644 .ok_or(AuthError::AuthenticationFailed)?;
1645
1646 let client_proof = base64_decode(&proof_b64).map_err(|_| AuthError::InvalidCredentials)?;
1652
1653 let client_signature =
1655 PasswordHash::hmac_sha256(&principal.password_hash.stored_key, auth_message.as_bytes());
1656
1657 if client_proof.len() != client_signature.len() {
1659 return Err(AuthError::InvalidCredentials);
1660 }
1661
1662 let client_key: Vec<u8> = client_proof
1663 .iter()
1664 .zip(client_signature.iter())
1665 .map(|(p, s)| p ^ s)
1666 .collect();
1667
1668 let computed_stored_key = Sha256::digest(&client_key);
1670 if !PasswordHash::constant_time_compare(
1671 &computed_stored_key,
1672 &principal.password_hash.stored_key,
1673 ) {
1674 warn!(
1675 "SCRAM authentication failed for '{}' from {}",
1676 username, client_ip
1677 );
1678 return Err(AuthError::AuthenticationFailed);
1679 }
1680
1681 let server_signature =
1683 PasswordHash::hmac_sha256(&principal.password_hash.server_key, auth_message.as_bytes());
1684 let server_final = format!("v={}", base64_encode(&server_signature));
1685
1686 let session = self.auth_manager.create_session(&principal);
1688 debug!(
1689 "SCRAM authentication successful for '{}' from {}",
1690 username, client_ip
1691 );
1692
1693 Ok((session, server_final.into_bytes()))
1694 }
1695
1696 fn unescape_username(s: &str) -> String {
1698 s.replace("=2C", ",").replace("=3D", "=")
1699 }
1700}
1701
1702fn base64_encode(data: &[u8]) -> String {
1704 use base64::{engine::general_purpose::STANDARD, Engine as _};
1705 STANDARD.encode(data)
1706}
1707
1708fn base64_decode(s: &str) -> Result<Vec<u8>, base64::DecodeError> {
1710 use base64::{engine::general_purpose::STANDARD, Engine as _};
1711 STANDARD.decode(s)
1712}
1713
1714#[cfg(test)]
1719mod tests {
1720 use super::*;
1721
1722 #[test]
1723 fn test_password_hash_verify() {
1724 let hash = PasswordHash::new("Test@Pass123");
1725 assert!(hash.verify("Test@Pass123"));
1726 assert!(!hash.verify("Wrong@Pass1"));
1727 assert!(!hash.verify(""));
1728 assert!(!hash.verify("Test@Pass12")); }
1730
1731 #[test]
1732 fn test_password_hash_timing_attack_resistant() {
1733 let hash = PasswordHash::new("Correct@Pass1");
1736
1737 assert!(!hash.verify("Wrong@Pass1"));
1739
1740 assert!(!hash.verify("x"));
1742
1743 }
1745
1746 #[test]
1747 fn test_create_principal() {
1748 let auth = AuthManager::new_default();
1749
1750 let mut roles = HashSet::new();
1751 roles.insert("producer".to_string());
1752
1753 auth.create_principal(
1754 "alice",
1755 "Secure@Pass123",
1756 PrincipalType::User,
1757 roles.clone(),
1758 )
1759 .expect("Failed to create principal");
1760
1761 assert!(auth
1763 .create_principal("alice", "Other@Pass1", PrincipalType::User, roles.clone())
1764 .is_err());
1765
1766 let principal = auth.get_principal("alice").expect("Principal not found");
1768 assert_eq!(principal.name, "alice");
1769 assert!(principal.roles.contains("producer"));
1770 }
1771
1772 #[test]
1773 fn test_authentication_success() {
1774 let auth = AuthManager::new_default();
1775
1776 let mut roles = HashSet::new();
1777 roles.insert("producer".to_string());
1778
1779 auth.create_principal("bob", "Bob@Pass123", PrincipalType::User, roles)
1780 .unwrap();
1781
1782 let session = auth
1783 .authenticate("bob", "Bob@Pass123", "127.0.0.1")
1784 .expect("Authentication should succeed");
1785
1786 assert_eq!(session.principal_name, "bob");
1787 assert!(!session.is_expired());
1788 }
1789
1790 #[test]
1791 fn test_authentication_failure() {
1792 let auth = AuthManager::new_default();
1793
1794 let mut roles = HashSet::new();
1795 roles.insert("producer".to_string());
1796
1797 auth.create_principal("charlie", "Correct@Pass1", PrincipalType::User, roles)
1798 .unwrap();
1799
1800 let result = auth.authenticate("charlie", "Wrong@Pass1", "127.0.0.1");
1802 assert!(matches!(result, Err(AuthError::AuthenticationFailed)));
1803
1804 let result = auth.authenticate("unknown", "password", "127.0.0.1");
1806 assert!(matches!(result, Err(AuthError::AuthenticationFailed)));
1807 }
1808
1809 #[test]
1810 fn test_rate_limiting() {
1811 let config = AuthConfig {
1812 max_failed_attempts: 3,
1813 lockout_duration: Duration::from_secs(120),
1815 ..Default::default()
1816 };
1817 let auth = AuthManager::new(config);
1818
1819 let mut roles = HashSet::new();
1820 roles.insert("consumer".to_string());
1821 auth.create_principal("eve", "Eve@Pass123", PrincipalType::User, roles)
1822 .unwrap();
1823
1824 for _ in 0..3 {
1826 let _ = auth.authenticate("eve", "wrong", "192.168.1.1");
1827 }
1828
1829 let result = auth.authenticate("eve", "Eve@Pass123", "192.168.1.1");
1831 assert!(matches!(result, Err(AuthError::RateLimited)));
1832
1833 {
1836 let mut tracker = auth.failed_attempts.write();
1837 tracker.clear_failures("eve");
1838 tracker.clear_failures("192.168.1.1");
1839 }
1840
1841 let result = auth.authenticate("eve", "Eve@Pass123", "192.168.1.1");
1843 assert!(result.is_ok());
1844 }
1845
1846 #[test]
1847 fn test_role_permissions() {
1848 let auth = AuthManager::with_auth_enabled();
1849
1850 let mut roles = HashSet::new();
1851 roles.insert("producer".to_string());
1852 auth.create_principal("producer_user", "Prod@Pass123", PrincipalType::User, roles)
1853 .unwrap();
1854
1855 let session = auth
1856 .authenticate("producer_user", "Prod@Pass123", "127.0.0.1")
1857 .unwrap();
1858
1859 assert!(session.has_permission(
1861 &ResourceType::Topic("orders".to_string()),
1862 &Permission::Write
1863 ));
1864
1865 assert!(!session.has_permission(
1867 &ResourceType::Topic("orders".to_string()),
1868 &Permission::Delete
1869 ));
1870 }
1871
1872 #[test]
1873 fn test_admin_has_all_permissions() {
1874 let auth = AuthManager::with_auth_enabled();
1875
1876 let mut roles = HashSet::new();
1877 roles.insert("admin".to_string());
1878 auth.create_principal("admin_user", "Admin@Pass1", PrincipalType::User, roles)
1879 .unwrap();
1880
1881 let session = auth
1882 .authenticate("admin_user", "Admin@Pass1", "127.0.0.1")
1883 .unwrap();
1884
1885 assert!(session.has_permission(&ResourceType::Cluster, &Permission::All));
1887 assert!(session.has_permission(
1888 &ResourceType::Topic("any_topic".to_string()),
1889 &Permission::Delete
1890 ));
1891 }
1892
1893 #[test]
1894 fn test_resource_pattern_matching() {
1895 assert!(ResourceType::TopicPattern("*".to_string())
1896 .matches(&ResourceType::Topic("anything".to_string())));
1897
1898 assert!(ResourceType::TopicPattern("orders-*".to_string())
1899 .matches(&ResourceType::Topic("orders-us".to_string())));
1900
1901 assert!(ResourceType::TopicPattern("orders-*".to_string())
1902 .matches(&ResourceType::Topic("orders-eu".to_string())));
1903
1904 assert!(!ResourceType::TopicPattern("orders-*".to_string())
1905 .matches(&ResourceType::Topic("events-us".to_string())));
1906 }
1907
1908 #[test]
1909 fn test_acl_enforcement() {
1910 let auth = AuthManager::new(AuthConfig {
1911 require_authentication: true,
1912 enable_acls: true,
1913 default_deny: true,
1914 ..Default::default()
1915 });
1916
1917 let mut roles = HashSet::new();
1918 roles.insert("read-only".to_string());
1919 auth.create_principal("reader", "Read@Pass123", PrincipalType::User, roles)
1920 .unwrap();
1921
1922 auth.add_acl(AclEntry {
1924 principal: "reader".to_string(),
1925 resource: ResourceType::Topic("special-topic".to_string()),
1926 permission: Permission::Write,
1927 allow: true,
1928 host: "*".to_string(),
1929 });
1930
1931 let session = auth
1932 .authenticate("reader", "Read@Pass123", "127.0.0.1")
1933 .unwrap();
1934
1935 let result = auth.authorize(
1937 &session,
1938 &ResourceType::Topic("special-topic".to_string()),
1939 Permission::Write,
1940 "127.0.0.1",
1941 );
1942 assert!(result.is_ok());
1943
1944 let result = auth.authorize(
1946 &session,
1947 &ResourceType::Topic("other-topic".to_string()),
1948 Permission::Write,
1949 "127.0.0.1",
1950 );
1951 assert!(result.is_err());
1952 }
1953
1954 #[test]
1955 fn test_sasl_plain_authentication() {
1956 let auth = Arc::new(AuthManager::new_default());
1957
1958 let mut roles = HashSet::new();
1959 roles.insert("producer".to_string());
1960 auth.create_principal("sasl_user", "Sasl@Pass123", PrincipalType::User, roles)
1961 .unwrap();
1962
1963 let sasl = SaslPlainAuth::new(auth);
1964
1965 let two_part = b"sasl_user\0Sasl@Pass123";
1967 let result = sasl.authenticate(two_part, "127.0.0.1");
1968 assert!(result.is_ok());
1969
1970 let three_part = b"\0sasl_user\0Sasl@Pass123";
1972 let result = sasl.authenticate(three_part, "127.0.0.1");
1973 assert!(result.is_ok());
1974 }
1975
1976 #[test]
1977 fn test_session_expiration() {
1978 let config = AuthConfig {
1979 session_timeout: Duration::from_millis(100),
1980 ..Default::default()
1981 };
1982 let auth = AuthManager::new(config);
1983
1984 let mut roles = HashSet::new();
1985 roles.insert("producer".to_string());
1986 auth.create_principal("expiring", "Expiry@Pass1", PrincipalType::User, roles)
1987 .unwrap();
1988
1989 let session = auth
1990 .authenticate("expiring", "Expiry@Pass1", "127.0.0.1")
1991 .unwrap();
1992 assert!(!session.is_expired());
1993
1994 std::thread::sleep(Duration::from_millis(150));
1996
1997 let session = AuthSession {
1999 expires_at: session.expires_at,
2000 ..session
2001 };
2002 assert!(session.is_expired());
2003 }
2004
2005 #[test]
2006 fn test_delete_principal_invalidates_sessions() {
2007 let auth = AuthManager::new_default();
2008
2009 let mut roles = HashSet::new();
2010 roles.insert("producer".to_string());
2011 auth.create_principal("deleteme", "Delete@Pass1", PrincipalType::User, roles)
2012 .unwrap();
2013
2014 let session = auth
2015 .authenticate("deleteme", "Delete@Pass1", "127.0.0.1")
2016 .unwrap();
2017
2018 assert!(auth.get_session(&session.id).is_some());
2020
2021 auth.delete_principal("deleteme").unwrap();
2023
2024 assert!(auth.get_session(&session.id).is_none());
2026 }
2027
2028 #[test]
2029 fn test_disabled_principal_cannot_authenticate() {
2030 let auth = AuthManager::new_default();
2031
2032 let mut roles = HashSet::new();
2033 roles.insert("producer".to_string());
2034 auth.create_principal("disabled_user", "Disable@Pass1", PrincipalType::User, roles)
2035 .unwrap();
2036
2037 {
2039 let mut principals = auth.principals.write();
2040 if let Some(p) = principals.get_mut("disabled_user") {
2041 p.enabled = false;
2042 }
2043 }
2044
2045 let result = auth.authenticate("disabled_user", "Disable@Pass1", "127.0.0.1");
2047 assert!(matches!(result, Err(AuthError::AuthenticationFailed)));
2048 }
2049
2050 #[test]
2051 fn test_password_hash_debug_redacts_sensitive_data() {
2052 let hash = PasswordHash::new("super_secret_password");
2053 let debug_output = format!("{:?}", hash);
2054
2055 assert!(
2057 debug_output.contains("[REDACTED]"),
2058 "Debug output should contain [REDACTED]"
2059 );
2060
2061 assert!(
2064 !debug_output.contains("super_secret_password"),
2065 "Debug output should not contain password"
2066 );
2067
2068 assert!(
2070 debug_output.contains("iterations"),
2071 "Debug output should show iterations field"
2072 );
2073 }
2074
2075 #[test]
2076 fn test_principal_debug_redacts_password_hash() {
2077 let principal = Principal {
2078 name: "test_user".to_string(),
2079 principal_type: PrincipalType::User,
2080 password_hash: PasswordHash::new("secret_password"),
2081 roles: HashSet::from(["admin".to_string()]),
2082 enabled: true,
2083 metadata: HashMap::new(),
2084 created_at: 1234567890,
2085 };
2086
2087 let debug_output = format!("{:?}", principal);
2088
2089 assert!(
2091 debug_output.contains("[REDACTED]"),
2092 "Debug output should contain [REDACTED]: {}",
2093 debug_output
2094 );
2095
2096 assert!(
2098 debug_output.contains("test_user"),
2099 "Debug output should show name"
2100 );
2101 assert!(
2102 debug_output.contains("admin"),
2103 "Debug output should show roles"
2104 );
2105 }
2106
2107 #[test]
2112 fn test_scram_full_handshake() {
2113 use sha2::{Digest, Sha256};
2114
2115 let auth = Arc::new(AuthManager::new_default());
2116
2117 let mut roles = HashSet::new();
2119 roles.insert("producer".to_string());
2120 auth.create_principal("scram_user", "Scram@Pass123", PrincipalType::User, roles)
2121 .expect("Failed to create principal");
2122
2123 let scram = SaslScramAuth::new(auth.clone());
2124
2125 let client_nonce = "rOprNGfwEbeRWgbNEkqO";
2127 let client_first = format!("n,,n=scram_user,r={}", client_nonce);
2128
2129 let (state, server_first) = scram
2130 .process_client_first(client_first.as_bytes(), "127.0.0.1")
2131 .expect("client-first processing should succeed");
2132
2133 let server_first_str = std::str::from_utf8(&server_first).expect("valid UTF-8");
2135 assert!(server_first_str.starts_with(&format!("r={}", client_nonce)));
2136 assert!(server_first_str.contains(",s="));
2137 assert!(server_first_str.contains(",i="));
2138
2139 let ScramState::ServerFirstSent {
2141 username: _,
2142 client_nonce: _,
2143 server_nonce: _,
2144 salt,
2145 iterations,
2146 auth_message: _,
2147 } = &state
2148 else {
2149 panic!("Expected ServerFirstSent state");
2150 };
2151
2152 let salted_password = compute_salted_password("Scram@Pass123", salt, *iterations);
2155 let client_key = PasswordHash::hmac_sha256(&salted_password, b"Client Key");
2156 let stored_key = Sha256::digest(&client_key);
2157
2158 let client_first_bare = format!("n=scram_user,r={}", client_nonce);
2160 let combined_nonce: String = server_first_str
2161 .split(',')
2162 .find(|s| s.starts_with("r="))
2163 .map(|s| &s[2..])
2164 .unwrap()
2165 .to_string();
2166
2167 let auth_message = format!(
2168 "{},{},c=biws,r={}",
2169 client_first_bare, server_first_str, combined_nonce
2170 );
2171
2172 let client_signature = PasswordHash::hmac_sha256(&stored_key, auth_message.as_bytes());
2173 let client_proof: Vec<u8> = client_key
2174 .iter()
2175 .zip(client_signature.iter())
2176 .map(|(k, s)| k ^ s)
2177 .collect();
2178
2179 let client_final = format!(
2180 "c=biws,r={},p={}",
2181 combined_nonce,
2182 base64_encode(&client_proof)
2183 );
2184
2185 let (session, server_final) = scram
2187 .process_client_final(&state, client_final.as_bytes(), "127.0.0.1")
2188 .expect("client-final processing should succeed");
2189
2190 assert_eq!(session.principal_name, "scram_user");
2192 assert!(!session.is_expired());
2193
2194 let server_final_str = std::str::from_utf8(&server_final).expect("valid UTF-8");
2196 assert!(server_final_str.starts_with("v="));
2197 }
2198
2199 #[test]
2200 fn test_scram_wrong_password() {
2201 let auth = Arc::new(AuthManager::new_default());
2202
2203 let mut roles = HashSet::new();
2204 roles.insert("producer".to_string());
2205 auth.create_principal("scram_user2", "Correct@Pass1", PrincipalType::User, roles)
2206 .expect("Failed to create principal");
2207
2208 let scram = SaslScramAuth::new(auth.clone());
2209
2210 let client_nonce = "test_nonce_12345";
2212 let client_first = format!("n,,n=scram_user2,r={}", client_nonce);
2213
2214 let (state, server_first) = scram
2215 .process_client_first(client_first.as_bytes(), "127.0.0.1")
2216 .expect("client-first processing should succeed");
2217
2218 let server_first_str = std::str::from_utf8(&server_first).expect("valid UTF-8");
2220 let combined_nonce: String = server_first_str
2221 .split(',')
2222 .find(|s| s.starts_with("r="))
2223 .map(|s| &s[2..])
2224 .unwrap()
2225 .to_string();
2226
2227 let ScramState::ServerFirstSent {
2229 salt, iterations, ..
2230 } = &state
2231 else {
2232 panic!("Expected ServerFirstSent state");
2233 };
2234
2235 let salted_password = compute_salted_password("Wrong@Pass1", salt, *iterations);
2236 let client_key = PasswordHash::hmac_sha256(&salted_password, b"Client Key");
2237 let stored_key = sha2::Sha256::digest(&client_key);
2238
2239 let client_first_bare = format!("n=scram_user2,r={}", client_nonce);
2240 let auth_message = format!(
2241 "{},{},c=biws,r={}",
2242 client_first_bare, server_first_str, combined_nonce
2243 );
2244
2245 let client_signature = PasswordHash::hmac_sha256(&stored_key, auth_message.as_bytes());
2246 let client_proof: Vec<u8> = client_key
2247 .iter()
2248 .zip(client_signature.iter())
2249 .map(|(k, s)| k ^ s)
2250 .collect();
2251
2252 let client_final = format!(
2253 "c=biws,r={},p={}",
2254 combined_nonce,
2255 base64_encode(&client_proof)
2256 );
2257
2258 let result = scram.process_client_final(&state, client_final.as_bytes(), "127.0.0.1");
2260 assert!(result.is_err());
2261 assert!(matches!(result, Err(AuthError::AuthenticationFailed)));
2262 }
2263
2264 #[test]
2265 fn test_scram_nonexistent_user() {
2266 let auth = Arc::new(AuthManager::new_default());
2267 let scram = SaslScramAuth::new(auth.clone());
2268
2269 let client_first = "n,,n=nonexistent_user,r=test_nonce";
2271
2272 let result = scram.process_client_first(client_first.as_bytes(), "127.0.0.1");
2274 assert!(
2275 result.is_ok(),
2276 "Should return fake server-first to prevent enumeration"
2277 );
2278
2279 let (state, server_first) = result.unwrap();
2280 let server_first_str = std::str::from_utf8(&server_first).expect("valid UTF-8");
2281
2282 assert!(server_first_str.contains("r=test_nonce"));
2284 assert!(server_first_str.contains(",s="));
2285 assert!(server_first_str.contains(",i="));
2286
2287 let combined_nonce: String = server_first_str
2289 .split(',')
2290 .find(|s| s.starts_with("r="))
2291 .map(|s| &s[2..])
2292 .unwrap()
2293 .to_string();
2294
2295 let client_final = format!("c=biws,r={},p=dW5rbm93bg==", combined_nonce);
2296 let result = scram.process_client_final(&state, client_final.as_bytes(), "127.0.0.1");
2297 assert!(result.is_err());
2298 }
2299
2300 #[test]
2301 fn test_scram_nonce_mismatch() {
2302 let auth = Arc::new(AuthManager::new_default());
2303
2304 let mut roles = HashSet::new();
2305 roles.insert("producer".to_string());
2306 auth.create_principal("scram_user3", "Scram3@Pass1", PrincipalType::User, roles)
2307 .expect("Failed to create principal");
2308
2309 let scram = SaslScramAuth::new(auth.clone());
2310
2311 let client_first = "n,,n=scram_user3,r=original_nonce";
2312 let (state, _server_first) = scram
2313 .process_client_first(client_first.as_bytes(), "127.0.0.1")
2314 .expect("client-first should succeed");
2315
2316 let client_final = "c=biws,r=tampered_nonce_plus_server,p=dW5rbm93bg==";
2318 let result = scram.process_client_final(&state, client_final.as_bytes(), "127.0.0.1");
2319 assert!(result.is_err());
2320 assert!(matches!(result, Err(AuthError::InvalidCredentials)));
2321 }
2322
2323 fn compute_salted_password(password: &str, salt: &[u8], iterations: u32) -> Vec<u8> {
2325 use hmac::{Hmac, Mac};
2326 type HmacSha256 = Hmac<sha2::Sha256>;
2327
2328 let mut result = vec![0u8; 32];
2329
2330 let mut mac =
2331 HmacSha256::new_from_slice(password.as_bytes()).expect("HMAC accepts any key length");
2332 mac.update(salt);
2333 mac.update(&1u32.to_be_bytes());
2334 let mut u = mac.finalize().into_bytes();
2335 result.copy_from_slice(&u);
2336
2337 for _ in 1..iterations {
2338 let mut mac = HmacSha256::new_from_slice(password.as_bytes())
2339 .expect("HMAC accepts any key length");
2340 mac.update(&u);
2341 u = mac.finalize().into_bytes();
2342
2343 for (r, ui) in result.iter_mut().zip(u.iter()) {
2344 *r ^= ui;
2345 }
2346 }
2347
2348 result
2349 }
2350}