tansu_schema/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Schema
16//!
17//! Schema includes the following:
18//! - Validation of Kafka messages with an AVRO, JSON or Protobuf schema
19
20use std::{
21    collections::BTreeMap,
22    env::{self},
23    io,
24    num::TryFromIntError,
25    result,
26    string::FromUtf8Error,
27    sync::{Arc, LazyLock, Mutex, PoisonError},
28    time::{Duration, SystemTime},
29};
30
31use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
32use bytes::Bytes;
33use datafusion::error::DataFusionError;
34use deltalake::DeltaTableError;
35use governor::InsufficientCapacity;
36use iceberg::spec::DataFileBuilderError;
37use jsonschema::ValidationError;
38use object_store::{
39    DynObjectStore, ObjectStore, aws::AmazonS3Builder, local::LocalFileSystem, memory::InMemory,
40    path::Path,
41};
42use opentelemetry::{
43    InstrumentationScope, KeyValue, global,
44    metrics::{Counter, Histogram, Meter},
45};
46use opentelemetry_semantic_conventions::SCHEMA_URL;
47use parquet::errors::ParquetError;
48use rhai::EvalAltResult;
49use serde_json::Value;
50use tansu_sans_io::{ErrorCode, record::inflated::Batch};
51use tracing::{debug, error};
52use tracing_subscriber::filter::ParseError;
53use url::Url;
54
55use crate::lake::LakeHouseType;
56
57pub mod avro;
58pub mod json;
59pub mod lake;
60pub mod proto;
61pub(crate) mod sql;
62
63pub(crate) const ARROW_LIST_FIELD_NAME: &str = "element";
64
65/// Error
66#[derive(thiserror::Error, Debug)]
67pub enum Error {
68    #[error("{:?}", self)]
69    Anyhow(#[from] anyhow::Error),
70
71    #[error("{:?}", self)]
72    Api(ErrorCode),
73
74    #[error("{:?}", self)]
75    Arrow(#[from] ArrowError),
76
77    #[error("{:?}", self)]
78    Avro(Box<apache_avro::Error>),
79
80    #[error("{:?}", self)]
81    AvroToJson(apache_avro::types::Value),
82
83    #[error("{:?}", self)]
84    BadDowncast { field: String },
85
86    #[error("{:?}", self)]
87    EvalAlt(#[from] Box<EvalAltResult>),
88
89    #[error("{:?}", self)]
90    BuilderExhausted,
91
92    #[error("{:?}", self)]
93    ChronoParse(#[from] chrono::ParseError),
94
95    #[error("{:?}", self)]
96    DataFileBuilder(#[from] DataFileBuilderError),
97
98    #[error("{:?}", self)]
99    DataFusion(#[from] DataFusionError),
100
101    #[error("{:?}", self)]
102    DeltaTable(#[from] DeltaTableError),
103
104    #[error("{:?}", self)]
105    Downcast,
106
107    #[error("{:?}", self)]
108    FromUtf8(#[from] FromUtf8Error),
109
110    #[error("{:?}", self)]
111    Iceberg(#[from] ::iceberg::Error),
112
113    #[error("{:?}", self)]
114    InvalidValue(apache_avro::types::Value),
115
116    #[error("{:?}", self)]
117    InsufficientCapacity(#[from] InsufficientCapacity),
118
119    #[error("{:?}", self)]
120    Io(#[from] io::Error),
121
122    #[error("{:?}", self)]
123    JsonToAvro(Box<apache_avro::Schema>, Box<Value>),
124
125    #[error("field: {field}, not found in: {value} with schema: {schema}")]
126    JsonToAvroFieldNotFound {
127        schema: Box<apache_avro::Schema>,
128        value: Box<Value>,
129        field: String,
130    },
131
132    #[error("{:?}", self)]
133    KafkaSansIo(#[from] tansu_sans_io::Error),
134
135    #[error("{:?}", self)]
136    Message(String),
137
138    #[error("{:?}", self)]
139    NoCommonType(Vec<DataType>),
140
141    #[error("{:?}", self)]
142    ObjectStore(#[from] object_store::Error),
143
144    #[error("{:?}", self)]
145    Parquet(#[from] ParquetError),
146
147    #[error("{:?}", self)]
148    ParseFilter(#[from] ParseError),
149
150    #[error("{:?}", self)]
151    ParseUrl(#[from] url::ParseError),
152
153    #[error("{:?}", self)]
154    Poison,
155
156    #[error("{:?}", self)]
157    ProtobufJsonMapping(#[from] protobuf_json_mapping::ParseError),
158
159    #[error("{:?}", self)]
160    ProtobufJsonMappingPrint(#[from] protobuf_json_mapping::PrintError),
161
162    #[error("{:?}", self)]
163    Protobuf(#[from] protobuf::Error),
164
165    #[error("{:?}", self)]
166    ProtobufFileDescriptorMissing(Bytes),
167
168    #[error("{:?}", self)]
169    SchemaValidation,
170
171    #[error("{:?}", self)]
172    SerdeJson(#[from] serde_json::Error),
173
174    #[error("{:?}", self)]
175    SqlParser(#[from] datafusion::logical_expr::sqlparser::parser::ParserError),
176
177    #[error("{:?}", self)]
178    TryFromInt(#[from] TryFromIntError),
179
180    #[error("{:?}", self)]
181    UnsupportedIcebergCatalogUrl(Url),
182
183    #[error("{:?}", self)]
184    UnsupportedLakeHouseUrl(Url),
185
186    #[error("{:?}", self)]
187    UnsupportedSchemaRegistryUrl(Url),
188
189    #[error("{:?}", self)]
190    UnsupportedSchemaRuntimeValue(DataType, Value),
191
192    #[error("{:?}", self)]
193    Uuid(#[from] uuid::Error),
194}
195
196impl From<apache_avro::Error> for Error {
197    fn from(value: apache_avro::Error) -> Self {
198        Self::Avro(Box::new(value))
199    }
200}
201
202impl<T> From<PoisonError<T>> for Error {
203    fn from(_value: PoisonError<T>) -> Self {
204        Self::Poison
205    }
206}
207
208impl From<ValidationError<'_>> for Error {
209    fn from(_value: ValidationError<'_>) -> Self {
210        Self::SchemaValidation
211    }
212}
213
214pub type Result<T, E = Error> = result::Result<T, E>;
215
216/// Validate a Batch with a Schema
217pub trait Validator {
218    fn validate(&self, batch: &Batch) -> Result<()>;
219}
220
221/// Represent a Batch in the Arrow columnar data format
222pub trait AsArrow {
223    fn as_arrow(
224        &self,
225        partition: i32,
226        batch: &Batch,
227        lake_type: LakeHouseType,
228    ) -> Result<RecordBatch>;
229}
230
231/// Convert a JSON message into a Kafka record
232pub trait AsKafkaRecord {
233    fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder>;
234}
235
236/// Convert a Batch into a JSON value
237pub trait AsJsonValue {
238    fn as_json_value(&self, batch: &Batch) -> Result<Value>;
239}
240
241/// Generate a record
242pub trait Generator {
243    fn generate(&self) -> Result<tansu_sans_io::record::Builder>;
244}
245
246// Schema
247//
248// This is wrapper enumeration of the supported schema types
249#[derive(Clone, Debug)]
250pub enum Schema {
251    Avro(Box<avro::Schema>),
252    Json(Arc<json::Schema>),
253    Proto(Box<proto::Schema>),
254}
255
256#[derive(Clone, Debug)]
257struct CachedSchema {
258    loaded_at: SystemTime,
259    schema: Schema,
260}
261
262impl CachedSchema {
263    fn new(schema: Schema) -> Self {
264        Self {
265            schema,
266            loaded_at: SystemTime::now(),
267        }
268    }
269}
270
271impl AsKafkaRecord for Schema {
272    fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder> {
273        debug!(?value);
274
275        match self {
276            Self::Avro(schema) => schema.as_kafka_record(value),
277            Self::Json(schema) => schema.as_kafka_record(value),
278            Self::Proto(schema) => schema.as_kafka_record(value),
279        }
280    }
281}
282
283impl Validator for Schema {
284    fn validate(&self, batch: &Batch) -> Result<()> {
285        debug!(?batch);
286
287        match self {
288            Self::Avro(schema) => schema.validate(batch),
289            Self::Json(schema) => schema.validate(batch),
290            Self::Proto(schema) => schema.validate(batch),
291        }
292    }
293}
294
295impl AsArrow for Schema {
296    fn as_arrow(
297        &self,
298        partition: i32,
299        batch: &Batch,
300        lake_type: LakeHouseType,
301    ) -> Result<RecordBatch> {
302        debug!(?batch);
303
304        match self {
305            Self::Avro(schema) => schema.as_arrow(partition, batch, lake_type),
306            Self::Json(schema) => schema.as_arrow(partition, batch, lake_type),
307            Self::Proto(schema) => schema.as_arrow(partition, batch, lake_type),
308        }
309    }
310}
311
312impl AsJsonValue for Schema {
313    fn as_json_value(&self, batch: &Batch) -> Result<Value> {
314        debug!(?batch);
315
316        match self {
317            Self::Avro(schema) => schema.as_json_value(batch),
318            Self::Json(schema) => schema.as_json_value(batch),
319            Self::Proto(schema) => schema.as_json_value(batch),
320        }
321    }
322}
323
324impl Generator for Schema {
325    fn generate(&self) -> Result<tansu_sans_io::record::Builder> {
326        match self {
327            Schema::Avro(schema) => schema.generate(),
328            Schema::Json(schema) => schema.generate(),
329            Schema::Proto(schema) => schema.generate(),
330        }
331    }
332}
333
334type SchemaCache = Arc<Mutex<BTreeMap<String, CachedSchema>>>;
335
336// Schema Registry
337#[derive(Clone, Debug)]
338pub struct Registry {
339    object_store: Arc<DynObjectStore>,
340    schemas: SchemaCache,
341    cache_expiry_after: Option<Duration>,
342}
343
344// Schema Registry builder
345#[derive(Clone, Debug)]
346pub struct Builder {
347    object_store: Arc<DynObjectStore>,
348    cache_expiry_after: Option<Duration>,
349}
350
351impl TryFrom<&Url> for Builder {
352    type Error = Error;
353
354    fn try_from(storage: &Url) -> Result<Self, Self::Error> {
355        debug!(%storage);
356
357        match storage.scheme() {
358            "s3" => {
359                let bucket_name = storage.host_str().unwrap_or("schema");
360
361                AmazonS3Builder::from_env()
362                    .with_bucket_name(bucket_name)
363                    .build()
364                    .map_err(Into::into)
365                    .map(Self::new)
366            }
367
368            "file" => {
369                let mut path = env::current_dir().inspect(|current_dir| debug!(?current_dir))?;
370
371                if let Some(domain) = storage.domain() {
372                    path.push(domain);
373                }
374
375                if let Some(relative) = storage.path().strip_prefix("/") {
376                    path.push(relative);
377                } else {
378                    path.push(storage.path());
379                }
380
381                debug!(?path);
382
383                LocalFileSystem::new_with_prefix(path)
384                    .map_err(Into::into)
385                    .map(Self::new)
386            }
387
388            "memory" => Ok(Self::new(InMemory::new())),
389
390            _unsupported => Err(Error::UnsupportedSchemaRegistryUrl(storage.to_owned())),
391        }
392    }
393}
394
395impl From<Builder> for Registry {
396    fn from(builder: Builder) -> Self {
397        Self {
398            object_store: builder.object_store,
399            schemas: Arc::new(Mutex::new(BTreeMap::new())),
400            cache_expiry_after: builder.cache_expiry_after,
401        }
402    }
403}
404
405impl Builder {
406    pub fn new(object_store: impl ObjectStore) -> Self {
407        Self {
408            object_store: Arc::new(object_store),
409            cache_expiry_after: None,
410        }
411    }
412
413    pub fn with_cache_expiry_after(self, cache_expiry_after: Option<Duration>) -> Self {
414        Self {
415            cache_expiry_after,
416            ..self
417        }
418    }
419
420    pub fn build(self) -> Registry {
421        Registry::from(self)
422    }
423}
424
425pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
426    global::meter_with_scope(
427        InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
428            .with_version(env!("CARGO_PKG_VERSION"))
429            .with_schema_url(SCHEMA_URL)
430            .build(),
431    )
432});
433
434static VALIDATION_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
435    METER
436        .u64_histogram("registry_validation_duration")
437        .with_unit("ms")
438        .with_description("The registry validation request latencies in milliseconds")
439        .build()
440});
441
442static VALIDATION_ERROR: LazyLock<Counter<u64>> = LazyLock::new(|| {
443    METER
444        .u64_counter("registry_validation_error")
445        .with_description("The registry validation error count")
446        .build()
447});
448
449static AS_ARROW_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
450    METER
451        .u64_histogram("registry_as_arrow_duration")
452        .with_unit("ms")
453        .with_description("The registry as Apache Arrow latencies in milliseconds")
454        .build()
455});
456
457impl Registry {
458    pub fn new(object_store: impl ObjectStore) -> Self {
459        Builder::new(object_store).build()
460    }
461
462    pub fn builder(object_store: impl ObjectStore) -> Builder {
463        Builder::new(object_store)
464    }
465
466    pub fn builder_try_from_url(url: &Url) -> Result<Builder> {
467        Builder::try_from(url)
468    }
469
470    pub fn as_arrow(
471        &self,
472        topic: &str,
473        partition: i32,
474        batch: &Batch,
475        lake_type: LakeHouseType,
476    ) -> Result<Option<RecordBatch>> {
477        debug!(topic, partition, ?batch);
478
479        let start = SystemTime::now();
480
481        self.schemas
482            .lock()
483            .map_err(Into::into)
484            .and_then(|guard| {
485                guard
486                    .get(topic)
487                    .map(|cached| cached.schema.as_arrow(partition, batch, lake_type))
488                    .transpose()
489            })
490            .inspect(|record_batch| {
491                debug!(
492                    rows = record_batch
493                        .as_ref()
494                        .map(|record_batch| record_batch.num_rows())
495                );
496                AS_ARROW_DURATION.record(
497                    start
498                        .elapsed()
499                        .map_or(0, |duration| duration.as_millis() as u64),
500                    &[KeyValue::new("topic", topic.to_owned())],
501                )
502            })
503            .inspect_err(|err| debug!(?err))
504    }
505
506    pub async fn schema(&self, topic: &str) -> Result<Option<Schema>> {
507        debug!(?topic);
508
509        let proto = Path::from(format!("{topic}.proto"));
510        let json = Path::from(format!("{topic}.json"));
511        let avro = Path::from(format!("{topic}.avsc"));
512
513        if let Some(cached) = self.schemas.lock().map(|guard| guard.get(topic).cloned())? {
514            if self.cache_expiry_after.is_some_and(|cache_expiry_after| {
515                SystemTime::now()
516                    .duration_since(cached.loaded_at)
517                    .unwrap_or_default()
518                    > cache_expiry_after
519            }) {
520                return Ok(Some(cached.schema));
521            } else {
522                debug!(cache_expiry = topic);
523            }
524        }
525
526        if let Ok(get_result) = self
527            .object_store
528            .get(&proto)
529            .await
530            .inspect(|get_result| debug!(?get_result))
531            .inspect_err(|err| debug!(?err))
532        {
533            get_result
534                .bytes()
535                .await
536                .map_err(Into::into)
537                .and_then(proto::Schema::try_from)
538                .map(Box::new)
539                .map(Schema::Proto)
540                .and_then(|schema| {
541                    self.schemas
542                        .lock()
543                        .map_err(Into::into)
544                        .map(|mut guard| {
545                            guard.insert(topic.to_owned(), CachedSchema::new(schema.clone()))
546                        })
547                        .and(Ok(Some(schema)))
548                })
549        } else if let Ok(get_result) = self.object_store.get(&json).await {
550            get_result
551                .bytes()
552                .await
553                .map_err(Into::into)
554                .and_then(json::Schema::try_from)
555                .map(Arc::new)
556                .map(Schema::Json)
557                .and_then(|schema| {
558                    self.schemas
559                        .lock()
560                        .map_err(Into::into)
561                        .map(|mut guard| {
562                            guard.insert(topic.to_owned(), CachedSchema::new(schema.clone()))
563                        })
564                        .and(Ok(Some(schema)))
565                })
566        } else if let Ok(get_result) = self.object_store.get(&avro).await {
567            get_result
568                .bytes()
569                .await
570                .map_err(Into::into)
571                .and_then(avro::Schema::try_from)
572                .map(Box::new)
573                .map(Schema::Avro)
574                .and_then(|schema| {
575                    self.schemas
576                        .lock()
577                        .map_err(Into::into)
578                        .map(|mut guard| {
579                            guard.insert(topic.to_owned(), CachedSchema::new(schema.clone()))
580                        })
581                        .and(Ok(Some(schema)))
582                })
583        } else {
584            Ok(None)
585        }
586    }
587
588    pub async fn validate(&self, topic: &str, batch: &Batch) -> Result<()> {
589        debug!(%topic, ?batch);
590
591        let validation_start = SystemTime::now();
592
593        let Some(schema) = self.schema(topic).await? else {
594            debug!(no_schema_for_topic = %topic);
595            return Ok(());
596        };
597
598        schema
599            .validate(batch)
600            .inspect(|_| {
601                VALIDATION_DURATION.record(
602                    validation_start
603                        .elapsed()
604                        .map_or(0, |duration| duration.as_millis() as u64),
605                    &[KeyValue::new("topic", topic.to_owned())],
606                )
607            })
608            .inspect_err(|err| {
609                VALIDATION_ERROR.add(
610                    1,
611                    &[
612                        KeyValue::new("topic", topic.to_owned()),
613                        KeyValue::new("reason", err.to_string()),
614                    ],
615                )
616            })
617    }
618}
619
620#[cfg(test)]
621mod tests {
622    use super::*;
623    use crate::Result;
624    use bytes::Bytes;
625    use object_store::PutPayload;
626    use serde_json::json;
627    use std::{fs::File, sync::Arc, thread};
628    use tansu_sans_io::record::Record;
629    use tracing::{error, subscriber::DefaultGuard};
630    use tracing_subscriber::EnvFilter;
631
632    fn init_tracing() -> Result<DefaultGuard> {
633        Ok(tracing::subscriber::set_default(
634            tracing_subscriber::fmt()
635                .with_level(true)
636                .with_line_number(true)
637                .with_thread_names(false)
638                .with_env_filter(
639                    EnvFilter::from_default_env()
640                        .add_directive(format!("{}=debug", env!("CARGO_CRATE_NAME")).parse()?),
641                )
642                .with_writer(
643                    thread::current()
644                        .name()
645                        .ok_or(Error::Message(String::from("unnamed thread")))
646                        .and_then(|name| {
647                            File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
648                                .map_err(Into::into)
649                        })
650                        .map(Arc::new)?,
651                )
652                .finish(),
653        ))
654    }
655
656    const DEF_PROTO: &[u8] = br#"
657      syntax = 'proto3';
658
659      message Key {
660        int32 id = 1;
661      }
662
663      message Value {
664        string name = 1;
665        string email = 2;
666      }
667    "#;
668
669    const PQR_AVRO: &[u8] = br#"
670        {
671            "type": "record",
672            "name": "test",
673            "fields": [
674                {"name": "a", "type": "long", "default": 42},
675                {"name": "b", "type": "string"},
676                {"name": "c", "type": "long", "default": 43}
677            ]
678        }
679    "#;
680
681    async fn populate() -> Result<Registry> {
682        let _guard = init_tracing()?;
683
684        let object_store = InMemory::new();
685
686        let location = Path::from("abc.json");
687        let payload = serde_json::to_vec(&json!({
688            "type": "object",
689            "properties": {
690                "key": {
691                    "type": "number",
692                    "multipleOf": 10
693                }
694            }
695        }))
696        .map(Bytes::from)
697        .map(PutPayload::from)?;
698
699        _ = object_store.put(&location, payload).await?;
700
701        let location = Path::from("def.proto");
702        let payload = PutPayload::from(Bytes::from_static(DEF_PROTO));
703        _ = object_store.put(&location, payload).await?;
704
705        let location = Path::from("pqr.avsc");
706        let payload = PutPayload::from(Bytes::from_static(PQR_AVRO));
707        _ = object_store.put(&location, payload).await?;
708
709        Ok(Registry::new(object_store))
710    }
711
712    #[tokio::test]
713    async fn abc_valid() -> Result<()> {
714        let _guard = init_tracing()?;
715
716        let registry = populate().await?;
717
718        let key = Bytes::from_static(b"5450");
719
720        let batch = Batch::builder()
721            .record(Record::builder().key(key.clone().into()))
722            .build()?;
723
724        registry.validate("abc", &batch).await?;
725
726        Ok(())
727    }
728
729    #[tokio::test]
730    async fn abc_invalid() -> Result<()> {
731        let _guard = init_tracing()?;
732        let registry = populate().await?;
733
734        let key = Bytes::from_static(b"545");
735
736        let batch = Batch::builder()
737            .record(Record::builder().key(key.clone().into()))
738            .build()?;
739
740        assert!(matches!(
741            registry
742                .validate("abc", &batch)
743                .await
744                .inspect_err(|err| error!(?err)),
745            Err(Error::Api(ErrorCode::InvalidRecord))
746        ));
747
748        Ok(())
749    }
750
751    #[tokio::test]
752    async fn pqr_valid() -> Result<()> {
753        let _guard = init_tracing()?;
754        let registry = populate().await?;
755
756        let key = Bytes::from_static(b"5450");
757
758        let batch = Batch::builder()
759            .record(Record::builder().key(key.clone().into()))
760            .build()?;
761
762        registry.validate("pqr", &batch).await?;
763
764        Ok(())
765    }
766}