Skip to main content

rivven_schema/
server.rs

1//! HTTP Server for Schema Registry
2//!
3//! Provides a Confluent-compatible REST API with optional authentication.
4//!
5//! ## Authentication
6//!
7//! When the `auth` feature is enabled, the server supports:
8//! - HTTP Basic Authentication
9//! - Bearer Token authentication (OIDC/service accounts)
10//!
11//! See [`crate::auth`] for configuration options.
12
13use crate::compatibility::CompatibilityLevel;
14use crate::error::SchemaError;
15use crate::registry::SchemaRegistry;
16use crate::types::{
17    SchemaContext, SchemaId, SchemaReference, SchemaType, SchemaVersion, Subject, ValidationLevel,
18    ValidationRule, ValidationRuleType, VersionState,
19};
20use axum::{
21    extract::{DefaultBodyLimit, Extension, Path, Query, State},
22    http::StatusCode,
23    routing::{delete, get, post, put},
24    Json, Router,
25};
26use parking_lot::RwLock;
27use serde::{Deserialize, Serialize};
28use std::net::SocketAddr;
29use std::sync::Arc;
30use tower_http::cors::CorsLayer;
31use tower_http::trace::TraceLayer;
32use tracing::info;
33
34use crate::auth::AuthState;
35use crate::auth::{auth_middleware, AuthConfig, SchemaPermission, ServerAuthState};
36use axum::middleware;
37use rivven_core::AuthManager;
38
39/// Server configuration
40#[derive(Debug, Clone)]
41pub struct ServerConfig {
42    pub host: String,
43    pub port: u16,
44    /// Authentication configuration
45    pub auth: Option<AuthConfig>,
46}
47
48impl Default for ServerConfig {
49    fn default() -> Self {
50        Self {
51            host: "0.0.0.0".to_string(),
52            port: 8081,
53            auth: None,
54        }
55    }
56}
57
58impl ServerConfig {
59    /// Enable authentication with the given config
60    pub fn with_auth(mut self, auth_config: AuthConfig) -> Self {
61        self.auth = Some(auth_config);
62        self
63    }
64}
65
66/// Registry mode
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
68pub enum RegistryMode {
69    #[default]
70    ReadWrite,
71    ReadOnly,
72    Import,
73}
74
75impl std::fmt::Display for RegistryMode {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        match self {
78            RegistryMode::ReadWrite => write!(f, "READWRITE"),
79            RegistryMode::ReadOnly => write!(f, "READONLY"),
80            RegistryMode::Import => write!(f, "IMPORT"),
81        }
82    }
83}
84
85/// Shared server state
86pub struct ServerState {
87    pub registry: SchemaRegistry,
88    pub mode: RwLock<RegistryMode>,
89    #[cfg(feature = "cedar")]
90    pub cedar_authorizer: Option<Arc<rivven_core::CedarAuthorizer>>,
91}
92
93/// Schema Registry HTTP Server
94pub struct SchemaServer {
95    state: Arc<ServerState>,
96    config: ServerConfig,
97    #[cfg(not(feature = "cedar"))]
98    auth_manager: Option<Arc<AuthManager>>,
99    #[cfg(feature = "cedar")]
100    auth_manager: Option<Arc<AuthManager>>,
101    #[cfg(feature = "cedar")]
102    cedar_authorizer: Option<Arc<rivven_core::CedarAuthorizer>>,
103}
104
105impl SchemaServer {
106    /// Create a new schema server
107    pub fn new(registry: SchemaRegistry, config: ServerConfig) -> Self {
108        Self {
109            state: Arc::new(ServerState {
110                registry,
111                mode: RwLock::new(RegistryMode::default()),
112                #[cfg(feature = "cedar")]
113                cedar_authorizer: None,
114            }),
115            config,
116            auth_manager: None,
117            #[cfg(feature = "cedar")]
118            cedar_authorizer: None,
119        }
120    }
121
122    /// Create a new schema server with authentication
123    #[cfg(not(feature = "cedar"))]
124    pub fn with_auth(
125        registry: SchemaRegistry,
126        config: ServerConfig,
127        auth_manager: Arc<AuthManager>,
128    ) -> Self {
129        Self {
130            state: Arc::new(ServerState {
131                registry,
132                mode: RwLock::new(RegistryMode::default()),
133            }),
134            config,
135            auth_manager: Some(auth_manager),
136        }
137    }
138
139    /// Create a new schema server with authentication (simple RBAC)
140    #[cfg(feature = "cedar")]
141    pub fn with_auth(
142        registry: SchemaRegistry,
143        config: ServerConfig,
144        auth_manager: Arc<AuthManager>,
145    ) -> Self {
146        Self {
147            state: Arc::new(ServerState {
148                registry,
149                mode: RwLock::new(RegistryMode::default()),
150                cedar_authorizer: None,
151            }),
152            config,
153            auth_manager: Some(auth_manager),
154            cedar_authorizer: None,
155        }
156    }
157
158    /// Create a new schema server with Cedar policy-based authorization
159    #[cfg(feature = "cedar")]
160    pub fn with_cedar(
161        registry: SchemaRegistry,
162        config: ServerConfig,
163        auth_manager: Arc<AuthManager>,
164        cedar_authorizer: Arc<rivven_core::CedarAuthorizer>,
165    ) -> Self {
166        Self {
167            state: Arc::new(ServerState {
168                registry,
169                mode: RwLock::new(RegistryMode::default()),
170                cedar_authorizer: Some(cedar_authorizer.clone()),
171            }),
172            config,
173            auth_manager: Some(auth_manager),
174            cedar_authorizer: Some(cedar_authorizer),
175        }
176    }
177
178    /// Build the Axum router
179    pub fn router(&self) -> Router {
180        // Restrict CORS to disallow arbitrary cross-origin requests.
181        // `allow_origin(Any)` + `allow_methods(Any)` + `allow_headers(Any)` allows
182        // any website to make authenticated requests to the schema registry.
183        // Instead, allow only safe methods and standard headers. Operators can
184        // configure specific allowed origins via reverse proxy if needed.
185        let cors = CorsLayer::new()
186            .allow_methods([
187                axum::http::Method::GET,
188                axum::http::Method::POST,
189                axum::http::Method::PUT,
190                axum::http::Method::DELETE,
191            ])
192            .allow_headers([
193                axum::http::header::CONTENT_TYPE,
194                axum::http::header::AUTHORIZATION,
195                axum::http::header::ACCEPT,
196            ]);
197
198        let base_router = Router::new()
199            // Root / health
200            .route("/", get(root_handler))
201            .route("/health", get(health_handler))
202            .route("/health/live", get(liveness_handler))
203            .route("/health/ready", get(readiness_handler))
204            // Schemas
205            .route("/schemas/ids/:id", get(get_schema_by_id))
206            .route("/schemas/ids/:id/versions", get(get_schema_versions))
207            // Subjects
208            .route("/subjects", get(list_subjects))
209            .route("/subjects/:subject", delete(delete_subject))
210            .route("/subjects/:subject/versions", get(list_subject_versions))
211            .route("/subjects/:subject/versions", post(register_schema))
212            .route(
213                "/subjects/:subject/versions/:version",
214                get(get_subject_version),
215            )
216            .route(
217                "/subjects/:subject/versions/:version",
218                delete(delete_version),
219            )
220            // Subject recovery (undelete)
221            .route("/subjects/deleted", get(list_deleted_subjects))
222            .route("/subjects/:subject/undelete", post(undelete_subject))
223            // Version state management
224            .route(
225                "/subjects/:subject/versions/:version/state",
226                get(get_version_state),
227            )
228            .route(
229                "/subjects/:subject/versions/:version/state",
230                put(set_version_state),
231            )
232            .route(
233                "/subjects/:subject/versions/:version/deprecate",
234                post(deprecate_version),
235            )
236            .route(
237                "/subjects/:subject/versions/:version/disable",
238                post(disable_version),
239            )
240            .route(
241                "/subjects/:subject/versions/:version/enable",
242                post(enable_version),
243            )
244            // Schema references
245            .route(
246                "/subjects/:subject/versions/:version/referencedby",
247                get(get_referenced_by),
248            )
249            // Schema validation (without registering)
250            .route("/subjects/:subject/validate", post(validate_schema))
251            // Compatibility
252            .route(
253                "/compatibility/subjects/:subject/versions/:version",
254                post(check_compatibility),
255            )
256            // Config
257            .route("/config", get(get_global_config))
258            .route("/config", put(update_global_config))
259            .route("/config/:subject", get(get_subject_config))
260            .route("/config/:subject", put(update_subject_config))
261            // Validation rules
262            .route("/config/validation/rules", get(list_validation_rules))
263            .route("/config/validation/rules", post(add_validation_rule))
264            .route(
265                "/config/validation/rules/:name",
266                delete(delete_validation_rule),
267            )
268            // Mode
269            .route("/mode", get(get_mode))
270            .route("/mode", put(update_mode))
271            // Contexts (multi-tenant)
272            .route("/contexts", get(list_contexts))
273            .route("/contexts", post(create_context))
274            .route("/contexts/:context", get(get_context))
275            .route("/contexts/:context", delete(delete_context))
276            .route("/contexts/:context/subjects", get(list_context_subjects))
277            // Statistics
278            .route("/stats", get(get_stats))
279            .with_state(self.state.clone())
280            .layer(DefaultBodyLimit::max(10 * 1024 * 1024)) // 10 MiB
281            .layer(cors)
282            .layer(TraceLayer::new_for_http());
283
284        // Add authentication middleware if configured (simple RBAC)
285        #[cfg(not(feature = "cedar"))]
286        let base_router = if let Some(auth_manager) = &self.auth_manager {
287            let auth_config = self.config.auth.clone().unwrap_or_default();
288            let auth_state = Arc::new(ServerAuthState {
289                auth_manager: auth_manager.clone(),
290                config: auth_config,
291            });
292            base_router.layer(middleware::from_fn_with_state(auth_state, auth_middleware))
293        } else {
294            base_router
295        };
296
297        // Add authentication middleware if configured (with Cedar support)
298        #[cfg(feature = "cedar")]
299        let base_router = if let Some(auth_manager) = &self.auth_manager {
300            let auth_config = self.config.auth.clone().unwrap_or_default();
301            let auth_state = Arc::new(ServerAuthState {
302                auth_manager: auth_manager.clone(),
303                cedar_authorizer: self.cedar_authorizer.clone(),
304                config: auth_config,
305            });
306            base_router.layer(middleware::from_fn_with_state(auth_state, auth_middleware))
307        } else {
308            base_router
309        };
310
311        base_router
312    }
313
314    /// Run the server
315    pub async fn run(self, addr: SocketAddr) -> anyhow::Result<()> {
316        let router = self.router();
317        let listener = tokio::net::TcpListener::bind(addr).await?;
318        info!("Schema Registry server listening on {}", addr);
319        axum::serve(listener, router).await?;
320        Ok(())
321    }
322}
323
324// ============================================================================
325// Helper for converting SchemaError to HTTP response
326// ============================================================================
327
328fn schema_error_response(e: SchemaError) -> (StatusCode, Json<ErrorResponse>) {
329    let status = match e.http_status() {
330        404 => StatusCode::NOT_FOUND,
331        409 => StatusCode::CONFLICT,
332        422 => StatusCode::UNPROCESSABLE_ENTITY,
333        _ => StatusCode::INTERNAL_SERVER_ERROR,
334    };
335    (
336        status,
337        Json(ErrorResponse {
338            error_code: e.error_code(),
339            message: e.to_string(),
340        }),
341    )
342}
343
344/// Enforce subject-level permission.
345///
346/// When Cedar is enabled and a `CedarAuthorizer` is configured in `ServerState`,
347/// policy evaluation is delegated to Cedar. Otherwise, simple session-based RBAC
348/// is used. If no auth middleware is active (auth extension absent), the request
349/// is allowed — the server was started without authentication.
350fn enforce_permission(
351    auth: &Option<Extension<AuthState>>,
352    subject: &str,
353    permission: SchemaPermission,
354    server_state: &ServerState,
355) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
356    let _ = server_state; // used only when cedar feature is enabled
357
358    if let Some(Extension(ref state)) = auth {
359        let map_err = |e: crate::auth::AuthErrorResponse| {
360            (
361                StatusCode::FORBIDDEN,
362                Json(ErrorResponse {
363                    error_code: e.error_code as u32,
364                    message: e.message,
365                }),
366            )
367        };
368
369        #[cfg(not(feature = "cedar"))]
370        crate::auth::check_subject_permission(state, subject, permission).map_err(map_err)?;
371
372        #[cfg(feature = "cedar")]
373        {
374            if let Some(ref authorizer) = server_state.cedar_authorizer {
375                crate::auth::check_subject_permission_cedar(
376                    state, subject, permission, authorizer, None,
377                )
378                .map_err(map_err)?;
379            } else {
380                // Cedar feature enabled but no authorizer configured — fall back to simple RBAC
381                crate::auth::check_subject_permission(state, subject, permission)
382                    .map_err(map_err)?;
383            }
384        }
385    }
386    // No auth middleware configured (auth extension absent) → pass through.
387    // The server was started without authentication.
388    Ok(())
389}
390
391// ============================================================================
392// Request/Response Types
393// ============================================================================
394
395#[derive(Serialize)]
396struct RootResponse {
397    version: &'static str,
398    commit: &'static str,
399}
400
401#[derive(Deserialize)]
402struct RegisterSchemaRequest {
403    schema: String,
404    #[serde(rename = "schemaType", default)]
405    schema_type: Option<String>,
406    /// Schema references for complex types (Avro named types, Protobuf imports, JSON Schema $refs)
407    #[serde(default)]
408    references: Vec<SchemaReference>,
409}
410
411#[derive(Serialize)]
412struct RegisterSchemaResponse {
413    id: u32,
414}
415
416#[derive(Serialize)]
417struct SchemaResponse {
418    schema: String,
419    #[serde(rename = "schemaType")]
420    schema_type: String,
421    #[serde(default, skip_serializing_if = "Vec::is_empty")]
422    references: Vec<SchemaReference>,
423}
424
425#[derive(Serialize)]
426struct SubjectVersionResponse {
427    subject: String,
428    version: u32,
429    id: u32,
430    schema: String,
431    #[serde(rename = "schemaType")]
432    schema_type: String,
433}
434
435#[derive(Deserialize)]
436struct CompatibilityRequest {
437    schema: String,
438    #[serde(rename = "schemaType", default)]
439    schema_type: Option<String>,
440}
441
442#[derive(Serialize)]
443struct CompatibilityResponse {
444    is_compatible: bool,
445    #[serde(skip_serializing_if = "Vec::is_empty")]
446    messages: Vec<String>,
447}
448
449#[derive(Serialize)]
450struct ConfigResponse {
451    #[serde(rename = "compatibilityLevel")]
452    compatibility_level: String,
453}
454
455#[derive(Deserialize)]
456struct ConfigRequest {
457    compatibility: String,
458}
459
460#[derive(Serialize)]
461struct ModeResponse {
462    mode: String,
463}
464
465#[derive(Deserialize)]
466struct ModeRequest {
467    mode: String,
468}
469
470#[derive(Serialize)]
471struct ErrorResponse {
472    error_code: u32,
473    message: String,
474}
475
476#[derive(Deserialize)]
477struct QueryParams {
478    /// When `true`, perform a hard (permanent) delete that cannot be undone.
479    /// When `false` (default), perform a soft delete; the subject/version can
480    /// be recovered via the `/subjects/{subject}/undelete` endpoint.
481    ///
482    /// **Note:** There is currently no CLI sub-command for delete operations;
483    /// deletions are only available through the HTTP API. A future CLI
484    /// `delete --permanent` flag is planned.
485    #[serde(default)]
486    permanent: bool,
487}
488
489// ============================================================================
490// Handlers
491// ============================================================================
492
493async fn root_handler() -> Json<RootResponse> {
494    Json(RootResponse {
495        version: env!("CARGO_PKG_VERSION"),
496        commit: option_env!("GIT_HASH").unwrap_or("dev"),
497    })
498}
499
500/// Health check endpoint - returns basic health status
501#[derive(Serialize)]
502struct HealthResponse {
503    status: &'static str,
504    version: &'static str,
505}
506
507async fn health_handler() -> Json<HealthResponse> {
508    Json(HealthResponse {
509        status: "healthy",
510        version: env!("CARGO_PKG_VERSION"),
511    })
512}
513
514/// Kubernetes liveness probe - is the service alive?
515async fn liveness_handler() -> StatusCode {
516    StatusCode::OK
517}
518
519/// Kubernetes readiness probe - can the service handle requests?
520async fn readiness_handler(
521    State(state): State<Arc<ServerState>>,
522) -> Result<StatusCode, StatusCode> {
523    // Check if we can access the registry (basic health check)
524    match state.registry.list_subjects().await {
525        Ok(_) => Ok(StatusCode::OK),
526        Err(_) => Err(StatusCode::SERVICE_UNAVAILABLE),
527    }
528}
529
530async fn get_schema_by_id(
531    State(state): State<Arc<ServerState>>,
532    Path(id): Path<u32>,
533) -> Result<Json<SchemaResponse>, (StatusCode, Json<ErrorResponse>)> {
534    state
535        .registry
536        .get_by_id(SchemaId::new(id))
537        .await
538        .map(|schema| {
539            Json(SchemaResponse {
540                schema: schema.schema,
541                schema_type: schema.schema_type.to_string(),
542                references: schema.references,
543            })
544        })
545        .map_err(schema_error_response)
546}
547
548async fn get_schema_versions(
549    State(state): State<Arc<ServerState>>,
550    Path(id): Path<u32>,
551) -> Result<Json<Vec<SubjectVersionResponse>>, (StatusCode, Json<ErrorResponse>)> {
552    // Find all subjects that use this schema ID
553    let subjects = state
554        .registry
555        .list_subjects()
556        .await
557        .map_err(schema_error_response)?;
558    let mut results = Vec::new();
559
560    for subject in subjects {
561        let versions = state
562            .registry
563            .list_versions(subject.as_str())
564            .await
565            .map_err(schema_error_response)?;
566        for ver in versions {
567            if let Ok(sv) = state
568                .registry
569                .get_by_version(subject.as_str(), SchemaVersion::new(ver))
570                .await
571            {
572                if sv.id.0 == id {
573                    results.push(SubjectVersionResponse {
574                        subject: sv.subject.0,
575                        version: sv.version.0,
576                        id: sv.id.0,
577                        schema: sv.schema,
578                        schema_type: sv.schema_type.to_string(),
579                    });
580                }
581            }
582        }
583    }
584
585    Ok(Json(results))
586}
587
588async fn list_subjects(
589    State(state): State<Arc<ServerState>>,
590) -> Result<Json<Vec<String>>, (StatusCode, Json<ErrorResponse>)> {
591    state
592        .registry
593        .list_subjects()
594        .await
595        .map(|subjects| Json(subjects.into_iter().map(|s| s.0).collect()))
596        .map_err(schema_error_response)
597}
598
599async fn delete_subject(
600    State(state): State<Arc<ServerState>>,
601    Path(subject): Path<String>,
602    Query(params): Query<QueryParams>,
603    auth: Option<Extension<AuthState>>,
604) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
605    enforce_permission(&auth, &subject, SchemaPermission::Delete, &state)?;
606
607    // Check mode
608    if *state.mode.read() == RegistryMode::ReadOnly {
609        return Err((
610            StatusCode::FORBIDDEN,
611            Json(ErrorResponse {
612                error_code: 40301,
613                message: "Registry is in READONLY mode".to_string(),
614            }),
615        ));
616    }
617
618    state
619        .registry
620        .delete_subject(subject.as_str(), params.permanent)
621        .await
622        .map(Json)
623        .map_err(schema_error_response)
624}
625
626/// List soft-deleted subjects that can be recovered
627async fn list_deleted_subjects(
628    State(state): State<Arc<ServerState>>,
629) -> Result<Json<Vec<String>>, (StatusCode, Json<ErrorResponse>)> {
630    state
631        .registry
632        .list_deleted_subjects()
633        .await
634        .map(|subjects| Json(subjects.into_iter().map(|s| s.0).collect()))
635        .map_err(schema_error_response)
636}
637
638/// Undelete a soft-deleted subject
639async fn undelete_subject(
640    State(state): State<Arc<ServerState>>,
641    Path(subject): Path<String>,
642    auth: Option<Extension<AuthState>>,
643) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
644    enforce_permission(&auth, &subject, SchemaPermission::Create, &state)?;
645
646    // Check mode
647    if *state.mode.read() == RegistryMode::ReadOnly {
648        return Err((
649            StatusCode::FORBIDDEN,
650            Json(ErrorResponse {
651                error_code: 40301,
652                message: "Registry is in READONLY mode".to_string(),
653            }),
654        ));
655    }
656
657    state
658        .registry
659        .undelete_subject(subject.as_str())
660        .await
661        .map(Json)
662        .map_err(schema_error_response)
663}
664
665async fn list_subject_versions(
666    State(state): State<Arc<ServerState>>,
667    Path(subject): Path<String>,
668) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
669    state
670        .registry
671        .list_versions(subject.as_str())
672        .await
673        .map(Json)
674        .map_err(schema_error_response)
675}
676
677async fn register_schema(
678    State(state): State<Arc<ServerState>>,
679    Path(subject): Path<String>,
680    auth: Option<Extension<AuthState>>,
681    Json(req): Json<RegisterSchemaRequest>,
682) -> Result<Json<RegisterSchemaResponse>, (StatusCode, Json<ErrorResponse>)> {
683    enforce_permission(&auth, &subject, SchemaPermission::Create, &state)?;
684
685    // Check mode
686    if *state.mode.read() == RegistryMode::ReadOnly {
687        return Err((
688            StatusCode::FORBIDDEN,
689            Json(ErrorResponse {
690                error_code: 40301,
691                message: "Registry is in READONLY mode".to_string(),
692            }),
693        ));
694    }
695
696    let schema_type = req
697        .schema_type
698        .as_deref()
699        .unwrap_or("AVRO")
700        .parse::<SchemaType>()
701        .map_err(|_| {
702            schema_error_response(crate::SchemaError::InvalidInput(format!(
703                "Invalid schema type: {}",
704                req.schema_type.as_deref().unwrap_or("AVRO")
705            )))
706        })?;
707
708    state
709        .registry
710        .register_with_references(subject.as_str(), schema_type, &req.schema, req.references)
711        .await
712        .map(|id| Json(RegisterSchemaResponse { id: id.0 }))
713        .map_err(schema_error_response)
714}
715
716async fn get_subject_version(
717    State(state): State<Arc<ServerState>>,
718    Path((subject, version)): Path<(String, String)>,
719) -> Result<Json<SubjectVersionResponse>, (StatusCode, Json<ErrorResponse>)> {
720    let version = if version == "latest" {
721        state
722            .registry
723            .get_latest(subject.as_str())
724            .await
725            .map_err(schema_error_response)?
726            .version
727    } else {
728        let v: u32 = version.parse().map_err(|_| {
729            (
730                StatusCode::UNPROCESSABLE_ENTITY,
731                Json(ErrorResponse {
732                    error_code: 42202,
733                    message: format!("Invalid version: '{}'", version),
734                }),
735            )
736        })?;
737        SchemaVersion::new(v)
738    };
739
740    state
741        .registry
742        .get_by_version(subject.as_str(), version)
743        .await
744        .map(|sv| {
745            Json(SubjectVersionResponse {
746                subject: sv.subject.0,
747                version: sv.version.0,
748                id: sv.id.0,
749                schema: sv.schema,
750                schema_type: sv.schema_type.to_string(),
751            })
752        })
753        .map_err(schema_error_response)
754}
755
756async fn delete_version(
757    State(state): State<Arc<ServerState>>,
758    Path((subject, version)): Path<(String, u32)>,
759    Query(params): Query<QueryParams>,
760    auth: Option<Extension<AuthState>>,
761) -> Result<Json<u32>, (StatusCode, Json<ErrorResponse>)> {
762    enforce_permission(&auth, &subject, SchemaPermission::Delete, &state)?;
763
764    // Check mode
765    if *state.mode.read() == RegistryMode::ReadOnly {
766        return Err((
767            StatusCode::FORBIDDEN,
768            Json(ErrorResponse {
769                error_code: 40301,
770                message: "Registry is in READONLY mode".to_string(),
771            }),
772        ));
773    }
774
775    state
776        .registry
777        .delete_version(
778            subject.as_str(),
779            SchemaVersion::new(version),
780            params.permanent,
781        )
782        .await
783        .map(|_| Json(version))
784        .map_err(schema_error_response)
785}
786
787/// Get all schema IDs that reference a given subject/version
788async fn get_referenced_by(
789    State(state): State<Arc<ServerState>>,
790    Path((subject, version)): Path<(String, String)>,
791) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
792    let version = if version == "latest" {
793        // Get latest version number
794        let versions = state
795            .registry
796            .list_versions(subject.as_str())
797            .await
798            .map_err(schema_error_response)?;
799        versions.into_iter().max().unwrap_or(1)
800    } else {
801        version.parse().map_err(|_| {
802            schema_error_response(crate::SchemaError::InvalidInput(format!(
803                "Invalid version: {}",
804                version
805            )))
806        })?
807    };
808
809    state
810        .registry
811        .get_schemas_referencing(subject.as_str(), SchemaVersion::new(version))
812        .await
813        .map(|ids| Json(ids.into_iter().map(|id| id.0).collect()))
814        .map_err(schema_error_response)
815}
816
817async fn check_compatibility(
818    State(state): State<Arc<ServerState>>,
819    Path((subject, version)): Path<(String, String)>,
820    Json(req): Json<CompatibilityRequest>,
821) -> Result<Json<CompatibilityResponse>, (StatusCode, Json<ErrorResponse>)> {
822    let schema_type = req
823        .schema_type
824        .as_deref()
825        .unwrap_or("AVRO")
826        .parse::<SchemaType>()
827        .map_err(|_| {
828            schema_error_response(crate::SchemaError::InvalidInput(format!(
829                "Invalid schema type: {}",
830                req.schema_type.as_deref().unwrap_or("AVRO")
831            )))
832        })?;
833
834    let version = if version == "latest" {
835        None
836    } else {
837        Some(SchemaVersion::new(version.parse().map_err(|_| {
838            schema_error_response(crate::SchemaError::InvalidInput(format!(
839                "Invalid version: {}",
840                version
841            )))
842        })?))
843    };
844
845    state
846        .registry
847        .check_compatibility(subject.as_str(), schema_type, &req.schema, version)
848        .await
849        .map(|result| {
850            Json(CompatibilityResponse {
851                is_compatible: result.is_compatible,
852                messages: result.messages,
853            })
854        })
855        .map_err(schema_error_response)
856}
857
858async fn get_global_config(State(state): State<Arc<ServerState>>) -> Json<ConfigResponse> {
859    Json(ConfigResponse {
860        compatibility_level: state.registry.get_default_compatibility().to_string(),
861    })
862}
863
864async fn update_global_config(
865    State(state): State<Arc<ServerState>>,
866    auth: Option<Extension<AuthState>>,
867    Json(req): Json<ConfigRequest>,
868) -> Result<Json<ConfigResponse>, (StatusCode, Json<ErrorResponse>)> {
869    enforce_permission(&auth, "_global", SchemaPermission::Alter, &state)?;
870
871    let level: CompatibilityLevel = req.compatibility.parse().map_err(|_| {
872        (
873            StatusCode::UNPROCESSABLE_ENTITY,
874            Json(ErrorResponse {
875                error_code: 42203,
876                message: format!("Invalid compatibility level: {}", req.compatibility),
877            }),
878        )
879    })?;
880
881    state.registry.set_default_compatibility(level);
882
883    Ok(Json(ConfigResponse {
884        compatibility_level: level.to_string(),
885    }))
886}
887
888async fn get_subject_config(
889    State(state): State<Arc<ServerState>>,
890    Path(subject): Path<String>,
891) -> Json<ConfigResponse> {
892    let level = state
893        .registry
894        .get_subject_compatibility(&Subject::new(subject));
895    Json(ConfigResponse {
896        compatibility_level: level.to_string(),
897    })
898}
899
900async fn update_subject_config(
901    State(state): State<Arc<ServerState>>,
902    Path(subject): Path<String>,
903    auth: Option<Extension<AuthState>>,
904    Json(req): Json<ConfigRequest>,
905) -> Result<Json<ConfigResponse>, (StatusCode, Json<ErrorResponse>)> {
906    enforce_permission(&auth, &subject, SchemaPermission::Alter, &state)?;
907
908    let level: CompatibilityLevel = req.compatibility.parse().map_err(|_| {
909        (
910            StatusCode::UNPROCESSABLE_ENTITY,
911            Json(ErrorResponse {
912                error_code: 42203,
913                message: format!("Invalid compatibility level: {}", req.compatibility),
914            }),
915        )
916    })?;
917
918    state
919        .registry
920        .set_subject_compatibility(subject.as_str(), level);
921
922    Ok(Json(ConfigResponse {
923        compatibility_level: level.to_string(),
924    }))
925}
926
927async fn get_mode(State(state): State<Arc<ServerState>>) -> Json<ModeResponse> {
928    Json(ModeResponse {
929        mode: state.mode.read().to_string(),
930    })
931}
932
933async fn update_mode(
934    State(state): State<Arc<ServerState>>,
935    auth: Option<Extension<AuthState>>,
936    Json(req): Json<ModeRequest>,
937) -> Result<Json<ModeResponse>, (StatusCode, Json<ErrorResponse>)> {
938    enforce_permission(&auth, "_global", SchemaPermission::Alter, &state)?;
939
940    let mode = match req.mode.to_uppercase().as_str() {
941        "READWRITE" => RegistryMode::ReadWrite,
942        "READONLY" => RegistryMode::ReadOnly,
943        "IMPORT" => RegistryMode::Import,
944        _ => {
945            return Err((
946                StatusCode::UNPROCESSABLE_ENTITY,
947                Json(ErrorResponse {
948                    error_code: 42204,
949                    message: format!("Invalid mode: {}", req.mode),
950                }),
951            ))
952        }
953    };
954
955    *state.mode.write() = mode;
956
957    Ok(Json(ModeResponse {
958        mode: mode.to_string(),
959    }))
960}
961
962// ============================================================================
963// Version State Handlers
964// ============================================================================
965
966#[derive(Serialize)]
967struct VersionStateResponse {
968    state: String,
969}
970
971#[derive(Deserialize)]
972struct VersionStateRequest {
973    state: String,
974}
975
976async fn get_version_state(
977    State(state): State<Arc<ServerState>>,
978    Path((subject, version)): Path<(String, u32)>,
979) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
980    state
981        .registry
982        .get_version_state(subject.as_str(), SchemaVersion::new(version))
983        .await
984        .map(|s| {
985            Json(VersionStateResponse {
986                state: s.to_string(),
987            })
988        })
989        .map_err(schema_error_response)
990}
991
992async fn set_version_state(
993    State(state): State<Arc<ServerState>>,
994    Path((subject, version)): Path<(String, u32)>,
995    auth: Option<Extension<AuthState>>,
996    Json(req): Json<VersionStateRequest>,
997) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
998    enforce_permission(&auth, &subject, SchemaPermission::Alter, &state)?;
999
1000    let version_state: VersionState = req.state.parse().map_err(|_| {
1001        (
1002            StatusCode::UNPROCESSABLE_ENTITY,
1003            Json(ErrorResponse {
1004                error_code: 42205,
1005                message: format!("Invalid version state: {}", req.state),
1006            }),
1007        )
1008    })?;
1009
1010    state
1011        .registry
1012        .set_version_state(subject.as_str(), SchemaVersion::new(version), version_state)
1013        .await
1014        .map_err(schema_error_response)?;
1015
1016    Ok(Json(VersionStateResponse {
1017        state: version_state.to_string(),
1018    }))
1019}
1020
1021async fn deprecate_version(
1022    State(state): State<Arc<ServerState>>,
1023    Path((subject, version)): Path<(String, u32)>,
1024    auth: Option<Extension<AuthState>>,
1025) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
1026    enforce_permission(&auth, &subject, SchemaPermission::Alter, &state)?;
1027
1028    state
1029        .registry
1030        .deprecate_version(subject.as_str(), SchemaVersion::new(version))
1031        .await
1032        .map_err(schema_error_response)?;
1033
1034    Ok(Json(VersionStateResponse {
1035        state: "DEPRECATED".to_string(),
1036    }))
1037}
1038
1039async fn disable_version(
1040    State(state): State<Arc<ServerState>>,
1041    Path((subject, version)): Path<(String, u32)>,
1042    auth: Option<Extension<AuthState>>,
1043) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
1044    enforce_permission(&auth, &subject, SchemaPermission::Alter, &state)?;
1045
1046    state
1047        .registry
1048        .disable_version(subject.as_str(), SchemaVersion::new(version))
1049        .await
1050        .map_err(schema_error_response)?;
1051
1052    Ok(Json(VersionStateResponse {
1053        state: "DISABLED".to_string(),
1054    }))
1055}
1056
1057async fn enable_version(
1058    State(state): State<Arc<ServerState>>,
1059    Path((subject, version)): Path<(String, u32)>,
1060    auth: Option<Extension<AuthState>>,
1061) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
1062    enforce_permission(&auth, &subject, SchemaPermission::Alter, &state)?;
1063
1064    state
1065        .registry
1066        .enable_version(subject.as_str(), SchemaVersion::new(version))
1067        .await
1068        .map_err(schema_error_response)?;
1069
1070    Ok(Json(VersionStateResponse {
1071        state: "ENABLED".to_string(),
1072    }))
1073}
1074
1075// ============================================================================
1076// Validation Handlers
1077// ============================================================================
1078
1079#[derive(Deserialize)]
1080struct ValidateSchemaRequest {
1081    schema: String,
1082    #[serde(rename = "schemaType", default)]
1083    schema_type: Option<String>,
1084}
1085
1086#[derive(Serialize)]
1087struct ValidationResponse {
1088    is_valid: bool,
1089    errors: Vec<String>,
1090    warnings: Vec<String>,
1091}
1092
1093async fn validate_schema(
1094    State(state): State<Arc<ServerState>>,
1095    Path(subject): Path<String>,
1096    Json(req): Json<ValidateSchemaRequest>,
1097) -> Result<Json<ValidationResponse>, (StatusCode, Json<ErrorResponse>)> {
1098    let schema_type = req
1099        .schema_type
1100        .as_deref()
1101        .unwrap_or("AVRO")
1102        .parse::<SchemaType>()
1103        .map_err(|_| {
1104            schema_error_response(crate::SchemaError::InvalidInput(format!(
1105                "Invalid schema type: {}",
1106                req.schema_type.as_deref().unwrap_or("AVRO")
1107            )))
1108        })?;
1109
1110    state
1111        .registry
1112        .validate_schema(schema_type, &subject, &req.schema)
1113        .map(|report| {
1114            Json(ValidationResponse {
1115                is_valid: report.is_valid(),
1116                errors: report.error_messages(),
1117                warnings: report.warning_messages(),
1118            })
1119        })
1120        .map_err(schema_error_response)
1121}
1122
1123#[derive(Serialize)]
1124struct ValidationRuleResponse {
1125    name: String,
1126    rule_type: String,
1127    config: String,
1128    level: String,
1129    #[serde(skip_serializing_if = "Option::is_none")]
1130    description: Option<String>,
1131    active: bool,
1132}
1133
1134#[derive(Deserialize)]
1135struct AddValidationRuleRequest {
1136    name: String,
1137    rule_type: String,
1138    config: String,
1139    #[serde(default = "default_level")]
1140    level: String,
1141    description: Option<String>,
1142    #[serde(default)]
1143    subjects: Vec<String>,
1144    #[serde(default)]
1145    schema_types: Vec<String>,
1146}
1147
1148fn default_level() -> String {
1149    "ERROR".to_string()
1150}
1151
1152async fn list_validation_rules(
1153    State(state): State<Arc<ServerState>>,
1154) -> Json<Vec<ValidationRuleResponse>> {
1155    let rules = state.registry.validation_engine().read().list_rules();
1156    Json(
1157        rules
1158            .into_iter()
1159            .map(|r| ValidationRuleResponse {
1160                name: r.name().to_string(),
1161                rule_type: format!("{:?}", r.rule_type()),
1162                config: r.config().to_string(),
1163                level: format!("{:?}", r.level()),
1164                description: r.description().map(|s| s.to_string()),
1165                active: r.is_active(),
1166            })
1167            .collect(),
1168    )
1169}
1170
1171async fn add_validation_rule(
1172    State(state): State<Arc<ServerState>>,
1173    auth: Option<Extension<AuthState>>,
1174    Json(req): Json<AddValidationRuleRequest>,
1175) -> Result<Json<ValidationRuleResponse>, (StatusCode, Json<ErrorResponse>)> {
1176    enforce_permission(&auth, "_global", SchemaPermission::Create, &state)?;
1177
1178    let rule_type: ValidationRuleType = req.rule_type.parse().map_err(|_| {
1179        (
1180            StatusCode::UNPROCESSABLE_ENTITY,
1181            Json(ErrorResponse {
1182                error_code: 42206,
1183                message: format!("Invalid rule type: {}", req.rule_type),
1184            }),
1185        )
1186    })?;
1187
1188    let level: ValidationLevel = req.level.parse().map_err(|_| {
1189        (
1190            StatusCode::UNPROCESSABLE_ENTITY,
1191            Json(ErrorResponse {
1192                error_code: 42207,
1193                message: format!("Invalid validation level: {}", req.level),
1194            }),
1195        )
1196    })?;
1197
1198    let mut rule = ValidationRule::new(&req.name, rule_type, &req.config).with_level(level);
1199
1200    if let Some(desc) = &req.description {
1201        rule = rule.with_description(desc);
1202    }
1203
1204    if !req.subjects.is_empty() {
1205        rule = rule.for_subjects(req.subjects);
1206    }
1207
1208    if !req.schema_types.is_empty() {
1209        let schema_types: Vec<SchemaType> = req
1210            .schema_types
1211            .iter()
1212            .map(|s| {
1213                s.parse::<SchemaType>().map_err(|_| {
1214                    (
1215                        StatusCode::UNPROCESSABLE_ENTITY,
1216                        Json(ErrorResponse {
1217                            error_code: 42202,
1218                            message: format!("Unknown schema type: '{}'", s),
1219                        }),
1220                    )
1221                })
1222            })
1223            .collect::<Result<Vec<_>, _>>()?;
1224        rule = rule.for_schema_types(schema_types);
1225    }
1226
1227    state.registry.add_validation_rule(rule.clone());
1228
1229    Ok(Json(ValidationRuleResponse {
1230        name: rule.name().to_string(),
1231        rule_type: format!("{:?}", rule.rule_type()),
1232        config: rule.config().to_string(),
1233        level: format!("{:?}", rule.level()),
1234        description: rule.description().map(|s| s.to_string()),
1235        active: rule.is_active(),
1236    }))
1237}
1238
1239async fn delete_validation_rule(
1240    State(state): State<Arc<ServerState>>,
1241    Path(name): Path<String>,
1242    auth: Option<Extension<AuthState>>,
1243) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
1244    enforce_permission(&auth, "_global", SchemaPermission::Delete, &state)?;
1245
1246    let removed = state
1247        .registry
1248        .validation_engine()
1249        .write()
1250        .remove_rule(&name);
1251    if removed {
1252        Ok(StatusCode::NO_CONTENT)
1253    } else {
1254        Err((
1255            StatusCode::NOT_FOUND,
1256            Json(ErrorResponse {
1257                error_code: 40404,
1258                message: format!("Validation rule not found: {}", name),
1259            }),
1260        ))
1261    }
1262}
1263
1264// ============================================================================
1265// Context Handlers (Multi-tenant)
1266// ============================================================================
1267
1268#[derive(Serialize)]
1269struct ContextResponse {
1270    name: String,
1271    #[serde(skip_serializing_if = "Option::is_none")]
1272    description: Option<String>,
1273    active: bool,
1274}
1275
1276#[derive(Deserialize)]
1277struct CreateContextRequest {
1278    name: String,
1279    description: Option<String>,
1280}
1281
1282async fn list_contexts(State(state): State<Arc<ServerState>>) -> Json<Vec<ContextResponse>> {
1283    let contexts = state.registry.list_contexts();
1284    Json(
1285        contexts
1286            .into_iter()
1287            .map(|c| ContextResponse {
1288                name: c.name().to_string(),
1289                description: c.description().map(|s| s.to_string()),
1290                active: c.is_active(),
1291            })
1292            .collect(),
1293    )
1294}
1295
1296async fn create_context(
1297    State(state): State<Arc<ServerState>>,
1298    auth: Option<Extension<AuthState>>,
1299    Json(req): Json<CreateContextRequest>,
1300) -> Result<Json<ContextResponse>, (StatusCode, Json<ErrorResponse>)> {
1301    enforce_permission(&auth, "_global", SchemaPermission::Create, &state)?;
1302
1303    let mut context = SchemaContext::new(&req.name);
1304    if let Some(desc) = &req.description {
1305        context = context.with_description(desc);
1306    }
1307
1308    state
1309        .registry
1310        .create_context(context.clone())
1311        .map_err(schema_error_response)?;
1312
1313    Ok(Json(ContextResponse {
1314        name: context.name().to_string(),
1315        description: context.description().map(|s| s.to_string()),
1316        active: context.is_active(),
1317    }))
1318}
1319
1320async fn get_context(
1321    State(state): State<Arc<ServerState>>,
1322    Path(context_name): Path<String>,
1323) -> Result<Json<ContextResponse>, (StatusCode, Json<ErrorResponse>)> {
1324    state
1325        .registry
1326        .get_context(&context_name)
1327        .map(|c| {
1328            Json(ContextResponse {
1329                name: c.name().to_string(),
1330                description: c.description().map(|s| s.to_string()),
1331                active: c.is_active(),
1332            })
1333        })
1334        .ok_or_else(|| {
1335            (
1336                StatusCode::NOT_FOUND,
1337                Json(ErrorResponse {
1338                    error_code: 40405,
1339                    message: format!("Context not found: {}", context_name),
1340                }),
1341            )
1342        })
1343}
1344
1345async fn delete_context(
1346    State(state): State<Arc<ServerState>>,
1347    Path(context_name): Path<String>,
1348    auth: Option<Extension<AuthState>>,
1349) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
1350    enforce_permission(&auth, "_global", SchemaPermission::Delete, &state)?;
1351
1352    state
1353        .registry
1354        .delete_context(&context_name)
1355        .map(|_| StatusCode::NO_CONTENT)
1356        .map_err(schema_error_response)
1357}
1358
1359async fn list_context_subjects(
1360    State(state): State<Arc<ServerState>>,
1361    Path(context_name): Path<String>,
1362) -> Json<Vec<String>> {
1363    Json(state.registry.list_subjects_in_context(&context_name))
1364}
1365
1366// ============================================================================
1367// Statistics Handler
1368// ============================================================================
1369
1370#[derive(Serialize)]
1371struct StatsResponse {
1372    schema_count: usize,
1373    subject_count: usize,
1374    context_count: usize,
1375    cache_size: usize,
1376}
1377
1378async fn get_stats(State(state): State<Arc<ServerState>>) -> Json<StatsResponse> {
1379    let stats = state.registry.stats().await;
1380    Json(StatsResponse {
1381        schema_count: stats.schema_count,
1382        subject_count: stats.subject_count,
1383        context_count: stats.context_count,
1384        cache_size: stats.cache_size,
1385    })
1386}
1387
1388#[cfg(test)]
1389mod tests {
1390    use super::*;
1391    use crate::config::RegistryConfig;
1392    use axum::body::Body;
1393    use axum::http::Request;
1394    use tower::util::ServiceExt;
1395
1396    async fn create_test_app() -> Router {
1397        let config = RegistryConfig::memory();
1398        let registry = SchemaRegistry::new(config).await.unwrap();
1399        let server = SchemaServer::new(registry, ServerConfig::default());
1400        server.router()
1401    }
1402
1403    #[tokio::test]
1404    async fn test_root_endpoint() {
1405        let app = create_test_app().await;
1406
1407        let response = app
1408            .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap())
1409            .await
1410            .unwrap();
1411
1412        assert_eq!(response.status(), StatusCode::OK);
1413    }
1414
1415    #[tokio::test]
1416    async fn test_list_subjects_empty() {
1417        let app = create_test_app().await;
1418
1419        let response = app
1420            .oneshot(
1421                Request::builder()
1422                    .uri("/subjects")
1423                    .body(Body::empty())
1424                    .unwrap(),
1425            )
1426            .await
1427            .unwrap();
1428
1429        assert_eq!(response.status(), StatusCode::OK);
1430    }
1431
1432    #[tokio::test]
1433    async fn test_get_config() {
1434        let app = create_test_app().await;
1435
1436        let response = app
1437            .oneshot(
1438                Request::builder()
1439                    .uri("/config")
1440                    .body(Body::empty())
1441                    .unwrap(),
1442            )
1443            .await
1444            .unwrap();
1445
1446        assert_eq!(response.status(), StatusCode::OK);
1447    }
1448
1449    #[tokio::test]
1450    async fn test_get_mode() {
1451        let app = create_test_app().await;
1452
1453        let response = app
1454            .oneshot(Request::builder().uri("/mode").body(Body::empty()).unwrap())
1455            .await
1456            .unwrap();
1457
1458        assert_eq!(response.status(), StatusCode::OK);
1459    }
1460
1461    #[tokio::test]
1462    async fn test_health_endpoint() {
1463        let app = create_test_app().await;
1464
1465        let response = app
1466            .oneshot(
1467                Request::builder()
1468                    .uri("/health")
1469                    .body(Body::empty())
1470                    .unwrap(),
1471            )
1472            .await
1473            .unwrap();
1474
1475        assert_eq!(response.status(), StatusCode::OK);
1476    }
1477
1478    #[tokio::test]
1479    async fn test_liveness_endpoint() {
1480        let app = create_test_app().await;
1481
1482        let response = app
1483            .oneshot(
1484                Request::builder()
1485                    .uri("/health/live")
1486                    .body(Body::empty())
1487                    .unwrap(),
1488            )
1489            .await
1490            .unwrap();
1491
1492        assert_eq!(response.status(), StatusCode::OK);
1493    }
1494
1495    #[tokio::test]
1496    async fn test_readiness_endpoint() {
1497        let app = create_test_app().await;
1498
1499        let response = app
1500            .oneshot(
1501                Request::builder()
1502                    .uri("/health/ready")
1503                    .body(Body::empty())
1504                    .unwrap(),
1505            )
1506            .await
1507            .unwrap();
1508
1509        assert_eq!(response.status(), StatusCode::OK);
1510    }
1511
1512    #[tokio::test]
1513    async fn test_stats_endpoint() {
1514        let app = create_test_app().await;
1515
1516        let response = app
1517            .oneshot(
1518                Request::builder()
1519                    .uri("/stats")
1520                    .body(Body::empty())
1521                    .unwrap(),
1522            )
1523            .await
1524            .unwrap();
1525
1526        assert_eq!(response.status(), StatusCode::OK);
1527    }
1528
1529    #[tokio::test]
1530    async fn test_list_contexts_empty() {
1531        let app = create_test_app().await;
1532
1533        let response = app
1534            .oneshot(
1535                Request::builder()
1536                    .uri("/contexts")
1537                    .body(Body::empty())
1538                    .unwrap(),
1539            )
1540            .await
1541            .unwrap();
1542
1543        assert_eq!(response.status(), StatusCode::OK);
1544    }
1545
1546    #[tokio::test]
1547    async fn test_list_validation_rules_empty() {
1548        let app = create_test_app().await;
1549
1550        let response = app
1551            .oneshot(
1552                Request::builder()
1553                    .uri("/config/validation/rules")
1554                    .body(Body::empty())
1555                    .unwrap(),
1556            )
1557            .await
1558            .unwrap();
1559
1560        assert_eq!(response.status(), StatusCode::OK);
1561    }
1562}