Skip to main content

rivven_schema/
auth.rs

1//! Authentication & Authorization middleware for Schema Registry
2//!
3//! Provides enterprise-grade authentication for the Confluent-compatible REST API.
4//! Supports multiple authentication methods and authorization modes.
5//!
6//! ## Authentication Methods
7//!
8//! 1. **OAuth2/JWT** (jwt feature) - OIDC tokens with claim-based identity
9//! 2. **API Key** - Header-based authentication (X-API-Key or custom)
10//! 3. **Bearer Token** - Session ID or static token
11//! 4. **Basic Auth** - Username/password (legacy fallback)
12//!
13//! ## Authorization Modes
14//!
15//! 1. **Simple RBAC** (auth feature) - Permission-based using rivven-core AuthManager
16//! 2. **Cedar Policies** (cedar feature) - Fine-grained policy-as-code authorization
17//!
18//! ## Permissions
19//!
20//! Permissions are checked at the subject level:
21//! - `describe` - Get schemas, list subjects, check compatibility
22//! - `create` - Register schemas
23//! - `alter` - Update config
24//! - `delete` - Delete versions, delete subjects
25//!
26//! ## Configuration Examples
27//!
28//! ```rust,ignore
29//! use rivven_schema::auth::{AuthConfig, AuthMethod};
30//!
31//! // OAuth2/OIDC with JWT validation
32//! let config = AuthConfig::required()
33//!     .with_jwt_validation(JwtConfig {
34//!         issuer: "https://auth.example.com".into(),
35//!         audience: Some("rivven-schema".into()),
36//!         jwks_url: Some("https://auth.example.com/.well-known/jwks.json".into()),
37//!         ..Default::default()
38//!     });
39//!
40//! // API Key authentication
41//! let config = AuthConfig::required()
42//!     .with_api_keys(vec![("admin-key", "admin"), ("readonly-key", "viewer")]);
43//!
44//! // Multiple methods (evaluated in order)
45//! let config = AuthConfig::required()
46//!     .with_jwt_validation(jwt_config)
47//!     .with_api_keys(api_keys)
48//!     .with_basic_auth(true); // Fallback
49//! ```
50//!
51//! ## Cedar Policy Example
52//!
53//! ```cedar,ignore
54//! // Allow schema admins full access to schemas
55//! permit(
56//!   principal in Rivven::Group::"schema-admins",
57//!   action,
58//!   resource is Rivven::Schema
59//! );
60//!
61//! // Allow producers to register schemas matching their topic patterns
62//! permit(
63//!   principal,
64//!   action == Rivven::Action::"create",
65//!   resource is Rivven::Schema
66//! ) when {
67//!   resource.name.startsWith(principal.team + "-")
68//! };
69//! ```
70
71use axum::{body::Body, extract::State, http::Request, middleware::Next, response::Response};
72use serde::{Deserialize, Serialize};
73use std::collections::HashMap;
74use std::sync::Arc;
75use subtle::ConstantTimeEq;
76
77use axum::{
78    http::{header::AUTHORIZATION, StatusCode},
79    response::IntoResponse,
80    Json,
81};
82use base64::Engine;
83use tracing::{debug, warn};
84
85#[cfg(not(feature = "cedar"))]
86use rivven_core::{AuthManager, AuthSession, Permission, ResourceType};
87
88#[cfg(feature = "cedar")]
89use rivven_core::{AuthManager, AuthSession, Permission, ResourceType};
90
91// Cedar authorization support
92#[cfg(feature = "cedar")]
93use rivven_core::{AuthzContext, CedarAuthorizer, RivvenAction, RivvenResource};
94
95// JWT validation support
96#[cfg(feature = "jwt")]
97use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation};
98
99/// JWT/OIDC Configuration
100#[derive(Clone, Default)]
101pub struct JwtConfig {
102    /// Expected token issuer (iss claim)
103    pub issuer: Option<String>,
104    /// Expected audience (aud claim)
105    pub audience: Option<String>,
106    /// JWKS URL for key discovery (not implemented yet - uses secret)
107    pub jwks_url: Option<String>,
108    /// Secret key for HS256 validation (for testing/simple setups)
109    pub secret: Option<String>,
110    /// RSA public key in PEM format for RS256 validation
111    pub rsa_public_key: Option<String>,
112    /// Claim to use as principal name (default: "sub")
113    pub principal_claim: String,
114    /// Claim to use for roles/groups (default: "groups")
115    pub roles_claim: String,
116}
117
118impl std::fmt::Debug for JwtConfig {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        f.debug_struct("JwtConfig")
121            .field("issuer", &self.issuer)
122            .field("audience", &self.audience)
123            .field("jwks_url", &self.jwks_url)
124            .field("secret", &self.secret.as_ref().map(|_| "[REDACTED]"))
125            .field(
126                "rsa_public_key",
127                &self.rsa_public_key.as_ref().map(|_| "[REDACTED]"),
128            )
129            .field("principal_claim", &self.principal_claim)
130            .field("roles_claim", &self.roles_claim)
131            .finish()
132    }
133}
134
135impl JwtConfig {
136    /// Create config with issuer validation
137    pub fn with_issuer(issuer: impl Into<String>) -> Self {
138        Self {
139            issuer: Some(issuer.into()),
140            principal_claim: "sub".to_string(),
141            roles_claim: "groups".to_string(),
142            ..Default::default()
143        }
144    }
145
146    /// Set expected audience
147    pub fn with_audience(mut self, audience: impl Into<String>) -> Self {
148        self.audience = Some(audience.into());
149        self
150    }
151
152    /// Set HS256 secret for validation
153    pub fn with_secret(mut self, secret: impl Into<String>) -> Self {
154        self.secret = Some(secret.into());
155        self
156    }
157
158    /// Set RSA public key for RS256 validation
159    pub fn with_rsa_public_key(mut self, key: impl Into<String>) -> Self {
160        self.rsa_public_key = Some(key.into());
161        self
162    }
163}
164
165/// API Key entry
166#[derive(Debug, Clone)]
167pub struct ApiKeyEntry {
168    /// The API key value
169    pub key: String,
170    /// Principal/user name this key maps to
171    pub principal: String,
172    /// Roles/groups for this key
173    pub roles: Vec<String>,
174}
175
176impl ApiKeyEntry {
177    pub fn new(key: impl Into<String>, principal: impl Into<String>) -> Self {
178        Self {
179            key: key.into(),
180            principal: principal.into(),
181            roles: Vec::new(),
182        }
183    }
184
185    pub fn with_roles(mut self, roles: Vec<String>) -> Self {
186        self.roles = roles;
187        self
188    }
189}
190
191/// Authentication configuration
192#[derive(Debug, Clone)]
193pub struct AuthConfig {
194    /// Whether authentication is required
195    pub require_auth: bool,
196    /// Realm for Basic Auth challenge
197    pub realm: String,
198    /// Allow anonymous read access
199    pub allow_anonymous_read: bool,
200    /// Enable Basic Auth (username:password)
201    pub enable_basic_auth: bool,
202    /// Enable Bearer Token (session ID lookup)
203    pub enable_bearer_token: bool,
204    /// JWT/OIDC configuration (requires jwt feature)
205    #[cfg(feature = "jwt")]
206    pub jwt_config: Option<JwtConfig>,
207    /// API Keys (key -> ApiKeyEntry mapping)
208    pub api_keys: HashMap<String, ApiKeyEntry>,
209    /// Custom API key header name (default: X-API-Key)
210    pub api_key_header: String,
211    /// Use Cedar for authorization (when cedar feature enabled)
212    #[cfg(feature = "cedar")]
213    pub use_cedar: bool,
214}
215
216impl Default for AuthConfig {
217    fn default() -> Self {
218        Self {
219            require_auth: false,
220            realm: "Rivven Schema Registry".to_string(),
221            allow_anonymous_read: true,
222            enable_basic_auth: true,
223            enable_bearer_token: true,
224            #[cfg(feature = "jwt")]
225            jwt_config: None,
226            api_keys: HashMap::new(),
227            api_key_header: "X-API-Key".to_string(),
228            #[cfg(feature = "cedar")]
229            use_cedar: false,
230        }
231    }
232}
233
234impl AuthConfig {
235    /// Create config requiring authentication
236    pub fn required() -> Self {
237        Self {
238            require_auth: true,
239            realm: "Rivven Schema Registry".to_string(),
240            allow_anonymous_read: false,
241            enable_basic_auth: true,
242            enable_bearer_token: true,
243            #[cfg(feature = "jwt")]
244            jwt_config: None,
245            api_keys: HashMap::new(),
246            api_key_header: "X-API-Key".to_string(),
247            #[cfg(feature = "cedar")]
248            use_cedar: false,
249        }
250    }
251
252    /// Create config with anonymous read access
253    pub fn with_anonymous_read(mut self, allow: bool) -> Self {
254        self.allow_anonymous_read = allow;
255        self
256    }
257
258    /// Enable/disable Basic Auth
259    pub fn with_basic_auth(mut self, enable: bool) -> Self {
260        self.enable_basic_auth = enable;
261        self
262    }
263
264    /// Enable/disable Bearer Token (session lookup)
265    pub fn with_bearer_token(mut self, enable: bool) -> Self {
266        self.enable_bearer_token = enable;
267        self
268    }
269
270    /// Configure JWT/OIDC validation (requires jwt feature)
271    #[cfg(feature = "jwt")]
272    pub fn with_jwt(mut self, config: JwtConfig) -> Self {
273        self.jwt_config = Some(config);
274        self
275    }
276
277    /// Add API keys for authentication
278    pub fn with_api_keys(mut self, keys: Vec<ApiKeyEntry>) -> Self {
279        for entry in keys {
280            self.api_keys.insert(entry.key.clone(), entry);
281        }
282        self
283    }
284
285    /// Add a single API key
286    pub fn add_api_key(mut self, key: impl Into<String>, principal: impl Into<String>) -> Self {
287        let entry = ApiKeyEntry::new(key, principal);
288        self.api_keys.insert(entry.key.clone(), entry);
289        self
290    }
291
292    /// Set custom API key header name
293    pub fn with_api_key_header(mut self, header: impl Into<String>) -> Self {
294        self.api_key_header = header.into();
295        self
296    }
297
298    /// Enable Cedar-based authorization
299    #[cfg(feature = "cedar")]
300    pub fn with_cedar(mut self) -> Self {
301        self.use_cedar = true;
302        self
303    }
304}
305
306/// Authentication state for extracting in handlers
307#[derive(Debug, Clone)]
308pub struct AuthState {
309    pub session: Option<AuthSession>,
310    pub authenticated: bool,
311}
312
313impl AuthState {
314    /// Anonymous/unauthenticated state
315    pub fn anonymous() -> Self {
316        Self {
317            session: None,
318            authenticated: false,
319        }
320    }
321
322    /// Authenticated state with session
323    pub fn authenticated(session: AuthSession) -> Self {
324        Self {
325            session: Some(session),
326            authenticated: true,
327        }
328    }
329
330    /// Get the principal name if authenticated
331    pub fn principal(&self) -> Option<&str> {
332        self.session.as_ref().map(|s| s.principal_name.as_str())
333    }
334}
335
336/// Shared authentication state for the server
337#[cfg(not(feature = "cedar"))]
338pub struct ServerAuthState {
339    pub auth_manager: Arc<AuthManager>,
340    pub config: AuthConfig,
341}
342
343/// Shared authentication state for the server (with Cedar support)
344#[cfg(feature = "cedar")]
345pub struct ServerAuthState {
346    pub auth_manager: Arc<AuthManager>,
347    pub cedar_authorizer: Option<Arc<CedarAuthorizer>>,
348    pub config: AuthConfig,
349}
350
351#[cfg(feature = "cedar")]
352impl ServerAuthState {
353    /// Create server auth state with Cedar authorizer
354    pub fn with_cedar(
355        auth_manager: Arc<AuthManager>,
356        cedar_authorizer: Arc<CedarAuthorizer>,
357        config: AuthConfig,
358    ) -> Self {
359        Self {
360            auth_manager,
361            cedar_authorizer: Some(cedar_authorizer),
362            config,
363        }
364    }
365}
366
367/// Error response for auth failures
368#[derive(Debug, Serialize, Deserialize)]
369pub struct AuthErrorResponse {
370    pub error_code: i32,
371    pub message: String,
372}
373
374/// Authentication middleware
375///
376/// Evaluates authentication methods in order:
377/// 1. API Key (X-API-Key header or custom)
378/// 2. JWT/OIDC (Bearer token with JWT validation)
379/// 3. Bearer Token (session ID lookup)
380/// 4. Basic Auth (username:password)
381pub async fn auth_middleware(
382    State(auth_state): State<Arc<ServerAuthState>>,
383    mut request: Request<Body>,
384    next: Next,
385) -> Response {
386    let config = &auth_state.config;
387
388    // Try API Key first (custom header)
389    if !config.api_keys.is_empty() {
390        if let Some(api_key) = request
391            .headers()
392            .get(&config.api_key_header)
393            .and_then(|v| v.to_str().ok())
394        {
395            match validate_api_key(api_key, config, &auth_state.auth_manager) {
396                Ok(auth) => {
397                    debug!("API Key authentication successful");
398                    request.extensions_mut().insert(auth);
399                    return next.run(request).await;
400                }
401                Err(e) => {
402                    warn!("API Key authentication failed: {}", e.message);
403                    return unauthorized_response(&config.realm, e);
404                }
405            }
406        }
407    }
408
409    // Try Authorization header
410    let auth_header = request
411        .headers()
412        .get(AUTHORIZATION)
413        .and_then(|v| v.to_str().ok());
414
415    let result = match auth_header {
416        Some(header) if header.starts_with("Bearer ") => {
417            // Try JWT validation first (if configured)
418            #[cfg(feature = "jwt")]
419            if let Some(jwt_config) = &config.jwt_config {
420                match validate_jwt_token(header, jwt_config, &auth_state.auth_manager) {
421                    Ok(auth) => {
422                        return {
423                            request.extensions_mut().insert(auth);
424                            next.run(request).await
425                        }
426                    }
427                    Err(e) => {
428                        debug!(
429                            "JWT validation failed, trying session lookup: {}",
430                            e.message
431                        );
432                        // Fall through to session lookup
433                    }
434                }
435            }
436
437            // Try session ID lookup
438            if config.enable_bearer_token {
439                parse_bearer_token(header, &auth_state.auth_manager).await
440            } else {
441                Err(AuthErrorResponse {
442                    error_code: 40101,
443                    message: "Bearer token authentication is disabled".to_string(),
444                })
445            }
446        }
447        Some(header) if header.starts_with("Basic ") => {
448            if config.enable_basic_auth {
449                parse_basic_auth(header, &auth_state.auth_manager).await
450            } else {
451                Err(AuthErrorResponse {
452                    error_code: 40101,
453                    message: "Basic authentication is disabled".to_string(),
454                })
455            }
456        }
457        Some(_) => Err(AuthErrorResponse {
458            error_code: 40101,
459            message: "Invalid Authorization header format. Supported: Basic, Bearer".to_string(),
460        }),
461        None => {
462            if config.require_auth {
463                if config.allow_anonymous_read && is_read_request(&request) {
464                    Ok(AuthState::anonymous())
465                } else {
466                    Err(AuthErrorResponse {
467                        error_code: 40101,
468                        message: "Authentication required".to_string(),
469                    })
470                }
471            } else {
472                Ok(AuthState::anonymous())
473            }
474        }
475    };
476
477    match result {
478        Ok(auth) => {
479            request.extensions_mut().insert(auth);
480            next.run(request).await
481        }
482        Err(e) => unauthorized_response(&config.realm, e),
483    }
484}
485
486/// Build unauthorized response with WWW-Authenticate header
487fn unauthorized_response(realm: &str, error: AuthErrorResponse) -> Response {
488    (
489        StatusCode::UNAUTHORIZED,
490        [(
491            "WWW-Authenticate",
492            format!("Basic realm=\"{}\", Bearer", realm).as_str(),
493        )],
494        Json(error),
495    )
496        .into_response()
497}
498
499/// Validate API Key (timing-safe).
500///
501/// Standard HashMap::get short-circuits on key mismatch, creating
502/// a timing side-channel that enables brute-force key enumeration. Instead,
503/// we iterate all stored keys and use constant-time byte comparison so that
504/// the response time does not leak which keys exist.
505fn validate_api_key(
506    api_key: &str,
507    config: &AuthConfig,
508    auth_manager: &AuthManager,
509) -> Result<AuthState, AuthErrorResponse> {
510    // Constant-time scan: check every key even after a match is found
511    let api_key_bytes = api_key.as_bytes();
512    let mut matched_entry = None;
513
514    for (stored_key, entry) in &config.api_keys {
515        let stored_bytes = stored_key.as_bytes();
516        // Use subtle::ConstantTimeEq for immune-to-optimization comparison
517        if stored_bytes.len() == api_key_bytes.len()
518            && bool::from(stored_bytes.ct_eq(api_key_bytes))
519        {
520            matched_entry = Some(entry);
521        }
522        // Continue scanning even after match to prevent timing leak
523    }
524
525    if let Some(entry) = matched_entry {
526        debug!("API Key validated for principal: {}", entry.principal);
527
528        if let Some(session) = auth_manager.get_session_by_principal(&entry.principal) {
529            Ok(AuthState::authenticated(session))
530        } else {
531            let session = auth_manager
532                .create_api_key_session(&entry.principal, &entry.roles)
533                .map_err(|e| AuthErrorResponse {
534                    error_code: 50001,
535                    message: format!("Internal error: {e}"),
536                })?;
537            Ok(AuthState::authenticated(session))
538        }
539    } else {
540        Err(AuthErrorResponse {
541            error_code: 40101,
542            message: "Invalid API key".to_string(),
543        })
544    }
545}
546
547/// JWT Claims structure
548///
549/// Note: Some fields (iss, aud, exp) are validated internally by jsonwebtoken
550/// during token verification but not accessed directly in our code.
551#[cfg(feature = "jwt")]
552#[derive(Debug, Deserialize)]
553#[allow(dead_code)]
554struct JwtClaims {
555    /// Subject (user identifier)
556    sub: Option<String>,
557    /// Issuer (validated by jsonwebtoken)
558    iss: Option<String>,
559    /// Audience (validated by jsonwebtoken)
560    aud: Option<serde_json::Value>,
561    /// Expiration time (validated by jsonwebtoken)
562    exp: Option<u64>,
563    /// Groups/roles claim
564    groups: Option<Vec<String>>,
565    /// Alternative roles claim
566    roles: Option<Vec<String>>,
567    /// Email (alternative principal)
568    email: Option<String>,
569    /// Preferred username
570    preferred_username: Option<String>,
571}
572
573/// Validate JWT/OIDC token
574#[cfg(feature = "jwt")]
575fn validate_jwt_token(
576    header: &str,
577    jwt_config: &JwtConfig,
578    auth_manager: &AuthManager,
579) -> Result<AuthState, AuthErrorResponse> {
580    let token = header.trim_start_matches("Bearer ");
581
582    // Determine decoding key
583    let decoding_key = if let Some(secret) = &jwt_config.secret {
584        DecodingKey::from_secret(secret.as_bytes())
585    } else if let Some(rsa_key) = &jwt_config.rsa_public_key {
586        DecodingKey::from_rsa_pem(rsa_key.as_bytes()).map_err(|e| AuthErrorResponse {
587            error_code: 40101,
588            message: format!("Invalid RSA public key: {}", e),
589        })?
590    } else {
591        return Err(AuthErrorResponse {
592            error_code: 50001,
593            message: "JWT validation not configured (no secret or public key)".to_string(),
594        });
595    };
596
597    // Configure validation
598    let mut validation = Validation::default();
599
600    // Set algorithm based on key type
601    if jwt_config.rsa_public_key.is_some() {
602        validation.algorithms = vec![Algorithm::RS256, Algorithm::RS384, Algorithm::RS512];
603    } else {
604        validation.algorithms = vec![Algorithm::HS256, Algorithm::HS384, Algorithm::HS512];
605    }
606
607    // Set issuer validation
608    if let Some(issuer) = &jwt_config.issuer {
609        validation.set_issuer(&[issuer]);
610    }
611
612    // Set audience validation
613    if let Some(audience) = &jwt_config.audience {
614        validation.set_audience(&[audience]);
615    }
616
617    // Decode and validate token
618    let token_data =
619        decode::<JwtClaims>(token, &decoding_key, &validation).map_err(|e| AuthErrorResponse {
620            error_code: 40101,
621            message: format!("JWT validation failed: {}", e),
622        })?;
623
624    let claims = token_data.claims;
625
626    // Extract principal from configured claim
627    let principal = match jwt_config.principal_claim.as_str() {
628        "sub" => claims.sub.clone(),
629        "email" => claims.email.clone(),
630        "preferred_username" => claims.preferred_username.clone(),
631        _ => claims.sub.clone(),
632    }
633    .ok_or_else(|| AuthErrorResponse {
634        error_code: 40101,
635        message: format!("JWT missing required claim: {}", jwt_config.principal_claim),
636    })?;
637
638    // Extract roles from configured claim
639    let roles = match jwt_config.roles_claim.as_str() {
640        "groups" => claims.groups.unwrap_or_default(),
641        "roles" => claims.roles.unwrap_or_default(),
642        _ => claims.groups.or(claims.roles).unwrap_or_default(),
643    };
644
645    debug!(
646        "JWT validated for principal: {} with roles: {:?}",
647        principal, roles
648    );
649
650    // Create session for this JWT user
651    let session = auth_manager
652        .create_jwt_session(&principal, &roles)
653        .map_err(|e| AuthErrorResponse {
654            error_code: 50001,
655            message: format!("Internal error: {e}"),
656        })?;
657    Ok(AuthState::authenticated(session))
658}
659
660/// Parse Basic Auth header
661async fn parse_basic_auth(
662    header: &str,
663    auth_manager: &AuthManager,
664) -> Result<AuthState, AuthErrorResponse> {
665    let encoded = header.trim_start_matches("Basic ");
666    let decoded = base64::engine::general_purpose::STANDARD
667        .decode(encoded)
668        .map_err(|_| AuthErrorResponse {
669            error_code: 40101,
670            message: "Invalid Basic auth encoding".to_string(),
671        })?;
672
673    let credentials = String::from_utf8(decoded).map_err(|_| AuthErrorResponse {
674        error_code: 40101,
675        message: "Invalid credentials encoding".to_string(),
676    })?;
677
678    let (username, password) = credentials.split_once(':').ok_or(AuthErrorResponse {
679        error_code: 40101,
680        message: "Invalid Basic auth format".to_string(),
681    })?;
682
683    debug!("Authenticating user: {}", username);
684
685    // Note: We pass "http" as client_ip since we don't have access to it here
686    // In production, extract from X-Forwarded-For or connection info
687    match auth_manager.authenticate(username, password, "http") {
688        Ok(session) => {
689            debug!("Authentication successful for: {}", username);
690            Ok(AuthState::authenticated(session))
691        }
692        Err(e) => {
693            warn!("Authentication failed for {}: {:?}", username, e);
694            Err(AuthErrorResponse {
695                error_code: 40101,
696                message: "Invalid credentials".to_string(),
697            })
698        }
699    }
700}
701
702/// Parse Bearer token header
703/// Bearer tokens are treated as session IDs
704async fn parse_bearer_token(
705    header: &str,
706    auth_manager: &AuthManager,
707) -> Result<AuthState, AuthErrorResponse> {
708    let token = header.trim_start_matches("Bearer ");
709
710    // Bearer token is treated as session ID
711    match auth_manager.get_session(token) {
712        Some(session) => {
713            debug!(
714                "Token validation successful for: {}",
715                session.principal_name
716            );
717            Ok(AuthState::authenticated(session))
718        }
719        None => {
720            warn!("Token validation failed: invalid or expired session");
721            Err(AuthErrorResponse {
722                error_code: 40101,
723                message: "Invalid or expired token".to_string(),
724            })
725        }
726    }
727}
728
729/// Check if request is a read-only operation
730fn is_read_request(request: &Request<Body>) -> bool {
731    matches!(request.method().as_str(), "GET" | "HEAD" | "OPTIONS")
732}
733
734/// Permission required for schema operations
735#[derive(Debug, Clone, Copy, PartialEq, Eq)]
736pub enum SchemaPermission {
737    /// Read schemas, list subjects, check compatibility
738    Describe,
739    /// Register new schemas
740    Create,
741    /// Update configurations
742    Alter,
743    /// Delete schemas or subjects
744    Delete,
745}
746
747impl SchemaPermission {
748    /// Get the corresponding rivven-core Permission
749    pub fn to_core_permission(self) -> Permission {
750        match self {
751            SchemaPermission::Describe => Permission::Describe,
752            SchemaPermission::Create => Permission::Create,
753            SchemaPermission::Alter => Permission::Alter,
754            SchemaPermission::Delete => Permission::Delete,
755        }
756    }
757
758    /// Get the corresponding Cedar action
759    #[cfg(feature = "cedar")]
760    pub fn to_cedar_action(self) -> RivvenAction {
761        match self {
762            SchemaPermission::Describe => RivvenAction::Describe,
763            SchemaPermission::Create => RivvenAction::Create,
764            SchemaPermission::Alter => RivvenAction::Alter,
765            SchemaPermission::Delete => RivvenAction::Delete,
766        }
767    }
768}
769
770/// Check if the current session has permission on a subject (Simple RBAC)
771pub fn check_subject_permission(
772    auth_state: &AuthState,
773    subject: &str,
774    permission: SchemaPermission,
775) -> Result<(), AuthErrorResponse> {
776    // Anonymous access for read-only when configured
777    if !auth_state.authenticated {
778        if matches!(permission, SchemaPermission::Describe) {
779            return Ok(());
780        }
781        return Err(AuthErrorResponse {
782            error_code: 40301,
783            message: "Authentication required for write operations".to_string(),
784        });
785    }
786
787    if let Some(session) = &auth_state.session {
788        let resource = ResourceType::Schema(subject.to_string());
789        let perm = permission.to_core_permission();
790
791        // Use session's has_permission method instead of AuthManager
792        if session.has_permission(&resource, &perm) {
793            Ok(())
794        } else {
795            Err(AuthErrorResponse {
796                error_code: 40301,
797                message: format!(
798                    "Access denied: {} lacks {:?} permission on subject '{}'",
799                    session.principal_name, permission, subject
800                ),
801            })
802        }
803    } else {
804        Err(AuthErrorResponse {
805            error_code: 40101,
806            message: "No valid session".to_string(),
807        })
808    }
809}
810
811/// Check if the current session has permission on a subject (Cedar policy-based)
812#[cfg(feature = "cedar")]
813pub fn check_subject_permission_cedar(
814    auth_state: &AuthState,
815    subject: &str,
816    permission: SchemaPermission,
817    authorizer: &CedarAuthorizer,
818    context: Option<AuthzContext>,
819) -> Result<(), AuthErrorResponse> {
820    // Anonymous access for read-only when configured
821    if !auth_state.authenticated {
822        if matches!(permission, SchemaPermission::Describe) {
823            return Ok(());
824        }
825        return Err(AuthErrorResponse {
826            error_code: 40301,
827            message: "Authentication required for write operations".to_string(),
828        });
829    }
830
831    let principal = auth_state.principal().ok_or_else(|| AuthErrorResponse {
832        error_code: 40101,
833        message: "No valid session".to_string(),
834    })?;
835
836    let action = permission.to_cedar_action();
837    let resource = RivvenResource::Schema(subject.to_string());
838    let ctx = context.unwrap_or_default();
839
840    match authorizer.authorize(principal, action, &resource, &ctx) {
841        Ok(()) => Ok(()),
842        Err(e) => {
843            warn!("Cedar authorization denied: {:?}", e);
844            Err(AuthErrorResponse {
845                error_code: 40301,
846                message: format!(
847                    "Access denied: {} cannot {:?} on subject '{}'",
848                    principal, permission, subject
849                ),
850            })
851        }
852    }
853}
854
855#[cfg(test)]
856mod tests {
857    use super::*;
858
859    #[test]
860    fn test_auth_config_default() {
861        let config = AuthConfig::default();
862        assert!(!config.require_auth);
863        assert!(config.allow_anonymous_read);
864    }
865
866    #[test]
867    fn test_auth_config_required() {
868        let config = AuthConfig::required();
869        assert!(config.require_auth);
870        assert!(!config.allow_anonymous_read);
871    }
872
873    #[test]
874    fn test_auth_state_anonymous() {
875        let state = AuthState::anonymous();
876        assert!(!state.authenticated);
877        assert!(state.principal().is_none());
878    }
879
880    #[test]
881    fn test_schema_permission_to_core() {
882        assert!(matches!(
883            SchemaPermission::Describe.to_core_permission(),
884            Permission::Describe
885        ));
886        assert!(matches!(
887            SchemaPermission::Create.to_core_permission(),
888            Permission::Create
889        ));
890        assert!(matches!(
891            SchemaPermission::Alter.to_core_permission(),
892            Permission::Alter
893        ));
894        assert!(matches!(
895            SchemaPermission::Delete.to_core_permission(),
896            Permission::Delete
897        ));
898    }
899}
900
901#[cfg(all(test, feature = "cedar"))]
902mod cedar_tests {
903    use super::*;
904
905    #[test]
906    fn test_auth_config_with_cedar() {
907        let config = AuthConfig::required().with_cedar();
908        assert!(config.require_auth);
909        assert!(config.use_cedar);
910    }
911
912    #[test]
913    fn test_schema_permission_to_cedar_action() {
914        assert_eq!(
915            SchemaPermission::Describe.to_cedar_action(),
916            RivvenAction::Describe
917        );
918        assert_eq!(
919            SchemaPermission::Create.to_cedar_action(),
920            RivvenAction::Create
921        );
922        assert_eq!(
923            SchemaPermission::Alter.to_cedar_action(),
924            RivvenAction::Alter
925        );
926        assert_eq!(
927            SchemaPermission::Delete.to_cedar_action(),
928            RivvenAction::Delete
929        );
930    }
931
932    #[test]
933    fn test_cedar_authorization() {
934        // Create authorizer without schema validation for testing
935        let authorizer = CedarAuthorizer::new_without_schema();
936
937        // Add admin policy for schemas
938        authorizer
939            .add_policy(
940                "schema-admin",
941                r#"
942permit(
943  principal in Rivven::Group::"schema-admins",
944  action,
945  resource is Rivven::Schema
946);
947"#,
948            )
949            .unwrap();
950
951        // Add group and user
952        authorizer.add_group("schema-admins", &[]).unwrap();
953        authorizer
954            .add_user(
955                "alice",
956                Some("alice@example.com"),
957                &["admin"],
958                &["schema-admins"],
959                false,
960            )
961            .unwrap();
962
963        // Add schema entity
964        authorizer.add_schema("user-events-value", 1).unwrap();
965
966        // Create authenticated state
967        let _auth_state = AuthState {
968            session: None, // Cedar doesn't need the session, uses principal directly
969            authenticated: true,
970        };
971
972        // Test authorization with manual principal extraction
973        // In real code, auth_state.principal() would return the authenticated user
974        let ctx = AuthzContext::new().with_ip("127.0.0.1");
975        let result = authorizer.authorize(
976            "alice",
977            RivvenAction::Create,
978            &RivvenResource::Schema("user-events-value".to_string()),
979            &ctx,
980        );
981
982        assert!(result.is_ok());
983    }
984
985    #[test]
986    fn test_cedar_authorization_denied() {
987        let authorizer = CedarAuthorizer::new_without_schema();
988
989        // Only admins can alter schemas
990        authorizer
991            .add_policy(
992                "only-admins-alter",
993                r#"
994permit(
995  principal in Rivven::Group::"admins",
996  action == Rivven::Action::"alter",
997  resource is Rivven::Schema
998);
999"#,
1000            )
1001            .unwrap();
1002
1003        // Add a non-admin user
1004        authorizer
1005            .add_user("bob", Some("bob@example.com"), &["user"], &[], false)
1006            .unwrap();
1007
1008        authorizer.add_schema("config-value", 1).unwrap();
1009
1010        let ctx = AuthzContext::new();
1011        let result = authorizer.authorize(
1012            "bob",
1013            RivvenAction::Alter,
1014            &RivvenResource::Schema("config-value".to_string()),
1015            &ctx,
1016        );
1017
1018        assert!(result.is_err());
1019    }
1020}