1use crate::compatibility::CompatibilityLevel;
14use crate::error::SchemaError;
15use crate::registry::SchemaRegistry;
16use crate::types::{
17 SchemaContext, SchemaId, SchemaReference, SchemaType, SchemaVersion, Subject, ValidationLevel,
18 ValidationRule, ValidationRuleType, VersionState,
19};
20use axum::{
21 extract::{DefaultBodyLimit, Extension, Path, Query, State},
22 http::StatusCode,
23 routing::{delete, get, post, put},
24 Json, Router,
25};
26use parking_lot::RwLock;
27use serde::{Deserialize, Serialize};
28use std::net::SocketAddr;
29use std::sync::Arc;
30use tower_http::cors::CorsLayer;
31use tower_http::trace::TraceLayer;
32use tracing::info;
33
34use crate::auth::AuthState;
35use crate::auth::{auth_middleware, AuthConfig, SchemaPermission, ServerAuthState};
36use axum::middleware;
37use rivven_core::AuthManager;
38
39#[derive(Debug, Clone)]
41pub struct ServerConfig {
42 pub host: String,
43 pub port: u16,
44 pub auth: Option<AuthConfig>,
46}
47
48impl Default for ServerConfig {
49 fn default() -> Self {
50 Self {
51 host: "0.0.0.0".to_string(),
52 port: 8081,
53 auth: None,
54 }
55 }
56}
57
58impl ServerConfig {
59 pub fn with_auth(mut self, auth_config: AuthConfig) -> Self {
61 self.auth = Some(auth_config);
62 self
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
68pub enum RegistryMode {
69 #[default]
70 ReadWrite,
71 ReadOnly,
72 Import,
73}
74
75impl std::fmt::Display for RegistryMode {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 match self {
78 RegistryMode::ReadWrite => write!(f, "READWRITE"),
79 RegistryMode::ReadOnly => write!(f, "READONLY"),
80 RegistryMode::Import => write!(f, "IMPORT"),
81 }
82 }
83}
84
85pub struct ServerState {
87 pub registry: SchemaRegistry,
88 pub mode: RwLock<RegistryMode>,
89 #[cfg(feature = "cedar")]
90 pub cedar_authorizer: Option<Arc<rivven_core::CedarAuthorizer>>,
91}
92
93pub struct SchemaServer {
95 state: Arc<ServerState>,
96 config: ServerConfig,
97 #[cfg(not(feature = "cedar"))]
98 auth_manager: Option<Arc<AuthManager>>,
99 #[cfg(feature = "cedar")]
100 auth_manager: Option<Arc<AuthManager>>,
101 #[cfg(feature = "cedar")]
102 cedar_authorizer: Option<Arc<rivven_core::CedarAuthorizer>>,
103}
104
105impl SchemaServer {
106 pub fn new(registry: SchemaRegistry, config: ServerConfig) -> Self {
108 Self {
109 state: Arc::new(ServerState {
110 registry,
111 mode: RwLock::new(RegistryMode::default()),
112 #[cfg(feature = "cedar")]
113 cedar_authorizer: None,
114 }),
115 config,
116 auth_manager: None,
117 #[cfg(feature = "cedar")]
118 cedar_authorizer: None,
119 }
120 }
121
122 #[cfg(not(feature = "cedar"))]
124 pub fn with_auth(
125 registry: SchemaRegistry,
126 config: ServerConfig,
127 auth_manager: Arc<AuthManager>,
128 ) -> Self {
129 Self {
130 state: Arc::new(ServerState {
131 registry,
132 mode: RwLock::new(RegistryMode::default()),
133 }),
134 config,
135 auth_manager: Some(auth_manager),
136 }
137 }
138
139 #[cfg(feature = "cedar")]
141 pub fn with_auth(
142 registry: SchemaRegistry,
143 config: ServerConfig,
144 auth_manager: Arc<AuthManager>,
145 ) -> Self {
146 Self {
147 state: Arc::new(ServerState {
148 registry,
149 mode: RwLock::new(RegistryMode::default()),
150 cedar_authorizer: None,
151 }),
152 config,
153 auth_manager: Some(auth_manager),
154 cedar_authorizer: None,
155 }
156 }
157
158 #[cfg(feature = "cedar")]
160 pub fn with_cedar(
161 registry: SchemaRegistry,
162 config: ServerConfig,
163 auth_manager: Arc<AuthManager>,
164 cedar_authorizer: Arc<rivven_core::CedarAuthorizer>,
165 ) -> Self {
166 Self {
167 state: Arc::new(ServerState {
168 registry,
169 mode: RwLock::new(RegistryMode::default()),
170 cedar_authorizer: Some(cedar_authorizer.clone()),
171 }),
172 config,
173 auth_manager: Some(auth_manager),
174 cedar_authorizer: Some(cedar_authorizer),
175 }
176 }
177
178 pub fn router(&self) -> Router {
180 let cors = CorsLayer::new()
186 .allow_methods([
187 axum::http::Method::GET,
188 axum::http::Method::POST,
189 axum::http::Method::PUT,
190 axum::http::Method::DELETE,
191 ])
192 .allow_headers([
193 axum::http::header::CONTENT_TYPE,
194 axum::http::header::AUTHORIZATION,
195 axum::http::header::ACCEPT,
196 ]);
197
198 let base_router = Router::new()
199 .route("/", get(root_handler))
201 .route("/health", get(health_handler))
202 .route("/health/live", get(liveness_handler))
203 .route("/health/ready", get(readiness_handler))
204 .route("/schemas/ids/:id", get(get_schema_by_id))
206 .route("/schemas/ids/:id/versions", get(get_schema_versions))
207 .route("/subjects", get(list_subjects))
209 .route("/subjects/:subject", delete(delete_subject))
210 .route("/subjects/:subject/versions", get(list_subject_versions))
211 .route("/subjects/:subject/versions", post(register_schema))
212 .route(
213 "/subjects/:subject/versions/:version",
214 get(get_subject_version),
215 )
216 .route(
217 "/subjects/:subject/versions/:version",
218 delete(delete_version),
219 )
220 .route("/subjects/deleted", get(list_deleted_subjects))
222 .route("/subjects/:subject/undelete", post(undelete_subject))
223 .route(
225 "/subjects/:subject/versions/:version/state",
226 get(get_version_state),
227 )
228 .route(
229 "/subjects/:subject/versions/:version/state",
230 put(set_version_state),
231 )
232 .route(
233 "/subjects/:subject/versions/:version/deprecate",
234 post(deprecate_version),
235 )
236 .route(
237 "/subjects/:subject/versions/:version/disable",
238 post(disable_version),
239 )
240 .route(
241 "/subjects/:subject/versions/:version/enable",
242 post(enable_version),
243 )
244 .route(
246 "/subjects/:subject/versions/:version/referencedby",
247 get(get_referenced_by),
248 )
249 .route("/subjects/:subject/validate", post(validate_schema))
251 .route(
253 "/compatibility/subjects/:subject/versions/:version",
254 post(check_compatibility),
255 )
256 .route("/config", get(get_global_config))
258 .route("/config", put(update_global_config))
259 .route("/config/:subject", get(get_subject_config))
260 .route("/config/:subject", put(update_subject_config))
261 .route("/config/validation/rules", get(list_validation_rules))
263 .route("/config/validation/rules", post(add_validation_rule))
264 .route(
265 "/config/validation/rules/:name",
266 delete(delete_validation_rule),
267 )
268 .route("/mode", get(get_mode))
270 .route("/mode", put(update_mode))
271 .route("/contexts", get(list_contexts))
273 .route("/contexts", post(create_context))
274 .route("/contexts/:context", get(get_context))
275 .route("/contexts/:context", delete(delete_context))
276 .route("/contexts/:context/subjects", get(list_context_subjects))
277 .route("/stats", get(get_stats))
279 .with_state(self.state.clone())
280 .layer(DefaultBodyLimit::max(10 * 1024 * 1024)) .layer(cors)
282 .layer(TraceLayer::new_for_http());
283
284 #[cfg(not(feature = "cedar"))]
286 let base_router = if let Some(auth_manager) = &self.auth_manager {
287 let auth_config = self.config.auth.clone().unwrap_or_default();
288 let auth_state = Arc::new(ServerAuthState {
289 auth_manager: auth_manager.clone(),
290 config: auth_config,
291 });
292 base_router.layer(middleware::from_fn_with_state(auth_state, auth_middleware))
293 } else {
294 base_router
295 };
296
297 #[cfg(feature = "cedar")]
299 let base_router = if let Some(auth_manager) = &self.auth_manager {
300 let auth_config = self.config.auth.clone().unwrap_or_default();
301 let auth_state = Arc::new(ServerAuthState {
302 auth_manager: auth_manager.clone(),
303 cedar_authorizer: self.cedar_authorizer.clone(),
304 config: auth_config,
305 });
306 base_router.layer(middleware::from_fn_with_state(auth_state, auth_middleware))
307 } else {
308 base_router
309 };
310
311 base_router
312 }
313
314 pub async fn run(self, addr: SocketAddr) -> anyhow::Result<()> {
316 let router = self.router();
317 let listener = tokio::net::TcpListener::bind(addr).await?;
318 info!("Schema Registry server listening on {}", addr);
319 axum::serve(listener, router).await?;
320 Ok(())
321 }
322}
323
324fn schema_error_response(e: SchemaError) -> (StatusCode, Json<ErrorResponse>) {
329 let status = match e.http_status() {
330 404 => StatusCode::NOT_FOUND,
331 409 => StatusCode::CONFLICT,
332 422 => StatusCode::UNPROCESSABLE_ENTITY,
333 _ => StatusCode::INTERNAL_SERVER_ERROR,
334 };
335 (
336 status,
337 Json(ErrorResponse {
338 error_code: e.error_code(),
339 message: e.to_string(),
340 }),
341 )
342}
343
344fn enforce_permission(
351 auth: &Option<Extension<AuthState>>,
352 subject: &str,
353 permission: SchemaPermission,
354 server_state: &ServerState,
355) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
356 let _ = server_state; if let Some(Extension(ref state)) = auth {
359 let map_err = |e: crate::auth::AuthErrorResponse| {
360 (
361 StatusCode::FORBIDDEN,
362 Json(ErrorResponse {
363 error_code: e.error_code as u32,
364 message: e.message,
365 }),
366 )
367 };
368
369 #[cfg(not(feature = "cedar"))]
370 crate::auth::check_subject_permission(state, subject, permission).map_err(map_err)?;
371
372 #[cfg(feature = "cedar")]
373 {
374 if let Some(ref authorizer) = server_state.cedar_authorizer {
375 crate::auth::check_subject_permission_cedar(
376 state, subject, permission, authorizer, None,
377 )
378 .map_err(map_err)?;
379 } else {
380 crate::auth::check_subject_permission(state, subject, permission)
382 .map_err(map_err)?;
383 }
384 }
385 }
386 Ok(())
389}
390
391#[derive(Serialize)]
396struct RootResponse {
397 version: &'static str,
398 commit: &'static str,
399}
400
401#[derive(Deserialize)]
402struct RegisterSchemaRequest {
403 schema: String,
404 #[serde(rename = "schemaType", default)]
405 schema_type: Option<String>,
406 #[serde(default)]
408 references: Vec<SchemaReference>,
409}
410
411#[derive(Serialize)]
412struct RegisterSchemaResponse {
413 id: u32,
414}
415
416#[derive(Serialize)]
417struct SchemaResponse {
418 schema: String,
419 #[serde(rename = "schemaType")]
420 schema_type: String,
421 #[serde(default, skip_serializing_if = "Vec::is_empty")]
422 references: Vec<SchemaReference>,
423}
424
425#[derive(Serialize)]
426struct SubjectVersionResponse {
427 subject: String,
428 version: u32,
429 id: u32,
430 schema: String,
431 #[serde(rename = "schemaType")]
432 schema_type: String,
433}
434
435#[derive(Deserialize)]
436struct CompatibilityRequest {
437 schema: String,
438 #[serde(rename = "schemaType", default)]
439 schema_type: Option<String>,
440}
441
442#[derive(Serialize)]
443struct CompatibilityResponse {
444 is_compatible: bool,
445 #[serde(skip_serializing_if = "Vec::is_empty")]
446 messages: Vec<String>,
447}
448
449#[derive(Serialize)]
450struct ConfigResponse {
451 #[serde(rename = "compatibilityLevel")]
452 compatibility_level: String,
453}
454
455#[derive(Deserialize)]
456struct ConfigRequest {
457 compatibility: String,
458}
459
460#[derive(Serialize)]
461struct ModeResponse {
462 mode: String,
463}
464
465#[derive(Deserialize)]
466struct ModeRequest {
467 mode: String,
468}
469
470#[derive(Serialize)]
471struct ErrorResponse {
472 error_code: u32,
473 message: String,
474}
475
476#[derive(Deserialize)]
477struct QueryParams {
478 #[serde(default)]
486 permanent: bool,
487}
488
489async fn root_handler() -> Json<RootResponse> {
494 Json(RootResponse {
495 version: env!("CARGO_PKG_VERSION"),
496 commit: option_env!("GIT_HASH").unwrap_or("dev"),
497 })
498}
499
500#[derive(Serialize)]
502struct HealthResponse {
503 status: &'static str,
504 version: &'static str,
505}
506
507async fn health_handler() -> Json<HealthResponse> {
508 Json(HealthResponse {
509 status: "healthy",
510 version: env!("CARGO_PKG_VERSION"),
511 })
512}
513
514async fn liveness_handler() -> StatusCode {
516 StatusCode::OK
517}
518
519async fn readiness_handler(
521 State(state): State<Arc<ServerState>>,
522) -> Result<StatusCode, StatusCode> {
523 match state.registry.list_subjects().await {
525 Ok(_) => Ok(StatusCode::OK),
526 Err(_) => Err(StatusCode::SERVICE_UNAVAILABLE),
527 }
528}
529
530async fn get_schema_by_id(
531 State(state): State<Arc<ServerState>>,
532 Path(id): Path<u32>,
533) -> Result<Json<SchemaResponse>, (StatusCode, Json<ErrorResponse>)> {
534 state
535 .registry
536 .get_by_id(SchemaId::new(id))
537 .await
538 .map(|schema| {
539 Json(SchemaResponse {
540 schema: schema.schema,
541 schema_type: schema.schema_type.to_string(),
542 references: schema.references,
543 })
544 })
545 .map_err(schema_error_response)
546}
547
548async fn get_schema_versions(
549 State(state): State<Arc<ServerState>>,
550 Path(id): Path<u32>,
551) -> Result<Json<Vec<SubjectVersionResponse>>, (StatusCode, Json<ErrorResponse>)> {
552 let subjects = state
554 .registry
555 .list_subjects()
556 .await
557 .map_err(schema_error_response)?;
558 let mut results = Vec::new();
559
560 for subject in subjects {
561 let versions = state
562 .registry
563 .list_versions(subject.as_str())
564 .await
565 .map_err(schema_error_response)?;
566 for ver in versions {
567 if let Ok(sv) = state
568 .registry
569 .get_by_version(subject.as_str(), SchemaVersion::new(ver))
570 .await
571 {
572 if sv.id.0 == id {
573 results.push(SubjectVersionResponse {
574 subject: sv.subject.0,
575 version: sv.version.0,
576 id: sv.id.0,
577 schema: sv.schema,
578 schema_type: sv.schema_type.to_string(),
579 });
580 }
581 }
582 }
583 }
584
585 Ok(Json(results))
586}
587
588async fn list_subjects(
589 State(state): State<Arc<ServerState>>,
590) -> Result<Json<Vec<String>>, (StatusCode, Json<ErrorResponse>)> {
591 state
592 .registry
593 .list_subjects()
594 .await
595 .map(|subjects| Json(subjects.into_iter().map(|s| s.0).collect()))
596 .map_err(schema_error_response)
597}
598
599async fn delete_subject(
600 State(state): State<Arc<ServerState>>,
601 Path(subject): Path<String>,
602 Query(params): Query<QueryParams>,
603 auth: Option<Extension<AuthState>>,
604) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
605 enforce_permission(&auth, &subject, SchemaPermission::Delete, &state)?;
606
607 if *state.mode.read() == RegistryMode::ReadOnly {
609 return Err((
610 StatusCode::FORBIDDEN,
611 Json(ErrorResponse {
612 error_code: 40301,
613 message: "Registry is in READONLY mode".to_string(),
614 }),
615 ));
616 }
617
618 state
619 .registry
620 .delete_subject(subject.as_str(), params.permanent)
621 .await
622 .map(Json)
623 .map_err(schema_error_response)
624}
625
626async fn list_deleted_subjects(
628 State(state): State<Arc<ServerState>>,
629) -> Result<Json<Vec<String>>, (StatusCode, Json<ErrorResponse>)> {
630 state
631 .registry
632 .list_deleted_subjects()
633 .await
634 .map(|subjects| Json(subjects.into_iter().map(|s| s.0).collect()))
635 .map_err(schema_error_response)
636}
637
638async fn undelete_subject(
640 State(state): State<Arc<ServerState>>,
641 Path(subject): Path<String>,
642 auth: Option<Extension<AuthState>>,
643) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
644 enforce_permission(&auth, &subject, SchemaPermission::Create, &state)?;
645
646 if *state.mode.read() == RegistryMode::ReadOnly {
648 return Err((
649 StatusCode::FORBIDDEN,
650 Json(ErrorResponse {
651 error_code: 40301,
652 message: "Registry is in READONLY mode".to_string(),
653 }),
654 ));
655 }
656
657 state
658 .registry
659 .undelete_subject(subject.as_str())
660 .await
661 .map(Json)
662 .map_err(schema_error_response)
663}
664
665async fn list_subject_versions(
666 State(state): State<Arc<ServerState>>,
667 Path(subject): Path<String>,
668) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
669 state
670 .registry
671 .list_versions(subject.as_str())
672 .await
673 .map(Json)
674 .map_err(schema_error_response)
675}
676
677async fn register_schema(
678 State(state): State<Arc<ServerState>>,
679 Path(subject): Path<String>,
680 auth: Option<Extension<AuthState>>,
681 Json(req): Json<RegisterSchemaRequest>,
682) -> Result<Json<RegisterSchemaResponse>, (StatusCode, Json<ErrorResponse>)> {
683 enforce_permission(&auth, &subject, SchemaPermission::Create, &state)?;
684
685 if *state.mode.read() == RegistryMode::ReadOnly {
687 return Err((
688 StatusCode::FORBIDDEN,
689 Json(ErrorResponse {
690 error_code: 40301,
691 message: "Registry is in READONLY mode".to_string(),
692 }),
693 ));
694 }
695
696 let schema_type = req
697 .schema_type
698 .as_deref()
699 .unwrap_or("AVRO")
700 .parse::<SchemaType>()
701 .map_err(|_| {
702 schema_error_response(crate::SchemaError::InvalidInput(format!(
703 "Invalid schema type: {}",
704 req.schema_type.as_deref().unwrap_or("AVRO")
705 )))
706 })?;
707
708 state
709 .registry
710 .register_with_references(subject.as_str(), schema_type, &req.schema, req.references)
711 .await
712 .map(|id| Json(RegisterSchemaResponse { id: id.0 }))
713 .map_err(schema_error_response)
714}
715
716async fn get_subject_version(
717 State(state): State<Arc<ServerState>>,
718 Path((subject, version)): Path<(String, String)>,
719) -> Result<Json<SubjectVersionResponse>, (StatusCode, Json<ErrorResponse>)> {
720 let version = if version == "latest" {
721 state
722 .registry
723 .get_latest(subject.as_str())
724 .await
725 .map_err(schema_error_response)?
726 .version
727 } else {
728 let v: u32 = version.parse().map_err(|_| {
729 (
730 StatusCode::UNPROCESSABLE_ENTITY,
731 Json(ErrorResponse {
732 error_code: 42202,
733 message: format!("Invalid version: '{}'", version),
734 }),
735 )
736 })?;
737 SchemaVersion::new(v)
738 };
739
740 state
741 .registry
742 .get_by_version(subject.as_str(), version)
743 .await
744 .map(|sv| {
745 Json(SubjectVersionResponse {
746 subject: sv.subject.0,
747 version: sv.version.0,
748 id: sv.id.0,
749 schema: sv.schema,
750 schema_type: sv.schema_type.to_string(),
751 })
752 })
753 .map_err(schema_error_response)
754}
755
756async fn delete_version(
757 State(state): State<Arc<ServerState>>,
758 Path((subject, version)): Path<(String, u32)>,
759 Query(params): Query<QueryParams>,
760 auth: Option<Extension<AuthState>>,
761) -> Result<Json<u32>, (StatusCode, Json<ErrorResponse>)> {
762 enforce_permission(&auth, &subject, SchemaPermission::Delete, &state)?;
763
764 if *state.mode.read() == RegistryMode::ReadOnly {
766 return Err((
767 StatusCode::FORBIDDEN,
768 Json(ErrorResponse {
769 error_code: 40301,
770 message: "Registry is in READONLY mode".to_string(),
771 }),
772 ));
773 }
774
775 state
776 .registry
777 .delete_version(
778 subject.as_str(),
779 SchemaVersion::new(version),
780 params.permanent,
781 )
782 .await
783 .map(|_| Json(version))
784 .map_err(schema_error_response)
785}
786
787async fn get_referenced_by(
789 State(state): State<Arc<ServerState>>,
790 Path((subject, version)): Path<(String, String)>,
791) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
792 let version = if version == "latest" {
793 let versions = state
795 .registry
796 .list_versions(subject.as_str())
797 .await
798 .map_err(schema_error_response)?;
799 versions.into_iter().max().unwrap_or(1)
800 } else {
801 version.parse().map_err(|_| {
802 schema_error_response(crate::SchemaError::InvalidInput(format!(
803 "Invalid version: {}",
804 version
805 )))
806 })?
807 };
808
809 state
810 .registry
811 .get_schemas_referencing(subject.as_str(), SchemaVersion::new(version))
812 .await
813 .map(|ids| Json(ids.into_iter().map(|id| id.0).collect()))
814 .map_err(schema_error_response)
815}
816
817async fn check_compatibility(
818 State(state): State<Arc<ServerState>>,
819 Path((subject, version)): Path<(String, String)>,
820 Json(req): Json<CompatibilityRequest>,
821) -> Result<Json<CompatibilityResponse>, (StatusCode, Json<ErrorResponse>)> {
822 let schema_type = req
823 .schema_type
824 .as_deref()
825 .unwrap_or("AVRO")
826 .parse::<SchemaType>()
827 .map_err(|_| {
828 schema_error_response(crate::SchemaError::InvalidInput(format!(
829 "Invalid schema type: {}",
830 req.schema_type.as_deref().unwrap_or("AVRO")
831 )))
832 })?;
833
834 let version = if version == "latest" {
835 None
836 } else {
837 Some(SchemaVersion::new(version.parse().map_err(|_| {
838 schema_error_response(crate::SchemaError::InvalidInput(format!(
839 "Invalid version: {}",
840 version
841 )))
842 })?))
843 };
844
845 state
846 .registry
847 .check_compatibility(subject.as_str(), schema_type, &req.schema, version)
848 .await
849 .map(|result| {
850 Json(CompatibilityResponse {
851 is_compatible: result.is_compatible,
852 messages: result.messages,
853 })
854 })
855 .map_err(schema_error_response)
856}
857
858async fn get_global_config(State(state): State<Arc<ServerState>>) -> Json<ConfigResponse> {
859 Json(ConfigResponse {
860 compatibility_level: state.registry.get_default_compatibility().to_string(),
861 })
862}
863
864async fn update_global_config(
865 State(state): State<Arc<ServerState>>,
866 auth: Option<Extension<AuthState>>,
867 Json(req): Json<ConfigRequest>,
868) -> Result<Json<ConfigResponse>, (StatusCode, Json<ErrorResponse>)> {
869 enforce_permission(&auth, "_global", SchemaPermission::Alter, &state)?;
870
871 let level: CompatibilityLevel = req.compatibility.parse().map_err(|_| {
872 (
873 StatusCode::UNPROCESSABLE_ENTITY,
874 Json(ErrorResponse {
875 error_code: 42203,
876 message: format!("Invalid compatibility level: {}", req.compatibility),
877 }),
878 )
879 })?;
880
881 state.registry.set_default_compatibility(level);
882
883 Ok(Json(ConfigResponse {
884 compatibility_level: level.to_string(),
885 }))
886}
887
888async fn get_subject_config(
889 State(state): State<Arc<ServerState>>,
890 Path(subject): Path<String>,
891) -> Json<ConfigResponse> {
892 let level = state
893 .registry
894 .get_subject_compatibility(&Subject::new(subject));
895 Json(ConfigResponse {
896 compatibility_level: level.to_string(),
897 })
898}
899
900async fn update_subject_config(
901 State(state): State<Arc<ServerState>>,
902 Path(subject): Path<String>,
903 auth: Option<Extension<AuthState>>,
904 Json(req): Json<ConfigRequest>,
905) -> Result<Json<ConfigResponse>, (StatusCode, Json<ErrorResponse>)> {
906 enforce_permission(&auth, &subject, SchemaPermission::Alter, &state)?;
907
908 let level: CompatibilityLevel = req.compatibility.parse().map_err(|_| {
909 (
910 StatusCode::UNPROCESSABLE_ENTITY,
911 Json(ErrorResponse {
912 error_code: 42203,
913 message: format!("Invalid compatibility level: {}", req.compatibility),
914 }),
915 )
916 })?;
917
918 state
919 .registry
920 .set_subject_compatibility(subject.as_str(), level);
921
922 Ok(Json(ConfigResponse {
923 compatibility_level: level.to_string(),
924 }))
925}
926
927async fn get_mode(State(state): State<Arc<ServerState>>) -> Json<ModeResponse> {
928 Json(ModeResponse {
929 mode: state.mode.read().to_string(),
930 })
931}
932
933async fn update_mode(
934 State(state): State<Arc<ServerState>>,
935 auth: Option<Extension<AuthState>>,
936 Json(req): Json<ModeRequest>,
937) -> Result<Json<ModeResponse>, (StatusCode, Json<ErrorResponse>)> {
938 enforce_permission(&auth, "_global", SchemaPermission::Alter, &state)?;
939
940 let mode = match req.mode.to_uppercase().as_str() {
941 "READWRITE" => RegistryMode::ReadWrite,
942 "READONLY" => RegistryMode::ReadOnly,
943 "IMPORT" => RegistryMode::Import,
944 _ => {
945 return Err((
946 StatusCode::UNPROCESSABLE_ENTITY,
947 Json(ErrorResponse {
948 error_code: 42204,
949 message: format!("Invalid mode: {}", req.mode),
950 }),
951 ))
952 }
953 };
954
955 *state.mode.write() = mode;
956
957 Ok(Json(ModeResponse {
958 mode: mode.to_string(),
959 }))
960}
961
962#[derive(Serialize)]
967struct VersionStateResponse {
968 state: String,
969}
970
971#[derive(Deserialize)]
972struct VersionStateRequest {
973 state: String,
974}
975
976async fn get_version_state(
977 State(state): State<Arc<ServerState>>,
978 Path((subject, version)): Path<(String, u32)>,
979) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
980 state
981 .registry
982 .get_version_state(subject.as_str(), SchemaVersion::new(version))
983 .await
984 .map(|s| {
985 Json(VersionStateResponse {
986 state: s.to_string(),
987 })
988 })
989 .map_err(schema_error_response)
990}
991
992async fn set_version_state(
993 State(state): State<Arc<ServerState>>,
994 Path((subject, version)): Path<(String, u32)>,
995 auth: Option<Extension<AuthState>>,
996 Json(req): Json<VersionStateRequest>,
997) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
998 enforce_permission(&auth, &subject, SchemaPermission::Alter, &state)?;
999
1000 let version_state: VersionState = req.state.parse().map_err(|_| {
1001 (
1002 StatusCode::UNPROCESSABLE_ENTITY,
1003 Json(ErrorResponse {
1004 error_code: 42205,
1005 message: format!("Invalid version state: {}", req.state),
1006 }),
1007 )
1008 })?;
1009
1010 state
1011 .registry
1012 .set_version_state(subject.as_str(), SchemaVersion::new(version), version_state)
1013 .await
1014 .map_err(schema_error_response)?;
1015
1016 Ok(Json(VersionStateResponse {
1017 state: version_state.to_string(),
1018 }))
1019}
1020
1021async fn deprecate_version(
1022 State(state): State<Arc<ServerState>>,
1023 Path((subject, version)): Path<(String, u32)>,
1024 auth: Option<Extension<AuthState>>,
1025) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
1026 enforce_permission(&auth, &subject, SchemaPermission::Alter, &state)?;
1027
1028 state
1029 .registry
1030 .deprecate_version(subject.as_str(), SchemaVersion::new(version))
1031 .await
1032 .map_err(schema_error_response)?;
1033
1034 Ok(Json(VersionStateResponse {
1035 state: "DEPRECATED".to_string(),
1036 }))
1037}
1038
1039async fn disable_version(
1040 State(state): State<Arc<ServerState>>,
1041 Path((subject, version)): Path<(String, u32)>,
1042 auth: Option<Extension<AuthState>>,
1043) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
1044 enforce_permission(&auth, &subject, SchemaPermission::Alter, &state)?;
1045
1046 state
1047 .registry
1048 .disable_version(subject.as_str(), SchemaVersion::new(version))
1049 .await
1050 .map_err(schema_error_response)?;
1051
1052 Ok(Json(VersionStateResponse {
1053 state: "DISABLED".to_string(),
1054 }))
1055}
1056
1057async fn enable_version(
1058 State(state): State<Arc<ServerState>>,
1059 Path((subject, version)): Path<(String, u32)>,
1060 auth: Option<Extension<AuthState>>,
1061) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
1062 enforce_permission(&auth, &subject, SchemaPermission::Alter, &state)?;
1063
1064 state
1065 .registry
1066 .enable_version(subject.as_str(), SchemaVersion::new(version))
1067 .await
1068 .map_err(schema_error_response)?;
1069
1070 Ok(Json(VersionStateResponse {
1071 state: "ENABLED".to_string(),
1072 }))
1073}
1074
1075#[derive(Deserialize)]
1080struct ValidateSchemaRequest {
1081 schema: String,
1082 #[serde(rename = "schemaType", default)]
1083 schema_type: Option<String>,
1084}
1085
1086#[derive(Serialize)]
1087struct ValidationResponse {
1088 is_valid: bool,
1089 errors: Vec<String>,
1090 warnings: Vec<String>,
1091}
1092
1093async fn validate_schema(
1094 State(state): State<Arc<ServerState>>,
1095 Path(subject): Path<String>,
1096 Json(req): Json<ValidateSchemaRequest>,
1097) -> Result<Json<ValidationResponse>, (StatusCode, Json<ErrorResponse>)> {
1098 let schema_type = req
1099 .schema_type
1100 .as_deref()
1101 .unwrap_or("AVRO")
1102 .parse::<SchemaType>()
1103 .map_err(|_| {
1104 schema_error_response(crate::SchemaError::InvalidInput(format!(
1105 "Invalid schema type: {}",
1106 req.schema_type.as_deref().unwrap_or("AVRO")
1107 )))
1108 })?;
1109
1110 state
1111 .registry
1112 .validate_schema(schema_type, &subject, &req.schema)
1113 .map(|report| {
1114 Json(ValidationResponse {
1115 is_valid: report.is_valid(),
1116 errors: report.error_messages(),
1117 warnings: report.warning_messages(),
1118 })
1119 })
1120 .map_err(schema_error_response)
1121}
1122
1123#[derive(Serialize)]
1124struct ValidationRuleResponse {
1125 name: String,
1126 rule_type: String,
1127 config: String,
1128 level: String,
1129 #[serde(skip_serializing_if = "Option::is_none")]
1130 description: Option<String>,
1131 active: bool,
1132}
1133
1134#[derive(Deserialize)]
1135struct AddValidationRuleRequest {
1136 name: String,
1137 rule_type: String,
1138 config: String,
1139 #[serde(default = "default_level")]
1140 level: String,
1141 description: Option<String>,
1142 #[serde(default)]
1143 subjects: Vec<String>,
1144 #[serde(default)]
1145 schema_types: Vec<String>,
1146}
1147
1148fn default_level() -> String {
1149 "ERROR".to_string()
1150}
1151
1152async fn list_validation_rules(
1153 State(state): State<Arc<ServerState>>,
1154) -> Json<Vec<ValidationRuleResponse>> {
1155 let rules = state.registry.validation_engine().read().list_rules();
1156 Json(
1157 rules
1158 .into_iter()
1159 .map(|r| ValidationRuleResponse {
1160 name: r.name().to_string(),
1161 rule_type: format!("{:?}", r.rule_type()),
1162 config: r.config().to_string(),
1163 level: format!("{:?}", r.level()),
1164 description: r.description().map(|s| s.to_string()),
1165 active: r.is_active(),
1166 })
1167 .collect(),
1168 )
1169}
1170
1171async fn add_validation_rule(
1172 State(state): State<Arc<ServerState>>,
1173 auth: Option<Extension<AuthState>>,
1174 Json(req): Json<AddValidationRuleRequest>,
1175) -> Result<Json<ValidationRuleResponse>, (StatusCode, Json<ErrorResponse>)> {
1176 enforce_permission(&auth, "_global", SchemaPermission::Create, &state)?;
1177
1178 let rule_type: ValidationRuleType = req.rule_type.parse().map_err(|_| {
1179 (
1180 StatusCode::UNPROCESSABLE_ENTITY,
1181 Json(ErrorResponse {
1182 error_code: 42206,
1183 message: format!("Invalid rule type: {}", req.rule_type),
1184 }),
1185 )
1186 })?;
1187
1188 let level: ValidationLevel = req.level.parse().map_err(|_| {
1189 (
1190 StatusCode::UNPROCESSABLE_ENTITY,
1191 Json(ErrorResponse {
1192 error_code: 42207,
1193 message: format!("Invalid validation level: {}", req.level),
1194 }),
1195 )
1196 })?;
1197
1198 let mut rule = ValidationRule::new(&req.name, rule_type, &req.config).with_level(level);
1199
1200 if let Some(desc) = &req.description {
1201 rule = rule.with_description(desc);
1202 }
1203
1204 if !req.subjects.is_empty() {
1205 rule = rule.for_subjects(req.subjects);
1206 }
1207
1208 if !req.schema_types.is_empty() {
1209 let schema_types: Vec<SchemaType> = req
1210 .schema_types
1211 .iter()
1212 .map(|s| {
1213 s.parse::<SchemaType>().map_err(|_| {
1214 (
1215 StatusCode::UNPROCESSABLE_ENTITY,
1216 Json(ErrorResponse {
1217 error_code: 42202,
1218 message: format!("Unknown schema type: '{}'", s),
1219 }),
1220 )
1221 })
1222 })
1223 .collect::<Result<Vec<_>, _>>()?;
1224 rule = rule.for_schema_types(schema_types);
1225 }
1226
1227 state.registry.add_validation_rule(rule.clone());
1228
1229 Ok(Json(ValidationRuleResponse {
1230 name: rule.name().to_string(),
1231 rule_type: format!("{:?}", rule.rule_type()),
1232 config: rule.config().to_string(),
1233 level: format!("{:?}", rule.level()),
1234 description: rule.description().map(|s| s.to_string()),
1235 active: rule.is_active(),
1236 }))
1237}
1238
1239async fn delete_validation_rule(
1240 State(state): State<Arc<ServerState>>,
1241 Path(name): Path<String>,
1242 auth: Option<Extension<AuthState>>,
1243) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
1244 enforce_permission(&auth, "_global", SchemaPermission::Delete, &state)?;
1245
1246 let removed = state
1247 .registry
1248 .validation_engine()
1249 .write()
1250 .remove_rule(&name);
1251 if removed {
1252 Ok(StatusCode::NO_CONTENT)
1253 } else {
1254 Err((
1255 StatusCode::NOT_FOUND,
1256 Json(ErrorResponse {
1257 error_code: 40404,
1258 message: format!("Validation rule not found: {}", name),
1259 }),
1260 ))
1261 }
1262}
1263
1264#[derive(Serialize)]
1269struct ContextResponse {
1270 name: String,
1271 #[serde(skip_serializing_if = "Option::is_none")]
1272 description: Option<String>,
1273 active: bool,
1274}
1275
1276#[derive(Deserialize)]
1277struct CreateContextRequest {
1278 name: String,
1279 description: Option<String>,
1280}
1281
1282async fn list_contexts(State(state): State<Arc<ServerState>>) -> Json<Vec<ContextResponse>> {
1283 let contexts = state.registry.list_contexts();
1284 Json(
1285 contexts
1286 .into_iter()
1287 .map(|c| ContextResponse {
1288 name: c.name().to_string(),
1289 description: c.description().map(|s| s.to_string()),
1290 active: c.is_active(),
1291 })
1292 .collect(),
1293 )
1294}
1295
1296async fn create_context(
1297 State(state): State<Arc<ServerState>>,
1298 auth: Option<Extension<AuthState>>,
1299 Json(req): Json<CreateContextRequest>,
1300) -> Result<Json<ContextResponse>, (StatusCode, Json<ErrorResponse>)> {
1301 enforce_permission(&auth, "_global", SchemaPermission::Create, &state)?;
1302
1303 let mut context = SchemaContext::new(&req.name);
1304 if let Some(desc) = &req.description {
1305 context = context.with_description(desc);
1306 }
1307
1308 state
1309 .registry
1310 .create_context(context.clone())
1311 .map_err(schema_error_response)?;
1312
1313 Ok(Json(ContextResponse {
1314 name: context.name().to_string(),
1315 description: context.description().map(|s| s.to_string()),
1316 active: context.is_active(),
1317 }))
1318}
1319
1320async fn get_context(
1321 State(state): State<Arc<ServerState>>,
1322 Path(context_name): Path<String>,
1323) -> Result<Json<ContextResponse>, (StatusCode, Json<ErrorResponse>)> {
1324 state
1325 .registry
1326 .get_context(&context_name)
1327 .map(|c| {
1328 Json(ContextResponse {
1329 name: c.name().to_string(),
1330 description: c.description().map(|s| s.to_string()),
1331 active: c.is_active(),
1332 })
1333 })
1334 .ok_or_else(|| {
1335 (
1336 StatusCode::NOT_FOUND,
1337 Json(ErrorResponse {
1338 error_code: 40405,
1339 message: format!("Context not found: {}", context_name),
1340 }),
1341 )
1342 })
1343}
1344
1345async fn delete_context(
1346 State(state): State<Arc<ServerState>>,
1347 Path(context_name): Path<String>,
1348 auth: Option<Extension<AuthState>>,
1349) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
1350 enforce_permission(&auth, "_global", SchemaPermission::Delete, &state)?;
1351
1352 state
1353 .registry
1354 .delete_context(&context_name)
1355 .map(|_| StatusCode::NO_CONTENT)
1356 .map_err(schema_error_response)
1357}
1358
1359async fn list_context_subjects(
1360 State(state): State<Arc<ServerState>>,
1361 Path(context_name): Path<String>,
1362) -> Json<Vec<String>> {
1363 Json(state.registry.list_subjects_in_context(&context_name))
1364}
1365
1366#[derive(Serialize)]
1371struct StatsResponse {
1372 schema_count: usize,
1373 subject_count: usize,
1374 context_count: usize,
1375 cache_size: usize,
1376}
1377
1378async fn get_stats(State(state): State<Arc<ServerState>>) -> Json<StatsResponse> {
1379 let stats = state.registry.stats().await;
1380 Json(StatsResponse {
1381 schema_count: stats.schema_count,
1382 subject_count: stats.subject_count,
1383 context_count: stats.context_count,
1384 cache_size: stats.cache_size,
1385 })
1386}
1387
1388#[cfg(test)]
1389mod tests {
1390 use super::*;
1391 use crate::config::RegistryConfig;
1392 use axum::body::Body;
1393 use axum::http::Request;
1394 use tower::util::ServiceExt;
1395
1396 async fn create_test_app() -> Router {
1397 let config = RegistryConfig::memory();
1398 let registry = SchemaRegistry::new(config).await.unwrap();
1399 let server = SchemaServer::new(registry, ServerConfig::default());
1400 server.router()
1401 }
1402
1403 #[tokio::test]
1404 async fn test_root_endpoint() {
1405 let app = create_test_app().await;
1406
1407 let response = app
1408 .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap())
1409 .await
1410 .unwrap();
1411
1412 assert_eq!(response.status(), StatusCode::OK);
1413 }
1414
1415 #[tokio::test]
1416 async fn test_list_subjects_empty() {
1417 let app = create_test_app().await;
1418
1419 let response = app
1420 .oneshot(
1421 Request::builder()
1422 .uri("/subjects")
1423 .body(Body::empty())
1424 .unwrap(),
1425 )
1426 .await
1427 .unwrap();
1428
1429 assert_eq!(response.status(), StatusCode::OK);
1430 }
1431
1432 #[tokio::test]
1433 async fn test_get_config() {
1434 let app = create_test_app().await;
1435
1436 let response = app
1437 .oneshot(
1438 Request::builder()
1439 .uri("/config")
1440 .body(Body::empty())
1441 .unwrap(),
1442 )
1443 .await
1444 .unwrap();
1445
1446 assert_eq!(response.status(), StatusCode::OK);
1447 }
1448
1449 #[tokio::test]
1450 async fn test_get_mode() {
1451 let app = create_test_app().await;
1452
1453 let response = app
1454 .oneshot(Request::builder().uri("/mode").body(Body::empty()).unwrap())
1455 .await
1456 .unwrap();
1457
1458 assert_eq!(response.status(), StatusCode::OK);
1459 }
1460
1461 #[tokio::test]
1462 async fn test_health_endpoint() {
1463 let app = create_test_app().await;
1464
1465 let response = app
1466 .oneshot(
1467 Request::builder()
1468 .uri("/health")
1469 .body(Body::empty())
1470 .unwrap(),
1471 )
1472 .await
1473 .unwrap();
1474
1475 assert_eq!(response.status(), StatusCode::OK);
1476 }
1477
1478 #[tokio::test]
1479 async fn test_liveness_endpoint() {
1480 let app = create_test_app().await;
1481
1482 let response = app
1483 .oneshot(
1484 Request::builder()
1485 .uri("/health/live")
1486 .body(Body::empty())
1487 .unwrap(),
1488 )
1489 .await
1490 .unwrap();
1491
1492 assert_eq!(response.status(), StatusCode::OK);
1493 }
1494
1495 #[tokio::test]
1496 async fn test_readiness_endpoint() {
1497 let app = create_test_app().await;
1498
1499 let response = app
1500 .oneshot(
1501 Request::builder()
1502 .uri("/health/ready")
1503 .body(Body::empty())
1504 .unwrap(),
1505 )
1506 .await
1507 .unwrap();
1508
1509 assert_eq!(response.status(), StatusCode::OK);
1510 }
1511
1512 #[tokio::test]
1513 async fn test_stats_endpoint() {
1514 let app = create_test_app().await;
1515
1516 let response = app
1517 .oneshot(
1518 Request::builder()
1519 .uri("/stats")
1520 .body(Body::empty())
1521 .unwrap(),
1522 )
1523 .await
1524 .unwrap();
1525
1526 assert_eq!(response.status(), StatusCode::OK);
1527 }
1528
1529 #[tokio::test]
1530 async fn test_list_contexts_empty() {
1531 let app = create_test_app().await;
1532
1533 let response = app
1534 .oneshot(
1535 Request::builder()
1536 .uri("/contexts")
1537 .body(Body::empty())
1538 .unwrap(),
1539 )
1540 .await
1541 .unwrap();
1542
1543 assert_eq!(response.status(), StatusCode::OK);
1544 }
1545
1546 #[tokio::test]
1547 async fn test_list_validation_rules_empty() {
1548 let app = create_test_app().await;
1549
1550 let response = app
1551 .oneshot(
1552 Request::builder()
1553 .uri("/config/validation/rules")
1554 .body(Body::empty())
1555 .unwrap(),
1556 )
1557 .await
1558 .unwrap();
1559
1560 assert_eq!(response.status(), StatusCode::OK);
1561 }
1562}