tansu_schema/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Schema
16//!
17//! Schema includes the following:
18//! - Validation of Kafka messages with an AVRO, JSON or Protobuf schema
19
20use std::{
21    collections::BTreeMap,
22    env::{self},
23    io,
24    num::TryFromIntError,
25    result,
26    string::FromUtf8Error,
27    sync::{Arc, LazyLock, Mutex, PoisonError},
28    time::SystemTime,
29};
30
31use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
32use bytes::Bytes;
33use datafusion::error::DataFusionError;
34use deltalake::DeltaTableError;
35use governor::InsufficientCapacity;
36use iceberg::spec::DataFileBuilderError;
37use jsonschema::ValidationError;
38use object_store::{
39    DynObjectStore, ObjectStore, aws::AmazonS3Builder, local::LocalFileSystem, memory::InMemory,
40    path::Path,
41};
42use opentelemetry::{
43    InstrumentationScope, KeyValue, global,
44    metrics::{Counter, Histogram, Meter},
45};
46use opentelemetry_semantic_conventions::SCHEMA_URL;
47use parquet::errors::ParquetError;
48use rhai::EvalAltResult;
49use serde_json::Value;
50use tansu_sans_io::{ErrorCode, record::inflated::Batch};
51use tracing::{debug, error};
52use tracing_subscriber::filter::ParseError;
53use url::Url;
54
55use crate::lake::LakeHouseType;
56
57pub mod avro;
58pub mod json;
59pub mod lake;
60pub mod proto;
61pub(crate) mod sql;
62
63pub(crate) const ARROW_LIST_FIELD_NAME: &str = "element";
64
65/// Error
66#[derive(thiserror::Error, Debug)]
67pub enum Error {
68    #[error("{:?}", self)]
69    Anyhow(#[from] anyhow::Error),
70
71    #[error("{:?}", self)]
72    Api(ErrorCode),
73
74    #[error("{:?}", self)]
75    Arrow(#[from] ArrowError),
76
77    #[error("{:?}", self)]
78    Avro(Box<apache_avro::Error>),
79
80    #[error("{:?}", self)]
81    AvroToJson(apache_avro::types::Value),
82
83    #[error("{:?}", self)]
84    BadDowncast { field: String },
85
86    #[error("{:?}", self)]
87    EvalAlt(#[from] Box<EvalAltResult>),
88
89    #[error("{:?}", self)]
90    BuilderExhausted,
91
92    #[error("{:?}", self)]
93    ChronoParse(#[from] chrono::ParseError),
94
95    #[error("{:?}", self)]
96    DataFileBuilder(#[from] DataFileBuilderError),
97
98    #[error("{:?}", self)]
99    DataFusion(#[from] DataFusionError),
100
101    #[error("{:?}", self)]
102    DeltaTable(#[from] DeltaTableError),
103
104    #[error("{:?}", self)]
105    Downcast,
106
107    #[error("{:?}", self)]
108    FromUtf8(#[from] FromUtf8Error),
109
110    #[error("{:?}", self)]
111    Iceberg(#[from] ::iceberg::Error),
112
113    #[error("{:?}", self)]
114    InvalidValue(apache_avro::types::Value),
115
116    #[error("{:?}", self)]
117    InsufficientCapacity(#[from] InsufficientCapacity),
118
119    #[error("{:?}", self)]
120    Io(#[from] io::Error),
121
122    #[error("{:?}", self)]
123    JsonToAvro(Box<apache_avro::Schema>, Box<Value>),
124
125    #[error("field: {field}, not found in: {value} with schema: {schema}")]
126    JsonToAvroFieldNotFound {
127        schema: Box<apache_avro::Schema>,
128        value: Box<Value>,
129        field: String,
130    },
131
132    #[error("{:?}", self)]
133    KafkaSansIo(#[from] tansu_sans_io::Error),
134
135    #[error("{:?}", self)]
136    Message(String),
137
138    #[error("{:?}", self)]
139    NoCommonType(Vec<DataType>),
140
141    #[error("{:?}", self)]
142    ObjectStore(#[from] object_store::Error),
143
144    #[error("{:?}", self)]
145    Parquet(#[from] ParquetError),
146
147    #[error("{:?}", self)]
148    ParseFilter(#[from] ParseError),
149
150    #[error("{:?}", self)]
151    ParseUrl(#[from] url::ParseError),
152
153    #[error("{:?}", self)]
154    Poison,
155
156    #[error("{:?}", self)]
157    ProtobufJsonMapping(#[from] protobuf_json_mapping::ParseError),
158
159    #[error("{:?}", self)]
160    ProtobufJsonMappingPrint(#[from] protobuf_json_mapping::PrintError),
161
162    #[error("{:?}", self)]
163    Protobuf(#[from] protobuf::Error),
164
165    #[error("{:?}", self)]
166    ProtobufFileDescriptorMissing(Bytes),
167
168    #[error("{:?}", self)]
169    SchemaValidation,
170
171    #[error("{:?}", self)]
172    SerdeJson(#[from] serde_json::Error),
173
174    #[error("{:?}", self)]
175    SqlParser(#[from] datafusion::logical_expr::sqlparser::parser::ParserError),
176
177    #[error("{:?}", self)]
178    TryFromInt(#[from] TryFromIntError),
179
180    #[error("{:?}", self)]
181    UnsupportedIcebergCatalogUrl(Url),
182
183    #[error("{:?}", self)]
184    UnsupportedLakeHouseUrl(Url),
185
186    #[error("{:?}", self)]
187    UnsupportedSchemaRegistryUrl(Url),
188
189    #[error("{:?}", self)]
190    UnsupportedSchemaRuntimeValue(DataType, Value),
191
192    #[error("{:?}", self)]
193    Uuid(#[from] uuid::Error),
194}
195
196impl From<apache_avro::Error> for Error {
197    fn from(value: apache_avro::Error) -> Self {
198        Self::Avro(Box::new(value))
199    }
200}
201
202impl<T> From<PoisonError<T>> for Error {
203    fn from(_value: PoisonError<T>) -> Self {
204        Self::Poison
205    }
206}
207
208impl From<ValidationError<'_>> for Error {
209    fn from(_value: ValidationError<'_>) -> Self {
210        Self::SchemaValidation
211    }
212}
213
214pub type Result<T, E = Error> = result::Result<T, E>;
215
216/// Validate a Batch with a Schema
217pub trait Validator {
218    fn validate(&self, batch: &Batch) -> Result<()>;
219}
220
221/// Represent a Batch in the Arrow columnar data format
222pub trait AsArrow {
223    fn as_arrow(
224        &self,
225        partition: i32,
226        batch: &Batch,
227        lake_type: LakeHouseType,
228    ) -> Result<RecordBatch>;
229}
230
231/// Convert a JSON message into a Kafka record
232pub trait AsKafkaRecord {
233    fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder>;
234}
235
236/// Convert a Batch into a JSON value
237pub trait AsJsonValue {
238    fn as_json_value(&self, batch: &Batch) -> Result<Value>;
239}
240
241/// Generate a record
242pub trait Generator {
243    fn generate(&self) -> Result<tansu_sans_io::record::Builder>;
244}
245
246// Schema
247//
248// This is wrapper enumeration for the supported schema types
249#[derive(Clone, Debug)]
250pub enum Schema {
251    Avro(Box<avro::Schema>),
252    Json(Arc<json::Schema>),
253    Proto(Box<proto::Schema>),
254}
255
256impl AsKafkaRecord for Schema {
257    fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder> {
258        debug!(?value);
259
260        match self {
261            Self::Avro(schema) => schema.as_kafka_record(value),
262            Self::Json(schema) => schema.as_kafka_record(value),
263            Self::Proto(schema) => schema.as_kafka_record(value),
264        }
265    }
266}
267
268impl Validator for Schema {
269    fn validate(&self, batch: &Batch) -> Result<()> {
270        debug!(?batch);
271
272        match self {
273            Self::Avro(schema) => schema.validate(batch),
274            Self::Json(schema) => schema.validate(batch),
275            Self::Proto(schema) => schema.validate(batch),
276        }
277    }
278}
279
280impl AsArrow for Schema {
281    fn as_arrow(
282        &self,
283        partition: i32,
284        batch: &Batch,
285        lake_type: LakeHouseType,
286    ) -> Result<RecordBatch> {
287        debug!(?batch);
288
289        match self {
290            Self::Avro(schema) => schema.as_arrow(partition, batch, lake_type),
291            Self::Json(schema) => schema.as_arrow(partition, batch, lake_type),
292            Self::Proto(schema) => schema.as_arrow(partition, batch, lake_type),
293        }
294    }
295}
296
297impl AsJsonValue for Schema {
298    fn as_json_value(&self, batch: &Batch) -> Result<Value> {
299        debug!(?batch);
300
301        match self {
302            Self::Avro(schema) => schema.as_json_value(batch),
303            Self::Json(schema) => schema.as_json_value(batch),
304            Self::Proto(schema) => schema.as_json_value(batch),
305        }
306    }
307}
308
309impl Generator for Schema {
310    fn generate(&self) -> Result<tansu_sans_io::record::Builder> {
311        match self {
312            Schema::Avro(schema) => schema.generate(),
313            Schema::Json(schema) => schema.generate(),
314            Schema::Proto(schema) => schema.generate(),
315        }
316    }
317}
318
319// Schema Registry
320#[derive(Clone, Debug)]
321pub struct Registry {
322    object_store: Arc<DynObjectStore>,
323    schemas: Arc<Mutex<BTreeMap<String, Schema>>>,
324    validation_duration: Histogram<u64>,
325    validation_error: Counter<u64>,
326    as_arrow_duration: Histogram<u64>,
327}
328
329pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
330    global::meter_with_scope(
331        InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
332            .with_version(env!("CARGO_PKG_VERSION"))
333            .with_schema_url(SCHEMA_URL)
334            .build(),
335    )
336});
337
338impl Registry {
339    pub fn new(storage: impl ObjectStore) -> Self {
340        Self {
341            object_store: Arc::new(storage),
342            schemas: Arc::new(Mutex::new(BTreeMap::new())),
343            validation_duration: METER
344                .u64_histogram("registry_validation_duration")
345                .with_unit("ms")
346                .with_description("The registry validation request latencies in milliseconds")
347                .build(),
348            validation_error: METER
349                .u64_counter("registry_validation_error")
350                .with_description("The registry validation error count")
351                .build(),
352            as_arrow_duration: METER
353                .u64_histogram("registry_as_arrow_duration")
354                .with_unit("ms")
355                .with_description("The registry as Apache Arrow latencies in milliseconds")
356                .build(),
357        }
358    }
359
360    pub fn as_arrow(
361        &self,
362        topic: &str,
363        partition: i32,
364        batch: &Batch,
365        lake_type: LakeHouseType,
366    ) -> Result<Option<RecordBatch>> {
367        debug!(topic, partition, ?batch);
368
369        let start = SystemTime::now();
370
371        self.schemas
372            .lock()
373            .map_err(Into::into)
374            .and_then(|guard| {
375                guard
376                    .get(topic)
377                    .map(|schema| schema.as_arrow(partition, batch, lake_type))
378                    .transpose()
379            })
380            .inspect(|record_batch| {
381                debug!(
382                    rows = record_batch
383                        .as_ref()
384                        .map(|record_batch| record_batch.num_rows())
385                );
386                self.as_arrow_duration.record(
387                    start
388                        .elapsed()
389                        .map_or(0, |duration| duration.as_millis() as u64),
390                    &[KeyValue::new("topic", topic.to_owned())],
391                )
392            })
393            .inspect_err(|err| debug!(?err))
394    }
395
396    pub async fn schema(&self, topic: &str) -> Result<Option<Schema>> {
397        debug!(?topic);
398
399        let proto = Path::from(format!("{topic}.proto"));
400        let json = Path::from(format!("{topic}.json"));
401        let avro = Path::from(format!("{topic}.avsc"));
402
403        if let Some(schema) = self.schemas.lock().map(|guard| guard.get(topic).cloned())? {
404            Ok(Some(schema))
405        } else if let Ok(get_result) = self
406            .object_store
407            .get(&proto)
408            .await
409            .inspect(|get_result| debug!(?get_result))
410            .inspect_err(|err| debug!(?err))
411        {
412            get_result
413                .bytes()
414                .await
415                .map_err(Into::into)
416                .and_then(proto::Schema::try_from)
417                .map(Box::new)
418                .map(Schema::Proto)
419                .and_then(|schema| {
420                    self.schemas
421                        .lock()
422                        .map_err(Into::into)
423                        .map(|mut guard| guard.insert(topic.to_owned(), schema.clone()))
424                        .and(Ok(Some(schema)))
425                })
426        } else if let Ok(get_result) = self.object_store.get(&json).await {
427            get_result
428                .bytes()
429                .await
430                .map_err(Into::into)
431                .and_then(json::Schema::try_from)
432                .map(Arc::new)
433                .map(Schema::Json)
434                .and_then(|schema| {
435                    self.schemas
436                        .lock()
437                        .map_err(Into::into)
438                        .map(|mut guard| guard.insert(topic.to_owned(), schema.clone()))
439                        .and(Ok(Some(schema)))
440                })
441        } else if let Ok(get_result) = self.object_store.get(&avro).await {
442            get_result
443                .bytes()
444                .await
445                .map_err(Into::into)
446                .and_then(avro::Schema::try_from)
447                .map(Box::new)
448                .map(Schema::Avro)
449                .and_then(|schema| {
450                    self.schemas
451                        .lock()
452                        .map_err(Into::into)
453                        .map(|mut guard| guard.insert(topic.to_owned(), schema.clone()))
454                        .and(Ok(Some(schema)))
455                })
456        } else {
457            Ok(None)
458        }
459    }
460
461    pub async fn validate(&self, topic: &str, batch: &Batch) -> Result<()> {
462        debug!(%topic, ?batch);
463
464        let validation_start = SystemTime::now();
465
466        let Some(schema) = self.schema(topic).await? else {
467            debug!(no_schema_for_topic = %topic);
468            return Ok(());
469        };
470
471        schema
472            .validate(batch)
473            .inspect(|_| {
474                self.validation_duration.record(
475                    validation_start
476                        .elapsed()
477                        .map_or(0, |duration| duration.as_millis() as u64),
478                    &[KeyValue::new("topic", topic.to_owned())],
479                )
480            })
481            .inspect_err(|err| {
482                self.validation_error.add(
483                    1,
484                    &[
485                        KeyValue::new("topic", topic.to_owned()),
486                        KeyValue::new("reason", err.to_string()),
487                    ],
488                )
489            })
490    }
491}
492
493impl TryFrom<Url> for Registry {
494    type Error = Error;
495
496    fn try_from(storage: Url) -> Result<Self, Self::Error> {
497        Self::try_from(&storage)
498    }
499}
500
501impl TryFrom<&Url> for Registry {
502    type Error = Error;
503
504    fn try_from(storage: &Url) -> Result<Self, Self::Error> {
505        debug!(%storage);
506
507        match storage.scheme() {
508            "s3" => {
509                let bucket_name = storage.host_str().unwrap_or("schema");
510
511                AmazonS3Builder::from_env()
512                    .with_bucket_name(bucket_name)
513                    .build()
514                    .map_err(Into::into)
515                    .map(Registry::new)
516            }
517
518            "file" => {
519                let mut path = env::current_dir().inspect(|current_dir| debug!(?current_dir))?;
520
521                if let Some(domain) = storage.domain() {
522                    path.push(domain);
523                }
524
525                if let Some(relative) = storage.path().strip_prefix("/") {
526                    path.push(relative);
527                } else {
528                    path.push(storage.path());
529                }
530
531                debug!(?path);
532
533                LocalFileSystem::new_with_prefix(path)
534                    .map_err(Into::into)
535                    .map(Registry::new)
536            }
537
538            "memory" => Ok(Registry::new(InMemory::new())),
539
540            _unsupported => Err(Error::UnsupportedSchemaRegistryUrl(storage.to_owned())),
541        }
542    }
543}
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548    use crate::Result;
549    use bytes::Bytes;
550    use object_store::PutPayload;
551    use serde_json::json;
552    use std::{fs::File, sync::Arc, thread};
553    use tansu_sans_io::record::Record;
554    use tracing::{error, subscriber::DefaultGuard};
555    use tracing_subscriber::EnvFilter;
556
557    fn init_tracing() -> Result<DefaultGuard> {
558        Ok(tracing::subscriber::set_default(
559            tracing_subscriber::fmt()
560                .with_level(true)
561                .with_line_number(true)
562                .with_thread_names(false)
563                .with_env_filter(
564                    EnvFilter::from_default_env()
565                        .add_directive(format!("{}=debug", env!("CARGO_CRATE_NAME")).parse()?),
566                )
567                .with_writer(
568                    thread::current()
569                        .name()
570                        .ok_or(Error::Message(String::from("unnamed thread")))
571                        .and_then(|name| {
572                            File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
573                                .map_err(Into::into)
574                        })
575                        .map(Arc::new)?,
576                )
577                .finish(),
578        ))
579    }
580
581    const DEF_PROTO: &[u8] = br#"
582      syntax = 'proto3';
583
584      message Key {
585        int32 id = 1;
586      }
587
588      message Value {
589        string name = 1;
590        string email = 2;
591      }
592    "#;
593
594    const PQR_AVRO: &[u8] = br#"
595        {
596            "type": "record",
597            "name": "test",
598            "fields": [
599                {"name": "a", "type": "long", "default": 42},
600                {"name": "b", "type": "string"},
601                {"name": "c", "type": "long", "default": 43}
602            ]
603        }
604    "#;
605
606    async fn populate() -> Result<Registry> {
607        let _guard = init_tracing()?;
608
609        let object_store = InMemory::new();
610
611        let location = Path::from("abc.json");
612        let payload = serde_json::to_vec(&json!({
613            "type": "object",
614            "properties": {
615                "key": {
616                    "type": "number",
617                    "multipleOf": 10
618                }
619            }
620        }))
621        .map(Bytes::from)
622        .map(PutPayload::from)?;
623
624        _ = object_store.put(&location, payload).await?;
625
626        let location = Path::from("def.proto");
627        let payload = PutPayload::from(Bytes::from_static(DEF_PROTO));
628        _ = object_store.put(&location, payload).await?;
629
630        let location = Path::from("pqr.avsc");
631        let payload = PutPayload::from(Bytes::from_static(PQR_AVRO));
632        _ = object_store.put(&location, payload).await?;
633
634        Ok(Registry::new(object_store))
635    }
636
637    #[tokio::test]
638    async fn abc_valid() -> Result<()> {
639        let _guard = init_tracing()?;
640
641        let registry = populate().await?;
642
643        let key = Bytes::from_static(b"5450");
644
645        let batch = Batch::builder()
646            .record(Record::builder().key(key.clone().into()))
647            .build()?;
648
649        registry.validate("abc", &batch).await?;
650
651        Ok(())
652    }
653
654    #[tokio::test]
655    async fn abc_invalid() -> Result<()> {
656        let _guard = init_tracing()?;
657        let registry = populate().await?;
658
659        let key = Bytes::from_static(b"545");
660
661        let batch = Batch::builder()
662            .record(Record::builder().key(key.clone().into()))
663            .build()?;
664
665        assert!(matches!(
666            registry
667                .validate("abc", &batch)
668                .await
669                .inspect_err(|err| error!(?err)),
670            Err(Error::Api(ErrorCode::InvalidRecord))
671        ));
672
673        Ok(())
674    }
675
676    #[tokio::test]
677    async fn pqr_valid() -> Result<()> {
678        let _guard = init_tracing()?;
679        let registry = populate().await?;
680
681        let key = Bytes::from_static(b"5450");
682
683        let batch = Batch::builder()
684            .record(Record::builder().key(key.clone().into()))
685            .build()?;
686
687        registry.validate("pqr", &batch).await?;
688
689        Ok(())
690    }
691}