1use crate::error::{AllSourceError, Result};
2use argon2::{
3 Argon2,
4 password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, SaltString, rand_core::OsRng},
5};
6use chrono::{Duration, Utc};
7use dashmap::DashMap;
8use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, Validation, decode, encode};
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "lowercase")]
16pub enum Role {
17 Admin, Developer, ReadOnly, ServiceAccount, }
22
23impl Role {
24 pub fn has_permission(&self, permission: Permission) -> bool {
26 match self {
27 Role::Admin => true, Role::Developer => matches!(
29 permission,
30 Permission::Read
31 | Permission::Write
32 | Permission::Metrics
33 | Permission::ManageSchemas
34 | Permission::ManagePipelines
35 ),
36 Role::ReadOnly => matches!(permission, Permission::Read | Permission::Metrics),
37 Role::ServiceAccount => {
38 matches!(permission, Permission::Read | Permission::Write)
39 }
40 }
41 }
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum Permission {
47 Read,
48 Write,
49 Admin,
50 Metrics,
51 ManageSchemas,
52 ManagePipelines,
53 ManageTenants,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct Claims {
59 pub sub: String,
61 pub tenant_id: String,
63 pub role: Role,
65 pub exp: i64,
67 pub iat: i64,
69 pub iss: String,
71}
72
73impl Claims {
74 pub fn new(user_id: String, tenant_id: String, role: Role, expires_in: Duration) -> Self {
76 let now = Utc::now();
77 Self {
78 sub: user_id,
79 tenant_id,
80 role,
81 iat: now.timestamp(),
82 exp: (now + expires_in).timestamp(),
83 iss: "allsource".to_string(),
84 }
85 }
86
87 pub fn is_expired(&self) -> bool {
89 Utc::now().timestamp() > self.exp
90 }
91
92 pub fn has_permission(&self, permission: Permission) -> bool {
94 self.role.has_permission(permission)
95 }
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct User {
101 pub id: Uuid,
102 pub username: String,
103 pub email: String,
104 #[serde(skip_serializing)]
105 pub password_hash: String,
106 pub role: Role,
107 pub tenant_id: String,
108 pub created_at: chrono::DateTime<Utc>,
109 pub active: bool,
110}
111
112impl User {
113 pub fn new(
115 username: String,
116 email: String,
117 password: &str,
118 role: Role,
119 tenant_id: String,
120 ) -> Result<Self> {
121 let password_hash = hash_password(password)?;
122
123 Ok(Self {
124 id: Uuid::new_v4(),
125 username,
126 email,
127 password_hash,
128 role,
129 tenant_id,
130 created_at: Utc::now(),
131 active: true,
132 })
133 }
134
135 pub fn verify_password(&self, password: &str) -> Result<bool> {
137 verify_password(password, &self.password_hash)
138 }
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct ApiKey {
144 pub id: Uuid,
145 pub name: String,
146 pub tenant_id: String,
147 pub role: Role,
148 #[serde(skip_serializing)]
149 pub key_hash: String,
150 pub created_at: chrono::DateTime<Utc>,
151 pub expires_at: Option<chrono::DateTime<Utc>>,
152 pub active: bool,
153 pub last_used: Option<chrono::DateTime<Utc>>,
154}
155
156impl ApiKey {
157 pub fn new(
159 name: String,
160 tenant_id: String,
161 role: Role,
162 expires_at: Option<chrono::DateTime<Utc>>,
163 ) -> (Self, String) {
164 let key = generate_api_key();
165 let key_hash = hash_api_key(&key);
166
167 let api_key = Self {
168 id: Uuid::new_v4(),
169 name,
170 tenant_id,
171 role,
172 key_hash,
173 created_at: Utc::now(),
174 expires_at,
175 active: true,
176 last_used: None,
177 };
178
179 (api_key, key)
180 }
181
182 pub fn verify(&self, key: &str) -> bool {
184 if !self.active {
185 return false;
186 }
187
188 if let Some(expires_at) = self.expires_at
189 && Utc::now() > expires_at
190 {
191 return false;
192 }
193
194 hash_api_key(key) == self.key_hash
195 }
196
197 pub fn is_expired(&self) -> bool {
199 if let Some(expires_at) = self.expires_at {
200 Utc::now() > expires_at
201 } else {
202 false
203 }
204 }
205}
206
207pub struct AuthManager {
209 encoding_key: EncodingKey,
211 decoding_key: DecodingKey,
213 validation: Validation,
215 users: Arc<DashMap<Uuid, User>>,
217 api_keys: Arc<DashMap<Uuid, ApiKey>>,
219 username_index: Arc<DashMap<String, Uuid>>,
221 auth_repo: Option<Arc<crate::infrastructure::repositories::EventSourcedAuthRepository>>,
224}
225
226impl AuthManager {
227 pub fn new(jwt_secret: &str) -> Self {
229 let encoding_key = EncodingKey::from_secret(jwt_secret.as_bytes());
230 let decoding_key = DecodingKey::from_secret(jwt_secret.as_bytes());
231
232 let mut validation = Validation::new(Algorithm::HS256);
233 validation.set_issuer(&["allsource"]);
234
235 Self {
236 encoding_key,
237 decoding_key,
238 validation,
239 users: Arc::new(DashMap::new()),
240 api_keys: Arc::new(DashMap::new()),
241 username_index: Arc::new(DashMap::new()),
242 auth_repo: None,
243 }
244 }
245
246 pub fn with_auth_repository(
252 mut self,
253 repo: Arc<crate::infrastructure::repositories::EventSourcedAuthRepository>,
254 ) -> Self {
255 for key in repo.all_keys() {
257 self.api_keys.insert(key.id, key);
258 }
259 let loaded = self.api_keys.len();
260 if loaded > 0 {
261 tracing::info!("Loaded {} API keys from durable storage", loaded);
262 }
263 self.auth_repo = Some(repo);
264 self
265 }
266
267 pub fn register_user(
269 &self,
270 username: String,
271 email: String,
272 password: &str,
273 role: Role,
274 tenant_id: String,
275 ) -> Result<User> {
276 if self.username_index.contains_key(&username) {
278 return Err(AllSourceError::ValidationError(
279 "Username already exists".to_string(),
280 ));
281 }
282
283 let user = User::new(username.clone(), email, password, role, tenant_id)?;
284
285 self.users.insert(user.id, user.clone());
286 self.username_index.insert(username, user.id);
287
288 Ok(user)
289 }
290
291 pub fn authenticate(&self, username: &str, password: &str) -> Result<String> {
293 let user_id = self
294 .username_index
295 .get(username)
296 .ok_or_else(|| AllSourceError::ValidationError("Invalid credentials".to_string()))?;
297
298 let user = self
299 .users
300 .get(&user_id)
301 .ok_or_else(|| AllSourceError::ValidationError("User not found".to_string()))?;
302
303 if !user.active {
304 return Err(AllSourceError::ValidationError(
305 "User account is inactive".to_string(),
306 ));
307 }
308
309 if !user.verify_password(password)? {
310 return Err(AllSourceError::ValidationError(
311 "Invalid credentials".to_string(),
312 ));
313 }
314
315 let claims = Claims::new(
317 user.id.to_string(),
318 user.tenant_id.clone(),
319 user.role.clone(),
320 Duration::hours(24), );
322
323 let token = encode(&Header::default(), &claims, &self.encoding_key)
324 .map_err(|e| AllSourceError::ValidationError(format!("Failed to create token: {e}")))?;
325
326 Ok(token)
327 }
328
329 pub fn validate_token(&self, token: &str) -> Result<Claims> {
331 let token_data = decode::<Claims>(token, &self.decoding_key, &self.validation)
332 .map_err(|e| AllSourceError::ValidationError(format!("Invalid token: {e}")))?;
333
334 if token_data.claims.is_expired() {
335 return Err(AllSourceError::ValidationError("Token expired".to_string()));
336 }
337
338 Ok(token_data.claims)
339 }
340
341 pub fn create_api_key(
346 &self,
347 name: String,
348 tenant_id: String,
349 role: Role,
350 expires_at: Option<chrono::DateTime<Utc>>,
351 ) -> (ApiKey, String) {
352 let (api_key, key) = ApiKey::new(name, tenant_id, role, expires_at);
353 self.api_keys.insert(api_key.id, api_key.clone());
354
355 if let Some(ref repo) = self.auth_repo
357 && let Err(e) = repo.persist_api_key(&api_key)
358 {
359 tracing::error!("Failed to persist API key to system store: {e}");
360 }
361
362 (api_key, key)
363 }
364
365 pub fn validate_api_key(&self, key: &str) -> Result<Claims> {
367 let found_key = self.api_keys.iter().find_map(|entry| {
370 let api_key = entry.value();
371 if api_key.verify(key) {
372 Some((api_key.id, api_key.tenant_id.clone(), api_key.role.clone()))
373 } else {
374 None
375 }
376 });
377
378 if let Some((key_id, tenant_id, role)) = found_key {
379 if let Some(mut key_mut) = self.api_keys.get_mut(&key_id) {
381 key_mut.last_used = Some(Utc::now());
382 }
383
384 let claims = Claims::new(key_id.to_string(), tenant_id, role, Duration::hours(24));
385
386 return Ok(claims);
387 }
388
389 Err(AllSourceError::ValidationError(
390 "Invalid API key".to_string(),
391 ))
392 }
393
394 pub fn get_user(&self, user_id: &Uuid) -> Option<User> {
396 self.users.get(user_id).map(|u| u.clone())
397 }
398
399 pub fn list_users(&self) -> Vec<User> {
401 self.users
402 .iter()
403 .map(|entry| entry.value().clone())
404 .collect()
405 }
406
407 pub fn delete_user(&self, user_id: &Uuid) -> Result<()> {
409 if let Some((_, user)) = self.users.remove(user_id) {
410 self.username_index.remove(&user.username);
411 Ok(())
412 } else {
413 Err(AllSourceError::ValidationError(
414 "User not found".to_string(),
415 ))
416 }
417 }
418
419 pub fn revoke_api_key(&self, key_id: &Uuid) -> Result<()> {
424 if let Some(mut api_key) = self.api_keys.get_mut(key_id) {
425 api_key.active = false;
426
427 if let Some(ref repo) = self.auth_repo
429 && let Err(e) = repo.persist_key_revocation(key_id)
430 {
431 tracing::error!("Failed to persist key revocation to system store: {e}");
432 }
433
434 Ok(())
435 } else {
436 Err(AllSourceError::ValidationError(
437 "API key not found".to_string(),
438 ))
439 }
440 }
441
442 pub fn list_api_keys(&self, tenant_id: &str) -> Vec<ApiKey> {
444 self.api_keys
445 .iter()
446 .filter(|entry| entry.value().tenant_id == tenant_id)
447 .map(|entry| entry.value().clone())
448 .collect()
449 }
450
451 pub fn register_bootstrap_api_key(&self, key: &str, tenant_id: &str) -> ApiKey {
461 let key_hash = hash_api_key(key);
462
463 let existing = self.api_keys.iter().find_map(|entry| {
465 if entry.value().key_hash == key_hash && entry.value().active {
466 Some(entry.value().clone())
467 } else {
468 None
469 }
470 });
471
472 if let Some(existing_key) = existing {
473 tracing::debug!("Bootstrap API key already exists (idempotent)");
474 return existing_key;
475 }
476
477 let api_key = ApiKey {
478 id: Uuid::new_v4(),
479 name: "bootstrap".to_string(),
480 tenant_id: tenant_id.to_string(),
481 role: Role::Admin,
482 key_hash,
483 created_at: Utc::now(),
484 expires_at: None,
485 active: true,
486 last_used: None,
487 };
488
489 self.api_keys.insert(api_key.id, api_key.clone());
490
491 if let Some(ref repo) = self.auth_repo
493 && let Err(e) = repo.persist_api_key(&api_key)
494 {
495 tracing::error!("Failed to persist bootstrap API key: {e}");
496 }
497
498 api_key
499 }
500}
501
502impl Default for AuthManager {
503 fn default() -> Self {
504 use base64::{Engine as _, engine::general_purpose};
505 let secret = general_purpose::STANDARD.encode(rand::random::<[u8; 32]>());
506 Self::new(&secret)
507 }
508}
509
510fn hash_password(password: &str) -> Result<String> {
512 let salt = SaltString::generate(&mut OsRng);
513 let argon2 = Argon2::default();
514
515 let hash = argon2
516 .hash_password(password.as_bytes(), &salt)
517 .map_err(|e| AllSourceError::ValidationError(format!("Password hashing failed: {e}")))?;
518
519 Ok(hash.to_string())
520}
521
522fn verify_password(password: &str, hash: &str) -> Result<bool> {
524 let parsed_hash = PasswordHash::new(hash)
525 .map_err(|e| AllSourceError::ValidationError(format!("Invalid password hash: {e}")))?;
526
527 let argon2 = Argon2::default();
528 Ok(argon2
529 .verify_password(password.as_bytes(), &parsed_hash)
530 .is_ok())
531}
532
533fn generate_api_key() -> String {
535 use base64::{Engine as _, engine::general_purpose};
536 let random_bytes: [u8; 32] = rand::random();
537 format!(
538 "ask_{}",
539 general_purpose::URL_SAFE_NO_PAD.encode(random_bytes)
540 )
541}
542
543fn hash_api_key(key: &str) -> String {
545 use std::{
546 collections::hash_map::DefaultHasher,
547 hash::{Hash, Hasher},
548 };
549
550 let mut hasher = DefaultHasher::new();
551 key.hash(&mut hasher);
552 format!("{:x}", hasher.finish())
553}
554
555#[cfg(test)]
556mod tests {
557 use super::*;
558
559 #[test]
560 fn test_user_creation_and_verification() {
561 let user = User::new(
562 "testuser".to_string(),
563 "test@example.com".to_string(),
564 "password123",
565 Role::Developer,
566 "tenant1".to_string(),
567 )
568 .unwrap();
569
570 assert!(user.verify_password("password123").unwrap());
571 assert!(!user.verify_password("wrongpassword").unwrap());
572 }
573
574 #[test]
575 fn test_role_permissions() {
576 assert!(Role::Admin.has_permission(Permission::Admin));
577 assert!(Role::Developer.has_permission(Permission::Write));
578 assert!(!Role::ReadOnly.has_permission(Permission::Write));
579 assert!(Role::ReadOnly.has_permission(Permission::Read));
580 }
581
582 #[test]
583 fn test_auth_manager() {
584 let auth = AuthManager::new("test_secret");
585
586 let user = auth
588 .register_user(
589 "alice".to_string(),
590 "alice@example.com".to_string(),
591 "password123",
592 Role::Developer,
593 "tenant1".to_string(),
594 )
595 .unwrap();
596
597 let token = auth.authenticate("alice", "password123").unwrap();
599 assert!(!token.is_empty());
600
601 let claims = auth.validate_token(&token).unwrap();
603 assert_eq!(claims.tenant_id, "tenant1");
604 assert_eq!(claims.role, Role::Developer);
605 }
606
607 #[test]
608 fn test_api_key() {
609 let auth = AuthManager::new("test_secret");
610
611 let (api_key, key) = auth.create_api_key(
612 "test-key".to_string(),
613 "tenant1".to_string(),
614 Role::ServiceAccount,
615 None,
616 );
617
618 let claims = auth.validate_api_key(&key).unwrap();
620 assert_eq!(claims.tenant_id, "tenant1");
621 assert_eq!(claims.role, Role::ServiceAccount);
622
623 auth.revoke_api_key(&api_key.id).unwrap();
625
626 assert!(auth.validate_api_key(&key).is_err());
628 }
629
630 #[test]
631 fn test_claims_expiration() {
632 let claims = Claims::new(
633 "user1".to_string(),
634 "tenant1".to_string(),
635 Role::Developer,
636 Duration::seconds(-1), );
638
639 assert!(claims.is_expired());
640 }
641}