Skip to main content

rivven_schema/
server.rs

1//! HTTP Server for Schema Registry
2//!
3//! Provides a Confluent-compatible REST API with optional authentication.
4//!
5//! ## Authentication
6//!
7//! When the `auth` feature is enabled, the server supports:
8//! - HTTP Basic Authentication
9//! - Bearer Token authentication (OIDC/service accounts)
10//!
11//! See [`crate::auth`] for configuration options.
12
13use crate::compatibility::CompatibilityLevel;
14use crate::error::SchemaError;
15use crate::registry::SchemaRegistry;
16use crate::types::{
17    SchemaContext, SchemaId, SchemaReference, SchemaType, SchemaVersion, Subject, ValidationLevel,
18    ValidationRule, ValidationRuleType, VersionState,
19};
20use axum::{
21    extract::{DefaultBodyLimit, Path, Query, State},
22    http::StatusCode,
23    routing::{delete, get, post, put},
24    Json, Router,
25};
26use parking_lot::RwLock;
27use serde::{Deserialize, Serialize};
28use std::net::SocketAddr;
29use std::sync::Arc;
30use tower_http::cors::CorsLayer;
31use tower_http::trace::TraceLayer;
32use tracing::info;
33
34#[cfg(feature = "auth")]
35use crate::auth::{auth_middleware, AuthConfig, ServerAuthState};
36#[cfg(feature = "auth")]
37use axum::middleware;
38#[cfg(feature = "auth")]
39use rivven_core::AuthManager;
40
41/// Server configuration
42#[derive(Debug, Clone)]
43pub struct ServerConfig {
44    pub host: String,
45    pub port: u16,
46    /// Authentication configuration (requires `auth` feature)
47    #[cfg(feature = "auth")]
48    pub auth: Option<AuthConfig>,
49}
50
51impl Default for ServerConfig {
52    fn default() -> Self {
53        Self {
54            host: "0.0.0.0".to_string(),
55            port: 8081,
56            #[cfg(feature = "auth")]
57            auth: None,
58        }
59    }
60}
61
62impl ServerConfig {
63    /// Enable authentication with default config
64    #[cfg(feature = "auth")]
65    pub fn with_auth(mut self, auth_config: AuthConfig) -> Self {
66        self.auth = Some(auth_config);
67        self
68    }
69}
70
71/// Registry mode
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
73pub enum RegistryMode {
74    #[default]
75    ReadWrite,
76    ReadOnly,
77    Import,
78}
79
80impl std::fmt::Display for RegistryMode {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        match self {
83            RegistryMode::ReadWrite => write!(f, "READWRITE"),
84            RegistryMode::ReadOnly => write!(f, "READONLY"),
85            RegistryMode::Import => write!(f, "IMPORT"),
86        }
87    }
88}
89
90/// Shared server state
91pub struct ServerState {
92    pub registry: SchemaRegistry,
93    pub mode: RwLock<RegistryMode>,
94    pub default_compatibility: RwLock<CompatibilityLevel>,
95}
96
97/// Schema Registry HTTP Server
98pub struct SchemaServer {
99    state: Arc<ServerState>,
100    #[allow(dead_code)] // Used for auth config when feature is enabled
101    config: ServerConfig,
102    #[cfg(all(feature = "auth", not(feature = "cedar")))]
103    auth_manager: Option<Arc<AuthManager>>,
104    #[cfg(feature = "cedar")]
105    auth_manager: Option<Arc<AuthManager>>,
106    #[cfg(feature = "cedar")]
107    cedar_authorizer: Option<Arc<rivven_core::CedarAuthorizer>>,
108}
109
110impl SchemaServer {
111    /// Create a new schema server
112    pub fn new(registry: SchemaRegistry, config: ServerConfig) -> Self {
113        let default_compat = registry.get_default_compatibility();
114        Self {
115            state: Arc::new(ServerState {
116                registry,
117                mode: RwLock::new(RegistryMode::default()),
118                default_compatibility: RwLock::new(default_compat),
119            }),
120            config,
121            #[cfg(feature = "auth")]
122            auth_manager: None,
123            #[cfg(feature = "cedar")]
124            cedar_authorizer: None,
125        }
126    }
127
128    /// Create a new schema server with authentication
129    #[cfg(all(feature = "auth", not(feature = "cedar")))]
130    pub fn with_auth(
131        registry: SchemaRegistry,
132        config: ServerConfig,
133        auth_manager: Arc<AuthManager>,
134    ) -> Self {
135        let default_compat = registry.get_default_compatibility();
136        Self {
137            state: Arc::new(ServerState {
138                registry,
139                mode: RwLock::new(RegistryMode::default()),
140                default_compatibility: RwLock::new(default_compat),
141            }),
142            config,
143            auth_manager: Some(auth_manager),
144        }
145    }
146
147    /// Create a new schema server with authentication (simple RBAC)
148    #[cfg(feature = "cedar")]
149    pub fn with_auth(
150        registry: SchemaRegistry,
151        config: ServerConfig,
152        auth_manager: Arc<AuthManager>,
153    ) -> Self {
154        let default_compat = registry.get_default_compatibility();
155        Self {
156            state: Arc::new(ServerState {
157                registry,
158                mode: RwLock::new(RegistryMode::default()),
159                default_compatibility: RwLock::new(default_compat),
160            }),
161            config,
162            auth_manager: Some(auth_manager),
163            cedar_authorizer: None,
164        }
165    }
166
167    /// Create a new schema server with Cedar policy-based authorization
168    #[cfg(feature = "cedar")]
169    pub fn with_cedar(
170        registry: SchemaRegistry,
171        config: ServerConfig,
172        auth_manager: Arc<AuthManager>,
173        cedar_authorizer: Arc<rivven_core::CedarAuthorizer>,
174    ) -> Self {
175        let default_compat = registry.get_default_compatibility();
176        Self {
177            state: Arc::new(ServerState {
178                registry,
179                mode: RwLock::new(RegistryMode::default()),
180                default_compatibility: RwLock::new(default_compat),
181            }),
182            config,
183            auth_manager: Some(auth_manager),
184            cedar_authorizer: Some(cedar_authorizer),
185        }
186    }
187
188    /// Build the Axum router
189    pub fn router(&self) -> Router {
190        // Restrict CORS to disallow arbitrary cross-origin requests.
191        // `allow_origin(Any)` + `allow_methods(Any)` + `allow_headers(Any)` allows
192        // any website to make authenticated requests to the schema registry.
193        // Instead, allow only safe methods and standard headers. Operators can
194        // configure specific allowed origins via reverse proxy if needed.
195        let cors = CorsLayer::new()
196            .allow_methods([
197                axum::http::Method::GET,
198                axum::http::Method::POST,
199                axum::http::Method::PUT,
200                axum::http::Method::DELETE,
201            ])
202            .allow_headers([
203                axum::http::header::CONTENT_TYPE,
204                axum::http::header::AUTHORIZATION,
205                axum::http::header::ACCEPT,
206            ]);
207
208        let base_router = Router::new()
209            // Root / health
210            .route("/", get(root_handler))
211            .route("/health", get(health_handler))
212            .route("/health/live", get(liveness_handler))
213            .route("/health/ready", get(readiness_handler))
214            // Schemas
215            .route("/schemas/ids/:id", get(get_schema_by_id))
216            .route("/schemas/ids/:id/versions", get(get_schema_versions))
217            // Subjects
218            .route("/subjects", get(list_subjects))
219            .route("/subjects/:subject", delete(delete_subject))
220            .route("/subjects/:subject/versions", get(list_subject_versions))
221            .route("/subjects/:subject/versions", post(register_schema))
222            .route(
223                "/subjects/:subject/versions/:version",
224                get(get_subject_version),
225            )
226            .route(
227                "/subjects/:subject/versions/:version",
228                delete(delete_version),
229            )
230            // Subject recovery (undelete)
231            .route("/subjects/deleted", get(list_deleted_subjects))
232            .route("/subjects/:subject/undelete", post(undelete_subject))
233            // Version state management
234            .route(
235                "/subjects/:subject/versions/:version/state",
236                get(get_version_state),
237            )
238            .route(
239                "/subjects/:subject/versions/:version/state",
240                put(set_version_state),
241            )
242            .route(
243                "/subjects/:subject/versions/:version/deprecate",
244                post(deprecate_version),
245            )
246            .route(
247                "/subjects/:subject/versions/:version/disable",
248                post(disable_version),
249            )
250            .route(
251                "/subjects/:subject/versions/:version/enable",
252                post(enable_version),
253            )
254            // Schema references
255            .route(
256                "/subjects/:subject/versions/:version/referencedby",
257                get(get_referenced_by),
258            )
259            // Schema validation (without registering)
260            .route("/subjects/:subject/validate", post(validate_schema))
261            // Compatibility
262            .route(
263                "/compatibility/subjects/:subject/versions/:version",
264                post(check_compatibility),
265            )
266            // Config
267            .route("/config", get(get_global_config))
268            .route("/config", put(update_global_config))
269            .route("/config/:subject", get(get_subject_config))
270            .route("/config/:subject", put(update_subject_config))
271            // Validation rules
272            .route("/config/validation/rules", get(list_validation_rules))
273            .route("/config/validation/rules", post(add_validation_rule))
274            .route(
275                "/config/validation/rules/:name",
276                delete(delete_validation_rule),
277            )
278            // Mode
279            .route("/mode", get(get_mode))
280            .route("/mode", put(update_mode))
281            // Contexts (multi-tenant)
282            .route("/contexts", get(list_contexts))
283            .route("/contexts", post(create_context))
284            .route("/contexts/:context", get(get_context))
285            .route("/contexts/:context", delete(delete_context))
286            .route("/contexts/:context/subjects", get(list_context_subjects))
287            // Statistics
288            .route("/stats", get(get_stats))
289            .with_state(self.state.clone())
290            .layer(DefaultBodyLimit::max(10 * 1024 * 1024)) // 10 MiB
291            .layer(cors)
292            .layer(TraceLayer::new_for_http());
293
294        // Add authentication middleware if configured (simple RBAC)
295        #[cfg(all(feature = "auth", not(feature = "cedar")))]
296        let base_router = if let Some(auth_manager) = &self.auth_manager {
297            let auth_config = self.config.auth.clone().unwrap_or_default();
298            let auth_state = Arc::new(ServerAuthState {
299                auth_manager: auth_manager.clone(),
300                config: auth_config,
301            });
302            base_router.layer(middleware::from_fn_with_state(auth_state, auth_middleware))
303        } else {
304            base_router
305        };
306
307        // Add authentication middleware if configured (with Cedar support)
308        #[cfg(feature = "cedar")]
309        let base_router = if let Some(auth_manager) = &self.auth_manager {
310            let auth_config = self.config.auth.clone().unwrap_or_default();
311            let auth_state = Arc::new(ServerAuthState {
312                auth_manager: auth_manager.clone(),
313                cedar_authorizer: self.cedar_authorizer.clone(),
314                config: auth_config,
315            });
316            base_router.layer(middleware::from_fn_with_state(auth_state, auth_middleware))
317        } else {
318            base_router
319        };
320
321        base_router
322    }
323
324    /// Run the server
325    pub async fn run(self, addr: SocketAddr) -> anyhow::Result<()> {
326        let router = self.router();
327        let listener = tokio::net::TcpListener::bind(addr).await?;
328        info!("Schema Registry server listening on {}", addr);
329        axum::serve(listener, router).await?;
330        Ok(())
331    }
332}
333
334// ============================================================================
335// Helper for converting SchemaError to HTTP response
336// ============================================================================
337
338fn schema_error_response(e: SchemaError) -> (StatusCode, Json<ErrorResponse>) {
339    let status = match e.http_status() {
340        404 => StatusCode::NOT_FOUND,
341        409 => StatusCode::CONFLICT,
342        422 => StatusCode::UNPROCESSABLE_ENTITY,
343        _ => StatusCode::INTERNAL_SERVER_ERROR,
344    };
345    (
346        status,
347        Json(ErrorResponse {
348            error_code: e.error_code(),
349            message: e.to_string(),
350        }),
351    )
352}
353
354// ============================================================================
355// Request/Response Types
356// ============================================================================
357
358#[derive(Serialize)]
359struct RootResponse {
360    version: &'static str,
361    commit: &'static str,
362}
363
364#[derive(Deserialize)]
365struct RegisterSchemaRequest {
366    schema: String,
367    #[serde(rename = "schemaType", default)]
368    schema_type: Option<String>,
369    /// Schema references for complex types (Avro named types, Protobuf imports, JSON Schema $refs)
370    #[serde(default)]
371    references: Vec<SchemaReference>,
372}
373
374#[derive(Serialize)]
375struct RegisterSchemaResponse {
376    id: u32,
377}
378
379#[derive(Serialize)]
380struct SchemaResponse {
381    schema: String,
382    #[serde(rename = "schemaType")]
383    schema_type: String,
384    #[serde(default, skip_serializing_if = "Vec::is_empty")]
385    references: Vec<SchemaReference>,
386}
387
388#[derive(Serialize)]
389struct SubjectVersionResponse {
390    subject: String,
391    version: u32,
392    id: u32,
393    schema: String,
394    #[serde(rename = "schemaType")]
395    schema_type: String,
396}
397
398#[derive(Deserialize)]
399struct CompatibilityRequest {
400    schema: String,
401    #[serde(rename = "schemaType", default)]
402    schema_type: Option<String>,
403}
404
405#[derive(Serialize)]
406struct CompatibilityResponse {
407    is_compatible: bool,
408    #[serde(skip_serializing_if = "Vec::is_empty")]
409    messages: Vec<String>,
410}
411
412#[derive(Serialize)]
413struct ConfigResponse {
414    #[serde(rename = "compatibilityLevel")]
415    compatibility_level: String,
416}
417
418#[derive(Deserialize)]
419struct ConfigRequest {
420    compatibility: String,
421}
422
423#[derive(Serialize)]
424struct ModeResponse {
425    mode: String,
426}
427
428#[derive(Deserialize)]
429struct ModeRequest {
430    mode: String,
431}
432
433#[derive(Serialize)]
434struct ErrorResponse {
435    error_code: u32,
436    message: String,
437}
438
439#[derive(Deserialize)]
440struct QueryParams {
441    /// When `true`, perform a hard (permanent) delete that cannot be undone.
442    /// When `false` (default), perform a soft delete; the subject/version can
443    /// be recovered via the `/subjects/{subject}/undelete` endpoint.
444    ///
445    /// **Note:** There is currently no CLI sub-command for delete operations;
446    /// deletions are only available through the HTTP API. A future CLI
447    /// `delete --permanent` flag is planned.
448    #[serde(default)]
449    permanent: bool,
450}
451
452// ============================================================================
453// Handlers
454// ============================================================================
455
456async fn root_handler() -> Json<RootResponse> {
457    Json(RootResponse {
458        version: env!("CARGO_PKG_VERSION"),
459        commit: option_env!("GIT_HASH").unwrap_or("dev"),
460    })
461}
462
463/// Health check endpoint - returns basic health status
464#[derive(Serialize)]
465struct HealthResponse {
466    status: &'static str,
467    version: &'static str,
468}
469
470async fn health_handler() -> Json<HealthResponse> {
471    Json(HealthResponse {
472        status: "healthy",
473        version: env!("CARGO_PKG_VERSION"),
474    })
475}
476
477/// Kubernetes liveness probe - is the service alive?
478async fn liveness_handler() -> StatusCode {
479    StatusCode::OK
480}
481
482/// Kubernetes readiness probe - can the service handle requests?
483async fn readiness_handler(
484    State(state): State<Arc<ServerState>>,
485) -> Result<StatusCode, StatusCode> {
486    // Check if we can access the registry (basic health check)
487    match state.registry.list_subjects().await {
488        Ok(_) => Ok(StatusCode::OK),
489        Err(_) => Err(StatusCode::SERVICE_UNAVAILABLE),
490    }
491}
492
493async fn get_schema_by_id(
494    State(state): State<Arc<ServerState>>,
495    Path(id): Path<u32>,
496) -> Result<Json<SchemaResponse>, (StatusCode, Json<ErrorResponse>)> {
497    state
498        .registry
499        .get_by_id(SchemaId::new(id))
500        .await
501        .map(|schema| {
502            Json(SchemaResponse {
503                schema: schema.schema,
504                schema_type: schema.schema_type.to_string(),
505                references: schema.references,
506            })
507        })
508        .map_err(schema_error_response)
509}
510
511async fn get_schema_versions(
512    State(state): State<Arc<ServerState>>,
513    Path(id): Path<u32>,
514) -> Result<Json<Vec<SubjectVersionResponse>>, (StatusCode, Json<ErrorResponse>)> {
515    // Find all subjects that use this schema ID
516    let subjects = state
517        .registry
518        .list_subjects()
519        .await
520        .map_err(schema_error_response)?;
521    let mut results = Vec::new();
522
523    for subject in subjects {
524        let versions = state
525            .registry
526            .list_versions(subject.as_str())
527            .await
528            .map_err(schema_error_response)?;
529        for ver in versions {
530            if let Ok(sv) = state
531                .registry
532                .get_by_version(subject.as_str(), SchemaVersion::new(ver))
533                .await
534            {
535                if sv.id.0 == id {
536                    results.push(SubjectVersionResponse {
537                        subject: sv.subject.0,
538                        version: sv.version.0,
539                        id: sv.id.0,
540                        schema: sv.schema,
541                        schema_type: sv.schema_type.to_string(),
542                    });
543                }
544            }
545        }
546    }
547
548    Ok(Json(results))
549}
550
551async fn list_subjects(
552    State(state): State<Arc<ServerState>>,
553) -> Result<Json<Vec<String>>, (StatusCode, Json<ErrorResponse>)> {
554    state
555        .registry
556        .list_subjects()
557        .await
558        .map(|subjects| Json(subjects.into_iter().map(|s| s.0).collect()))
559        .map_err(schema_error_response)
560}
561
562async fn delete_subject(
563    State(state): State<Arc<ServerState>>,
564    Path(subject): Path<String>,
565    Query(params): Query<QueryParams>,
566) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
567    // Check mode
568    if *state.mode.read() == RegistryMode::ReadOnly {
569        return Err((
570            StatusCode::FORBIDDEN,
571            Json(ErrorResponse {
572                error_code: 40301,
573                message: "Registry is in READONLY mode".to_string(),
574            }),
575        ));
576    }
577
578    state
579        .registry
580        .delete_subject(subject.as_str(), params.permanent)
581        .await
582        .map(Json)
583        .map_err(schema_error_response)
584}
585
586/// List soft-deleted subjects that can be recovered
587async fn list_deleted_subjects(
588    State(state): State<Arc<ServerState>>,
589) -> Result<Json<Vec<String>>, (StatusCode, Json<ErrorResponse>)> {
590    state
591        .registry
592        .list_deleted_subjects()
593        .await
594        .map(|subjects| Json(subjects.into_iter().map(|s| s.0).collect()))
595        .map_err(schema_error_response)
596}
597
598/// Undelete a soft-deleted subject
599async fn undelete_subject(
600    State(state): State<Arc<ServerState>>,
601    Path(subject): Path<String>,
602) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
603    // Check mode
604    if *state.mode.read() == RegistryMode::ReadOnly {
605        return Err((
606            StatusCode::FORBIDDEN,
607            Json(ErrorResponse {
608                error_code: 40301,
609                message: "Registry is in READONLY mode".to_string(),
610            }),
611        ));
612    }
613
614    state
615        .registry
616        .undelete_subject(subject.as_str())
617        .await
618        .map(Json)
619        .map_err(schema_error_response)
620}
621
622async fn list_subject_versions(
623    State(state): State<Arc<ServerState>>,
624    Path(subject): Path<String>,
625) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
626    state
627        .registry
628        .list_versions(subject.as_str())
629        .await
630        .map(Json)
631        .map_err(schema_error_response)
632}
633
634async fn register_schema(
635    State(state): State<Arc<ServerState>>,
636    Path(subject): Path<String>,
637    Json(req): Json<RegisterSchemaRequest>,
638) -> Result<Json<RegisterSchemaResponse>, (StatusCode, Json<ErrorResponse>)> {
639    // Check mode
640    if *state.mode.read() == RegistryMode::ReadOnly {
641        return Err((
642            StatusCode::FORBIDDEN,
643            Json(ErrorResponse {
644                error_code: 40301,
645                message: "Registry is in READONLY mode".to_string(),
646            }),
647        ));
648    }
649
650    let schema_type = req
651        .schema_type
652        .as_deref()
653        .unwrap_or("AVRO")
654        .parse::<SchemaType>()
655        .map_err(|_| {
656            schema_error_response(crate::SchemaError::InvalidInput(format!(
657                "Invalid schema type: {}",
658                req.schema_type.as_deref().unwrap_or("AVRO")
659            )))
660        })?;
661
662    state
663        .registry
664        .register_with_references(subject.as_str(), schema_type, &req.schema, req.references)
665        .await
666        .map(|id| Json(RegisterSchemaResponse { id: id.0 }))
667        .map_err(schema_error_response)
668}
669
670async fn get_subject_version(
671    State(state): State<Arc<ServerState>>,
672    Path((subject, version)): Path<(String, String)>,
673) -> Result<Json<SubjectVersionResponse>, (StatusCode, Json<ErrorResponse>)> {
674    let version = if version == "latest" {
675        state
676            .registry
677            .get_latest(subject.as_str())
678            .await
679            .map_err(schema_error_response)?
680            .version
681    } else {
682        let v: u32 = version.parse().map_err(|_| {
683            (
684                StatusCode::UNPROCESSABLE_ENTITY,
685                Json(ErrorResponse {
686                    error_code: 42202,
687                    message: format!("Invalid version: '{}'", version),
688                }),
689            )
690        })?;
691        SchemaVersion::new(v)
692    };
693
694    state
695        .registry
696        .get_by_version(subject.as_str(), version)
697        .await
698        .map(|sv| {
699            Json(SubjectVersionResponse {
700                subject: sv.subject.0,
701                version: sv.version.0,
702                id: sv.id.0,
703                schema: sv.schema,
704                schema_type: sv.schema_type.to_string(),
705            })
706        })
707        .map_err(schema_error_response)
708}
709
710async fn delete_version(
711    State(state): State<Arc<ServerState>>,
712    Path((subject, version)): Path<(String, u32)>,
713    Query(params): Query<QueryParams>,
714) -> Result<Json<u32>, (StatusCode, Json<ErrorResponse>)> {
715    // Check mode
716    if *state.mode.read() == RegistryMode::ReadOnly {
717        return Err((
718            StatusCode::FORBIDDEN,
719            Json(ErrorResponse {
720                error_code: 40301,
721                message: "Registry is in READONLY mode".to_string(),
722            }),
723        ));
724    }
725
726    state
727        .registry
728        .delete_version(
729            subject.as_str(),
730            SchemaVersion::new(version),
731            params.permanent,
732        )
733        .await
734        .map(|_| Json(version))
735        .map_err(schema_error_response)
736}
737
738/// Get all schema IDs that reference a given subject/version
739async fn get_referenced_by(
740    State(state): State<Arc<ServerState>>,
741    Path((subject, version)): Path<(String, String)>,
742) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
743    let version = if version == "latest" {
744        // Get latest version number
745        let versions = state
746            .registry
747            .list_versions(subject.as_str())
748            .await
749            .map_err(schema_error_response)?;
750        versions.into_iter().max().unwrap_or(1)
751    } else {
752        version.parse().map_err(|_| {
753            schema_error_response(crate::SchemaError::InvalidInput(format!(
754                "Invalid version: {}",
755                version
756            )))
757        })?
758    };
759
760    state
761        .registry
762        .get_schemas_referencing(subject.as_str(), SchemaVersion::new(version))
763        .await
764        .map(|ids| Json(ids.into_iter().map(|id| id.0).collect()))
765        .map_err(schema_error_response)
766}
767
768async fn check_compatibility(
769    State(state): State<Arc<ServerState>>,
770    Path((subject, version)): Path<(String, String)>,
771    Json(req): Json<CompatibilityRequest>,
772) -> Result<Json<CompatibilityResponse>, (StatusCode, Json<ErrorResponse>)> {
773    let schema_type = req
774        .schema_type
775        .as_deref()
776        .unwrap_or("AVRO")
777        .parse::<SchemaType>()
778        .map_err(|_| {
779            schema_error_response(crate::SchemaError::InvalidInput(format!(
780                "Invalid schema type: {}",
781                req.schema_type.as_deref().unwrap_or("AVRO")
782            )))
783        })?;
784
785    let version = if version == "latest" {
786        None
787    } else {
788        Some(SchemaVersion::new(version.parse().map_err(|_| {
789            schema_error_response(crate::SchemaError::InvalidInput(format!(
790                "Invalid version: {}",
791                version
792            )))
793        })?))
794    };
795
796    state
797        .registry
798        .check_compatibility(subject.as_str(), schema_type, &req.schema, version)
799        .await
800        .map(|result| {
801            Json(CompatibilityResponse {
802                is_compatible: result.is_compatible,
803                messages: result.messages,
804            })
805        })
806        .map_err(schema_error_response)
807}
808
809async fn get_global_config(State(state): State<Arc<ServerState>>) -> Json<ConfigResponse> {
810    Json(ConfigResponse {
811        compatibility_level: state.default_compatibility.read().to_string(),
812    })
813}
814
815async fn update_global_config(
816    State(state): State<Arc<ServerState>>,
817    Json(req): Json<ConfigRequest>,
818) -> Result<Json<ConfigResponse>, (StatusCode, Json<ErrorResponse>)> {
819    let level: CompatibilityLevel = req.compatibility.parse().map_err(|_| {
820        (
821            StatusCode::UNPROCESSABLE_ENTITY,
822            Json(ErrorResponse {
823                error_code: 42203,
824                message: format!("Invalid compatibility level: {}", req.compatibility),
825            }),
826        )
827    })?;
828
829    *state.default_compatibility.write() = level;
830
831    Ok(Json(ConfigResponse {
832        compatibility_level: level.to_string(),
833    }))
834}
835
836async fn get_subject_config(
837    State(state): State<Arc<ServerState>>,
838    Path(subject): Path<String>,
839) -> Json<ConfigResponse> {
840    let level = state
841        .registry
842        .get_subject_compatibility(&Subject::new(subject));
843    Json(ConfigResponse {
844        compatibility_level: level.to_string(),
845    })
846}
847
848async fn update_subject_config(
849    State(state): State<Arc<ServerState>>,
850    Path(subject): Path<String>,
851    Json(req): Json<ConfigRequest>,
852) -> Result<Json<ConfigResponse>, (StatusCode, Json<ErrorResponse>)> {
853    let level: CompatibilityLevel = req.compatibility.parse().map_err(|_| {
854        (
855            StatusCode::UNPROCESSABLE_ENTITY,
856            Json(ErrorResponse {
857                error_code: 42203,
858                message: format!("Invalid compatibility level: {}", req.compatibility),
859            }),
860        )
861    })?;
862
863    state
864        .registry
865        .set_subject_compatibility(subject.as_str(), level);
866
867    Ok(Json(ConfigResponse {
868        compatibility_level: level.to_string(),
869    }))
870}
871
872async fn get_mode(State(state): State<Arc<ServerState>>) -> Json<ModeResponse> {
873    Json(ModeResponse {
874        mode: state.mode.read().to_string(),
875    })
876}
877
878async fn update_mode(
879    State(state): State<Arc<ServerState>>,
880    Json(req): Json<ModeRequest>,
881) -> Result<Json<ModeResponse>, (StatusCode, Json<ErrorResponse>)> {
882    let mode = match req.mode.to_uppercase().as_str() {
883        "READWRITE" => RegistryMode::ReadWrite,
884        "READONLY" => RegistryMode::ReadOnly,
885        "IMPORT" => RegistryMode::Import,
886        _ => {
887            return Err((
888                StatusCode::UNPROCESSABLE_ENTITY,
889                Json(ErrorResponse {
890                    error_code: 42204,
891                    message: format!("Invalid mode: {}", req.mode),
892                }),
893            ))
894        }
895    };
896
897    *state.mode.write() = mode;
898
899    Ok(Json(ModeResponse {
900        mode: mode.to_string(),
901    }))
902}
903
904// ============================================================================
905// Version State Handlers
906// ============================================================================
907
908#[derive(Serialize)]
909struct VersionStateResponse {
910    state: String,
911}
912
913#[derive(Deserialize)]
914struct VersionStateRequest {
915    state: String,
916}
917
918async fn get_version_state(
919    State(state): State<Arc<ServerState>>,
920    Path((subject, version)): Path<(String, u32)>,
921) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
922    state
923        .registry
924        .get_version_state(subject.as_str(), SchemaVersion::new(version))
925        .await
926        .map(|s| {
927            Json(VersionStateResponse {
928                state: s.to_string(),
929            })
930        })
931        .map_err(schema_error_response)
932}
933
934async fn set_version_state(
935    State(state): State<Arc<ServerState>>,
936    Path((subject, version)): Path<(String, u32)>,
937    Json(req): Json<VersionStateRequest>,
938) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
939    let version_state: VersionState = req.state.parse().map_err(|_| {
940        (
941            StatusCode::UNPROCESSABLE_ENTITY,
942            Json(ErrorResponse {
943                error_code: 42205,
944                message: format!("Invalid version state: {}", req.state),
945            }),
946        )
947    })?;
948
949    state
950        .registry
951        .set_version_state(subject.as_str(), SchemaVersion::new(version), version_state)
952        .await
953        .map_err(schema_error_response)?;
954
955    Ok(Json(VersionStateResponse {
956        state: version_state.to_string(),
957    }))
958}
959
960async fn deprecate_version(
961    State(state): State<Arc<ServerState>>,
962    Path((subject, version)): Path<(String, u32)>,
963) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
964    state
965        .registry
966        .deprecate_version(subject.as_str(), SchemaVersion::new(version))
967        .await
968        .map_err(schema_error_response)?;
969
970    Ok(Json(VersionStateResponse {
971        state: "DEPRECATED".to_string(),
972    }))
973}
974
975async fn disable_version(
976    State(state): State<Arc<ServerState>>,
977    Path((subject, version)): Path<(String, u32)>,
978) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
979    state
980        .registry
981        .disable_version(subject.as_str(), SchemaVersion::new(version))
982        .await
983        .map_err(schema_error_response)?;
984
985    Ok(Json(VersionStateResponse {
986        state: "DISABLED".to_string(),
987    }))
988}
989
990async fn enable_version(
991    State(state): State<Arc<ServerState>>,
992    Path((subject, version)): Path<(String, u32)>,
993) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
994    state
995        .registry
996        .enable_version(subject.as_str(), SchemaVersion::new(version))
997        .await
998        .map_err(schema_error_response)?;
999
1000    Ok(Json(VersionStateResponse {
1001        state: "ENABLED".to_string(),
1002    }))
1003}
1004
1005// ============================================================================
1006// Validation Handlers
1007// ============================================================================
1008
1009#[derive(Deserialize)]
1010struct ValidateSchemaRequest {
1011    schema: String,
1012    #[serde(rename = "schemaType", default)]
1013    schema_type: Option<String>,
1014}
1015
1016#[derive(Serialize)]
1017struct ValidationResponse {
1018    is_valid: bool,
1019    errors: Vec<String>,
1020    warnings: Vec<String>,
1021}
1022
1023async fn validate_schema(
1024    State(state): State<Arc<ServerState>>,
1025    Path(subject): Path<String>,
1026    Json(req): Json<ValidateSchemaRequest>,
1027) -> Result<Json<ValidationResponse>, (StatusCode, Json<ErrorResponse>)> {
1028    let schema_type = req
1029        .schema_type
1030        .as_deref()
1031        .unwrap_or("AVRO")
1032        .parse::<SchemaType>()
1033        .map_err(|_| {
1034            schema_error_response(crate::SchemaError::InvalidInput(format!(
1035                "Invalid schema type: {}",
1036                req.schema_type.as_deref().unwrap_or("AVRO")
1037            )))
1038        })?;
1039
1040    state
1041        .registry
1042        .validate_schema(schema_type, &subject, &req.schema)
1043        .map(|report| {
1044            Json(ValidationResponse {
1045                is_valid: report.is_valid(),
1046                errors: report.error_messages(),
1047                warnings: report.warning_messages(),
1048            })
1049        })
1050        .map_err(schema_error_response)
1051}
1052
1053#[derive(Serialize)]
1054struct ValidationRuleResponse {
1055    name: String,
1056    rule_type: String,
1057    config: String,
1058    level: String,
1059    #[serde(skip_serializing_if = "Option::is_none")]
1060    description: Option<String>,
1061    active: bool,
1062}
1063
1064#[derive(Deserialize)]
1065struct AddValidationRuleRequest {
1066    name: String,
1067    rule_type: String,
1068    config: String,
1069    #[serde(default = "default_level")]
1070    level: String,
1071    description: Option<String>,
1072    #[serde(default)]
1073    subjects: Vec<String>,
1074    #[serde(default)]
1075    schema_types: Vec<String>,
1076}
1077
1078fn default_level() -> String {
1079    "ERROR".to_string()
1080}
1081
1082async fn list_validation_rules(
1083    State(state): State<Arc<ServerState>>,
1084) -> Json<Vec<ValidationRuleResponse>> {
1085    let rules = state.registry.validation_engine().read().list_rules();
1086    Json(
1087        rules
1088            .into_iter()
1089            .map(|r| ValidationRuleResponse {
1090                name: r.name().to_string(),
1091                rule_type: format!("{:?}", r.rule_type()),
1092                config: r.config().to_string(),
1093                level: format!("{:?}", r.level()),
1094                description: r.description().map(|s| s.to_string()),
1095                active: r.is_active(),
1096            })
1097            .collect(),
1098    )
1099}
1100
1101async fn add_validation_rule(
1102    State(state): State<Arc<ServerState>>,
1103    Json(req): Json<AddValidationRuleRequest>,
1104) -> Result<Json<ValidationRuleResponse>, (StatusCode, Json<ErrorResponse>)> {
1105    let rule_type: ValidationRuleType = req.rule_type.parse().map_err(|_| {
1106        (
1107            StatusCode::UNPROCESSABLE_ENTITY,
1108            Json(ErrorResponse {
1109                error_code: 42206,
1110                message: format!("Invalid rule type: {}", req.rule_type),
1111            }),
1112        )
1113    })?;
1114
1115    let level: ValidationLevel = req.level.parse().map_err(|_| {
1116        (
1117            StatusCode::UNPROCESSABLE_ENTITY,
1118            Json(ErrorResponse {
1119                error_code: 42207,
1120                message: format!("Invalid validation level: {}", req.level),
1121            }),
1122        )
1123    })?;
1124
1125    let mut rule = ValidationRule::new(&req.name, rule_type, &req.config).with_level(level);
1126
1127    if let Some(desc) = &req.description {
1128        rule = rule.with_description(desc);
1129    }
1130
1131    if !req.subjects.is_empty() {
1132        rule = rule.for_subjects(req.subjects);
1133    }
1134
1135    if !req.schema_types.is_empty() {
1136        let schema_types: Vec<SchemaType> = req
1137            .schema_types
1138            .iter()
1139            .filter_map(|s| s.parse().ok())
1140            .collect();
1141        rule = rule.for_schema_types(schema_types);
1142    }
1143
1144    state.registry.add_validation_rule(rule.clone());
1145
1146    Ok(Json(ValidationRuleResponse {
1147        name: rule.name().to_string(),
1148        rule_type: format!("{:?}", rule.rule_type()),
1149        config: rule.config().to_string(),
1150        level: format!("{:?}", rule.level()),
1151        description: rule.description().map(|s| s.to_string()),
1152        active: rule.is_active(),
1153    }))
1154}
1155
1156async fn delete_validation_rule(
1157    State(state): State<Arc<ServerState>>,
1158    Path(name): Path<String>,
1159) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
1160    let removed = state
1161        .registry
1162        .validation_engine()
1163        .write()
1164        .remove_rule(&name);
1165    if removed {
1166        Ok(StatusCode::NO_CONTENT)
1167    } else {
1168        Err((
1169            StatusCode::NOT_FOUND,
1170            Json(ErrorResponse {
1171                error_code: 40404,
1172                message: format!("Validation rule not found: {}", name),
1173            }),
1174        ))
1175    }
1176}
1177
1178// ============================================================================
1179// Context Handlers (Multi-tenant)
1180// ============================================================================
1181
1182#[derive(Serialize)]
1183struct ContextResponse {
1184    name: String,
1185    #[serde(skip_serializing_if = "Option::is_none")]
1186    description: Option<String>,
1187    active: bool,
1188}
1189
1190#[derive(Deserialize)]
1191struct CreateContextRequest {
1192    name: String,
1193    description: Option<String>,
1194}
1195
1196async fn list_contexts(State(state): State<Arc<ServerState>>) -> Json<Vec<ContextResponse>> {
1197    let contexts = state.registry.list_contexts();
1198    Json(
1199        contexts
1200            .into_iter()
1201            .map(|c| ContextResponse {
1202                name: c.name().to_string(),
1203                description: c.description().map(|s| s.to_string()),
1204                active: c.is_active(),
1205            })
1206            .collect(),
1207    )
1208}
1209
1210async fn create_context(
1211    State(state): State<Arc<ServerState>>,
1212    Json(req): Json<CreateContextRequest>,
1213) -> Result<Json<ContextResponse>, (StatusCode, Json<ErrorResponse>)> {
1214    let mut context = SchemaContext::new(&req.name);
1215    if let Some(desc) = &req.description {
1216        context = context.with_description(desc);
1217    }
1218
1219    state
1220        .registry
1221        .create_context(context.clone())
1222        .map_err(schema_error_response)?;
1223
1224    Ok(Json(ContextResponse {
1225        name: context.name().to_string(),
1226        description: context.description().map(|s| s.to_string()),
1227        active: context.is_active(),
1228    }))
1229}
1230
1231async fn get_context(
1232    State(state): State<Arc<ServerState>>,
1233    Path(context_name): Path<String>,
1234) -> Result<Json<ContextResponse>, (StatusCode, Json<ErrorResponse>)> {
1235    state
1236        .registry
1237        .get_context(&context_name)
1238        .map(|c| {
1239            Json(ContextResponse {
1240                name: c.name().to_string(),
1241                description: c.description().map(|s| s.to_string()),
1242                active: c.is_active(),
1243            })
1244        })
1245        .ok_or_else(|| {
1246            (
1247                StatusCode::NOT_FOUND,
1248                Json(ErrorResponse {
1249                    error_code: 40405,
1250                    message: format!("Context not found: {}", context_name),
1251                }),
1252            )
1253        })
1254}
1255
1256async fn delete_context(
1257    State(state): State<Arc<ServerState>>,
1258    Path(context_name): Path<String>,
1259) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
1260    state
1261        .registry
1262        .delete_context(&context_name)
1263        .map(|_| StatusCode::NO_CONTENT)
1264        .map_err(schema_error_response)
1265}
1266
1267async fn list_context_subjects(
1268    State(state): State<Arc<ServerState>>,
1269    Path(context_name): Path<String>,
1270) -> Json<Vec<String>> {
1271    Json(state.registry.list_subjects_in_context(&context_name))
1272}
1273
1274// ============================================================================
1275// Statistics Handler
1276// ============================================================================
1277
1278#[derive(Serialize)]
1279struct StatsResponse {
1280    schema_count: usize,
1281    subject_count: usize,
1282    context_count: usize,
1283    cache_size: usize,
1284}
1285
1286async fn get_stats(State(state): State<Arc<ServerState>>) -> Json<StatsResponse> {
1287    let stats = state.registry.stats().await;
1288    Json(StatsResponse {
1289        schema_count: stats.schema_count,
1290        subject_count: stats.subject_count,
1291        context_count: stats.context_count,
1292        cache_size: stats.cache_size,
1293    })
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298    use super::*;
1299    use crate::config::RegistryConfig;
1300    use axum::body::Body;
1301    use axum::http::Request;
1302    use tower::util::ServiceExt;
1303
1304    async fn create_test_app() -> Router {
1305        let config = RegistryConfig::memory();
1306        let registry = SchemaRegistry::new(config).await.unwrap();
1307        let server = SchemaServer::new(registry, ServerConfig::default());
1308        server.router()
1309    }
1310
1311    #[tokio::test]
1312    async fn test_root_endpoint() {
1313        let app = create_test_app().await;
1314
1315        let response = app
1316            .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap())
1317            .await
1318            .unwrap();
1319
1320        assert_eq!(response.status(), StatusCode::OK);
1321    }
1322
1323    #[tokio::test]
1324    async fn test_list_subjects_empty() {
1325        let app = create_test_app().await;
1326
1327        let response = app
1328            .oneshot(
1329                Request::builder()
1330                    .uri("/subjects")
1331                    .body(Body::empty())
1332                    .unwrap(),
1333            )
1334            .await
1335            .unwrap();
1336
1337        assert_eq!(response.status(), StatusCode::OK);
1338    }
1339
1340    #[tokio::test]
1341    async fn test_get_config() {
1342        let app = create_test_app().await;
1343
1344        let response = app
1345            .oneshot(
1346                Request::builder()
1347                    .uri("/config")
1348                    .body(Body::empty())
1349                    .unwrap(),
1350            )
1351            .await
1352            .unwrap();
1353
1354        assert_eq!(response.status(), StatusCode::OK);
1355    }
1356
1357    #[tokio::test]
1358    async fn test_get_mode() {
1359        let app = create_test_app().await;
1360
1361        let response = app
1362            .oneshot(Request::builder().uri("/mode").body(Body::empty()).unwrap())
1363            .await
1364            .unwrap();
1365
1366        assert_eq!(response.status(), StatusCode::OK);
1367    }
1368
1369    #[tokio::test]
1370    async fn test_health_endpoint() {
1371        let app = create_test_app().await;
1372
1373        let response = app
1374            .oneshot(
1375                Request::builder()
1376                    .uri("/health")
1377                    .body(Body::empty())
1378                    .unwrap(),
1379            )
1380            .await
1381            .unwrap();
1382
1383        assert_eq!(response.status(), StatusCode::OK);
1384    }
1385
1386    #[tokio::test]
1387    async fn test_liveness_endpoint() {
1388        let app = create_test_app().await;
1389
1390        let response = app
1391            .oneshot(
1392                Request::builder()
1393                    .uri("/health/live")
1394                    .body(Body::empty())
1395                    .unwrap(),
1396            )
1397            .await
1398            .unwrap();
1399
1400        assert_eq!(response.status(), StatusCode::OK);
1401    }
1402
1403    #[tokio::test]
1404    async fn test_readiness_endpoint() {
1405        let app = create_test_app().await;
1406
1407        let response = app
1408            .oneshot(
1409                Request::builder()
1410                    .uri("/health/ready")
1411                    .body(Body::empty())
1412                    .unwrap(),
1413            )
1414            .await
1415            .unwrap();
1416
1417        assert_eq!(response.status(), StatusCode::OK);
1418    }
1419
1420    #[tokio::test]
1421    async fn test_stats_endpoint() {
1422        let app = create_test_app().await;
1423
1424        let response = app
1425            .oneshot(
1426                Request::builder()
1427                    .uri("/stats")
1428                    .body(Body::empty())
1429                    .unwrap(),
1430            )
1431            .await
1432            .unwrap();
1433
1434        assert_eq!(response.status(), StatusCode::OK);
1435    }
1436
1437    #[tokio::test]
1438    async fn test_list_contexts_empty() {
1439        let app = create_test_app().await;
1440
1441        let response = app
1442            .oneshot(
1443                Request::builder()
1444                    .uri("/contexts")
1445                    .body(Body::empty())
1446                    .unwrap(),
1447            )
1448            .await
1449            .unwrap();
1450
1451        assert_eq!(response.status(), StatusCode::OK);
1452    }
1453
1454    #[tokio::test]
1455    async fn test_list_validation_rules_empty() {
1456        let app = create_test_app().await;
1457
1458        let response = app
1459            .oneshot(
1460                Request::builder()
1461                    .uri("/config/validation/rules")
1462                    .body(Body::empty())
1463                    .unwrap(),
1464            )
1465            .await
1466            .unwrap();
1467
1468        assert_eq!(response.status(), StatusCode::OK);
1469    }
1470}