Skip to main content

rivven_schema/
auth.rs

1//! Authentication & Authorization middleware for Schema Registry
2//!
3//! Provides enterprise-grade authentication for the Confluent-compatible REST API.
4//! Supports multiple authentication methods and authorization modes.
5//!
6//! ## Authentication Methods
7//!
8//! 1. **OAuth2/JWT** (jwt feature) - OIDC tokens with claim-based identity
9//! 2. **API Key** - Header-based authentication (X-API-Key or custom)
10//! 3. **Bearer Token** - Session ID or static token
11//! 4. **Basic Auth** - Username/password (legacy fallback)
12//!
13//! ## Authorization Modes
14//!
15//! 1. **Simple RBAC** (auth feature) - Permission-based using rivven-core AuthManager
16//! 2. **Cedar Policies** (cedar feature) - Fine-grained policy-as-code authorization
17//!
18//! ## Permissions
19//!
20//! Permissions are checked at the subject level:
21//! - `describe` - Get schemas, list subjects, check compatibility
22//! - `create` - Register schemas
23//! - `alter` - Update config
24//! - `delete` - Delete versions, delete subjects
25//!
26//! ## Configuration Examples
27//!
28//! ```rust,ignore
29//! use rivven_schema::auth::{AuthConfig, AuthMethod};
30//!
31//! // OAuth2/OIDC with JWT validation
32//! let config = AuthConfig::required()
33//!     .with_jwt_validation(JwtConfig {
34//!         issuer: "https://auth.example.com".into(),
35//!         audience: Some("rivven-schema".into()),
36//!         jwks_url: Some("https://auth.example.com/.well-known/jwks.json".into()),
37//!         ..Default::default()
38//!     });
39//!
40//! // API Key authentication
41//! let config = AuthConfig::required()
42//!     .with_api_keys(vec![("admin-key", "admin"), ("readonly-key", "viewer")]);
43//!
44//! // Multiple methods (evaluated in order)
45//! let config = AuthConfig::required()
46//!     .with_jwt_validation(jwt_config)
47//!     .with_api_keys(api_keys)
48//!     .with_basic_auth(true); // Fallback
49//! ```
50//!
51//! ## Cedar Policy Example
52//!
53//! ```cedar,ignore
54//! // Allow schema admins full access to schemas
55//! permit(
56//!   principal in Rivven::Group::"schema-admins",
57//!   action,
58//!   resource is Rivven::Schema
59//! );
60//!
61//! // Allow producers to register schemas matching their topic patterns
62//! permit(
63//!   principal,
64//!   action == Rivven::Action::"create",
65//!   resource is Rivven::Schema
66//! ) when {
67//!   resource.name.startsWith(principal.team + "-")
68//! };
69//! ```
70
71use axum::{body::Body, extract::State, http::Request, middleware::Next, response::Response};
72use serde::{Deserialize, Serialize};
73use std::collections::HashMap;
74use std::sync::Arc;
75
76#[cfg(feature = "auth")]
77use axum::{
78    http::{header::AUTHORIZATION, StatusCode},
79    response::IntoResponse,
80    Json,
81};
82#[cfg(feature = "auth")]
83use base64::Engine;
84#[cfg(feature = "auth")]
85use tracing::{debug, warn};
86
87#[cfg(all(feature = "auth", not(feature = "cedar")))]
88use rivven_core::{AuthManager, AuthSession, Permission, ResourceType};
89
90#[cfg(feature = "cedar")]
91use rivven_core::{AuthManager, AuthSession, Permission};
92
93// Cedar authorization support
94#[cfg(feature = "cedar")]
95use rivven_core::{AuthzContext, CedarAuthorizer, RivvenAction, RivvenResource};
96
97// JWT validation support
98#[cfg(feature = "jwt")]
99use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation};
100
101/// JWT/OIDC Configuration
102#[derive(Debug, Clone, Default)]
103pub struct JwtConfig {
104    /// Expected token issuer (iss claim)
105    pub issuer: Option<String>,
106    /// Expected audience (aud claim)
107    pub audience: Option<String>,
108    /// JWKS URL for key discovery (not implemented yet - uses secret)
109    pub jwks_url: Option<String>,
110    /// Secret key for HS256 validation (for testing/simple setups)
111    pub secret: Option<String>,
112    /// RSA public key in PEM format for RS256 validation
113    pub rsa_public_key: Option<String>,
114    /// Claim to use as principal name (default: "sub")
115    pub principal_claim: String,
116    /// Claim to use for roles/groups (default: "groups")
117    pub roles_claim: String,
118}
119
120impl JwtConfig {
121    /// Create config with issuer validation
122    pub fn with_issuer(issuer: impl Into<String>) -> Self {
123        Self {
124            issuer: Some(issuer.into()),
125            principal_claim: "sub".to_string(),
126            roles_claim: "groups".to_string(),
127            ..Default::default()
128        }
129    }
130
131    /// Set expected audience
132    pub fn with_audience(mut self, audience: impl Into<String>) -> Self {
133        self.audience = Some(audience.into());
134        self
135    }
136
137    /// Set HS256 secret for validation
138    pub fn with_secret(mut self, secret: impl Into<String>) -> Self {
139        self.secret = Some(secret.into());
140        self
141    }
142
143    /// Set RSA public key for RS256 validation
144    pub fn with_rsa_public_key(mut self, key: impl Into<String>) -> Self {
145        self.rsa_public_key = Some(key.into());
146        self
147    }
148}
149
150/// API Key entry
151#[derive(Debug, Clone)]
152pub struct ApiKeyEntry {
153    /// The API key value
154    pub key: String,
155    /// Principal/user name this key maps to
156    pub principal: String,
157    /// Roles/groups for this key
158    pub roles: Vec<String>,
159}
160
161impl ApiKeyEntry {
162    pub fn new(key: impl Into<String>, principal: impl Into<String>) -> Self {
163        Self {
164            key: key.into(),
165            principal: principal.into(),
166            roles: Vec::new(),
167        }
168    }
169
170    pub fn with_roles(mut self, roles: Vec<String>) -> Self {
171        self.roles = roles;
172        self
173    }
174}
175
176/// Authentication configuration
177#[derive(Debug, Clone)]
178pub struct AuthConfig {
179    /// Whether authentication is required
180    pub require_auth: bool,
181    /// Realm for Basic Auth challenge
182    pub realm: String,
183    /// Allow anonymous read access
184    pub allow_anonymous_read: bool,
185    /// Enable Basic Auth (username:password)
186    pub enable_basic_auth: bool,
187    /// Enable Bearer Token (session ID lookup)
188    pub enable_bearer_token: bool,
189    /// JWT/OIDC configuration (requires jwt feature)
190    #[cfg(feature = "jwt")]
191    pub jwt_config: Option<JwtConfig>,
192    /// API Keys (key -> ApiKeyEntry mapping)
193    pub api_keys: HashMap<String, ApiKeyEntry>,
194    /// Custom API key header name (default: X-API-Key)
195    pub api_key_header: String,
196    /// Use Cedar for authorization (when cedar feature enabled)
197    #[cfg(feature = "cedar")]
198    pub use_cedar: bool,
199}
200
201impl Default for AuthConfig {
202    fn default() -> Self {
203        Self {
204            require_auth: false,
205            realm: "Rivven Schema Registry".to_string(),
206            allow_anonymous_read: true,
207            enable_basic_auth: true,
208            enable_bearer_token: true,
209            #[cfg(feature = "jwt")]
210            jwt_config: None,
211            api_keys: HashMap::new(),
212            api_key_header: "X-API-Key".to_string(),
213            #[cfg(feature = "cedar")]
214            use_cedar: false,
215        }
216    }
217}
218
219impl AuthConfig {
220    /// Create config requiring authentication
221    pub fn required() -> Self {
222        Self {
223            require_auth: true,
224            realm: "Rivven Schema Registry".to_string(),
225            allow_anonymous_read: false,
226            enable_basic_auth: true,
227            enable_bearer_token: true,
228            #[cfg(feature = "jwt")]
229            jwt_config: None,
230            api_keys: HashMap::new(),
231            api_key_header: "X-API-Key".to_string(),
232            #[cfg(feature = "cedar")]
233            use_cedar: false,
234        }
235    }
236
237    /// Create config with anonymous read access
238    pub fn with_anonymous_read(mut self, allow: bool) -> Self {
239        self.allow_anonymous_read = allow;
240        self
241    }
242
243    /// Enable/disable Basic Auth
244    pub fn with_basic_auth(mut self, enable: bool) -> Self {
245        self.enable_basic_auth = enable;
246        self
247    }
248
249    /// Enable/disable Bearer Token (session lookup)
250    pub fn with_bearer_token(mut self, enable: bool) -> Self {
251        self.enable_bearer_token = enable;
252        self
253    }
254
255    /// Configure JWT/OIDC validation (requires jwt feature)
256    #[cfg(feature = "jwt")]
257    pub fn with_jwt(mut self, config: JwtConfig) -> Self {
258        self.jwt_config = Some(config);
259        self
260    }
261
262    /// Add API keys for authentication
263    pub fn with_api_keys(mut self, keys: Vec<ApiKeyEntry>) -> Self {
264        for entry in keys {
265            self.api_keys.insert(entry.key.clone(), entry);
266        }
267        self
268    }
269
270    /// Add a single API key
271    pub fn add_api_key(mut self, key: impl Into<String>, principal: impl Into<String>) -> Self {
272        let entry = ApiKeyEntry::new(key, principal);
273        self.api_keys.insert(entry.key.clone(), entry);
274        self
275    }
276
277    /// Set custom API key header name
278    pub fn with_api_key_header(mut self, header: impl Into<String>) -> Self {
279        self.api_key_header = header.into();
280        self
281    }
282
283    /// Enable Cedar-based authorization
284    #[cfg(feature = "cedar")]
285    pub fn with_cedar(mut self) -> Self {
286        self.use_cedar = true;
287        self
288    }
289}
290
291/// Authentication state for extracting in handlers
292#[derive(Debug, Clone)]
293pub struct AuthState {
294    #[cfg(feature = "auth")]
295    pub session: Option<AuthSession>,
296    #[cfg(not(feature = "auth"))]
297    pub session: Option<()>,
298    pub authenticated: bool,
299}
300
301impl AuthState {
302    /// Anonymous/unauthenticated state
303    pub fn anonymous() -> Self {
304        Self {
305            session: None,
306            authenticated: false,
307        }
308    }
309
310    #[cfg(feature = "auth")]
311    /// Authenticated state with session
312    pub fn authenticated(session: AuthSession) -> Self {
313        Self {
314            session: Some(session),
315            authenticated: true,
316        }
317    }
318
319    /// Get the principal name if authenticated
320    pub fn principal(&self) -> Option<&str> {
321        #[cfg(feature = "auth")]
322        {
323            self.session.as_ref().map(|s| s.principal_name.as_str())
324        }
325        #[cfg(not(feature = "auth"))]
326        {
327            None
328        }
329    }
330}
331
332/// Shared authentication state for the server
333#[cfg(all(feature = "auth", not(feature = "cedar")))]
334pub struct ServerAuthState {
335    pub auth_manager: Arc<AuthManager>,
336    pub config: AuthConfig,
337}
338
339/// Shared authentication state for the server (with Cedar support)
340#[cfg(feature = "cedar")]
341pub struct ServerAuthState {
342    pub auth_manager: Arc<AuthManager>,
343    pub cedar_authorizer: Option<Arc<CedarAuthorizer>>,
344    pub config: AuthConfig,
345}
346
347#[cfg(not(feature = "auth"))]
348pub struct ServerAuthState {
349    pub config: AuthConfig,
350}
351
352#[cfg(feature = "cedar")]
353impl ServerAuthState {
354    /// Create server auth state with Cedar authorizer
355    pub fn with_cedar(
356        auth_manager: Arc<AuthManager>,
357        cedar_authorizer: Arc<CedarAuthorizer>,
358        config: AuthConfig,
359    ) -> Self {
360        Self {
361            auth_manager,
362            cedar_authorizer: Some(cedar_authorizer),
363            config,
364        }
365    }
366}
367
368/// Error response for auth failures
369#[derive(Debug, Serialize, Deserialize)]
370pub struct AuthErrorResponse {
371    pub error_code: i32,
372    pub message: String,
373}
374
375/// Authentication middleware
376///
377/// Evaluates authentication methods in order:
378/// 1. API Key (X-API-Key header or custom)
379/// 2. JWT/OIDC (Bearer token with JWT validation)
380/// 3. Bearer Token (session ID lookup)
381/// 4. Basic Auth (username:password)
382#[cfg(feature = "auth")]
383pub async fn auth_middleware(
384    State(auth_state): State<Arc<ServerAuthState>>,
385    mut request: Request<Body>,
386    next: Next,
387) -> Response {
388    let config = &auth_state.config;
389
390    // Try API Key first (custom header)
391    if !config.api_keys.is_empty() {
392        if let Some(api_key) = request
393            .headers()
394            .get(&config.api_key_header)
395            .and_then(|v| v.to_str().ok())
396        {
397            match validate_api_key(api_key, config, &auth_state.auth_manager) {
398                Ok(auth) => {
399                    debug!("API Key authentication successful");
400                    request.extensions_mut().insert(auth);
401                    return next.run(request).await;
402                }
403                Err(e) => {
404                    warn!("API Key authentication failed: {}", e.message);
405                    return unauthorized_response(&config.realm, e);
406                }
407            }
408        }
409    }
410
411    // Try Authorization header
412    let auth_header = request
413        .headers()
414        .get(AUTHORIZATION)
415        .and_then(|v| v.to_str().ok());
416
417    let result = match auth_header {
418        Some(header) if header.starts_with("Bearer ") => {
419            // Try JWT validation first (if configured)
420            #[cfg(feature = "jwt")]
421            if let Some(jwt_config) = &config.jwt_config {
422                match validate_jwt_token(header, jwt_config, &auth_state.auth_manager) {
423                    Ok(auth) => {
424                        return {
425                            request.extensions_mut().insert(auth);
426                            next.run(request).await
427                        }
428                    }
429                    Err(e) => {
430                        debug!(
431                            "JWT validation failed, trying session lookup: {}",
432                            e.message
433                        );
434                        // Fall through to session lookup
435                    }
436                }
437            }
438
439            // Try session ID lookup
440            if config.enable_bearer_token {
441                parse_bearer_token(header, &auth_state.auth_manager).await
442            } else {
443                Err(AuthErrorResponse {
444                    error_code: 40101,
445                    message: "Bearer token authentication is disabled".to_string(),
446                })
447            }
448        }
449        Some(header) if header.starts_with("Basic ") => {
450            if config.enable_basic_auth {
451                parse_basic_auth(header, &auth_state.auth_manager).await
452            } else {
453                Err(AuthErrorResponse {
454                    error_code: 40101,
455                    message: "Basic authentication is disabled".to_string(),
456                })
457            }
458        }
459        Some(_) => Err(AuthErrorResponse {
460            error_code: 40101,
461            message: "Invalid Authorization header format. Supported: Basic, Bearer".to_string(),
462        }),
463        None => {
464            if config.require_auth {
465                if config.allow_anonymous_read && is_read_request(&request) {
466                    Ok(AuthState::anonymous())
467                } else {
468                    Err(AuthErrorResponse {
469                        error_code: 40101,
470                        message: "Authentication required".to_string(),
471                    })
472                }
473            } else {
474                Ok(AuthState::anonymous())
475            }
476        }
477    };
478
479    match result {
480        Ok(auth) => {
481            request.extensions_mut().insert(auth);
482            next.run(request).await
483        }
484        Err(e) => unauthorized_response(&config.realm, e),
485    }
486}
487
488/// Build unauthorized response with WWW-Authenticate header
489#[cfg(feature = "auth")]
490fn unauthorized_response(realm: &str, error: AuthErrorResponse) -> Response {
491    (
492        StatusCode::UNAUTHORIZED,
493        [(
494            "WWW-Authenticate",
495            format!("Basic realm=\"{}\", Bearer", realm).as_str(),
496        )],
497        Json(error),
498    )
499        .into_response()
500}
501
502/// Authentication middleware when auth feature is disabled (passthrough)
503#[cfg(not(feature = "auth"))]
504pub async fn auth_middleware(
505    State(_auth_state): State<Arc<ServerAuthState>>,
506    mut request: Request<Body>,
507    next: Next,
508) -> Response {
509    // Without auth feature, always allow anonymous
510    request.extensions_mut().insert(AuthState::anonymous());
511    next.run(request).await
512}
513
514/// Validate API Key
515#[cfg(feature = "auth")]
516fn validate_api_key(
517    api_key: &str,
518    config: &AuthConfig,
519    auth_manager: &AuthManager,
520) -> Result<AuthState, AuthErrorResponse> {
521    if let Some(entry) = config.api_keys.get(api_key) {
522        debug!("API Key validated for principal: {}", entry.principal);
523
524        // Create a session for this API key user
525        // Look up the user in auth_manager or create a synthetic session
526        if let Some(session) = auth_manager.get_session_by_principal(&entry.principal) {
527            Ok(AuthState::authenticated(session))
528        } else {
529            // Create synthetic session with roles from API key entry
530            let session = auth_manager.create_api_key_session(&entry.principal, &entry.roles);
531            Ok(AuthState::authenticated(session))
532        }
533    } else {
534        Err(AuthErrorResponse {
535            error_code: 40101,
536            message: "Invalid API key".to_string(),
537        })
538    }
539}
540
541/// JWT Claims structure
542///
543/// Note: Some fields (iss, aud, exp) are validated internally by jsonwebtoken
544/// during token verification but not accessed directly in our code.
545#[cfg(feature = "jwt")]
546#[derive(Debug, Deserialize)]
547#[allow(dead_code)]
548struct JwtClaims {
549    /// Subject (user identifier)
550    sub: Option<String>,
551    /// Issuer (validated by jsonwebtoken)
552    iss: Option<String>,
553    /// Audience (validated by jsonwebtoken)
554    aud: Option<serde_json::Value>,
555    /// Expiration time (validated by jsonwebtoken)
556    exp: Option<u64>,
557    /// Groups/roles claim
558    groups: Option<Vec<String>>,
559    /// Alternative roles claim
560    roles: Option<Vec<String>>,
561    /// Email (alternative principal)
562    email: Option<String>,
563    /// Preferred username
564    preferred_username: Option<String>,
565}
566
567/// Validate JWT/OIDC token
568#[cfg(feature = "jwt")]
569fn validate_jwt_token(
570    header: &str,
571    jwt_config: &JwtConfig,
572    auth_manager: &AuthManager,
573) -> Result<AuthState, AuthErrorResponse> {
574    let token = header.trim_start_matches("Bearer ");
575
576    // Determine decoding key
577    let decoding_key = if let Some(secret) = &jwt_config.secret {
578        DecodingKey::from_secret(secret.as_bytes())
579    } else if let Some(rsa_key) = &jwt_config.rsa_public_key {
580        DecodingKey::from_rsa_pem(rsa_key.as_bytes()).map_err(|e| AuthErrorResponse {
581            error_code: 40101,
582            message: format!("Invalid RSA public key: {}", e),
583        })?
584    } else {
585        return Err(AuthErrorResponse {
586            error_code: 50001,
587            message: "JWT validation not configured (no secret or public key)".to_string(),
588        });
589    };
590
591    // Configure validation
592    let mut validation = Validation::default();
593
594    // Set algorithm based on key type
595    if jwt_config.rsa_public_key.is_some() {
596        validation.algorithms = vec![Algorithm::RS256, Algorithm::RS384, Algorithm::RS512];
597    } else {
598        validation.algorithms = vec![Algorithm::HS256, Algorithm::HS384, Algorithm::HS512];
599    }
600
601    // Set issuer validation
602    if let Some(issuer) = &jwt_config.issuer {
603        validation.set_issuer(&[issuer]);
604    }
605
606    // Set audience validation
607    if let Some(audience) = &jwt_config.audience {
608        validation.set_audience(&[audience]);
609    }
610
611    // Decode and validate token
612    let token_data =
613        decode::<JwtClaims>(token, &decoding_key, &validation).map_err(|e| AuthErrorResponse {
614            error_code: 40101,
615            message: format!("JWT validation failed: {}", e),
616        })?;
617
618    let claims = token_data.claims;
619
620    // Extract principal from configured claim
621    let principal = match jwt_config.principal_claim.as_str() {
622        "sub" => claims.sub.clone(),
623        "email" => claims.email.clone(),
624        "preferred_username" => claims.preferred_username.clone(),
625        _ => claims.sub.clone(),
626    }
627    .ok_or_else(|| AuthErrorResponse {
628        error_code: 40101,
629        message: format!("JWT missing required claim: {}", jwt_config.principal_claim),
630    })?;
631
632    // Extract roles from configured claim
633    let roles = match jwt_config.roles_claim.as_str() {
634        "groups" => claims.groups.unwrap_or_default(),
635        "roles" => claims.roles.unwrap_or_default(),
636        _ => claims.groups.or(claims.roles).unwrap_or_default(),
637    };
638
639    debug!(
640        "JWT validated for principal: {} with roles: {:?}",
641        principal, roles
642    );
643
644    // Create session for this JWT user
645    let session = auth_manager.create_jwt_session(&principal, &roles);
646    Ok(AuthState::authenticated(session))
647}
648
649/// Parse Basic Auth header
650#[cfg(feature = "auth")]
651async fn parse_basic_auth(
652    header: &str,
653    auth_manager: &AuthManager,
654) -> Result<AuthState, AuthErrorResponse> {
655    let encoded = header.trim_start_matches("Basic ");
656    let decoded = base64::engine::general_purpose::STANDARD
657        .decode(encoded)
658        .map_err(|_| AuthErrorResponse {
659            error_code: 40101,
660            message: "Invalid Basic auth encoding".to_string(),
661        })?;
662
663    let credentials = String::from_utf8(decoded).map_err(|_| AuthErrorResponse {
664        error_code: 40101,
665        message: "Invalid credentials encoding".to_string(),
666    })?;
667
668    let (username, password) = credentials.split_once(':').ok_or(AuthErrorResponse {
669        error_code: 40101,
670        message: "Invalid Basic auth format".to_string(),
671    })?;
672
673    debug!("Authenticating user: {}", username);
674
675    // Note: We pass "http" as client_ip since we don't have access to it here
676    // In production, extract from X-Forwarded-For or connection info
677    match auth_manager.authenticate(username, password, "http") {
678        Ok(session) => {
679            debug!("Authentication successful for: {}", username);
680            Ok(AuthState::authenticated(session))
681        }
682        Err(e) => {
683            warn!("Authentication failed for {}: {:?}", username, e);
684            Err(AuthErrorResponse {
685                error_code: 40101,
686                message: "Invalid credentials".to_string(),
687            })
688        }
689    }
690}
691
692/// Parse Bearer token header
693/// Bearer tokens are treated as session IDs
694#[cfg(feature = "auth")]
695async fn parse_bearer_token(
696    header: &str,
697    auth_manager: &AuthManager,
698) -> Result<AuthState, AuthErrorResponse> {
699    let token = header.trim_start_matches("Bearer ");
700
701    // Bearer token is treated as session ID
702    match auth_manager.get_session(token) {
703        Some(session) => {
704            debug!(
705                "Token validation successful for: {}",
706                session.principal_name
707            );
708            Ok(AuthState::authenticated(session))
709        }
710        None => {
711            warn!("Token validation failed: invalid or expired session");
712            Err(AuthErrorResponse {
713                error_code: 40101,
714                message: "Invalid or expired token".to_string(),
715            })
716        }
717    }
718}
719
720/// Check if request is a read-only operation
721#[cfg(feature = "auth")]
722fn is_read_request(request: &Request<Body>) -> bool {
723    matches!(request.method().as_str(), "GET" | "HEAD" | "OPTIONS")
724}
725
726/// Permission required for schema operations
727#[derive(Debug, Clone, Copy, PartialEq, Eq)]
728pub enum SchemaPermission {
729    /// Read schemas, list subjects, check compatibility
730    Describe,
731    /// Register new schemas
732    Create,
733    /// Update configurations
734    Alter,
735    /// Delete schemas or subjects
736    Delete,
737}
738
739impl SchemaPermission {
740    /// Get the corresponding rivven-core Permission
741    #[cfg(feature = "auth")]
742    pub fn to_core_permission(self) -> Permission {
743        match self {
744            SchemaPermission::Describe => Permission::Describe,
745            SchemaPermission::Create => Permission::Create,
746            SchemaPermission::Alter => Permission::Alter,
747            SchemaPermission::Delete => Permission::Delete,
748        }
749    }
750
751    /// Get the corresponding Cedar action
752    #[cfg(feature = "cedar")]
753    pub fn to_cedar_action(self) -> RivvenAction {
754        match self {
755            SchemaPermission::Describe => RivvenAction::Describe,
756            SchemaPermission::Create => RivvenAction::Create,
757            SchemaPermission::Alter => RivvenAction::Alter,
758            SchemaPermission::Delete => RivvenAction::Delete,
759        }
760    }
761}
762
763// Legacy alias for backward compatibility
764pub type SchemaAction = SchemaPermission;
765
766/// Check if the current session has permission on a subject (Simple RBAC)
767#[cfg(all(feature = "auth", not(feature = "cedar")))]
768pub fn check_subject_permission(
769    auth_state: &AuthState,
770    subject: &str,
771    permission: SchemaPermission,
772) -> Result<(), AuthErrorResponse> {
773    // Anonymous access for read-only when configured
774    if !auth_state.authenticated {
775        if matches!(permission, SchemaPermission::Describe) {
776            return Ok(());
777        }
778        return Err(AuthErrorResponse {
779            error_code: 40301,
780            message: "Authentication required for write operations".to_string(),
781        });
782    }
783
784    if let Some(session) = &auth_state.session {
785        let resource = ResourceType::Schema(subject.to_string());
786        let perm = permission.to_core_permission();
787
788        // Use session's has_permission method instead of AuthManager
789        if session.has_permission(&resource, &perm) {
790            Ok(())
791        } else {
792            Err(AuthErrorResponse {
793                error_code: 40301,
794                message: format!(
795                    "Access denied: {} lacks {:?} permission on subject '{}'",
796                    session.principal_name, permission, subject
797                ),
798            })
799        }
800    } else {
801        Err(AuthErrorResponse {
802            error_code: 40101,
803            message: "No valid session".to_string(),
804        })
805    }
806}
807
808/// Check if the current session has permission on a subject (Cedar policy-based)
809#[cfg(feature = "cedar")]
810pub fn check_subject_permission_cedar(
811    auth_state: &AuthState,
812    subject: &str,
813    permission: SchemaPermission,
814    authorizer: &CedarAuthorizer,
815    context: Option<AuthzContext>,
816) -> Result<(), AuthErrorResponse> {
817    // Anonymous access for read-only when configured
818    if !auth_state.authenticated {
819        if matches!(permission, SchemaPermission::Describe) {
820            return Ok(());
821        }
822        return Err(AuthErrorResponse {
823            error_code: 40301,
824            message: "Authentication required for write operations".to_string(),
825        });
826    }
827
828    let principal = auth_state.principal().ok_or_else(|| AuthErrorResponse {
829        error_code: 40101,
830        message: "No valid session".to_string(),
831    })?;
832
833    let action = permission.to_cedar_action();
834    let resource = RivvenResource::Schema(subject.to_string());
835    let ctx = context.unwrap_or_default();
836
837    match authorizer.authorize(principal, action, &resource, &ctx) {
838        Ok(()) => Ok(()),
839        Err(e) => {
840            warn!("Cedar authorization denied: {:?}", e);
841            Err(AuthErrorResponse {
842                error_code: 40301,
843                message: format!(
844                    "Access denied: {} cannot {:?} on subject '{}'",
845                    principal, permission, subject
846                ),
847            })
848        }
849    }
850}
851
852/// Check permission without auth feature (always succeeds)
853#[cfg(not(feature = "auth"))]
854pub fn check_subject_permission(
855    _auth_state: &AuthState,
856    _subject: &str,
857    _permission: SchemaPermission,
858) -> Result<(), AuthErrorResponse> {
859    Ok(())
860}
861
862#[cfg(all(test, feature = "auth"))]
863mod tests {
864    use super::*;
865
866    #[test]
867    fn test_auth_config_default() {
868        let config = AuthConfig::default();
869        assert!(!config.require_auth);
870        assert!(config.allow_anonymous_read);
871    }
872
873    #[test]
874    fn test_auth_config_required() {
875        let config = AuthConfig::required();
876        assert!(config.require_auth);
877        assert!(!config.allow_anonymous_read);
878    }
879
880    #[test]
881    fn test_auth_state_anonymous() {
882        let state = AuthState::anonymous();
883        assert!(!state.authenticated);
884        assert!(state.principal().is_none());
885    }
886
887    #[test]
888    fn test_schema_permission_to_core() {
889        assert!(matches!(
890            SchemaPermission::Describe.to_core_permission(),
891            Permission::Describe
892        ));
893        assert!(matches!(
894            SchemaPermission::Create.to_core_permission(),
895            Permission::Create
896        ));
897        assert!(matches!(
898            SchemaPermission::Alter.to_core_permission(),
899            Permission::Alter
900        ));
901        assert!(matches!(
902            SchemaPermission::Delete.to_core_permission(),
903            Permission::Delete
904        ));
905    }
906}
907
908#[cfg(all(test, feature = "cedar"))]
909mod cedar_tests {
910    use super::*;
911
912    #[test]
913    fn test_auth_config_with_cedar() {
914        let config = AuthConfig::required().with_cedar();
915        assert!(config.require_auth);
916        assert!(config.use_cedar);
917    }
918
919    #[test]
920    fn test_schema_permission_to_cedar_action() {
921        assert_eq!(
922            SchemaPermission::Describe.to_cedar_action(),
923            RivvenAction::Describe
924        );
925        assert_eq!(
926            SchemaPermission::Create.to_cedar_action(),
927            RivvenAction::Create
928        );
929        assert_eq!(
930            SchemaPermission::Alter.to_cedar_action(),
931            RivvenAction::Alter
932        );
933        assert_eq!(
934            SchemaPermission::Delete.to_cedar_action(),
935            RivvenAction::Delete
936        );
937    }
938
939    #[test]
940    fn test_cedar_authorization() {
941        // Create authorizer without schema validation for testing
942        let authorizer = CedarAuthorizer::new_without_schema();
943
944        // Add admin policy for schemas
945        authorizer
946            .add_policy(
947                "schema-admin",
948                r#"
949permit(
950  principal in Rivven::Group::"schema-admins",
951  action,
952  resource is Rivven::Schema
953);
954"#,
955            )
956            .unwrap();
957
958        // Add group and user
959        authorizer.add_group("schema-admins", &[]).unwrap();
960        authorizer
961            .add_user(
962                "alice",
963                Some("alice@example.com"),
964                &["admin"],
965                &["schema-admins"],
966                false,
967            )
968            .unwrap();
969
970        // Add schema entity
971        authorizer.add_schema("user-events-value", 1).unwrap();
972
973        // Create authenticated state
974        let _auth_state = AuthState {
975            session: None, // Cedar doesn't need the session, uses principal directly
976            authenticated: true,
977        };
978
979        // Test authorization with manual principal extraction
980        // In real code, auth_state.principal() would return the authenticated user
981        let ctx = AuthzContext::new().with_ip("127.0.0.1");
982        let result = authorizer.authorize(
983            "alice",
984            RivvenAction::Create,
985            &RivvenResource::Schema("user-events-value".to_string()),
986            &ctx,
987        );
988
989        assert!(result.is_ok());
990    }
991
992    #[test]
993    fn test_cedar_authorization_denied() {
994        let authorizer = CedarAuthorizer::new_without_schema();
995
996        // Only admins can alter schemas
997        authorizer
998            .add_policy(
999                "only-admins-alter",
1000                r#"
1001permit(
1002  principal in Rivven::Group::"admins",
1003  action == Rivven::Action::"alter",
1004  resource is Rivven::Schema
1005);
1006"#,
1007            )
1008            .unwrap();
1009
1010        // Add a non-admin user
1011        authorizer
1012            .add_user("bob", Some("bob@example.com"), &["user"], &[], false)
1013            .unwrap();
1014
1015        authorizer.add_schema("config-value", 1).unwrap();
1016
1017        let ctx = AuthzContext::new();
1018        let result = authorizer.authorize(
1019            "bob",
1020            RivvenAction::Alter,
1021            &RivvenResource::Schema("config-value".to_string()),
1022            &ctx,
1023        );
1024
1025        assert!(result.is_err());
1026    }
1027}