Skip to main content

allsource_core/infrastructure/security/
auth.rs

1use crate::error::{AllSourceError, Result};
2use argon2::{
3    Argon2,
4    password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, SaltString, rand_core::OsRng},
5};
6use chrono::{Duration, Utc};
7use dashmap::DashMap;
8use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, Validation, decode, encode};
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use uuid::Uuid;
12
13/// User role for RBAC
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "lowercase")]
16pub enum Role {
17    Admin,          // Full system access
18    Developer,      // Read/write events, manage schemas
19    ReadOnly,       // Read-only access to events
20    ServiceAccount, // Programmatic access for services
21}
22
23impl Role {
24    /// Check if role has specific permission
25    pub fn has_permission(&self, permission: Permission) -> bool {
26        match self {
27            Role::Admin => true, // Admin has all permissions
28            Role::Developer => matches!(
29                permission,
30                Permission::Read
31                    | Permission::Write
32                    | Permission::Metrics
33                    | Permission::ManageSchemas
34                    | Permission::ManagePipelines
35            ),
36            Role::ReadOnly => matches!(permission, Permission::Read | Permission::Metrics),
37            Role::ServiceAccount => {
38                matches!(permission, Permission::Read | Permission::Write)
39            }
40        }
41    }
42}
43
44/// Permission types
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum Permission {
47    Read,
48    Write,
49    Admin,
50    Metrics,
51    ManageSchemas,
52    ManagePipelines,
53    ManageTenants,
54}
55
56/// JWT Claims
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct Claims {
59    /// Subject (user ID or API key ID)
60    pub sub: String,
61    /// Tenant ID
62    pub tenant_id: String,
63    /// User role
64    pub role: Role,
65    /// Expiration time (UNIX timestamp)
66    pub exp: i64,
67    /// Issued at time (UNIX timestamp)
68    pub iat: i64,
69    /// Issuer
70    pub iss: String,
71}
72
73impl Claims {
74    /// Create new claims for a user
75    pub fn new(user_id: String, tenant_id: String, role: Role, expires_in: Duration) -> Self {
76        let now = Utc::now();
77        Self {
78            sub: user_id,
79            tenant_id,
80            role,
81            iat: now.timestamp(),
82            exp: (now + expires_in).timestamp(),
83            iss: "allsource".to_string(),
84        }
85    }
86
87    /// Check if claims are expired
88    pub fn is_expired(&self) -> bool {
89        Utc::now().timestamp() > self.exp
90    }
91
92    /// Check if user has permission
93    pub fn has_permission(&self, permission: Permission) -> bool {
94        self.role.has_permission(permission)
95    }
96}
97
98/// User account
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct User {
101    pub id: Uuid,
102    pub username: String,
103    pub email: String,
104    #[serde(skip_serializing)]
105    pub password_hash: String,
106    pub role: Role,
107    pub tenant_id: String,
108    pub created_at: chrono::DateTime<Utc>,
109    pub active: bool,
110}
111
112impl User {
113    /// Create a new user with hashed password
114    pub fn new(
115        username: String,
116        email: String,
117        password: &str,
118        role: Role,
119        tenant_id: String,
120    ) -> Result<Self> {
121        let password_hash = hash_password(password)?;
122
123        Ok(Self {
124            id: Uuid::new_v4(),
125            username,
126            email,
127            password_hash,
128            role,
129            tenant_id,
130            created_at: Utc::now(),
131            active: true,
132        })
133    }
134
135    /// Verify password
136    pub fn verify_password(&self, password: &str) -> Result<bool> {
137        verify_password(password, &self.password_hash)
138    }
139}
140
141/// API Key for programmatic access
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct ApiKey {
144    pub id: Uuid,
145    pub name: String,
146    pub tenant_id: String,
147    pub role: Role,
148    #[serde(skip_serializing)]
149    pub key_hash: String,
150    pub created_at: chrono::DateTime<Utc>,
151    pub expires_at: Option<chrono::DateTime<Utc>>,
152    pub active: bool,
153    pub last_used: Option<chrono::DateTime<Utc>>,
154}
155
156impl ApiKey {
157    /// Create new API key
158    pub fn new(
159        name: String,
160        tenant_id: String,
161        role: Role,
162        expires_at: Option<chrono::DateTime<Utc>>,
163    ) -> (Self, String) {
164        let key = generate_api_key();
165        let key_hash = hash_api_key(&key);
166
167        let api_key = Self {
168            id: Uuid::new_v4(),
169            name,
170            tenant_id,
171            role,
172            key_hash,
173            created_at: Utc::now(),
174            expires_at,
175            active: true,
176            last_used: None,
177        };
178
179        (api_key, key)
180    }
181
182    /// Verify API key
183    pub fn verify(&self, key: &str) -> bool {
184        if !self.active {
185            return false;
186        }
187
188        if let Some(expires_at) = self.expires_at
189            && Utc::now() > expires_at
190        {
191            return false;
192        }
193
194        hash_api_key(key) == self.key_hash
195    }
196
197    /// Check if expired
198    pub fn is_expired(&self) -> bool {
199        if let Some(expires_at) = self.expires_at {
200            Utc::now() > expires_at
201        } else {
202            false
203        }
204    }
205}
206
207/// Authentication manager
208pub struct AuthManager {
209    /// JWT encoding key
210    encoding_key: EncodingKey,
211    /// JWT decoding key
212    decoding_key: DecodingKey,
213    /// JWT validation rules
214    validation: Validation,
215    /// Users storage (in production, use database)
216    users: Arc<DashMap<Uuid, User>>,
217    /// API keys storage (in production, use database)
218    api_keys: Arc<DashMap<Uuid, ApiKey>>,
219    /// Username to user ID mapping
220    username_index: Arc<DashMap<String, Uuid>>,
221    /// Optional event-sourced auth repository for durable key storage.
222    /// When set, API key operations are persisted to the system WAL.
223    auth_repo: Option<Arc<crate::infrastructure::repositories::EventSourcedAuthRepository>>,
224}
225
226impl AuthManager {
227    /// Create new authentication manager
228    pub fn new(jwt_secret: &str) -> Self {
229        let encoding_key = EncodingKey::from_secret(jwt_secret.as_bytes());
230        let decoding_key = DecodingKey::from_secret(jwt_secret.as_bytes());
231
232        let mut validation = Validation::new(Algorithm::HS256);
233        validation.set_issuer(&["allsource"]);
234
235        Self {
236            encoding_key,
237            decoding_key,
238            validation,
239            users: Arc::new(DashMap::new()),
240            api_keys: Arc::new(DashMap::new()),
241            username_index: Arc::new(DashMap::new()),
242            auth_repo: None,
243        }
244    }
245
246    /// Attach an event-sourced auth repository for durable API key storage.
247    ///
248    /// Loads all previously persisted keys into the in-memory cache.
249    /// Subsequent `create_api_key` and `revoke_api_key` calls will also
250    /// write events to the system WAL.
251    pub fn with_auth_repository(
252        mut self,
253        repo: Arc<crate::infrastructure::repositories::EventSourcedAuthRepository>,
254    ) -> Self {
255        // Load persisted keys into the in-memory DashMap
256        for key in repo.all_keys() {
257            self.api_keys.insert(key.id, key);
258        }
259        let loaded = self.api_keys.len();
260        if loaded > 0 {
261            tracing::info!("Loaded {} API keys from durable storage", loaded);
262        }
263        self.auth_repo = Some(repo);
264        self
265    }
266
267    /// Register a new user
268    pub fn register_user(
269        &self,
270        username: String,
271        email: String,
272        password: &str,
273        role: Role,
274        tenant_id: String,
275    ) -> Result<User> {
276        // Check if username already exists
277        if self.username_index.contains_key(&username) {
278            return Err(AllSourceError::ValidationError(
279                "Username already exists".to_string(),
280            ));
281        }
282
283        let user = User::new(username.clone(), email, password, role, tenant_id)?;
284
285        self.users.insert(user.id, user.clone());
286        self.username_index.insert(username, user.id);
287
288        Ok(user)
289    }
290
291    /// Authenticate user with username and password
292    pub fn authenticate(&self, username: &str, password: &str) -> Result<String> {
293        let user_id = self
294            .username_index
295            .get(username)
296            .ok_or_else(|| AllSourceError::ValidationError("Invalid credentials".to_string()))?;
297
298        let user = self
299            .users
300            .get(&user_id)
301            .ok_or_else(|| AllSourceError::ValidationError("User not found".to_string()))?;
302
303        if !user.active {
304            return Err(AllSourceError::ValidationError(
305                "User account is inactive".to_string(),
306            ));
307        }
308
309        if !user.verify_password(password)? {
310            return Err(AllSourceError::ValidationError(
311                "Invalid credentials".to_string(),
312            ));
313        }
314
315        // Generate JWT token
316        let claims = Claims::new(
317            user.id.to_string(),
318            user.tenant_id.clone(),
319            user.role.clone(),
320            Duration::hours(24), // Token expires in 24 hours
321        );
322
323        let token = encode(&Header::default(), &claims, &self.encoding_key)
324            .map_err(|e| AllSourceError::ValidationError(format!("Failed to create token: {e}")))?;
325
326        Ok(token)
327    }
328
329    /// Validate JWT token
330    pub fn validate_token(&self, token: &str) -> Result<Claims> {
331        let token_data = decode::<Claims>(token, &self.decoding_key, &self.validation)
332            .map_err(|e| AllSourceError::ValidationError(format!("Invalid token: {e}")))?;
333
334        if token_data.claims.is_expired() {
335            return Err(AllSourceError::ValidationError("Token expired".to_string()));
336        }
337
338        Ok(token_data.claims)
339    }
340
341    /// Create API key
342    ///
343    /// When an auth repository is attached, the key is persisted to the
344    /// system WAL and survives restarts.
345    pub fn create_api_key(
346        &self,
347        name: String,
348        tenant_id: String,
349        role: Role,
350        expires_at: Option<chrono::DateTime<Utc>>,
351    ) -> (ApiKey, String) {
352        let (api_key, key) = ApiKey::new(name, tenant_id, role, expires_at);
353        self.api_keys.insert(api_key.id, api_key.clone());
354
355        // Persist to durable storage if available
356        if let Some(ref repo) = self.auth_repo
357            && let Err(e) = repo.persist_api_key(&api_key)
358        {
359            tracing::error!("Failed to persist API key to system store: {e}");
360        }
361
362        (api_key, key)
363    }
364
365    /// Validate API key
366    pub fn validate_api_key(&self, key: &str) -> Result<Claims> {
367        // First, find the matching key and extract the data we need
368        // We must complete iteration before calling get_mut to avoid deadlock
369        let found_key = self.api_keys.iter().find_map(|entry| {
370            let api_key = entry.value();
371            if api_key.verify(key) {
372                Some((api_key.id, api_key.tenant_id.clone(), api_key.role.clone()))
373            } else {
374                None
375            }
376        });
377
378        if let Some((key_id, tenant_id, role)) = found_key {
379            // Now safe to acquire write lock since iteration is complete
380            if let Some(mut key_mut) = self.api_keys.get_mut(&key_id) {
381                key_mut.last_used = Some(Utc::now());
382            }
383
384            let claims = Claims::new(key_id.to_string(), tenant_id, role, Duration::hours(24));
385
386            return Ok(claims);
387        }
388
389        Err(AllSourceError::ValidationError(
390            "Invalid API key".to_string(),
391        ))
392    }
393
394    /// Get user by ID
395    pub fn get_user(&self, user_id: &Uuid) -> Option<User> {
396        self.users.get(user_id).map(|u| u.clone())
397    }
398
399    /// List all users (admin only)
400    pub fn list_users(&self) -> Vec<User> {
401        self.users
402            .iter()
403            .map(|entry| entry.value().clone())
404            .collect()
405    }
406
407    /// Delete user
408    pub fn delete_user(&self, user_id: &Uuid) -> Result<()> {
409        if let Some((_, user)) = self.users.remove(user_id) {
410            self.username_index.remove(&user.username);
411            Ok(())
412        } else {
413            Err(AllSourceError::ValidationError(
414                "User not found".to_string(),
415            ))
416        }
417    }
418
419    /// Revoke API key
420    ///
421    /// When an auth repository is attached, the revocation is persisted
422    /// to the system WAL and survives restarts.
423    pub fn revoke_api_key(&self, key_id: &Uuid) -> Result<()> {
424        if let Some(mut api_key) = self.api_keys.get_mut(key_id) {
425            api_key.active = false;
426
427            // Persist revocation to durable storage
428            if let Some(ref repo) = self.auth_repo
429                && let Err(e) = repo.persist_key_revocation(key_id)
430            {
431                tracing::error!("Failed to persist key revocation to system store: {e}");
432            }
433
434            Ok(())
435        } else {
436            Err(AllSourceError::ValidationError(
437                "API key not found".to_string(),
438            ))
439        }
440    }
441
442    /// List API keys for a tenant
443    pub fn list_api_keys(&self, tenant_id: &str) -> Vec<ApiKey> {
444        self.api_keys
445            .iter()
446            .filter(|entry| entry.value().tenant_id == tenant_id)
447            .map(|entry| entry.value().clone())
448            .collect()
449    }
450
451    /// Register a bootstrap API key with a specific key value.
452    ///
453    /// This is used on startup to create a pre-configured API key from
454    /// the `ALLSOURCE_BOOTSTRAP_API_KEY` environment variable.
455    ///
456    /// If an auth repository is attached, the key is persisted to the
457    /// system WAL. On subsequent restarts, the key is loaded from storage
458    /// automatically — the bootstrap only creates the key if it doesn't
459    /// already exist (idempotent).
460    pub fn register_bootstrap_api_key(&self, key: &str, tenant_id: &str) -> ApiKey {
461        let key_hash = hash_api_key(key);
462
463        // Check if this exact key already exists (idempotent bootstrap)
464        let existing = self.api_keys.iter().find_map(|entry| {
465            if entry.value().key_hash == key_hash && entry.value().active {
466                Some(entry.value().clone())
467            } else {
468                None
469            }
470        });
471
472        if let Some(existing_key) = existing {
473            tracing::debug!("Bootstrap API key already exists (idempotent)");
474            return existing_key;
475        }
476
477        let api_key = ApiKey {
478            id: Uuid::new_v4(),
479            name: "bootstrap".to_string(),
480            tenant_id: tenant_id.to_string(),
481            role: Role::Admin,
482            key_hash,
483            created_at: Utc::now(),
484            expires_at: None,
485            active: true,
486            last_used: None,
487        };
488
489        self.api_keys.insert(api_key.id, api_key.clone());
490
491        // Persist to durable storage if available
492        if let Some(ref repo) = self.auth_repo
493            && let Err(e) = repo.persist_api_key(&api_key)
494        {
495            tracing::error!("Failed to persist bootstrap API key: {e}");
496        }
497
498        api_key
499    }
500}
501
502impl Default for AuthManager {
503    fn default() -> Self {
504        use base64::{Engine as _, engine::general_purpose};
505        let secret = general_purpose::STANDARD.encode(rand::random::<[u8; 32]>());
506        Self::new(&secret)
507    }
508}
509
510/// Hash password using Argon2
511fn hash_password(password: &str) -> Result<String> {
512    let salt = SaltString::generate(&mut OsRng);
513    let argon2 = Argon2::default();
514
515    let hash = argon2
516        .hash_password(password.as_bytes(), &salt)
517        .map_err(|e| AllSourceError::ValidationError(format!("Password hashing failed: {e}")))?;
518
519    Ok(hash.to_string())
520}
521
522/// Verify password against hash
523fn verify_password(password: &str, hash: &str) -> Result<bool> {
524    let parsed_hash = PasswordHash::new(hash)
525        .map_err(|e| AllSourceError::ValidationError(format!("Invalid password hash: {e}")))?;
526
527    let argon2 = Argon2::default();
528    Ok(argon2
529        .verify_password(password.as_bytes(), &parsed_hash)
530        .is_ok())
531}
532
533/// Generate API key
534fn generate_api_key() -> String {
535    use base64::{Engine as _, engine::general_purpose};
536    let random_bytes: [u8; 32] = rand::random();
537    format!(
538        "ask_{}",
539        general_purpose::URL_SAFE_NO_PAD.encode(random_bytes)
540    )
541}
542
543/// Hash API key for storage
544fn hash_api_key(key: &str) -> String {
545    use std::{
546        collections::hash_map::DefaultHasher,
547        hash::{Hash, Hasher},
548    };
549
550    let mut hasher = DefaultHasher::new();
551    key.hash(&mut hasher);
552    format!("{:x}", hasher.finish())
553}
554
555#[cfg(test)]
556mod tests {
557    use super::*;
558
559    #[test]
560    fn test_user_creation_and_verification() {
561        let user = User::new(
562            "testuser".to_string(),
563            "test@example.com".to_string(),
564            "password123",
565            Role::Developer,
566            "tenant1".to_string(),
567        )
568        .unwrap();
569
570        assert!(user.verify_password("password123").unwrap());
571        assert!(!user.verify_password("wrongpassword").unwrap());
572    }
573
574    #[test]
575    fn test_role_permissions() {
576        assert!(Role::Admin.has_permission(Permission::Admin));
577        assert!(Role::Developer.has_permission(Permission::Write));
578        assert!(!Role::ReadOnly.has_permission(Permission::Write));
579        assert!(Role::ReadOnly.has_permission(Permission::Read));
580    }
581
582    #[test]
583    fn test_auth_manager() {
584        let auth = AuthManager::new("test_secret");
585
586        // Register user
587        let user = auth
588            .register_user(
589                "alice".to_string(),
590                "alice@example.com".to_string(),
591                "password123",
592                Role::Developer,
593                "tenant1".to_string(),
594            )
595            .unwrap();
596
597        // Authenticate
598        let token = auth.authenticate("alice", "password123").unwrap();
599        assert!(!token.is_empty());
600
601        // Validate token
602        let claims = auth.validate_token(&token).unwrap();
603        assert_eq!(claims.tenant_id, "tenant1");
604        assert_eq!(claims.role, Role::Developer);
605    }
606
607    #[test]
608    fn test_api_key() {
609        let auth = AuthManager::new("test_secret");
610
611        let (api_key, key) = auth.create_api_key(
612            "test-key".to_string(),
613            "tenant1".to_string(),
614            Role::ServiceAccount,
615            None,
616        );
617
618        // Validate key
619        let claims = auth.validate_api_key(&key).unwrap();
620        assert_eq!(claims.tenant_id, "tenant1");
621        assert_eq!(claims.role, Role::ServiceAccount);
622
623        // Revoke key
624        auth.revoke_api_key(&api_key.id).unwrap();
625
626        // Should fail after revocation
627        assert!(auth.validate_api_key(&key).is_err());
628    }
629
630    #[test]
631    fn test_claims_expiration() {
632        let claims = Claims::new(
633            "user1".to_string(),
634            "tenant1".to_string(),
635            Role::Developer,
636            Duration::seconds(-1), // Expired 1 second ago
637        );
638
639        assert!(claims.is_expired());
640    }
641}