1use crate::compatibility::CompatibilityLevel;
14use crate::error::SchemaError;
15use crate::registry::SchemaRegistry;
16use crate::types::{
17 SchemaContext, SchemaId, SchemaReference, SchemaType, SchemaVersion, Subject, ValidationLevel,
18 ValidationRule, ValidationRuleType, VersionState,
19};
20use axum::{
21 extract::{DefaultBodyLimit, Path, Query, State},
22 http::StatusCode,
23 routing::{delete, get, post, put},
24 Json, Router,
25};
26use parking_lot::RwLock;
27use serde::{Deserialize, Serialize};
28use std::net::SocketAddr;
29use std::sync::Arc;
30use tower_http::cors::CorsLayer;
31use tower_http::trace::TraceLayer;
32use tracing::info;
33
34#[cfg(feature = "auth")]
35use crate::auth::{auth_middleware, AuthConfig, ServerAuthState};
36#[cfg(feature = "auth")]
37use axum::middleware;
38#[cfg(feature = "auth")]
39use rivven_core::AuthManager;
40
41#[derive(Debug, Clone)]
43pub struct ServerConfig {
44 pub host: String,
45 pub port: u16,
46 #[cfg(feature = "auth")]
48 pub auth: Option<AuthConfig>,
49}
50
51impl Default for ServerConfig {
52 fn default() -> Self {
53 Self {
54 host: "0.0.0.0".to_string(),
55 port: 8081,
56 #[cfg(feature = "auth")]
57 auth: None,
58 }
59 }
60}
61
62impl ServerConfig {
63 #[cfg(feature = "auth")]
65 pub fn with_auth(mut self, auth_config: AuthConfig) -> Self {
66 self.auth = Some(auth_config);
67 self
68 }
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
73pub enum RegistryMode {
74 #[default]
75 ReadWrite,
76 ReadOnly,
77 Import,
78}
79
80impl std::fmt::Display for RegistryMode {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 match self {
83 RegistryMode::ReadWrite => write!(f, "READWRITE"),
84 RegistryMode::ReadOnly => write!(f, "READONLY"),
85 RegistryMode::Import => write!(f, "IMPORT"),
86 }
87 }
88}
89
90pub struct ServerState {
92 pub registry: SchemaRegistry,
93 pub mode: RwLock<RegistryMode>,
94 pub default_compatibility: RwLock<CompatibilityLevel>,
95}
96
97pub struct SchemaServer {
99 state: Arc<ServerState>,
100 #[allow(dead_code)] config: ServerConfig,
102 #[cfg(all(feature = "auth", not(feature = "cedar")))]
103 auth_manager: Option<Arc<AuthManager>>,
104 #[cfg(feature = "cedar")]
105 auth_manager: Option<Arc<AuthManager>>,
106 #[cfg(feature = "cedar")]
107 cedar_authorizer: Option<Arc<rivven_core::CedarAuthorizer>>,
108}
109
110impl SchemaServer {
111 pub fn new(registry: SchemaRegistry, config: ServerConfig) -> Self {
113 let default_compat = registry.get_default_compatibility();
114 Self {
115 state: Arc::new(ServerState {
116 registry,
117 mode: RwLock::new(RegistryMode::default()),
118 default_compatibility: RwLock::new(default_compat),
119 }),
120 config,
121 #[cfg(feature = "auth")]
122 auth_manager: None,
123 #[cfg(feature = "cedar")]
124 cedar_authorizer: None,
125 }
126 }
127
128 #[cfg(all(feature = "auth", not(feature = "cedar")))]
130 pub fn with_auth(
131 registry: SchemaRegistry,
132 config: ServerConfig,
133 auth_manager: Arc<AuthManager>,
134 ) -> Self {
135 let default_compat = registry.get_default_compatibility();
136 Self {
137 state: Arc::new(ServerState {
138 registry,
139 mode: RwLock::new(RegistryMode::default()),
140 default_compatibility: RwLock::new(default_compat),
141 }),
142 config,
143 auth_manager: Some(auth_manager),
144 }
145 }
146
147 #[cfg(feature = "cedar")]
149 pub fn with_auth(
150 registry: SchemaRegistry,
151 config: ServerConfig,
152 auth_manager: Arc<AuthManager>,
153 ) -> Self {
154 let default_compat = registry.get_default_compatibility();
155 Self {
156 state: Arc::new(ServerState {
157 registry,
158 mode: RwLock::new(RegistryMode::default()),
159 default_compatibility: RwLock::new(default_compat),
160 }),
161 config,
162 auth_manager: Some(auth_manager),
163 cedar_authorizer: None,
164 }
165 }
166
167 #[cfg(feature = "cedar")]
169 pub fn with_cedar(
170 registry: SchemaRegistry,
171 config: ServerConfig,
172 auth_manager: Arc<AuthManager>,
173 cedar_authorizer: Arc<rivven_core::CedarAuthorizer>,
174 ) -> Self {
175 let default_compat = registry.get_default_compatibility();
176 Self {
177 state: Arc::new(ServerState {
178 registry,
179 mode: RwLock::new(RegistryMode::default()),
180 default_compatibility: RwLock::new(default_compat),
181 }),
182 config,
183 auth_manager: Some(auth_manager),
184 cedar_authorizer: Some(cedar_authorizer),
185 }
186 }
187
188 pub fn router(&self) -> Router {
190 let cors = CorsLayer::new()
196 .allow_methods([
197 axum::http::Method::GET,
198 axum::http::Method::POST,
199 axum::http::Method::PUT,
200 axum::http::Method::DELETE,
201 ])
202 .allow_headers([
203 axum::http::header::CONTENT_TYPE,
204 axum::http::header::AUTHORIZATION,
205 axum::http::header::ACCEPT,
206 ]);
207
208 let base_router = Router::new()
209 .route("/", get(root_handler))
211 .route("/health", get(health_handler))
212 .route("/health/live", get(liveness_handler))
213 .route("/health/ready", get(readiness_handler))
214 .route("/schemas/ids/:id", get(get_schema_by_id))
216 .route("/schemas/ids/:id/versions", get(get_schema_versions))
217 .route("/subjects", get(list_subjects))
219 .route("/subjects/:subject", delete(delete_subject))
220 .route("/subjects/:subject/versions", get(list_subject_versions))
221 .route("/subjects/:subject/versions", post(register_schema))
222 .route(
223 "/subjects/:subject/versions/:version",
224 get(get_subject_version),
225 )
226 .route(
227 "/subjects/:subject/versions/:version",
228 delete(delete_version),
229 )
230 .route("/subjects/deleted", get(list_deleted_subjects))
232 .route("/subjects/:subject/undelete", post(undelete_subject))
233 .route(
235 "/subjects/:subject/versions/:version/state",
236 get(get_version_state),
237 )
238 .route(
239 "/subjects/:subject/versions/:version/state",
240 put(set_version_state),
241 )
242 .route(
243 "/subjects/:subject/versions/:version/deprecate",
244 post(deprecate_version),
245 )
246 .route(
247 "/subjects/:subject/versions/:version/disable",
248 post(disable_version),
249 )
250 .route(
251 "/subjects/:subject/versions/:version/enable",
252 post(enable_version),
253 )
254 .route(
256 "/subjects/:subject/versions/:version/referencedby",
257 get(get_referenced_by),
258 )
259 .route("/subjects/:subject/validate", post(validate_schema))
261 .route(
263 "/compatibility/subjects/:subject/versions/:version",
264 post(check_compatibility),
265 )
266 .route("/config", get(get_global_config))
268 .route("/config", put(update_global_config))
269 .route("/config/:subject", get(get_subject_config))
270 .route("/config/:subject", put(update_subject_config))
271 .route("/config/validation/rules", get(list_validation_rules))
273 .route("/config/validation/rules", post(add_validation_rule))
274 .route(
275 "/config/validation/rules/:name",
276 delete(delete_validation_rule),
277 )
278 .route("/mode", get(get_mode))
280 .route("/mode", put(update_mode))
281 .route("/contexts", get(list_contexts))
283 .route("/contexts", post(create_context))
284 .route("/contexts/:context", get(get_context))
285 .route("/contexts/:context", delete(delete_context))
286 .route("/contexts/:context/subjects", get(list_context_subjects))
287 .route("/stats", get(get_stats))
289 .with_state(self.state.clone())
290 .layer(DefaultBodyLimit::max(10 * 1024 * 1024)) .layer(cors)
292 .layer(TraceLayer::new_for_http());
293
294 #[cfg(all(feature = "auth", not(feature = "cedar")))]
296 let base_router = if let Some(auth_manager) = &self.auth_manager {
297 let auth_config = self.config.auth.clone().unwrap_or_default();
298 let auth_state = Arc::new(ServerAuthState {
299 auth_manager: auth_manager.clone(),
300 config: auth_config,
301 });
302 base_router.layer(middleware::from_fn_with_state(auth_state, auth_middleware))
303 } else {
304 base_router
305 };
306
307 #[cfg(feature = "cedar")]
309 let base_router = if let Some(auth_manager) = &self.auth_manager {
310 let auth_config = self.config.auth.clone().unwrap_or_default();
311 let auth_state = Arc::new(ServerAuthState {
312 auth_manager: auth_manager.clone(),
313 cedar_authorizer: self.cedar_authorizer.clone(),
314 config: auth_config,
315 });
316 base_router.layer(middleware::from_fn_with_state(auth_state, auth_middleware))
317 } else {
318 base_router
319 };
320
321 base_router
322 }
323
324 pub async fn run(self, addr: SocketAddr) -> anyhow::Result<()> {
326 let router = self.router();
327 let listener = tokio::net::TcpListener::bind(addr).await?;
328 info!("Schema Registry server listening on {}", addr);
329 axum::serve(listener, router).await?;
330 Ok(())
331 }
332}
333
334fn schema_error_response(e: SchemaError) -> (StatusCode, Json<ErrorResponse>) {
339 let status = match e.http_status() {
340 404 => StatusCode::NOT_FOUND,
341 409 => StatusCode::CONFLICT,
342 422 => StatusCode::UNPROCESSABLE_ENTITY,
343 _ => StatusCode::INTERNAL_SERVER_ERROR,
344 };
345 (
346 status,
347 Json(ErrorResponse {
348 error_code: e.error_code(),
349 message: e.to_string(),
350 }),
351 )
352}
353
354#[derive(Serialize)]
359struct RootResponse {
360 version: &'static str,
361 commit: &'static str,
362}
363
364#[derive(Deserialize)]
365struct RegisterSchemaRequest {
366 schema: String,
367 #[serde(rename = "schemaType", default)]
368 schema_type: Option<String>,
369 #[serde(default)]
371 references: Vec<SchemaReference>,
372}
373
374#[derive(Serialize)]
375struct RegisterSchemaResponse {
376 id: u32,
377}
378
379#[derive(Serialize)]
380struct SchemaResponse {
381 schema: String,
382 #[serde(rename = "schemaType")]
383 schema_type: String,
384 #[serde(default, skip_serializing_if = "Vec::is_empty")]
385 references: Vec<SchemaReference>,
386}
387
388#[derive(Serialize)]
389struct SubjectVersionResponse {
390 subject: String,
391 version: u32,
392 id: u32,
393 schema: String,
394 #[serde(rename = "schemaType")]
395 schema_type: String,
396}
397
398#[derive(Deserialize)]
399struct CompatibilityRequest {
400 schema: String,
401 #[serde(rename = "schemaType", default)]
402 schema_type: Option<String>,
403}
404
405#[derive(Serialize)]
406struct CompatibilityResponse {
407 is_compatible: bool,
408 #[serde(skip_serializing_if = "Vec::is_empty")]
409 messages: Vec<String>,
410}
411
412#[derive(Serialize)]
413struct ConfigResponse {
414 #[serde(rename = "compatibilityLevel")]
415 compatibility_level: String,
416}
417
418#[derive(Deserialize)]
419struct ConfigRequest {
420 compatibility: String,
421}
422
423#[derive(Serialize)]
424struct ModeResponse {
425 mode: String,
426}
427
428#[derive(Deserialize)]
429struct ModeRequest {
430 mode: String,
431}
432
433#[derive(Serialize)]
434struct ErrorResponse {
435 error_code: u32,
436 message: String,
437}
438
439#[derive(Deserialize)]
440struct QueryParams {
441 #[serde(default)]
449 permanent: bool,
450}
451
452async fn root_handler() -> Json<RootResponse> {
457 Json(RootResponse {
458 version: env!("CARGO_PKG_VERSION"),
459 commit: option_env!("GIT_HASH").unwrap_or("dev"),
460 })
461}
462
463#[derive(Serialize)]
465struct HealthResponse {
466 status: &'static str,
467 version: &'static str,
468}
469
470async fn health_handler() -> Json<HealthResponse> {
471 Json(HealthResponse {
472 status: "healthy",
473 version: env!("CARGO_PKG_VERSION"),
474 })
475}
476
477async fn liveness_handler() -> StatusCode {
479 StatusCode::OK
480}
481
482async fn readiness_handler(
484 State(state): State<Arc<ServerState>>,
485) -> Result<StatusCode, StatusCode> {
486 match state.registry.list_subjects().await {
488 Ok(_) => Ok(StatusCode::OK),
489 Err(_) => Err(StatusCode::SERVICE_UNAVAILABLE),
490 }
491}
492
493async fn get_schema_by_id(
494 State(state): State<Arc<ServerState>>,
495 Path(id): Path<u32>,
496) -> Result<Json<SchemaResponse>, (StatusCode, Json<ErrorResponse>)> {
497 state
498 .registry
499 .get_by_id(SchemaId::new(id))
500 .await
501 .map(|schema| {
502 Json(SchemaResponse {
503 schema: schema.schema,
504 schema_type: schema.schema_type.to_string(),
505 references: schema.references,
506 })
507 })
508 .map_err(schema_error_response)
509}
510
511async fn get_schema_versions(
512 State(state): State<Arc<ServerState>>,
513 Path(id): Path<u32>,
514) -> Result<Json<Vec<SubjectVersionResponse>>, (StatusCode, Json<ErrorResponse>)> {
515 let subjects = state
517 .registry
518 .list_subjects()
519 .await
520 .map_err(schema_error_response)?;
521 let mut results = Vec::new();
522
523 for subject in subjects {
524 let versions = state
525 .registry
526 .list_versions(subject.as_str())
527 .await
528 .map_err(schema_error_response)?;
529 for ver in versions {
530 if let Ok(sv) = state
531 .registry
532 .get_by_version(subject.as_str(), SchemaVersion::new(ver))
533 .await
534 {
535 if sv.id.0 == id {
536 results.push(SubjectVersionResponse {
537 subject: sv.subject.0,
538 version: sv.version.0,
539 id: sv.id.0,
540 schema: sv.schema,
541 schema_type: sv.schema_type.to_string(),
542 });
543 }
544 }
545 }
546 }
547
548 Ok(Json(results))
549}
550
551async fn list_subjects(
552 State(state): State<Arc<ServerState>>,
553) -> Result<Json<Vec<String>>, (StatusCode, Json<ErrorResponse>)> {
554 state
555 .registry
556 .list_subjects()
557 .await
558 .map(|subjects| Json(subjects.into_iter().map(|s| s.0).collect()))
559 .map_err(schema_error_response)
560}
561
562async fn delete_subject(
563 State(state): State<Arc<ServerState>>,
564 Path(subject): Path<String>,
565 Query(params): Query<QueryParams>,
566) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
567 if *state.mode.read() == RegistryMode::ReadOnly {
569 return Err((
570 StatusCode::FORBIDDEN,
571 Json(ErrorResponse {
572 error_code: 40301,
573 message: "Registry is in READONLY mode".to_string(),
574 }),
575 ));
576 }
577
578 state
579 .registry
580 .delete_subject(subject.as_str(), params.permanent)
581 .await
582 .map(Json)
583 .map_err(schema_error_response)
584}
585
586async fn list_deleted_subjects(
588 State(state): State<Arc<ServerState>>,
589) -> Result<Json<Vec<String>>, (StatusCode, Json<ErrorResponse>)> {
590 state
591 .registry
592 .list_deleted_subjects()
593 .await
594 .map(|subjects| Json(subjects.into_iter().map(|s| s.0).collect()))
595 .map_err(schema_error_response)
596}
597
598async fn undelete_subject(
600 State(state): State<Arc<ServerState>>,
601 Path(subject): Path<String>,
602) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
603 if *state.mode.read() == RegistryMode::ReadOnly {
605 return Err((
606 StatusCode::FORBIDDEN,
607 Json(ErrorResponse {
608 error_code: 40301,
609 message: "Registry is in READONLY mode".to_string(),
610 }),
611 ));
612 }
613
614 state
615 .registry
616 .undelete_subject(subject.as_str())
617 .await
618 .map(Json)
619 .map_err(schema_error_response)
620}
621
622async fn list_subject_versions(
623 State(state): State<Arc<ServerState>>,
624 Path(subject): Path<String>,
625) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
626 state
627 .registry
628 .list_versions(subject.as_str())
629 .await
630 .map(Json)
631 .map_err(schema_error_response)
632}
633
634async fn register_schema(
635 State(state): State<Arc<ServerState>>,
636 Path(subject): Path<String>,
637 Json(req): Json<RegisterSchemaRequest>,
638) -> Result<Json<RegisterSchemaResponse>, (StatusCode, Json<ErrorResponse>)> {
639 if *state.mode.read() == RegistryMode::ReadOnly {
641 return Err((
642 StatusCode::FORBIDDEN,
643 Json(ErrorResponse {
644 error_code: 40301,
645 message: "Registry is in READONLY mode".to_string(),
646 }),
647 ));
648 }
649
650 let schema_type = req
651 .schema_type
652 .as_deref()
653 .unwrap_or("AVRO")
654 .parse::<SchemaType>()
655 .map_err(|_| {
656 schema_error_response(crate::SchemaError::InvalidInput(format!(
657 "Invalid schema type: {}",
658 req.schema_type.as_deref().unwrap_or("AVRO")
659 )))
660 })?;
661
662 state
663 .registry
664 .register_with_references(subject.as_str(), schema_type, &req.schema, req.references)
665 .await
666 .map(|id| Json(RegisterSchemaResponse { id: id.0 }))
667 .map_err(schema_error_response)
668}
669
670async fn get_subject_version(
671 State(state): State<Arc<ServerState>>,
672 Path((subject, version)): Path<(String, String)>,
673) -> Result<Json<SubjectVersionResponse>, (StatusCode, Json<ErrorResponse>)> {
674 let version = if version == "latest" {
675 state
676 .registry
677 .get_latest(subject.as_str())
678 .await
679 .map_err(schema_error_response)?
680 .version
681 } else {
682 let v: u32 = version.parse().map_err(|_| {
683 (
684 StatusCode::UNPROCESSABLE_ENTITY,
685 Json(ErrorResponse {
686 error_code: 42202,
687 message: format!("Invalid version: '{}'", version),
688 }),
689 )
690 })?;
691 SchemaVersion::new(v)
692 };
693
694 state
695 .registry
696 .get_by_version(subject.as_str(), version)
697 .await
698 .map(|sv| {
699 Json(SubjectVersionResponse {
700 subject: sv.subject.0,
701 version: sv.version.0,
702 id: sv.id.0,
703 schema: sv.schema,
704 schema_type: sv.schema_type.to_string(),
705 })
706 })
707 .map_err(schema_error_response)
708}
709
710async fn delete_version(
711 State(state): State<Arc<ServerState>>,
712 Path((subject, version)): Path<(String, u32)>,
713 Query(params): Query<QueryParams>,
714) -> Result<Json<u32>, (StatusCode, Json<ErrorResponse>)> {
715 if *state.mode.read() == RegistryMode::ReadOnly {
717 return Err((
718 StatusCode::FORBIDDEN,
719 Json(ErrorResponse {
720 error_code: 40301,
721 message: "Registry is in READONLY mode".to_string(),
722 }),
723 ));
724 }
725
726 state
727 .registry
728 .delete_version(
729 subject.as_str(),
730 SchemaVersion::new(version),
731 params.permanent,
732 )
733 .await
734 .map(|_| Json(version))
735 .map_err(schema_error_response)
736}
737
738async fn get_referenced_by(
740 State(state): State<Arc<ServerState>>,
741 Path((subject, version)): Path<(String, String)>,
742) -> Result<Json<Vec<u32>>, (StatusCode, Json<ErrorResponse>)> {
743 let version = if version == "latest" {
744 let versions = state
746 .registry
747 .list_versions(subject.as_str())
748 .await
749 .map_err(schema_error_response)?;
750 versions.into_iter().max().unwrap_or(1)
751 } else {
752 version.parse().map_err(|_| {
753 schema_error_response(crate::SchemaError::InvalidInput(format!(
754 "Invalid version: {}",
755 version
756 )))
757 })?
758 };
759
760 state
761 .registry
762 .get_schemas_referencing(subject.as_str(), SchemaVersion::new(version))
763 .await
764 .map(|ids| Json(ids.into_iter().map(|id| id.0).collect()))
765 .map_err(schema_error_response)
766}
767
768async fn check_compatibility(
769 State(state): State<Arc<ServerState>>,
770 Path((subject, version)): Path<(String, String)>,
771 Json(req): Json<CompatibilityRequest>,
772) -> Result<Json<CompatibilityResponse>, (StatusCode, Json<ErrorResponse>)> {
773 let schema_type = req
774 .schema_type
775 .as_deref()
776 .unwrap_or("AVRO")
777 .parse::<SchemaType>()
778 .map_err(|_| {
779 schema_error_response(crate::SchemaError::InvalidInput(format!(
780 "Invalid schema type: {}",
781 req.schema_type.as_deref().unwrap_or("AVRO")
782 )))
783 })?;
784
785 let version = if version == "latest" {
786 None
787 } else {
788 Some(SchemaVersion::new(version.parse().map_err(|_| {
789 schema_error_response(crate::SchemaError::InvalidInput(format!(
790 "Invalid version: {}",
791 version
792 )))
793 })?))
794 };
795
796 state
797 .registry
798 .check_compatibility(subject.as_str(), schema_type, &req.schema, version)
799 .await
800 .map(|result| {
801 Json(CompatibilityResponse {
802 is_compatible: result.is_compatible,
803 messages: result.messages,
804 })
805 })
806 .map_err(schema_error_response)
807}
808
809async fn get_global_config(State(state): State<Arc<ServerState>>) -> Json<ConfigResponse> {
810 Json(ConfigResponse {
811 compatibility_level: state.default_compatibility.read().to_string(),
812 })
813}
814
815async fn update_global_config(
816 State(state): State<Arc<ServerState>>,
817 Json(req): Json<ConfigRequest>,
818) -> Result<Json<ConfigResponse>, (StatusCode, Json<ErrorResponse>)> {
819 let level: CompatibilityLevel = req.compatibility.parse().map_err(|_| {
820 (
821 StatusCode::UNPROCESSABLE_ENTITY,
822 Json(ErrorResponse {
823 error_code: 42203,
824 message: format!("Invalid compatibility level: {}", req.compatibility),
825 }),
826 )
827 })?;
828
829 *state.default_compatibility.write() = level;
830
831 Ok(Json(ConfigResponse {
832 compatibility_level: level.to_string(),
833 }))
834}
835
836async fn get_subject_config(
837 State(state): State<Arc<ServerState>>,
838 Path(subject): Path<String>,
839) -> Json<ConfigResponse> {
840 let level = state
841 .registry
842 .get_subject_compatibility(&Subject::new(subject));
843 Json(ConfigResponse {
844 compatibility_level: level.to_string(),
845 })
846}
847
848async fn update_subject_config(
849 State(state): State<Arc<ServerState>>,
850 Path(subject): Path<String>,
851 Json(req): Json<ConfigRequest>,
852) -> Result<Json<ConfigResponse>, (StatusCode, Json<ErrorResponse>)> {
853 let level: CompatibilityLevel = req.compatibility.parse().map_err(|_| {
854 (
855 StatusCode::UNPROCESSABLE_ENTITY,
856 Json(ErrorResponse {
857 error_code: 42203,
858 message: format!("Invalid compatibility level: {}", req.compatibility),
859 }),
860 )
861 })?;
862
863 state
864 .registry
865 .set_subject_compatibility(subject.as_str(), level);
866
867 Ok(Json(ConfigResponse {
868 compatibility_level: level.to_string(),
869 }))
870}
871
872async fn get_mode(State(state): State<Arc<ServerState>>) -> Json<ModeResponse> {
873 Json(ModeResponse {
874 mode: state.mode.read().to_string(),
875 })
876}
877
878async fn update_mode(
879 State(state): State<Arc<ServerState>>,
880 Json(req): Json<ModeRequest>,
881) -> Result<Json<ModeResponse>, (StatusCode, Json<ErrorResponse>)> {
882 let mode = match req.mode.to_uppercase().as_str() {
883 "READWRITE" => RegistryMode::ReadWrite,
884 "READONLY" => RegistryMode::ReadOnly,
885 "IMPORT" => RegistryMode::Import,
886 _ => {
887 return Err((
888 StatusCode::UNPROCESSABLE_ENTITY,
889 Json(ErrorResponse {
890 error_code: 42204,
891 message: format!("Invalid mode: {}", req.mode),
892 }),
893 ))
894 }
895 };
896
897 *state.mode.write() = mode;
898
899 Ok(Json(ModeResponse {
900 mode: mode.to_string(),
901 }))
902}
903
904#[derive(Serialize)]
909struct VersionStateResponse {
910 state: String,
911}
912
913#[derive(Deserialize)]
914struct VersionStateRequest {
915 state: String,
916}
917
918async fn get_version_state(
919 State(state): State<Arc<ServerState>>,
920 Path((subject, version)): Path<(String, u32)>,
921) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
922 state
923 .registry
924 .get_version_state(subject.as_str(), SchemaVersion::new(version))
925 .await
926 .map(|s| {
927 Json(VersionStateResponse {
928 state: s.to_string(),
929 })
930 })
931 .map_err(schema_error_response)
932}
933
934async fn set_version_state(
935 State(state): State<Arc<ServerState>>,
936 Path((subject, version)): Path<(String, u32)>,
937 Json(req): Json<VersionStateRequest>,
938) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
939 let version_state: VersionState = req.state.parse().map_err(|_| {
940 (
941 StatusCode::UNPROCESSABLE_ENTITY,
942 Json(ErrorResponse {
943 error_code: 42205,
944 message: format!("Invalid version state: {}", req.state),
945 }),
946 )
947 })?;
948
949 state
950 .registry
951 .set_version_state(subject.as_str(), SchemaVersion::new(version), version_state)
952 .await
953 .map_err(schema_error_response)?;
954
955 Ok(Json(VersionStateResponse {
956 state: version_state.to_string(),
957 }))
958}
959
960async fn deprecate_version(
961 State(state): State<Arc<ServerState>>,
962 Path((subject, version)): Path<(String, u32)>,
963) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
964 state
965 .registry
966 .deprecate_version(subject.as_str(), SchemaVersion::new(version))
967 .await
968 .map_err(schema_error_response)?;
969
970 Ok(Json(VersionStateResponse {
971 state: "DEPRECATED".to_string(),
972 }))
973}
974
975async fn disable_version(
976 State(state): State<Arc<ServerState>>,
977 Path((subject, version)): Path<(String, u32)>,
978) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
979 state
980 .registry
981 .disable_version(subject.as_str(), SchemaVersion::new(version))
982 .await
983 .map_err(schema_error_response)?;
984
985 Ok(Json(VersionStateResponse {
986 state: "DISABLED".to_string(),
987 }))
988}
989
990async fn enable_version(
991 State(state): State<Arc<ServerState>>,
992 Path((subject, version)): Path<(String, u32)>,
993) -> Result<Json<VersionStateResponse>, (StatusCode, Json<ErrorResponse>)> {
994 state
995 .registry
996 .enable_version(subject.as_str(), SchemaVersion::new(version))
997 .await
998 .map_err(schema_error_response)?;
999
1000 Ok(Json(VersionStateResponse {
1001 state: "ENABLED".to_string(),
1002 }))
1003}
1004
1005#[derive(Deserialize)]
1010struct ValidateSchemaRequest {
1011 schema: String,
1012 #[serde(rename = "schemaType", default)]
1013 schema_type: Option<String>,
1014}
1015
1016#[derive(Serialize)]
1017struct ValidationResponse {
1018 is_valid: bool,
1019 errors: Vec<String>,
1020 warnings: Vec<String>,
1021}
1022
1023async fn validate_schema(
1024 State(state): State<Arc<ServerState>>,
1025 Path(subject): Path<String>,
1026 Json(req): Json<ValidateSchemaRequest>,
1027) -> Result<Json<ValidationResponse>, (StatusCode, Json<ErrorResponse>)> {
1028 let schema_type = req
1029 .schema_type
1030 .as_deref()
1031 .unwrap_or("AVRO")
1032 .parse::<SchemaType>()
1033 .map_err(|_| {
1034 schema_error_response(crate::SchemaError::InvalidInput(format!(
1035 "Invalid schema type: {}",
1036 req.schema_type.as_deref().unwrap_or("AVRO")
1037 )))
1038 })?;
1039
1040 state
1041 .registry
1042 .validate_schema(schema_type, &subject, &req.schema)
1043 .map(|report| {
1044 Json(ValidationResponse {
1045 is_valid: report.is_valid(),
1046 errors: report.error_messages(),
1047 warnings: report.warning_messages(),
1048 })
1049 })
1050 .map_err(schema_error_response)
1051}
1052
1053#[derive(Serialize)]
1054struct ValidationRuleResponse {
1055 name: String,
1056 rule_type: String,
1057 config: String,
1058 level: String,
1059 #[serde(skip_serializing_if = "Option::is_none")]
1060 description: Option<String>,
1061 active: bool,
1062}
1063
1064#[derive(Deserialize)]
1065struct AddValidationRuleRequest {
1066 name: String,
1067 rule_type: String,
1068 config: String,
1069 #[serde(default = "default_level")]
1070 level: String,
1071 description: Option<String>,
1072 #[serde(default)]
1073 subjects: Vec<String>,
1074 #[serde(default)]
1075 schema_types: Vec<String>,
1076}
1077
1078fn default_level() -> String {
1079 "ERROR".to_string()
1080}
1081
1082async fn list_validation_rules(
1083 State(state): State<Arc<ServerState>>,
1084) -> Json<Vec<ValidationRuleResponse>> {
1085 let rules = state.registry.validation_engine().read().list_rules();
1086 Json(
1087 rules
1088 .into_iter()
1089 .map(|r| ValidationRuleResponse {
1090 name: r.name().to_string(),
1091 rule_type: format!("{:?}", r.rule_type()),
1092 config: r.config().to_string(),
1093 level: format!("{:?}", r.level()),
1094 description: r.description().map(|s| s.to_string()),
1095 active: r.is_active(),
1096 })
1097 .collect(),
1098 )
1099}
1100
1101async fn add_validation_rule(
1102 State(state): State<Arc<ServerState>>,
1103 Json(req): Json<AddValidationRuleRequest>,
1104) -> Result<Json<ValidationRuleResponse>, (StatusCode, Json<ErrorResponse>)> {
1105 let rule_type: ValidationRuleType = req.rule_type.parse().map_err(|_| {
1106 (
1107 StatusCode::UNPROCESSABLE_ENTITY,
1108 Json(ErrorResponse {
1109 error_code: 42206,
1110 message: format!("Invalid rule type: {}", req.rule_type),
1111 }),
1112 )
1113 })?;
1114
1115 let level: ValidationLevel = req.level.parse().map_err(|_| {
1116 (
1117 StatusCode::UNPROCESSABLE_ENTITY,
1118 Json(ErrorResponse {
1119 error_code: 42207,
1120 message: format!("Invalid validation level: {}", req.level),
1121 }),
1122 )
1123 })?;
1124
1125 let mut rule = ValidationRule::new(&req.name, rule_type, &req.config).with_level(level);
1126
1127 if let Some(desc) = &req.description {
1128 rule = rule.with_description(desc);
1129 }
1130
1131 if !req.subjects.is_empty() {
1132 rule = rule.for_subjects(req.subjects);
1133 }
1134
1135 if !req.schema_types.is_empty() {
1136 let schema_types: Vec<SchemaType> = req
1137 .schema_types
1138 .iter()
1139 .filter_map(|s| s.parse().ok())
1140 .collect();
1141 rule = rule.for_schema_types(schema_types);
1142 }
1143
1144 state.registry.add_validation_rule(rule.clone());
1145
1146 Ok(Json(ValidationRuleResponse {
1147 name: rule.name().to_string(),
1148 rule_type: format!("{:?}", rule.rule_type()),
1149 config: rule.config().to_string(),
1150 level: format!("{:?}", rule.level()),
1151 description: rule.description().map(|s| s.to_string()),
1152 active: rule.is_active(),
1153 }))
1154}
1155
1156async fn delete_validation_rule(
1157 State(state): State<Arc<ServerState>>,
1158 Path(name): Path<String>,
1159) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
1160 let removed = state
1161 .registry
1162 .validation_engine()
1163 .write()
1164 .remove_rule(&name);
1165 if removed {
1166 Ok(StatusCode::NO_CONTENT)
1167 } else {
1168 Err((
1169 StatusCode::NOT_FOUND,
1170 Json(ErrorResponse {
1171 error_code: 40404,
1172 message: format!("Validation rule not found: {}", name),
1173 }),
1174 ))
1175 }
1176}
1177
1178#[derive(Serialize)]
1183struct ContextResponse {
1184 name: String,
1185 #[serde(skip_serializing_if = "Option::is_none")]
1186 description: Option<String>,
1187 active: bool,
1188}
1189
1190#[derive(Deserialize)]
1191struct CreateContextRequest {
1192 name: String,
1193 description: Option<String>,
1194}
1195
1196async fn list_contexts(State(state): State<Arc<ServerState>>) -> Json<Vec<ContextResponse>> {
1197 let contexts = state.registry.list_contexts();
1198 Json(
1199 contexts
1200 .into_iter()
1201 .map(|c| ContextResponse {
1202 name: c.name().to_string(),
1203 description: c.description().map(|s| s.to_string()),
1204 active: c.is_active(),
1205 })
1206 .collect(),
1207 )
1208}
1209
1210async fn create_context(
1211 State(state): State<Arc<ServerState>>,
1212 Json(req): Json<CreateContextRequest>,
1213) -> Result<Json<ContextResponse>, (StatusCode, Json<ErrorResponse>)> {
1214 let mut context = SchemaContext::new(&req.name);
1215 if let Some(desc) = &req.description {
1216 context = context.with_description(desc);
1217 }
1218
1219 state
1220 .registry
1221 .create_context(context.clone())
1222 .map_err(schema_error_response)?;
1223
1224 Ok(Json(ContextResponse {
1225 name: context.name().to_string(),
1226 description: context.description().map(|s| s.to_string()),
1227 active: context.is_active(),
1228 }))
1229}
1230
1231async fn get_context(
1232 State(state): State<Arc<ServerState>>,
1233 Path(context_name): Path<String>,
1234) -> Result<Json<ContextResponse>, (StatusCode, Json<ErrorResponse>)> {
1235 state
1236 .registry
1237 .get_context(&context_name)
1238 .map(|c| {
1239 Json(ContextResponse {
1240 name: c.name().to_string(),
1241 description: c.description().map(|s| s.to_string()),
1242 active: c.is_active(),
1243 })
1244 })
1245 .ok_or_else(|| {
1246 (
1247 StatusCode::NOT_FOUND,
1248 Json(ErrorResponse {
1249 error_code: 40405,
1250 message: format!("Context not found: {}", context_name),
1251 }),
1252 )
1253 })
1254}
1255
1256async fn delete_context(
1257 State(state): State<Arc<ServerState>>,
1258 Path(context_name): Path<String>,
1259) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
1260 state
1261 .registry
1262 .delete_context(&context_name)
1263 .map(|_| StatusCode::NO_CONTENT)
1264 .map_err(schema_error_response)
1265}
1266
1267async fn list_context_subjects(
1268 State(state): State<Arc<ServerState>>,
1269 Path(context_name): Path<String>,
1270) -> Json<Vec<String>> {
1271 Json(state.registry.list_subjects_in_context(&context_name))
1272}
1273
1274#[derive(Serialize)]
1279struct StatsResponse {
1280 schema_count: usize,
1281 subject_count: usize,
1282 context_count: usize,
1283 cache_size: usize,
1284}
1285
1286async fn get_stats(State(state): State<Arc<ServerState>>) -> Json<StatsResponse> {
1287 let stats = state.registry.stats().await;
1288 Json(StatsResponse {
1289 schema_count: stats.schema_count,
1290 subject_count: stats.subject_count,
1291 context_count: stats.context_count,
1292 cache_size: stats.cache_size,
1293 })
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298 use super::*;
1299 use crate::config::RegistryConfig;
1300 use axum::body::Body;
1301 use axum::http::Request;
1302 use tower::util::ServiceExt;
1303
1304 async fn create_test_app() -> Router {
1305 let config = RegistryConfig::memory();
1306 let registry = SchemaRegistry::new(config).await.unwrap();
1307 let server = SchemaServer::new(registry, ServerConfig::default());
1308 server.router()
1309 }
1310
1311 #[tokio::test]
1312 async fn test_root_endpoint() {
1313 let app = create_test_app().await;
1314
1315 let response = app
1316 .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap())
1317 .await
1318 .unwrap();
1319
1320 assert_eq!(response.status(), StatusCode::OK);
1321 }
1322
1323 #[tokio::test]
1324 async fn test_list_subjects_empty() {
1325 let app = create_test_app().await;
1326
1327 let response = app
1328 .oneshot(
1329 Request::builder()
1330 .uri("/subjects")
1331 .body(Body::empty())
1332 .unwrap(),
1333 )
1334 .await
1335 .unwrap();
1336
1337 assert_eq!(response.status(), StatusCode::OK);
1338 }
1339
1340 #[tokio::test]
1341 async fn test_get_config() {
1342 let app = create_test_app().await;
1343
1344 let response = app
1345 .oneshot(
1346 Request::builder()
1347 .uri("/config")
1348 .body(Body::empty())
1349 .unwrap(),
1350 )
1351 .await
1352 .unwrap();
1353
1354 assert_eq!(response.status(), StatusCode::OK);
1355 }
1356
1357 #[tokio::test]
1358 async fn test_get_mode() {
1359 let app = create_test_app().await;
1360
1361 let response = app
1362 .oneshot(Request::builder().uri("/mode").body(Body::empty()).unwrap())
1363 .await
1364 .unwrap();
1365
1366 assert_eq!(response.status(), StatusCode::OK);
1367 }
1368
1369 #[tokio::test]
1370 async fn test_health_endpoint() {
1371 let app = create_test_app().await;
1372
1373 let response = app
1374 .oneshot(
1375 Request::builder()
1376 .uri("/health")
1377 .body(Body::empty())
1378 .unwrap(),
1379 )
1380 .await
1381 .unwrap();
1382
1383 assert_eq!(response.status(), StatusCode::OK);
1384 }
1385
1386 #[tokio::test]
1387 async fn test_liveness_endpoint() {
1388 let app = create_test_app().await;
1389
1390 let response = app
1391 .oneshot(
1392 Request::builder()
1393 .uri("/health/live")
1394 .body(Body::empty())
1395 .unwrap(),
1396 )
1397 .await
1398 .unwrap();
1399
1400 assert_eq!(response.status(), StatusCode::OK);
1401 }
1402
1403 #[tokio::test]
1404 async fn test_readiness_endpoint() {
1405 let app = create_test_app().await;
1406
1407 let response = app
1408 .oneshot(
1409 Request::builder()
1410 .uri("/health/ready")
1411 .body(Body::empty())
1412 .unwrap(),
1413 )
1414 .await
1415 .unwrap();
1416
1417 assert_eq!(response.status(), StatusCode::OK);
1418 }
1419
1420 #[tokio::test]
1421 async fn test_stats_endpoint() {
1422 let app = create_test_app().await;
1423
1424 let response = app
1425 .oneshot(
1426 Request::builder()
1427 .uri("/stats")
1428 .body(Body::empty())
1429 .unwrap(),
1430 )
1431 .await
1432 .unwrap();
1433
1434 assert_eq!(response.status(), StatusCode::OK);
1435 }
1436
1437 #[tokio::test]
1438 async fn test_list_contexts_empty() {
1439 let app = create_test_app().await;
1440
1441 let response = app
1442 .oneshot(
1443 Request::builder()
1444 .uri("/contexts")
1445 .body(Body::empty())
1446 .unwrap(),
1447 )
1448 .await
1449 .unwrap();
1450
1451 assert_eq!(response.status(), StatusCode::OK);
1452 }
1453
1454 #[tokio::test]
1455 async fn test_list_validation_rules_empty() {
1456 let app = create_test_app().await;
1457
1458 let response = app
1459 .oneshot(
1460 Request::builder()
1461 .uri("/config/validation/rules")
1462 .body(Body::empty())
1463 .unwrap(),
1464 )
1465 .await
1466 .unwrap();
1467
1468 assert_eq!(response.status(), StatusCode::OK);
1469 }
1470}