Skip to main content

allsource_core/infrastructure/security/
auth.rs

1use crate::error::{AllSourceError, Result};
2use argon2::{
3    password_hash::{rand_core::OsRng, PasswordHash, PasswordHasher, PasswordVerifier, SaltString},
4    Argon2,
5};
6use chrono::{Duration, Utc};
7use dashmap::DashMap;
8use jsonwebtoken::{decode, encode, Algorithm, DecodingKey, EncodingKey, Header, Validation};
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use uuid::Uuid;
12
13/// User role for RBAC
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "lowercase")]
16pub enum Role {
17    Admin,          // Full system access
18    Developer,      // Read/write events, manage schemas
19    ReadOnly,       // Read-only access to events
20    ServiceAccount, // Programmatic access for services
21}
22
23impl Role {
24    /// Check if role has specific permission
25    pub fn has_permission(&self, permission: Permission) -> bool {
26        match self {
27            Role::Admin => true, // Admin has all permissions
28            Role::Developer => matches!(
29                permission,
30                Permission::Read
31                    | Permission::Write
32                    | Permission::Metrics
33                    | Permission::ManageSchemas
34                    | Permission::ManagePipelines
35            ),
36            Role::ReadOnly => matches!(permission, Permission::Read | Permission::Metrics),
37            Role::ServiceAccount => {
38                matches!(permission, Permission::Read | Permission::Write)
39            }
40        }
41    }
42}
43
44/// Permission types
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum Permission {
47    Read,
48    Write,
49    Admin,
50    Metrics,
51    ManageSchemas,
52    ManagePipelines,
53    ManageTenants,
54}
55
56/// JWT Claims
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct Claims {
59    /// Subject (user ID or API key ID)
60    pub sub: String,
61    /// Tenant ID
62    pub tenant_id: String,
63    /// User role
64    pub role: Role,
65    /// Expiration time (UNIX timestamp)
66    pub exp: i64,
67    /// Issued at time (UNIX timestamp)
68    pub iat: i64,
69    /// Issuer
70    pub iss: String,
71}
72
73impl Claims {
74    /// Create new claims for a user
75    pub fn new(user_id: String, tenant_id: String, role: Role, expires_in: Duration) -> Self {
76        let now = Utc::now();
77        Self {
78            sub: user_id,
79            tenant_id,
80            role,
81            iat: now.timestamp(),
82            exp: (now + expires_in).timestamp(),
83            iss: "allsource".to_string(),
84        }
85    }
86
87    /// Check if claims are expired
88    pub fn is_expired(&self) -> bool {
89        Utc::now().timestamp() > self.exp
90    }
91
92    /// Check if user has permission
93    pub fn has_permission(&self, permission: Permission) -> bool {
94        self.role.has_permission(permission)
95    }
96}
97
98/// User account
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct User {
101    pub id: Uuid,
102    pub username: String,
103    pub email: String,
104    #[serde(skip_serializing)]
105    pub password_hash: String,
106    pub role: Role,
107    pub tenant_id: String,
108    pub created_at: chrono::DateTime<Utc>,
109    pub active: bool,
110}
111
112impl User {
113    /// Create a new user with hashed password
114    pub fn new(
115        username: String,
116        email: String,
117        password: &str,
118        role: Role,
119        tenant_id: String,
120    ) -> Result<Self> {
121        let password_hash = hash_password(password)?;
122
123        Ok(Self {
124            id: Uuid::new_v4(),
125            username,
126            email,
127            password_hash,
128            role,
129            tenant_id,
130            created_at: Utc::now(),
131            active: true,
132        })
133    }
134
135    /// Verify password
136    pub fn verify_password(&self, password: &str) -> Result<bool> {
137        verify_password(password, &self.password_hash)
138    }
139}
140
141/// API Key for programmatic access
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct ApiKey {
144    pub id: Uuid,
145    pub name: String,
146    pub tenant_id: String,
147    pub role: Role,
148    #[serde(skip_serializing)]
149    pub key_hash: String,
150    pub created_at: chrono::DateTime<Utc>,
151    pub expires_at: Option<chrono::DateTime<Utc>>,
152    pub active: bool,
153    pub last_used: Option<chrono::DateTime<Utc>>,
154}
155
156impl ApiKey {
157    /// Create new API key
158    pub fn new(
159        name: String,
160        tenant_id: String,
161        role: Role,
162        expires_at: Option<chrono::DateTime<Utc>>,
163    ) -> (Self, String) {
164        let key = generate_api_key();
165        let key_hash = hash_api_key(&key);
166
167        let api_key = Self {
168            id: Uuid::new_v4(),
169            name,
170            tenant_id,
171            role,
172            key_hash,
173            created_at: Utc::now(),
174            expires_at,
175            active: true,
176            last_used: None,
177        };
178
179        (api_key, key)
180    }
181
182    /// Verify API key
183    pub fn verify(&self, key: &str) -> bool {
184        if !self.active {
185            return false;
186        }
187
188        if let Some(expires_at) = self.expires_at {
189            if Utc::now() > expires_at {
190                return false;
191            }
192        }
193
194        hash_api_key(key) == self.key_hash
195    }
196
197    /// Check if expired
198    pub fn is_expired(&self) -> bool {
199        if let Some(expires_at) = self.expires_at {
200            Utc::now() > expires_at
201        } else {
202            false
203        }
204    }
205}
206
207/// Authentication manager
208pub struct AuthManager {
209    /// JWT encoding key
210    encoding_key: EncodingKey,
211    /// JWT decoding key
212    decoding_key: DecodingKey,
213    /// JWT validation rules
214    validation: Validation,
215    /// Users storage (in production, use database)
216    users: Arc<DashMap<Uuid, User>>,
217    /// API keys storage (in production, use database)
218    api_keys: Arc<DashMap<Uuid, ApiKey>>,
219    /// Username to user ID mapping
220    username_index: Arc<DashMap<String, Uuid>>,
221}
222
223impl AuthManager {
224    /// Create new authentication manager
225    pub fn new(jwt_secret: &str) -> Self {
226        let encoding_key = EncodingKey::from_secret(jwt_secret.as_bytes());
227        let decoding_key = DecodingKey::from_secret(jwt_secret.as_bytes());
228
229        let mut validation = Validation::new(Algorithm::HS256);
230        validation.set_issuer(&["allsource"]);
231
232        Self {
233            encoding_key,
234            decoding_key,
235            validation,
236            users: Arc::new(DashMap::new()),
237            api_keys: Arc::new(DashMap::new()),
238            username_index: Arc::new(DashMap::new()),
239        }
240    }
241
242    /// Register a new user
243    pub fn register_user(
244        &self,
245        username: String,
246        email: String,
247        password: &str,
248        role: Role,
249        tenant_id: String,
250    ) -> Result<User> {
251        // Check if username already exists
252        if self.username_index.contains_key(&username) {
253            return Err(AllSourceError::ValidationError(
254                "Username already exists".to_string(),
255            ));
256        }
257
258        let user = User::new(username.clone(), email, password, role, tenant_id)?;
259
260        self.users.insert(user.id, user.clone());
261        self.username_index.insert(username, user.id);
262
263        Ok(user)
264    }
265
266    /// Authenticate user with username and password
267    pub fn authenticate(&self, username: &str, password: &str) -> Result<String> {
268        let user_id = self
269            .username_index
270            .get(username)
271            .ok_or_else(|| AllSourceError::ValidationError("Invalid credentials".to_string()))?;
272
273        let user = self
274            .users
275            .get(&user_id)
276            .ok_or_else(|| AllSourceError::ValidationError("User not found".to_string()))?;
277
278        if !user.active {
279            return Err(AllSourceError::ValidationError(
280                "User account is inactive".to_string(),
281            ));
282        }
283
284        if !user.verify_password(password)? {
285            return Err(AllSourceError::ValidationError(
286                "Invalid credentials".to_string(),
287            ));
288        }
289
290        // Generate JWT token
291        let claims = Claims::new(
292            user.id.to_string(),
293            user.tenant_id.clone(),
294            user.role.clone(),
295            Duration::hours(24), // Token expires in 24 hours
296        );
297
298        let token = encode(&Header::default(), &claims, &self.encoding_key)
299            .map_err(|e| AllSourceError::ValidationError(format!("Failed to create token: {e}")))?;
300
301        Ok(token)
302    }
303
304    /// Validate JWT token
305    pub fn validate_token(&self, token: &str) -> Result<Claims> {
306        let token_data = decode::<Claims>(token, &self.decoding_key, &self.validation)
307            .map_err(|e| AllSourceError::ValidationError(format!("Invalid token: {e}")))?;
308
309        if token_data.claims.is_expired() {
310            return Err(AllSourceError::ValidationError("Token expired".to_string()));
311        }
312
313        Ok(token_data.claims)
314    }
315
316    /// Create API key
317    pub fn create_api_key(
318        &self,
319        name: String,
320        tenant_id: String,
321        role: Role,
322        expires_at: Option<chrono::DateTime<Utc>>,
323    ) -> (ApiKey, String) {
324        let (api_key, key) = ApiKey::new(name, tenant_id, role, expires_at);
325        self.api_keys.insert(api_key.id, api_key.clone());
326        (api_key, key)
327    }
328
329    /// Validate API key
330    pub fn validate_api_key(&self, key: &str) -> Result<Claims> {
331        // First, find the matching key and extract the data we need
332        // We must complete iteration before calling get_mut to avoid deadlock
333        let found_key = self.api_keys.iter().find_map(|entry| {
334            let api_key = entry.value();
335            if api_key.verify(key) {
336                Some((api_key.id, api_key.tenant_id.clone(), api_key.role.clone()))
337            } else {
338                None
339            }
340        });
341
342        if let Some((key_id, tenant_id, role)) = found_key {
343            // Now safe to acquire write lock since iteration is complete
344            if let Some(mut key_mut) = self.api_keys.get_mut(&key_id) {
345                key_mut.last_used = Some(Utc::now());
346            }
347
348            let claims = Claims::new(key_id.to_string(), tenant_id, role, Duration::hours(24));
349
350            return Ok(claims);
351        }
352
353        Err(AllSourceError::ValidationError(
354            "Invalid API key".to_string(),
355        ))
356    }
357
358    /// Get user by ID
359    pub fn get_user(&self, user_id: &Uuid) -> Option<User> {
360        self.users.get(user_id).map(|u| u.clone())
361    }
362
363    /// List all users (admin only)
364    pub fn list_users(&self) -> Vec<User> {
365        self.users
366            .iter()
367            .map(|entry| entry.value().clone())
368            .collect()
369    }
370
371    /// Delete user
372    pub fn delete_user(&self, user_id: &Uuid) -> Result<()> {
373        if let Some((_, user)) = self.users.remove(user_id) {
374            self.username_index.remove(&user.username);
375            Ok(())
376        } else {
377            Err(AllSourceError::ValidationError(
378                "User not found".to_string(),
379            ))
380        }
381    }
382
383    /// Revoke API key
384    pub fn revoke_api_key(&self, key_id: &Uuid) -> Result<()> {
385        if let Some(mut api_key) = self.api_keys.get_mut(key_id) {
386            api_key.active = false;
387            Ok(())
388        } else {
389            Err(AllSourceError::ValidationError(
390                "API key not found".to_string(),
391            ))
392        }
393    }
394
395    /// List API keys for a tenant
396    pub fn list_api_keys(&self, tenant_id: &str) -> Vec<ApiKey> {
397        self.api_keys
398            .iter()
399            .filter(|entry| entry.value().tenant_id == tenant_id)
400            .map(|entry| entry.value().clone())
401            .collect()
402    }
403}
404
405impl Default for AuthManager {
406    fn default() -> Self {
407        // Generate a random secret for development
408        // In production, this should come from configuration
409        use base64::{engine::general_purpose, Engine as _};
410        let secret = general_purpose::STANDARD.encode(rand::random::<[u8; 32]>());
411        Self::new(&secret)
412    }
413}
414
415/// Hash password using Argon2
416fn hash_password(password: &str) -> Result<String> {
417    let salt = SaltString::generate(&mut OsRng);
418    let argon2 = Argon2::default();
419
420    let hash = argon2
421        .hash_password(password.as_bytes(), &salt)
422        .map_err(|e| AllSourceError::ValidationError(format!("Password hashing failed: {e}")))?;
423
424    Ok(hash.to_string())
425}
426
427/// Verify password against hash
428fn verify_password(password: &str, hash: &str) -> Result<bool> {
429    let parsed_hash = PasswordHash::new(hash)
430        .map_err(|e| AllSourceError::ValidationError(format!("Invalid password hash: {e}")))?;
431
432    let argon2 = Argon2::default();
433    Ok(argon2
434        .verify_password(password.as_bytes(), &parsed_hash)
435        .is_ok())
436}
437
438/// Generate API key
439fn generate_api_key() -> String {
440    use base64::{engine::general_purpose, Engine as _};
441    let random_bytes: [u8; 32] = rand::random();
442    format!(
443        "ask_{}",
444        general_purpose::URL_SAFE_NO_PAD.encode(random_bytes)
445    )
446}
447
448/// Hash API key for storage
449fn hash_api_key(key: &str) -> String {
450    use std::collections::hash_map::DefaultHasher;
451    use std::hash::{Hash, Hasher};
452
453    let mut hasher = DefaultHasher::new();
454    key.hash(&mut hasher);
455    format!("{:x}", hasher.finish())
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461
462    #[test]
463    fn test_user_creation_and_verification() {
464        let user = User::new(
465            "testuser".to_string(),
466            "test@example.com".to_string(),
467            "password123",
468            Role::Developer,
469            "tenant1".to_string(),
470        )
471        .unwrap();
472
473        assert!(user.verify_password("password123").unwrap());
474        assert!(!user.verify_password("wrongpassword").unwrap());
475    }
476
477    #[test]
478    fn test_role_permissions() {
479        assert!(Role::Admin.has_permission(Permission::Admin));
480        assert!(Role::Developer.has_permission(Permission::Write));
481        assert!(!Role::ReadOnly.has_permission(Permission::Write));
482        assert!(Role::ReadOnly.has_permission(Permission::Read));
483    }
484
485    #[test]
486    fn test_auth_manager() {
487        let auth = AuthManager::new("test_secret");
488
489        // Register user
490        let user = auth
491            .register_user(
492                "alice".to_string(),
493                "alice@example.com".to_string(),
494                "password123",
495                Role::Developer,
496                "tenant1".to_string(),
497            )
498            .unwrap();
499
500        // Authenticate
501        let token = auth.authenticate("alice", "password123").unwrap();
502        assert!(!token.is_empty());
503
504        // Validate token
505        let claims = auth.validate_token(&token).unwrap();
506        assert_eq!(claims.tenant_id, "tenant1");
507        assert_eq!(claims.role, Role::Developer);
508    }
509
510    #[test]
511    fn test_api_key() {
512        let auth = AuthManager::new("test_secret");
513
514        let (api_key, key) = auth.create_api_key(
515            "test-key".to_string(),
516            "tenant1".to_string(),
517            Role::ServiceAccount,
518            None,
519        );
520
521        // Validate key
522        let claims = auth.validate_api_key(&key).unwrap();
523        assert_eq!(claims.tenant_id, "tenant1");
524        assert_eq!(claims.role, Role::ServiceAccount);
525
526        // Revoke key
527        auth.revoke_api_key(&api_key.id).unwrap();
528
529        // Should fail after revocation
530        assert!(auth.validate_api_key(&key).is_err());
531    }
532
533    #[test]
534    fn test_claims_expiration() {
535        let claims = Claims::new(
536            "user1".to_string(),
537            "tenant1".to_string(),
538            Role::Developer,
539            Duration::seconds(-1), // Expired 1 second ago
540        );
541
542        assert!(claims.is_expired());
543    }
544}