rs2_stream/
schema_validation.rs1use async_trait::async_trait;
4use jsonschema::{validator_for, Validator};
5use serde_json::Value;
6use std::sync::Arc;
7
8#[derive(Debug, thiserror::Error)]
9pub enum SchemaError {
10 #[error("Validation failed: {0}")]
11 ValidationFailed(String),
12 #[error("Missing schema: {0}")]
13 MissingSchema(String),
14 #[error("Parse error: {0}")]
15 ParseError(String),
16}
17
18#[async_trait]
19pub trait SchemaValidator: Send + Sync {
20 async fn validate(&self, data: &[u8]) -> Result<(), SchemaError>;
21 fn get_schema_id(&self) -> String;
22}
23
24#[derive(Clone)]
26pub struct JsonSchemaValidator {
27 schema_id: String,
28 compiled: Arc<Validator>,
29}
30
31impl JsonSchemaValidator {
32 pub fn new(schema_id: &str, schema: Value) -> Self {
34 let compiled = validator_for(&schema).expect("Invalid JSON schema");
35 Self {
36 schema_id: schema_id.to_string(),
37 compiled: Arc::new(compiled),
38 }
39 }
40}
41
42#[async_trait]
43impl SchemaValidator for JsonSchemaValidator {
44 async fn validate(&self, data: &[u8]) -> Result<(), SchemaError> {
45 let value: Value =
46 serde_json::from_slice(data).map_err(|e| SchemaError::ParseError(e.to_string()))?;
47 if let Err(error) = self.compiled.validate(&value) {
48 return Err(SchemaError::ValidationFailed(error.to_string()));
49 }
50 Ok(())
51 }
52 fn get_schema_id(&self) -> String {
53 self.schema_id.clone()
54 }
55}