Skip to main content

allsource_core/infrastructure/security/
auth.rs

1use crate::error::{AllSourceError, Result};
2use argon2::{
3    Argon2,
4    password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, SaltString, rand_core::OsRng},
5};
6use chrono::{Duration, Utc};
7use dashmap::DashMap;
8use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, Validation, decode, encode};
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use uuid::Uuid;
12
13/// User role for RBAC
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "lowercase")]
16pub enum Role {
17    Admin,          // Full system access
18    Developer,      // Read/write events, manage schemas
19    ReadOnly,       // Read-only access to events
20    ServiceAccount, // Programmatic access for services
21}
22
23impl Role {
24    /// Check if role has specific permission
25    pub fn has_permission(&self, permission: Permission) -> bool {
26        match self {
27            Role::Admin => true, // Admin has all permissions
28            Role::Developer => matches!(
29                permission,
30                Permission::Read
31                    | Permission::Write
32                    | Permission::Metrics
33                    | Permission::ManageSchemas
34                    | Permission::ManagePipelines
35            ),
36            Role::ReadOnly => matches!(permission, Permission::Read | Permission::Metrics),
37            Role::ServiceAccount => {
38                matches!(permission, Permission::Read | Permission::Write)
39            }
40        }
41    }
42
43    /// Canonical role preset for MCP tokens issued to Pro-tier subscribers.
44    /// Pro unlocks MCP server access at a read-only scope — consumers can
45    /// query events, sample streams, and read metrics, but cannot mutate
46    /// state. Call this from the API-key provisioning path when a Pro
47    /// tenant requests an MCP key so the mapping stays centralized here
48    /// rather than hardcoded at call sites.
49    pub fn mcp_readonly_preset() -> Self {
50        Role::ReadOnly
51    }
52}
53
54/// Permission types
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum Permission {
57    Read,
58    Write,
59    Admin,
60    Metrics,
61    ManageSchemas,
62    ManagePipelines,
63    ManageTenants,
64}
65
66/// JWT Claims
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct Claims {
69    /// Subject (user ID or API key ID)
70    pub sub: String,
71    /// Tenant ID
72    pub tenant_id: String,
73    /// User role
74    pub role: Role,
75    /// Expiration time (UNIX timestamp)
76    pub exp: i64,
77    /// Issued at time (UNIX timestamp)
78    pub iat: i64,
79    /// Issuer
80    pub iss: String,
81}
82
83impl Claims {
84    /// Create new claims for a user
85    pub fn new(user_id: String, tenant_id: String, role: Role, expires_in: Duration) -> Self {
86        let now = Utc::now();
87        Self {
88            sub: user_id,
89            tenant_id,
90            role,
91            iat: now.timestamp(),
92            exp: (now + expires_in).timestamp(),
93            iss: "allsource".to_string(),
94        }
95    }
96
97    /// Check if claims are expired
98    pub fn is_expired(&self) -> bool {
99        Utc::now().timestamp() > self.exp
100    }
101
102    /// Check if user has permission
103    pub fn has_permission(&self, permission: Permission) -> bool {
104        self.role.has_permission(permission)
105    }
106}
107
108/// User account
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct User {
111    pub id: Uuid,
112    pub username: String,
113    pub email: String,
114    #[serde(skip_serializing)]
115    pub password_hash: String,
116    pub role: Role,
117    pub tenant_id: String,
118    pub created_at: chrono::DateTime<Utc>,
119    pub active: bool,
120}
121
122impl User {
123    /// Create a new user with hashed password
124    pub fn new(
125        username: String,
126        email: String,
127        password: &str,
128        role: Role,
129        tenant_id: String,
130    ) -> Result<Self> {
131        let password_hash = hash_password(password)?;
132
133        Ok(Self {
134            id: Uuid::new_v4(),
135            username,
136            email,
137            password_hash,
138            role,
139            tenant_id,
140            created_at: Utc::now(),
141            active: true,
142        })
143    }
144
145    /// Verify password
146    pub fn verify_password(&self, password: &str) -> Result<bool> {
147        verify_password(password, &self.password_hash)
148    }
149}
150
151/// API Key for programmatic access
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct ApiKey {
154    pub id: Uuid,
155    pub name: String,
156    pub tenant_id: String,
157    pub role: Role,
158    #[serde(skip_serializing)]
159    pub key_hash: String,
160    pub created_at: chrono::DateTime<Utc>,
161    pub expires_at: Option<chrono::DateTime<Utc>>,
162    pub active: bool,
163    pub last_used: Option<chrono::DateTime<Utc>>,
164}
165
166impl ApiKey {
167    /// Create new API key
168    pub fn new(
169        name: String,
170        tenant_id: String,
171        role: Role,
172        expires_at: Option<chrono::DateTime<Utc>>,
173    ) -> (Self, String) {
174        let key = generate_api_key();
175        let key_hash = hash_api_key(&key);
176
177        let api_key = Self {
178            id: Uuid::new_v4(),
179            name,
180            tenant_id,
181            role,
182            key_hash,
183            created_at: Utc::now(),
184            expires_at,
185            active: true,
186            last_used: None,
187        };
188
189        (api_key, key)
190    }
191
192    /// Verify API key
193    pub fn verify(&self, key: &str) -> bool {
194        if !self.active {
195            return false;
196        }
197
198        if let Some(expires_at) = self.expires_at
199            && Utc::now() > expires_at
200        {
201            return false;
202        }
203
204        hash_api_key(key) == self.key_hash
205    }
206
207    /// Check if expired
208    pub fn is_expired(&self) -> bool {
209        if let Some(expires_at) = self.expires_at {
210            Utc::now() > expires_at
211        } else {
212            false
213        }
214    }
215}
216
217/// Authentication manager
218pub struct AuthManager {
219    /// JWT encoding key
220    encoding_key: EncodingKey,
221    /// JWT decoding key
222    decoding_key: DecodingKey,
223    /// JWT validation rules
224    validation: Validation,
225    /// Users storage (in production, use database)
226    users: Arc<DashMap<Uuid, User>>,
227    /// API keys storage (in production, use database)
228    api_keys: Arc<DashMap<Uuid, ApiKey>>,
229    /// Username to user ID mapping
230    username_index: Arc<DashMap<String, Uuid>>,
231    /// Optional event-sourced auth repository for durable key storage.
232    /// When set, API key operations are persisted to the system WAL.
233    auth_repo: Option<Arc<crate::infrastructure::repositories::EventSourcedAuthRepository>>,
234}
235
236impl AuthManager {
237    /// Create new authentication manager
238    pub fn new(jwt_secret: &str) -> Self {
239        let encoding_key = EncodingKey::from_secret(jwt_secret.as_bytes());
240        let decoding_key = DecodingKey::from_secret(jwt_secret.as_bytes());
241
242        let mut validation = Validation::new(Algorithm::HS256);
243        validation.set_issuer(&["allsource"]);
244
245        Self {
246            encoding_key,
247            decoding_key,
248            validation,
249            users: Arc::new(DashMap::new()),
250            api_keys: Arc::new(DashMap::new()),
251            username_index: Arc::new(DashMap::new()),
252            auth_repo: None,
253        }
254    }
255
256    /// Attach an event-sourced auth repository for durable API key storage.
257    ///
258    /// Loads all previously persisted keys into the in-memory cache.
259    /// Subsequent `create_api_key` and `revoke_api_key` calls will also
260    /// write events to the system WAL.
261    pub fn with_auth_repository(
262        mut self,
263        repo: Arc<crate::infrastructure::repositories::EventSourcedAuthRepository>,
264    ) -> Self {
265        // Load persisted keys into the in-memory DashMap
266        for key in repo.all_keys() {
267            self.api_keys.insert(key.id, key);
268        }
269        let loaded = self.api_keys.len();
270        if loaded > 0 {
271            tracing::info!("Loaded {} API keys from durable storage", loaded);
272        }
273        self.auth_repo = Some(repo);
274        self
275    }
276
277    /// Register a new user
278    pub fn register_user(
279        &self,
280        username: String,
281        email: String,
282        password: &str,
283        role: Role,
284        tenant_id: String,
285    ) -> Result<User> {
286        // Check if username already exists
287        if self.username_index.contains_key(&username) {
288            return Err(AllSourceError::ValidationError(
289                "Username already exists".to_string(),
290            ));
291        }
292
293        let user = User::new(username.clone(), email, password, role, tenant_id)?;
294
295        self.users.insert(user.id, user.clone());
296        self.username_index.insert(username, user.id);
297
298        Ok(user)
299    }
300
301    /// Authenticate user with username and password
302    pub fn authenticate(&self, username: &str, password: &str) -> Result<String> {
303        let user_id = self
304            .username_index
305            .get(username)
306            .ok_or_else(|| AllSourceError::ValidationError("Invalid credentials".to_string()))?;
307
308        let user = self
309            .users
310            .get(&user_id)
311            .ok_or_else(|| AllSourceError::ValidationError("User not found".to_string()))?;
312
313        if !user.active {
314            return Err(AllSourceError::ValidationError(
315                "User account is inactive".to_string(),
316            ));
317        }
318
319        if !user.verify_password(password)? {
320            return Err(AllSourceError::ValidationError(
321                "Invalid credentials".to_string(),
322            ));
323        }
324
325        // Generate JWT token
326        let claims = Claims::new(
327            user.id.to_string(),
328            user.tenant_id.clone(),
329            user.role.clone(),
330            Duration::hours(24), // Token expires in 24 hours
331        );
332
333        let token = encode(&Header::default(), &claims, &self.encoding_key)
334            .map_err(|e| AllSourceError::ValidationError(format!("Failed to create token: {e}")))?;
335
336        Ok(token)
337    }
338
339    /// Validate JWT token
340    pub fn validate_token(&self, token: &str) -> Result<Claims> {
341        let token_data = decode::<Claims>(token, &self.decoding_key, &self.validation)
342            .map_err(|e| AllSourceError::ValidationError(format!("Invalid token: {e}")))?;
343
344        if token_data.claims.is_expired() {
345            return Err(AllSourceError::ValidationError("Token expired".to_string()));
346        }
347
348        Ok(token_data.claims)
349    }
350
351    /// Create API key
352    ///
353    /// When an auth repository is attached, the key is persisted to the
354    /// system WAL and survives restarts.
355    pub fn create_api_key(
356        &self,
357        name: String,
358        tenant_id: String,
359        role: Role,
360        expires_at: Option<chrono::DateTime<Utc>>,
361    ) -> (ApiKey, String) {
362        let (api_key, key) = ApiKey::new(name, tenant_id, role, expires_at);
363        self.api_keys.insert(api_key.id, api_key.clone());
364
365        // Persist to durable storage if available
366        if let Some(ref repo) = self.auth_repo
367            && let Err(e) = repo.persist_api_key(&api_key)
368        {
369            tracing::error!("Failed to persist API key to system store: {e}");
370        }
371
372        (api_key, key)
373    }
374
375    /// Validate API key
376    pub fn validate_api_key(&self, key: &str) -> Result<Claims> {
377        // First, find the matching key and extract the data we need
378        // We must complete iteration before calling get_mut to avoid deadlock
379        let found_key = self.api_keys.iter().find_map(|entry| {
380            let api_key = entry.value();
381            if api_key.verify(key) {
382                Some((api_key.id, api_key.tenant_id.clone(), api_key.role.clone()))
383            } else {
384                None
385            }
386        });
387
388        if let Some((key_id, tenant_id, role)) = found_key {
389            // Now safe to acquire write lock since iteration is complete
390            if let Some(mut key_mut) = self.api_keys.get_mut(&key_id) {
391                key_mut.last_used = Some(Utc::now());
392            }
393
394            let claims = Claims::new(key_id.to_string(), tenant_id, role, Duration::hours(24));
395
396            return Ok(claims);
397        }
398
399        Err(AllSourceError::ValidationError(
400            "Invalid API key".to_string(),
401        ))
402    }
403
404    /// Get user by ID
405    pub fn get_user(&self, user_id: &Uuid) -> Option<User> {
406        self.users.get(user_id).map(|u| u.clone())
407    }
408
409    /// List all users (admin only)
410    pub fn list_users(&self) -> Vec<User> {
411        self.users
412            .iter()
413            .map(|entry| entry.value().clone())
414            .collect()
415    }
416
417    /// Delete user
418    pub fn delete_user(&self, user_id: &Uuid) -> Result<()> {
419        if let Some((_, user)) = self.users.remove(user_id) {
420            self.username_index.remove(&user.username);
421            Ok(())
422        } else {
423            Err(AllSourceError::ValidationError(
424                "User not found".to_string(),
425            ))
426        }
427    }
428
429    /// Revoke API key
430    ///
431    /// When an auth repository is attached, the revocation is persisted
432    /// to the system WAL and survives restarts.
433    pub fn revoke_api_key(&self, key_id: &Uuid) -> Result<()> {
434        if let Some(mut api_key) = self.api_keys.get_mut(key_id) {
435            api_key.active = false;
436
437            // Persist revocation to durable storage
438            if let Some(ref repo) = self.auth_repo
439                && let Err(e) = repo.persist_key_revocation(key_id)
440            {
441                tracing::error!("Failed to persist key revocation to system store: {e}");
442            }
443
444            Ok(())
445        } else {
446            Err(AllSourceError::ValidationError(
447                "API key not found".to_string(),
448            ))
449        }
450    }
451
452    /// List API keys for a tenant
453    pub fn list_api_keys(&self, tenant_id: &str) -> Vec<ApiKey> {
454        self.api_keys
455            .iter()
456            .filter(|entry| entry.value().tenant_id == tenant_id)
457            .map(|entry| entry.value().clone())
458            .collect()
459    }
460
461    /// Register a bootstrap API key with a specific key value.
462    ///
463    /// This is used on startup to create a pre-configured API key from
464    /// the `ALLSOURCE_BOOTSTRAP_API_KEY` environment variable.
465    ///
466    /// If an auth repository is attached, the key is persisted to the
467    /// system WAL. On subsequent restarts, the key is loaded from storage
468    /// automatically — the bootstrap only creates the key if it doesn't
469    /// already exist (idempotent).
470    pub fn register_bootstrap_api_key(&self, key: &str, tenant_id: &str) -> ApiKey {
471        let key_hash = hash_api_key(key);
472
473        // Check if this exact key already exists (idempotent bootstrap)
474        let existing = self.api_keys.iter().find_map(|entry| {
475            if entry.value().key_hash == key_hash && entry.value().active {
476                Some(entry.value().clone())
477            } else {
478                None
479            }
480        });
481
482        if let Some(existing_key) = existing {
483            tracing::debug!("Bootstrap API key already exists (idempotent)");
484            return existing_key;
485        }
486
487        let api_key = ApiKey {
488            id: Uuid::new_v4(),
489            name: "bootstrap".to_string(),
490            tenant_id: tenant_id.to_string(),
491            role: Role::Admin,
492            key_hash,
493            created_at: Utc::now(),
494            expires_at: None,
495            active: true,
496            last_used: None,
497        };
498
499        self.api_keys.insert(api_key.id, api_key.clone());
500
501        // Persist to durable storage if available
502        if let Some(ref repo) = self.auth_repo
503            && let Err(e) = repo.persist_api_key(&api_key)
504        {
505            tracing::error!("Failed to persist bootstrap API key: {e}");
506        }
507
508        api_key
509    }
510}
511
512impl Default for AuthManager {
513    fn default() -> Self {
514        use base64::{Engine as _, engine::general_purpose};
515        let secret = general_purpose::STANDARD.encode(rand::random::<[u8; 32]>());
516        Self::new(&secret)
517    }
518}
519
520/// Hash password using Argon2
521fn hash_password(password: &str) -> Result<String> {
522    let salt = SaltString::generate(&mut OsRng);
523    let argon2 = Argon2::default();
524
525    let hash = argon2
526        .hash_password(password.as_bytes(), &salt)
527        .map_err(|e| AllSourceError::ValidationError(format!("Password hashing failed: {e}")))?;
528
529    Ok(hash.to_string())
530}
531
532/// Verify password against hash
533fn verify_password(password: &str, hash: &str) -> Result<bool> {
534    let parsed_hash = PasswordHash::new(hash)
535        .map_err(|e| AllSourceError::ValidationError(format!("Invalid password hash: {e}")))?;
536
537    let argon2 = Argon2::default();
538    Ok(argon2
539        .verify_password(password.as_bytes(), &parsed_hash)
540        .is_ok())
541}
542
543/// Generate API key
544fn generate_api_key() -> String {
545    use base64::{Engine as _, engine::general_purpose};
546    let random_bytes: [u8; 32] = rand::random();
547    format!(
548        "ask_{}",
549        general_purpose::URL_SAFE_NO_PAD.encode(random_bytes)
550    )
551}
552
553/// Hash API key for storage
554fn hash_api_key(key: &str) -> String {
555    use std::{
556        collections::hash_map::DefaultHasher,
557        hash::{Hash, Hasher},
558    };
559
560    let mut hasher = DefaultHasher::new();
561    key.hash(&mut hasher);
562    format!("{:x}", hasher.finish())
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568
569    #[test]
570    fn test_user_creation_and_verification() {
571        let user = User::new(
572            "testuser".to_string(),
573            "test@example.com".to_string(),
574            "password123",
575            Role::Developer,
576            "tenant1".to_string(),
577        )
578        .unwrap();
579
580        assert!(user.verify_password("password123").unwrap());
581        assert!(!user.verify_password("wrongpassword").unwrap());
582    }
583
584    #[test]
585    fn test_role_permissions() {
586        assert!(Role::Admin.has_permission(Permission::Admin));
587        assert!(Role::Developer.has_permission(Permission::Write));
588        assert!(!Role::ReadOnly.has_permission(Permission::Write));
589        assert!(Role::ReadOnly.has_permission(Permission::Read));
590    }
591
592    #[test]
593    fn test_auth_manager() {
594        let auth = AuthManager::new("test_secret");
595
596        // Register user
597        let user = auth
598            .register_user(
599                "alice".to_string(),
600                "alice@example.com".to_string(),
601                "password123",
602                Role::Developer,
603                "tenant1".to_string(),
604            )
605            .unwrap();
606
607        // Authenticate
608        let token = auth.authenticate("alice", "password123").unwrap();
609        assert!(!token.is_empty());
610
611        // Validate token
612        let claims = auth.validate_token(&token).unwrap();
613        assert_eq!(claims.tenant_id, "tenant1");
614        assert_eq!(claims.role, Role::Developer);
615    }
616
617    #[test]
618    fn test_api_key() {
619        let auth = AuthManager::new("test_secret");
620
621        let (api_key, key) = auth.create_api_key(
622            "test-key".to_string(),
623            "tenant1".to_string(),
624            Role::ServiceAccount,
625            None,
626        );
627
628        // Validate key
629        let claims = auth.validate_api_key(&key).unwrap();
630        assert_eq!(claims.tenant_id, "tenant1");
631        assert_eq!(claims.role, Role::ServiceAccount);
632
633        // Revoke key
634        auth.revoke_api_key(&api_key.id).unwrap();
635
636        // Should fail after revocation
637        assert!(auth.validate_api_key(&key).is_err());
638    }
639
640    #[test]
641    fn test_claims_expiration() {
642        let claims = Claims::new(
643            "user1".to_string(),
644            "tenant1".to_string(),
645            Role::Developer,
646            Duration::seconds(-1), // Expired 1 second ago
647        );
648
649        assert!(claims.is_expired());
650    }
651}