1use std::{
21 collections::BTreeMap,
22 env::{self},
23 fmt::{self, Display, Formatter},
24 io,
25 num::TryFromIntError,
26 result,
27 str::FromStr,
28 string::FromUtf8Error,
29 sync::{Arc, LazyLock, Mutex, PoisonError},
30 time::{Duration, SystemTime},
31};
32
33#[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
34use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
35
36use bytes::Bytes;
37
38#[cfg(any(feature = "iceberg", feature = "delta"))]
39use datafusion::error::DataFusionError;
40
41#[cfg(feature = "delta")]
42use deltalake::DeltaTableError;
43
44use governor::InsufficientCapacity;
45
46#[cfg(feature = "iceberg")]
47use iceberg::spec::DataFileBuilderError;
48
49use jsonschema::ValidationError;
50use object_store::{
51 DynObjectStore, ObjectStore, ObjectStoreExt, aws::AmazonS3Builder, local::LocalFileSystem,
52 memory::InMemory, path::Path,
53};
54use opentelemetry::{
55 InstrumentationScope, KeyValue, global,
56 metrics::{Counter, Histogram, Meter},
57};
58use opentelemetry_semantic_conventions::SCHEMA_URL;
59
60#[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
61use parquet::errors::ParquetError;
62
63use rhai::EvalAltResult;
64use serde_json::Value;
65use tansu_sans_io::{ErrorCode, record::inflated::Batch};
66use tracing::{debug, instrument};
67use tracing_subscriber::filter::ParseError;
68use url::Url;
69
70pub mod avro;
71pub mod json;
72pub mod lake;
73pub mod proto;
74
75#[cfg(feature = "delta")]
76pub(crate) mod sql;
77
78pub(crate) const ARROW_LIST_FIELD_NAME: &str = "element";
79
80#[derive(thiserror::Error, Debug)]
82pub enum Error {
83 Anyhow(#[from] anyhow::Error),
84
85 Api(ErrorCode),
86
87 #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
88 Arrow(#[from] ArrowError),
89
90 Avro(Box<apache_avro::Error>),
91
92 AvroToJson(apache_avro::types::Value),
93
94 BadDowncast {
95 field: String,
96 },
97
98 EvalAlt(#[from] Box<EvalAltResult>),
99
100 BuilderExhausted,
101
102 ChronoParse(#[from] chrono::ParseError),
103
104 #[cfg(feature = "iceberg")]
105 DataFileBuilder(#[from] DataFileBuilderError),
106
107 #[cfg(any(feature = "iceberg", feature = "delta"))]
108 DataFusion(Box<DataFusionError>),
109
110 #[cfg(feature = "delta")]
111 DeltaTable(Box<DeltaTableError>),
112
113 Downcast,
114
115 FromUtf8(#[from] FromUtf8Error),
116
117 #[cfg(feature = "iceberg")]
118 Iceberg(Box<::iceberg::Error>),
119
120 InvalidValue(apache_avro::types::Value),
121
122 InsufficientCapacity(#[from] InsufficientCapacity),
123
124 Io(#[from] io::Error),
125
126 JsonToAvro(Box<apache_avro::Schema>, Box<Value>),
127
128 JsonToAvroFieldNotFound {
129 schema: Box<apache_avro::Schema>,
130 value: Box<Value>,
131 field: String,
132 },
133
134 KafkaSansIo(#[from] tansu_sans_io::Error),
135
136 Message(String),
137
138 #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
139 NoCommonType(Vec<DataType>),
140
141 ObjectStore(#[from] object_store::Error),
142
143 #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
144 Parquet(#[from] ParquetError),
145
146 ParseFilter(#[from] ParseError),
147
148 ParseUrl(#[from] url::ParseError),
149
150 Poison,
151
152 ProtobufJsonMapping(#[from] protobuf_json_mapping::ParseError),
153
154 ProtobufJsonMappingPrint(#[from] protobuf_json_mapping::PrintError),
155
156 Protobuf(#[from] protobuf::Error),
157
158 ProtobufFileDescriptorMissing(Bytes),
159
160 SchemaValidation,
161
162 SerdeJson(#[from] serde_json::Error),
163
164 #[cfg(any(feature = "iceberg", feature = "delta"))]
165 SqlParser(#[from] datafusion::logical_expr::sqlparser::parser::ParserError),
166
167 TopicWithoutSchema(String),
168
169 TryFromInt(#[from] TryFromIntError),
170
171 UnsupportedIcebergCatalogUrl(Url),
172
173 UnsupportedLakeHouseUrl(Url),
174
175 UnsupportedSchemaRegistryUrl(Url),
176
177 #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
178 UnsupportedSchemaRuntimeValue(DataType, Value),
179
180 Uuid(#[from] uuid::Error),
181}
182
183impl Display for Error {
184 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
185 write!(f, "{self:?}")
186 }
187}
188
189#[cfg(any(feature = "iceberg", feature = "delta"))]
190impl From<DataFusionError> for Error {
191 fn from(value: DataFusionError) -> Self {
192 Self::DataFusion(Box::new(value))
193 }
194}
195
196#[cfg(feature = "iceberg")]
197impl From<::iceberg::Error> for Error {
198 fn from(value: ::iceberg::Error) -> Self {
199 Self::Iceberg(Box::new(value))
200 }
201}
202
203#[cfg(feature = "delta")]
204impl From<DeltaTableError> for Error {
205 fn from(value: DeltaTableError) -> Self {
206 Self::DeltaTable(Box::new(value))
207 }
208}
209
210impl From<apache_avro::Error> for Error {
211 fn from(value: apache_avro::Error) -> Self {
212 Self::Avro(Box::new(value))
213 }
214}
215
216impl<T> From<PoisonError<T>> for Error {
217 fn from(_value: PoisonError<T>) -> Self {
218 Self::Poison
219 }
220}
221
222impl From<ValidationError<'_>> for Error {
223 fn from(_value: ValidationError<'_>) -> Self {
224 Self::SchemaValidation
225 }
226}
227
228pub type Result<T, E = Error> = result::Result<T, E>;
229
230pub trait Validator {
232 fn validate(&self, batch: &Batch) -> Result<()>;
233}
234
235#[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
237trait AsArrow {
238 async fn as_arrow(
239 &self,
240 topic: &str,
241 partition: i32,
242 batch: &Batch,
243 lake_type: lake::LakeHouseType,
244 ) -> Result<RecordBatch>;
245}
246
247pub trait AsKafkaRecord {
249 fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder>;
250}
251
252pub trait AsJsonValue {
254 fn as_json_value(&self, batch: &Batch) -> Result<Value>;
255}
256
257pub trait Generator {
259 fn generate(&self) -> Result<tansu_sans_io::record::Builder>;
260}
261
262#[derive(Clone, Debug)]
266pub enum Schema {
267 Avro(Box<avro::Schema>),
268 Json(Arc<json::Schema>),
269 Proto(Box<proto::Schema>),
270}
271
272#[derive(Clone, Debug)]
273struct CachedSchema {
274 loaded_at: SystemTime,
275 schema: Schema,
276}
277
278impl CachedSchema {
279 fn new(schema: Schema) -> Self {
280 Self {
281 schema,
282 loaded_at: SystemTime::now(),
283 }
284 }
285}
286
287impl AsKafkaRecord for Schema {
288 fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder> {
289 debug!(?value);
290
291 match self {
292 Self::Avro(schema) => schema.as_kafka_record(value),
293 Self::Json(schema) => schema.as_kafka_record(value),
294 Self::Proto(schema) => schema.as_kafka_record(value),
295 }
296 }
297}
298
299impl Validator for Schema {
300 #[instrument(skip(self, batch), ret)]
301 fn validate(&self, batch: &Batch) -> Result<()> {
302 match self {
303 Self::Avro(schema) => schema.validate(batch),
304 Self::Json(schema) => schema.validate(batch),
305 Self::Proto(schema) => schema.validate(batch),
306 }
307 }
308}
309
310#[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
311impl AsArrow for Schema {
312 #[instrument(skip(self, batch), ret)]
313 async fn as_arrow(
314 &self,
315 topic: &str,
316 partition: i32,
317 batch: &Batch,
318 lake_type: lake::LakeHouseType,
319 ) -> Result<RecordBatch> {
320 match self {
321 Self::Avro(schema) => schema.as_arrow(topic, partition, batch, lake_type).await,
322 Self::Json(schema) => schema.as_arrow(topic, partition, batch, lake_type).await,
323 Self::Proto(schema) => schema.as_arrow(topic, partition, batch, lake_type).await,
324 }
325 }
326}
327
328impl AsJsonValue for Schema {
329 #[instrument(skip(self, batch), ret)]
330 fn as_json_value(&self, batch: &Batch) -> Result<Value> {
331 debug!(?batch);
332
333 match self {
334 Self::Avro(schema) => schema.as_json_value(batch),
335 Self::Json(schema) => schema.as_json_value(batch),
336 Self::Proto(schema) => schema.as_json_value(batch),
337 }
338 }
339}
340
341impl Generator for Schema {
342 fn generate(&self) -> Result<tansu_sans_io::record::Builder> {
343 match self {
344 Schema::Avro(schema) => schema.generate(),
345 Schema::Json(schema) => schema.generate(),
346 Schema::Proto(schema) => schema.generate(),
347 }
348 }
349}
350
351type SchemaCache = Arc<Mutex<BTreeMap<String, CachedSchema>>>;
352
353#[derive(Clone, Debug)]
355pub struct Registry {
356 object_store: Arc<DynObjectStore>,
357 schemas: SchemaCache,
358 cache_expiry_after: Option<Duration>,
359}
360
361impl FromStr for Registry {
362 type Err = Error;
363
364 fn from_str(s: &str) -> result::Result<Self, Self::Err> {
365 Url::parse(s)
366 .map_err(Into::into)
367 .and_then(|location| Builder::try_from(&location))
368 .map(Into::into)
369 }
370}
371
372#[derive(Clone, Debug)]
374pub struct Builder {
375 object_store: Arc<DynObjectStore>,
376 cache_expiry_after: Option<Duration>,
377}
378
379impl TryFrom<&Url> for Builder {
380 type Error = Error;
381
382 fn try_from(storage: &Url) -> Result<Self, Self::Error> {
383 debug!(%storage);
384
385 match storage.scheme() {
386 "s3" => {
387 let bucket_name = storage.host_str().unwrap_or("schema");
388
389 AmazonS3Builder::from_env()
390 .with_bucket_name(bucket_name)
391 .build()
392 .map_err(Into::into)
393 .map(Self::new)
394 }
395
396 "file" => {
397 let mut path = env::current_dir().inspect(|current_dir| debug!(?current_dir))?;
398
399 if let Some(domain) = storage.domain() {
400 path.push(domain);
401 }
402
403 if let Some(relative) = storage.path().strip_prefix("/") {
404 path.push(relative);
405 } else {
406 path.push(storage.path());
407 }
408
409 debug!(?path);
410
411 LocalFileSystem::new_with_prefix(path)
412 .map_err(Into::into)
413 .map(Self::new)
414 }
415
416 "memory" => Ok(Self::new(InMemory::new())),
417
418 _unsupported => Err(Error::UnsupportedSchemaRegistryUrl(storage.to_owned())),
419 }
420 }
421}
422
423impl From<Builder> for Registry {
424 fn from(builder: Builder) -> Self {
425 Self {
426 object_store: builder.object_store,
427 schemas: Arc::new(Mutex::new(BTreeMap::new())),
428 cache_expiry_after: builder.cache_expiry_after,
429 }
430 }
431}
432
433impl Builder {
434 pub fn new(object_store: impl ObjectStore) -> Self {
435 Self {
436 object_store: Arc::new(object_store),
437 cache_expiry_after: None,
438 }
439 }
440
441 pub fn with_cache_expiry_after(self, cache_expiry_after: Option<Duration>) -> Self {
442 Self {
443 cache_expiry_after,
444 ..self
445 }
446 }
447
448 pub fn build(self) -> Registry {
449 Registry::from(self)
450 }
451}
452
453pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
454 global::meter_with_scope(
455 InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
456 .with_version(env!("CARGO_PKG_VERSION"))
457 .with_schema_url(SCHEMA_URL)
458 .build(),
459 )
460});
461
462static VALIDATION_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
463 METER
464 .u64_histogram("registry_validation_duration")
465 .with_unit("ms")
466 .with_description("The registry validation request latencies in milliseconds")
467 .build()
468});
469
470static VALIDATION_ERROR: LazyLock<Counter<u64>> = LazyLock::new(|| {
471 METER
472 .u64_counter("registry_validation_error")
473 .with_description("The registry validation error count")
474 .build()
475});
476
477impl Registry {
478 pub fn new(object_store: impl ObjectStore) -> Self {
479 Builder::new(object_store).build()
480 }
481
482 pub fn builder(object_store: impl ObjectStore) -> Builder {
483 Builder::new(object_store)
484 }
485
486 pub fn builder_try_from_url(url: &Url) -> Result<Builder> {
487 Builder::try_from(url)
488 }
489
490 #[instrument(skip(self))]
491 pub async fn schema(&self, topic: &str) -> Result<Option<Schema>> {
492 let proto = Path::from(format!("{topic}.proto"));
493 let json = Path::from(format!("{topic}.json"));
494 let avro = Path::from(format!("{topic}.avsc"));
495
496 if let Some(cached) = self.schemas.lock().map(|guard| guard.get(topic).cloned())? {
497 if self.cache_expiry_after.is_some_and(|cache_expiry_after| {
498 SystemTime::now()
499 .duration_since(cached.loaded_at)
500 .unwrap_or_default()
501 > cache_expiry_after
502 }) {
503 return Ok(Some(cached.schema));
504 } else {
505 debug!(cache_expiry = topic);
506 }
507 }
508
509 if let Ok(get_result) = self
510 .object_store
511 .get(&proto)
512 .await
513 .inspect(|get_result| debug!(?get_result))
514 .inspect_err(|err| debug!(?err))
515 {
516 get_result
517 .bytes()
518 .await
519 .map_err(Into::into)
520 .and_then(proto::Schema::try_from)
521 .map(Box::new)
522 .map(Schema::Proto)
523 .and_then(|schema| {
524 self.schemas
525 .lock()
526 .map_err(Into::into)
527 .map(|mut guard| {
528 guard.insert(topic.to_owned(), CachedSchema::new(schema.clone()))
529 })
530 .and(Ok(Some(schema)))
531 })
532 } else if let Ok(get_result) = self.object_store.get(&json).await {
533 get_result
534 .bytes()
535 .await
536 .map_err(Into::into)
537 .and_then(json::Schema::try_from)
538 .map(Arc::new)
539 .map(Schema::Json)
540 .and_then(|schema| {
541 self.schemas
542 .lock()
543 .map_err(Into::into)
544 .map(|mut guard| {
545 guard.insert(topic.to_owned(), CachedSchema::new(schema.clone()))
546 })
547 .and(Ok(Some(schema)))
548 })
549 } else if let Ok(get_result) = self.object_store.get(&avro).await {
550 get_result
551 .bytes()
552 .await
553 .map_err(Into::into)
554 .and_then(avro::Schema::try_from)
555 .map(Box::new)
556 .map(Schema::Avro)
557 .and_then(|schema| {
558 self.schemas
559 .lock()
560 .map_err(Into::into)
561 .map(|mut guard| {
562 guard.insert(topic.to_owned(), CachedSchema::new(schema.clone()))
563 })
564 .and(Ok(Some(schema)))
565 })
566 } else {
567 Ok(None)
568 }
569 }
570
571 #[instrument(skip(self, batch))]
572 pub async fn validate(&self, topic: &str, batch: &Batch) -> Result<()> {
573 let validation_start = SystemTime::now();
574
575 let Some(schema) = self.schema(topic).await? else {
576 debug!(no_schema_for_topic = %topic);
577 return Ok(());
578 };
579
580 schema
581 .validate(batch)
582 .inspect(|_| {
583 VALIDATION_DURATION.record(
584 validation_start
585 .elapsed()
586 .map_or(0, |duration| duration.as_millis() as u64),
587 &[KeyValue::new("topic", topic.to_owned())],
588 )
589 })
590 .inspect_err(|err| {
591 VALIDATION_ERROR.add(
592 1,
593 &[
594 KeyValue::new("topic", topic.to_owned()),
595 KeyValue::new("reason", err.to_string()),
596 ],
597 )
598 })
599 }
600}
601
602#[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
603impl AsArrow for Registry {
604 #[instrument(skip(self, batch), ret)]
605 async fn as_arrow(
606 &self,
607 topic: &str,
608 partition: i32,
609 batch: &Batch,
610 lake_type: lake::LakeHouseType,
611 ) -> Result<RecordBatch> {
612 let start = SystemTime::now();
613
614 let schema = self
615 .schema(topic)
616 .await
617 .and_then(|schema| schema.ok_or(Error::TopicWithoutSchema(topic.to_owned())))?;
618
619 schema
620 .as_arrow(topic, partition, batch, lake_type)
621 .await
622 .inspect(|_| {
623 lake::AS_ARROW_DURATION.record(
624 start
625 .elapsed()
626 .map_or(0, |duration| duration.as_millis() as u64),
627 &[KeyValue::new("topic", topic.to_owned())],
628 )
629 })
630 .inspect_err(|err| debug!(?err))
631 }
632}
633
634#[cfg(test)]
635mod tests {
636 use super::*;
637 use crate::Result;
638 use bytes::Bytes;
639 use object_store::PutPayload;
640 use serde_json::json;
641 use std::{fs::File, sync::Arc, thread};
642 use tansu_sans_io::record::Record;
643 use tracing::subscriber::DefaultGuard;
644 use tracing_subscriber::EnvFilter;
645
646 fn init_tracing() -> Result<DefaultGuard> {
647 Ok(tracing::subscriber::set_default(
648 tracing_subscriber::fmt()
649 .with_level(true)
650 .with_line_number(true)
651 .with_thread_names(false)
652 .with_env_filter(
653 EnvFilter::from_default_env()
654 .add_directive(format!("{}=debug", env!("CARGO_CRATE_NAME")).parse()?),
655 )
656 .with_writer(
657 thread::current()
658 .name()
659 .ok_or(Error::Message(String::from("unnamed thread")))
660 .and_then(|name| {
661 File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
662 .map_err(Into::into)
663 })
664 .map(Arc::new)?,
665 )
666 .finish(),
667 ))
668 }
669
670 const DEF_PROTO: &[u8] = br#"
671 syntax = 'proto3';
672
673 message Key {
674 int32 id = 1;
675 }
676
677 message Value {
678 string name = 1;
679 string email = 2;
680 }
681 "#;
682
683 const PQR_AVRO: &[u8] = br#"
684 {
685 "type": "record",
686 "name": "test",
687 "fields": [
688 {"name": "a", "type": "long", "default": 42},
689 {"name": "b", "type": "string"},
690 {"name": "c", "type": "long", "default": 43}
691 ]
692 }
693 "#;
694
695 async fn populate() -> Result<Registry> {
696 let _guard = init_tracing()?;
697
698 let object_store = InMemory::new();
699
700 let location = Path::from("abc.json");
701 let payload = serde_json::to_vec(&json!({
702 "type": "object",
703 "properties": {
704 "key": {
705 "type": "number",
706 "multipleOf": 10
707 }
708 }
709 }))
710 .map(Bytes::from)
711 .map(PutPayload::from)?;
712
713 _ = object_store.put(&location, payload).await?;
714
715 let location = Path::from("def.proto");
716 let payload = PutPayload::from(Bytes::from_static(DEF_PROTO));
717 _ = object_store.put(&location, payload).await?;
718
719 let location = Path::from("pqr.avsc");
720 let payload = PutPayload::from(Bytes::from_static(PQR_AVRO));
721 _ = object_store.put(&location, payload).await?;
722
723 Ok(Registry::new(object_store))
724 }
725
726 #[tokio::test]
727 async fn abc_valid() -> Result<()> {
728 let _guard = init_tracing()?;
729
730 let registry = populate().await?;
731
732 let key = Bytes::from_static(b"5450");
733
734 let batch = Batch::builder()
735 .record(Record::builder().key(key.clone().into()))
736 .build()?;
737
738 registry.validate("abc", &batch).await?;
739
740 Ok(())
741 }
742
743 #[tokio::test]
744 async fn abc_invalid() -> Result<()> {
745 let _guard = init_tracing()?;
746 let registry = populate().await?;
747
748 let key = Bytes::from_static(b"545");
749
750 let batch = Batch::builder()
751 .record(Record::builder().key(key.clone().into()))
752 .build()?;
753
754 assert!(matches!(
755 registry
756 .validate("abc", &batch)
757 .await
758 .inspect_err(|err| debug!(?err)),
759 Err(Error::Api(ErrorCode::InvalidRecord))
760 ));
761
762 Ok(())
763 }
764
765 #[tokio::test]
766 async fn pqr_valid() -> Result<()> {
767 let _guard = init_tracing()?;
768 let registry = populate().await?;
769
770 let key = Bytes::from_static(b"5450");
771
772 let batch = Batch::builder()
773 .record(Record::builder().key(key.clone().into()))
774 .build()?;
775
776 registry.validate("pqr", &batch).await?;
777
778 Ok(())
779 }
780
781 #[test]
782 fn error_size_of() -> Result<()> {
783 let _guard = init_tracing()?;
784
785 debug!(error = size_of::<Error>());
786 debug!(anyhow = size_of::<anyhow::Error>());
787 #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
788 debug!(arrow = size_of::<ArrowError>());
789 debug!(avro_to_json = size_of::<apache_avro::types::Value>());
790 #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
791 debug!(data_file_builder = size_of::<DataFileBuilderError>());
792 #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
793 debug!(data_fusion = size_of::<Box<DataFusionError>>());
794 #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
795 debug!(delta_table = size_of::<Box<DeltaTableError>>());
796 #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
797 debug!(iceberg = size_of::<Box<::iceberg::Error>>());
798 debug!(sans_io = size_of::<tansu_sans_io::Error>());
799 debug!(object_store = size_of::<object_store::Error>());
800
801 #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
802 debug!(parquet = size_of::<ParquetError>());
803
804 debug!(parse_filter = size_of::<ParseError>());
805 debug!(protobuf_json_mapping = size_of::<protobuf_json_mapping::ParseError>());
806 debug!(protobuf_json_mapping_print = size_of::<protobuf_json_mapping::PrintError>());
807 debug!(protobuf = size_of::<protobuf::Error>());
808 debug!(serde_json = size_of::<serde_json::Error>());
809 #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
810 debug!(sql_parser = size_of::<datafusion::logical_expr::sqlparser::parser::ParserError>());
811 debug!(try_from_int = size_of::<TryFromIntError>());
812 debug!(url = size_of::<Url>());
813 #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
814 debug!(unsupported_schema_runtime_value = size_of::<(DataType, serde_json::Value)>());
815 debug!(uuid = size_of::<uuid::Error>());
816 Ok(())
817 }
818}