1use crate::error::{AllSourceError, Result};
2use argon2::{
3 Argon2,
4 password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, SaltString, rand_core::OsRng},
5};
6use chrono::{Duration, Utc};
7use dashmap::DashMap;
8use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, Validation, decode, encode};
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "lowercase")]
16pub enum Role {
17 Admin, Developer, ReadOnly, ServiceAccount, }
22
23impl Role {
24 pub fn has_permission(&self, permission: Permission) -> bool {
26 match self {
27 Role::Admin => true, Role::Developer => matches!(
29 permission,
30 Permission::Read
31 | Permission::Write
32 | Permission::Metrics
33 | Permission::ManageSchemas
34 | Permission::ManagePipelines
35 ),
36 Role::ReadOnly => matches!(permission, Permission::Read | Permission::Metrics),
37 Role::ServiceAccount => {
38 matches!(permission, Permission::Read | Permission::Write)
39 }
40 }
41 }
42
43 pub fn mcp_readonly_preset() -> Self {
50 Role::ReadOnly
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum Permission {
57 Read,
58 Write,
59 Admin,
60 Metrics,
61 ManageSchemas,
62 ManagePipelines,
63 ManageTenants,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct Claims {
69 pub sub: String,
71 pub tenant_id: String,
73 pub role: Role,
75 pub exp: i64,
77 pub iat: i64,
79 pub iss: String,
81}
82
83impl Claims {
84 pub fn new(user_id: String, tenant_id: String, role: Role, expires_in: Duration) -> Self {
86 let now = Utc::now();
87 Self {
88 sub: user_id,
89 tenant_id,
90 role,
91 iat: now.timestamp(),
92 exp: (now + expires_in).timestamp(),
93 iss: "allsource".to_string(),
94 }
95 }
96
97 pub fn is_expired(&self) -> bool {
99 Utc::now().timestamp() > self.exp
100 }
101
102 pub fn has_permission(&self, permission: Permission) -> bool {
104 self.role.has_permission(permission)
105 }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct User {
111 pub id: Uuid,
112 pub username: String,
113 pub email: String,
114 #[serde(skip_serializing)]
115 pub password_hash: String,
116 pub role: Role,
117 pub tenant_id: String,
118 pub created_at: chrono::DateTime<Utc>,
119 pub active: bool,
120}
121
122impl User {
123 pub fn new(
125 username: String,
126 email: String,
127 password: &str,
128 role: Role,
129 tenant_id: String,
130 ) -> Result<Self> {
131 let password_hash = hash_password(password)?;
132
133 Ok(Self {
134 id: Uuid::new_v4(),
135 username,
136 email,
137 password_hash,
138 role,
139 tenant_id,
140 created_at: Utc::now(),
141 active: true,
142 })
143 }
144
145 pub fn verify_password(&self, password: &str) -> Result<bool> {
147 verify_password(password, &self.password_hash)
148 }
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct ApiKey {
154 pub id: Uuid,
155 pub name: String,
156 pub tenant_id: String,
157 pub role: Role,
158 #[serde(skip_serializing)]
159 pub key_hash: String,
160 pub created_at: chrono::DateTime<Utc>,
161 pub expires_at: Option<chrono::DateTime<Utc>>,
162 pub active: bool,
163 pub last_used: Option<chrono::DateTime<Utc>>,
164}
165
166impl ApiKey {
167 pub fn new(
169 name: String,
170 tenant_id: String,
171 role: Role,
172 expires_at: Option<chrono::DateTime<Utc>>,
173 ) -> (Self, String) {
174 let key = generate_api_key();
175 let key_hash = hash_api_key(&key);
176
177 let api_key = Self {
178 id: Uuid::new_v4(),
179 name,
180 tenant_id,
181 role,
182 key_hash,
183 created_at: Utc::now(),
184 expires_at,
185 active: true,
186 last_used: None,
187 };
188
189 (api_key, key)
190 }
191
192 pub fn verify(&self, key: &str) -> bool {
194 if !self.active {
195 return false;
196 }
197
198 if let Some(expires_at) = self.expires_at
199 && Utc::now() > expires_at
200 {
201 return false;
202 }
203
204 hash_api_key(key) == self.key_hash
205 }
206
207 pub fn is_expired(&self) -> bool {
209 if let Some(expires_at) = self.expires_at {
210 Utc::now() > expires_at
211 } else {
212 false
213 }
214 }
215}
216
217pub struct AuthManager {
219 encoding_key: EncodingKey,
221 decoding_key: DecodingKey,
223 validation: Validation,
225 users: Arc<DashMap<Uuid, User>>,
227 api_keys: Arc<DashMap<Uuid, ApiKey>>,
229 username_index: Arc<DashMap<String, Uuid>>,
231 auth_repo: Option<Arc<crate::infrastructure::repositories::EventSourcedAuthRepository>>,
234}
235
236impl AuthManager {
237 pub fn new(jwt_secret: &str) -> Self {
239 let encoding_key = EncodingKey::from_secret(jwt_secret.as_bytes());
240 let decoding_key = DecodingKey::from_secret(jwt_secret.as_bytes());
241
242 let mut validation = Validation::new(Algorithm::HS256);
243 validation.set_issuer(&["allsource"]);
244
245 Self {
246 encoding_key,
247 decoding_key,
248 validation,
249 users: Arc::new(DashMap::new()),
250 api_keys: Arc::new(DashMap::new()),
251 username_index: Arc::new(DashMap::new()),
252 auth_repo: None,
253 }
254 }
255
256 pub fn with_auth_repository(
262 mut self,
263 repo: Arc<crate::infrastructure::repositories::EventSourcedAuthRepository>,
264 ) -> Self {
265 for key in repo.all_keys() {
267 self.api_keys.insert(key.id, key);
268 }
269 let loaded = self.api_keys.len();
270 if loaded > 0 {
271 tracing::info!("Loaded {} API keys from durable storage", loaded);
272 }
273 self.auth_repo = Some(repo);
274 self
275 }
276
277 pub fn register_user(
279 &self,
280 username: String,
281 email: String,
282 password: &str,
283 role: Role,
284 tenant_id: String,
285 ) -> Result<User> {
286 if self.username_index.contains_key(&username) {
288 return Err(AllSourceError::ValidationError(
289 "Username already exists".to_string(),
290 ));
291 }
292
293 let user = User::new(username.clone(), email, password, role, tenant_id)?;
294
295 self.users.insert(user.id, user.clone());
296 self.username_index.insert(username, user.id);
297
298 Ok(user)
299 }
300
301 pub fn authenticate(&self, username: &str, password: &str) -> Result<String> {
303 let user_id = self
304 .username_index
305 .get(username)
306 .ok_or_else(|| AllSourceError::ValidationError("Invalid credentials".to_string()))?;
307
308 let user = self
309 .users
310 .get(&user_id)
311 .ok_or_else(|| AllSourceError::ValidationError("User not found".to_string()))?;
312
313 if !user.active {
314 return Err(AllSourceError::ValidationError(
315 "User account is inactive".to_string(),
316 ));
317 }
318
319 if !user.verify_password(password)? {
320 return Err(AllSourceError::ValidationError(
321 "Invalid credentials".to_string(),
322 ));
323 }
324
325 let claims = Claims::new(
327 user.id.to_string(),
328 user.tenant_id.clone(),
329 user.role.clone(),
330 Duration::hours(24), );
332
333 let token = encode(&Header::default(), &claims, &self.encoding_key)
334 .map_err(|e| AllSourceError::ValidationError(format!("Failed to create token: {e}")))?;
335
336 Ok(token)
337 }
338
339 pub fn validate_token(&self, token: &str) -> Result<Claims> {
341 let token_data = decode::<Claims>(token, &self.decoding_key, &self.validation)
342 .map_err(|e| AllSourceError::ValidationError(format!("Invalid token: {e}")))?;
343
344 if token_data.claims.is_expired() {
345 return Err(AllSourceError::ValidationError("Token expired".to_string()));
346 }
347
348 Ok(token_data.claims)
349 }
350
351 pub fn create_api_key(
356 &self,
357 name: String,
358 tenant_id: String,
359 role: Role,
360 expires_at: Option<chrono::DateTime<Utc>>,
361 ) -> (ApiKey, String) {
362 let (api_key, key) = ApiKey::new(name, tenant_id, role, expires_at);
363 self.api_keys.insert(api_key.id, api_key.clone());
364
365 if let Some(ref repo) = self.auth_repo
367 && let Err(e) = repo.persist_api_key(&api_key)
368 {
369 tracing::error!("Failed to persist API key to system store: {e}");
370 }
371
372 (api_key, key)
373 }
374
375 pub fn validate_api_key(&self, key: &str) -> Result<Claims> {
377 let found_key = self.api_keys.iter().find_map(|entry| {
380 let api_key = entry.value();
381 if api_key.verify(key) {
382 Some((api_key.id, api_key.tenant_id.clone(), api_key.role.clone()))
383 } else {
384 None
385 }
386 });
387
388 if let Some((key_id, tenant_id, role)) = found_key {
389 if let Some(mut key_mut) = self.api_keys.get_mut(&key_id) {
391 key_mut.last_used = Some(Utc::now());
392 }
393
394 let claims = Claims::new(key_id.to_string(), tenant_id, role, Duration::hours(24));
395
396 return Ok(claims);
397 }
398
399 Err(AllSourceError::ValidationError(
400 "Invalid API key".to_string(),
401 ))
402 }
403
404 pub fn get_user(&self, user_id: &Uuid) -> Option<User> {
406 self.users.get(user_id).map(|u| u.clone())
407 }
408
409 pub fn list_users(&self) -> Vec<User> {
411 self.users
412 .iter()
413 .map(|entry| entry.value().clone())
414 .collect()
415 }
416
417 pub fn delete_user(&self, user_id: &Uuid) -> Result<()> {
419 if let Some((_, user)) = self.users.remove(user_id) {
420 self.username_index.remove(&user.username);
421 Ok(())
422 } else {
423 Err(AllSourceError::ValidationError(
424 "User not found".to_string(),
425 ))
426 }
427 }
428
429 pub fn revoke_api_key(&self, key_id: &Uuid) -> Result<()> {
434 if let Some(mut api_key) = self.api_keys.get_mut(key_id) {
435 api_key.active = false;
436
437 if let Some(ref repo) = self.auth_repo
439 && let Err(e) = repo.persist_key_revocation(key_id)
440 {
441 tracing::error!("Failed to persist key revocation to system store: {e}");
442 }
443
444 Ok(())
445 } else {
446 Err(AllSourceError::ValidationError(
447 "API key not found".to_string(),
448 ))
449 }
450 }
451
452 pub fn list_api_keys(&self, tenant_id: &str) -> Vec<ApiKey> {
454 self.api_keys
455 .iter()
456 .filter(|entry| entry.value().tenant_id == tenant_id)
457 .map(|entry| entry.value().clone())
458 .collect()
459 }
460
461 pub fn register_bootstrap_api_key(&self, key: &str, tenant_id: &str) -> ApiKey {
471 let key_hash = hash_api_key(key);
472
473 let existing = self.api_keys.iter().find_map(|entry| {
475 if entry.value().key_hash == key_hash && entry.value().active {
476 Some(entry.value().clone())
477 } else {
478 None
479 }
480 });
481
482 if let Some(existing_key) = existing {
483 tracing::debug!("Bootstrap API key already exists (idempotent)");
484 return existing_key;
485 }
486
487 let api_key = ApiKey {
488 id: Uuid::new_v4(),
489 name: "bootstrap".to_string(),
490 tenant_id: tenant_id.to_string(),
491 role: Role::Admin,
492 key_hash,
493 created_at: Utc::now(),
494 expires_at: None,
495 active: true,
496 last_used: None,
497 };
498
499 self.api_keys.insert(api_key.id, api_key.clone());
500
501 if let Some(ref repo) = self.auth_repo
503 && let Err(e) = repo.persist_api_key(&api_key)
504 {
505 tracing::error!("Failed to persist bootstrap API key: {e}");
506 }
507
508 api_key
509 }
510}
511
512impl Default for AuthManager {
513 fn default() -> Self {
514 use base64::{Engine as _, engine::general_purpose};
515 let secret = general_purpose::STANDARD.encode(rand::random::<[u8; 32]>());
516 Self::new(&secret)
517 }
518}
519
520fn hash_password(password: &str) -> Result<String> {
522 let salt = SaltString::generate(&mut OsRng);
523 let argon2 = Argon2::default();
524
525 let hash = argon2
526 .hash_password(password.as_bytes(), &salt)
527 .map_err(|e| AllSourceError::ValidationError(format!("Password hashing failed: {e}")))?;
528
529 Ok(hash.to_string())
530}
531
532fn verify_password(password: &str, hash: &str) -> Result<bool> {
534 let parsed_hash = PasswordHash::new(hash)
535 .map_err(|e| AllSourceError::ValidationError(format!("Invalid password hash: {e}")))?;
536
537 let argon2 = Argon2::default();
538 Ok(argon2
539 .verify_password(password.as_bytes(), &parsed_hash)
540 .is_ok())
541}
542
543fn generate_api_key() -> String {
545 use base64::{Engine as _, engine::general_purpose};
546 let random_bytes: [u8; 32] = rand::random();
547 format!(
548 "ask_{}",
549 general_purpose::URL_SAFE_NO_PAD.encode(random_bytes)
550 )
551}
552
553fn hash_api_key(key: &str) -> String {
555 use std::{
556 collections::hash_map::DefaultHasher,
557 hash::{Hash, Hasher},
558 };
559
560 let mut hasher = DefaultHasher::new();
561 key.hash(&mut hasher);
562 format!("{:x}", hasher.finish())
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568
569 #[test]
570 fn test_user_creation_and_verification() {
571 let user = User::new(
572 "testuser".to_string(),
573 "test@example.com".to_string(),
574 "password123",
575 Role::Developer,
576 "tenant1".to_string(),
577 )
578 .unwrap();
579
580 assert!(user.verify_password("password123").unwrap());
581 assert!(!user.verify_password("wrongpassword").unwrap());
582 }
583
584 #[test]
585 fn test_role_permissions() {
586 assert!(Role::Admin.has_permission(Permission::Admin));
587 assert!(Role::Developer.has_permission(Permission::Write));
588 assert!(!Role::ReadOnly.has_permission(Permission::Write));
589 assert!(Role::ReadOnly.has_permission(Permission::Read));
590 }
591
592 #[test]
593 fn test_auth_manager() {
594 let auth = AuthManager::new("test_secret");
595
596 let user = auth
598 .register_user(
599 "alice".to_string(),
600 "alice@example.com".to_string(),
601 "password123",
602 Role::Developer,
603 "tenant1".to_string(),
604 )
605 .unwrap();
606
607 let token = auth.authenticate("alice", "password123").unwrap();
609 assert!(!token.is_empty());
610
611 let claims = auth.validate_token(&token).unwrap();
613 assert_eq!(claims.tenant_id, "tenant1");
614 assert_eq!(claims.role, Role::Developer);
615 }
616
617 #[test]
618 fn test_api_key() {
619 let auth = AuthManager::new("test_secret");
620
621 let (api_key, key) = auth.create_api_key(
622 "test-key".to_string(),
623 "tenant1".to_string(),
624 Role::ServiceAccount,
625 None,
626 );
627
628 let claims = auth.validate_api_key(&key).unwrap();
630 assert_eq!(claims.tenant_id, "tenant1");
631 assert_eq!(claims.role, Role::ServiceAccount);
632
633 auth.revoke_api_key(&api_key.id).unwrap();
635
636 assert!(auth.validate_api_key(&key).is_err());
638 }
639
640 #[test]
641 fn test_claims_expiration() {
642 let claims = Claims::new(
643 "user1".to_string(),
644 "tenant1".to_string(),
645 Role::Developer,
646 Duration::seconds(-1), );
648
649 assert!(claims.is_expired());
650 }
651}