1use std::{
21 collections::BTreeMap,
22 env::{self},
23 io,
24 num::TryFromIntError,
25 result,
26 string::FromUtf8Error,
27 sync::{Arc, LazyLock, Mutex, PoisonError},
28 time::{Duration, SystemTime},
29};
30
31use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
32use bytes::Bytes;
33use datafusion::error::DataFusionError;
34use deltalake::DeltaTableError;
35use governor::InsufficientCapacity;
36use iceberg::spec::DataFileBuilderError;
37use jsonschema::ValidationError;
38use object_store::{
39 DynObjectStore, ObjectStore, aws::AmazonS3Builder, local::LocalFileSystem, memory::InMemory,
40 path::Path,
41};
42use opentelemetry::{
43 InstrumentationScope, KeyValue, global,
44 metrics::{Counter, Histogram, Meter},
45};
46use opentelemetry_semantic_conventions::SCHEMA_URL;
47use parquet::errors::ParquetError;
48use rhai::EvalAltResult;
49use serde_json::Value;
50use tansu_sans_io::{ErrorCode, record::inflated::Batch};
51use tracing::{debug, error};
52use tracing_subscriber::filter::ParseError;
53use url::Url;
54
55use crate::lake::LakeHouseType;
56
57pub mod avro;
58pub mod json;
59pub mod lake;
60pub mod proto;
61pub(crate) mod sql;
62
63pub(crate) const ARROW_LIST_FIELD_NAME: &str = "element";
64
65#[derive(thiserror::Error, Debug)]
67pub enum Error {
68 #[error("{:?}", self)]
69 Anyhow(#[from] anyhow::Error),
70
71 #[error("{:?}", self)]
72 Api(ErrorCode),
73
74 #[error("{:?}", self)]
75 Arrow(#[from] ArrowError),
76
77 #[error("{:?}", self)]
78 Avro(Box<apache_avro::Error>),
79
80 #[error("{:?}", self)]
81 AvroToJson(apache_avro::types::Value),
82
83 #[error("{:?}", self)]
84 BadDowncast { field: String },
85
86 #[error("{:?}", self)]
87 EvalAlt(#[from] Box<EvalAltResult>),
88
89 #[error("{:?}", self)]
90 BuilderExhausted,
91
92 #[error("{:?}", self)]
93 ChronoParse(#[from] chrono::ParseError),
94
95 #[error("{:?}", self)]
96 DataFileBuilder(#[from] DataFileBuilderError),
97
98 #[error("{:?}", self)]
99 DataFusion(#[from] DataFusionError),
100
101 #[error("{:?}", self)]
102 DeltaTable(#[from] DeltaTableError),
103
104 #[error("{:?}", self)]
105 Downcast,
106
107 #[error("{:?}", self)]
108 FromUtf8(#[from] FromUtf8Error),
109
110 #[error("{:?}", self)]
111 Iceberg(#[from] ::iceberg::Error),
112
113 #[error("{:?}", self)]
114 InvalidValue(apache_avro::types::Value),
115
116 #[error("{:?}", self)]
117 InsufficientCapacity(#[from] InsufficientCapacity),
118
119 #[error("{:?}", self)]
120 Io(#[from] io::Error),
121
122 #[error("{:?}", self)]
123 JsonToAvro(Box<apache_avro::Schema>, Box<Value>),
124
125 #[error("field: {field}, not found in: {value} with schema: {schema}")]
126 JsonToAvroFieldNotFound {
127 schema: Box<apache_avro::Schema>,
128 value: Box<Value>,
129 field: String,
130 },
131
132 #[error("{:?}", self)]
133 KafkaSansIo(#[from] tansu_sans_io::Error),
134
135 #[error("{:?}", self)]
136 Message(String),
137
138 #[error("{:?}", self)]
139 NoCommonType(Vec<DataType>),
140
141 #[error("{:?}", self)]
142 ObjectStore(#[from] object_store::Error),
143
144 #[error("{:?}", self)]
145 Parquet(#[from] ParquetError),
146
147 #[error("{:?}", self)]
148 ParseFilter(#[from] ParseError),
149
150 #[error("{:?}", self)]
151 ParseUrl(#[from] url::ParseError),
152
153 #[error("{:?}", self)]
154 Poison,
155
156 #[error("{:?}", self)]
157 ProtobufJsonMapping(#[from] protobuf_json_mapping::ParseError),
158
159 #[error("{:?}", self)]
160 ProtobufJsonMappingPrint(#[from] protobuf_json_mapping::PrintError),
161
162 #[error("{:?}", self)]
163 Protobuf(#[from] protobuf::Error),
164
165 #[error("{:?}", self)]
166 ProtobufFileDescriptorMissing(Bytes),
167
168 #[error("{:?}", self)]
169 SchemaValidation,
170
171 #[error("{:?}", self)]
172 SerdeJson(#[from] serde_json::Error),
173
174 #[error("{:?}", self)]
175 SqlParser(#[from] datafusion::logical_expr::sqlparser::parser::ParserError),
176
177 #[error("{:?}", self)]
178 TryFromInt(#[from] TryFromIntError),
179
180 #[error("{:?}", self)]
181 UnsupportedIcebergCatalogUrl(Url),
182
183 #[error("{:?}", self)]
184 UnsupportedLakeHouseUrl(Url),
185
186 #[error("{:?}", self)]
187 UnsupportedSchemaRegistryUrl(Url),
188
189 #[error("{:?}", self)]
190 UnsupportedSchemaRuntimeValue(DataType, Value),
191
192 #[error("{:?}", self)]
193 Uuid(#[from] uuid::Error),
194}
195
196impl From<apache_avro::Error> for Error {
197 fn from(value: apache_avro::Error) -> Self {
198 Self::Avro(Box::new(value))
199 }
200}
201
202impl<T> From<PoisonError<T>> for Error {
203 fn from(_value: PoisonError<T>) -> Self {
204 Self::Poison
205 }
206}
207
208impl From<ValidationError<'_>> for Error {
209 fn from(_value: ValidationError<'_>) -> Self {
210 Self::SchemaValidation
211 }
212}
213
214pub type Result<T, E = Error> = result::Result<T, E>;
215
216pub trait Validator {
218 fn validate(&self, batch: &Batch) -> Result<()>;
219}
220
221pub trait AsArrow {
223 fn as_arrow(
224 &self,
225 partition: i32,
226 batch: &Batch,
227 lake_type: LakeHouseType,
228 ) -> Result<RecordBatch>;
229}
230
231pub trait AsKafkaRecord {
233 fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder>;
234}
235
236pub trait AsJsonValue {
238 fn as_json_value(&self, batch: &Batch) -> Result<Value>;
239}
240
241pub trait Generator {
243 fn generate(&self) -> Result<tansu_sans_io::record::Builder>;
244}
245
246#[derive(Clone, Debug)]
250pub enum Schema {
251 Avro(Box<avro::Schema>),
252 Json(Arc<json::Schema>),
253 Proto(Box<proto::Schema>),
254}
255
256#[derive(Clone, Debug)]
257struct CachedSchema {
258 loaded_at: SystemTime,
259 schema: Schema,
260}
261
262impl CachedSchema {
263 fn new(schema: Schema) -> Self {
264 Self {
265 schema,
266 loaded_at: SystemTime::now(),
267 }
268 }
269}
270
271impl AsKafkaRecord for Schema {
272 fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder> {
273 debug!(?value);
274
275 match self {
276 Self::Avro(schema) => schema.as_kafka_record(value),
277 Self::Json(schema) => schema.as_kafka_record(value),
278 Self::Proto(schema) => schema.as_kafka_record(value),
279 }
280 }
281}
282
283impl Validator for Schema {
284 fn validate(&self, batch: &Batch) -> Result<()> {
285 debug!(?batch);
286
287 match self {
288 Self::Avro(schema) => schema.validate(batch),
289 Self::Json(schema) => schema.validate(batch),
290 Self::Proto(schema) => schema.validate(batch),
291 }
292 }
293}
294
295impl AsArrow for Schema {
296 fn as_arrow(
297 &self,
298 partition: i32,
299 batch: &Batch,
300 lake_type: LakeHouseType,
301 ) -> Result<RecordBatch> {
302 debug!(?batch);
303
304 match self {
305 Self::Avro(schema) => schema.as_arrow(partition, batch, lake_type),
306 Self::Json(schema) => schema.as_arrow(partition, batch, lake_type),
307 Self::Proto(schema) => schema.as_arrow(partition, batch, lake_type),
308 }
309 }
310}
311
312impl AsJsonValue for Schema {
313 fn as_json_value(&self, batch: &Batch) -> Result<Value> {
314 debug!(?batch);
315
316 match self {
317 Self::Avro(schema) => schema.as_json_value(batch),
318 Self::Json(schema) => schema.as_json_value(batch),
319 Self::Proto(schema) => schema.as_json_value(batch),
320 }
321 }
322}
323
324impl Generator for Schema {
325 fn generate(&self) -> Result<tansu_sans_io::record::Builder> {
326 match self {
327 Schema::Avro(schema) => schema.generate(),
328 Schema::Json(schema) => schema.generate(),
329 Schema::Proto(schema) => schema.generate(),
330 }
331 }
332}
333
334type SchemaCache = Arc<Mutex<BTreeMap<String, CachedSchema>>>;
335
336#[derive(Clone, Debug)]
338pub struct Registry {
339 object_store: Arc<DynObjectStore>,
340 schemas: SchemaCache,
341 cache_expiry_after: Option<Duration>,
342}
343
344#[derive(Clone, Debug)]
346pub struct Builder {
347 object_store: Arc<DynObjectStore>,
348 cache_expiry_after: Option<Duration>,
349}
350
351impl TryFrom<&Url> for Builder {
352 type Error = Error;
353
354 fn try_from(storage: &Url) -> Result<Self, Self::Error> {
355 debug!(%storage);
356
357 match storage.scheme() {
358 "s3" => {
359 let bucket_name = storage.host_str().unwrap_or("schema");
360
361 AmazonS3Builder::from_env()
362 .with_bucket_name(bucket_name)
363 .build()
364 .map_err(Into::into)
365 .map(Self::new)
366 }
367
368 "file" => {
369 let mut path = env::current_dir().inspect(|current_dir| debug!(?current_dir))?;
370
371 if let Some(domain) = storage.domain() {
372 path.push(domain);
373 }
374
375 if let Some(relative) = storage.path().strip_prefix("/") {
376 path.push(relative);
377 } else {
378 path.push(storage.path());
379 }
380
381 debug!(?path);
382
383 LocalFileSystem::new_with_prefix(path)
384 .map_err(Into::into)
385 .map(Self::new)
386 }
387
388 "memory" => Ok(Self::new(InMemory::new())),
389
390 _unsupported => Err(Error::UnsupportedSchemaRegistryUrl(storage.to_owned())),
391 }
392 }
393}
394
395impl From<Builder> for Registry {
396 fn from(builder: Builder) -> Self {
397 Self {
398 object_store: builder.object_store,
399 schemas: Arc::new(Mutex::new(BTreeMap::new())),
400 cache_expiry_after: builder.cache_expiry_after,
401 }
402 }
403}
404
405impl Builder {
406 pub fn new(object_store: impl ObjectStore) -> Self {
407 Self {
408 object_store: Arc::new(object_store),
409 cache_expiry_after: None,
410 }
411 }
412
413 pub fn with_cache_expiry_after(self, cache_expiry_after: Option<Duration>) -> Self {
414 Self {
415 cache_expiry_after,
416 ..self
417 }
418 }
419
420 pub fn build(self) -> Registry {
421 Registry::from(self)
422 }
423}
424
425pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
426 global::meter_with_scope(
427 InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
428 .with_version(env!("CARGO_PKG_VERSION"))
429 .with_schema_url(SCHEMA_URL)
430 .build(),
431 )
432});
433
434static VALIDATION_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
435 METER
436 .u64_histogram("registry_validation_duration")
437 .with_unit("ms")
438 .with_description("The registry validation request latencies in milliseconds")
439 .build()
440});
441
442static VALIDATION_ERROR: LazyLock<Counter<u64>> = LazyLock::new(|| {
443 METER
444 .u64_counter("registry_validation_error")
445 .with_description("The registry validation error count")
446 .build()
447});
448
449static AS_ARROW_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
450 METER
451 .u64_histogram("registry_as_arrow_duration")
452 .with_unit("ms")
453 .with_description("The registry as Apache Arrow latencies in milliseconds")
454 .build()
455});
456
457impl Registry {
458 pub fn new(object_store: impl ObjectStore) -> Self {
459 Builder::new(object_store).build()
460 }
461
462 pub fn builder(object_store: impl ObjectStore) -> Builder {
463 Builder::new(object_store)
464 }
465
466 pub fn builder_try_from_url(url: &Url) -> Result<Builder> {
467 Builder::try_from(url)
468 }
469
470 pub fn as_arrow(
471 &self,
472 topic: &str,
473 partition: i32,
474 batch: &Batch,
475 lake_type: LakeHouseType,
476 ) -> Result<Option<RecordBatch>> {
477 debug!(topic, partition, ?batch);
478
479 let start = SystemTime::now();
480
481 self.schemas
482 .lock()
483 .map_err(Into::into)
484 .and_then(|guard| {
485 guard
486 .get(topic)
487 .map(|cached| cached.schema.as_arrow(partition, batch, lake_type))
488 .transpose()
489 })
490 .inspect(|record_batch| {
491 debug!(
492 rows = record_batch
493 .as_ref()
494 .map(|record_batch| record_batch.num_rows())
495 );
496 AS_ARROW_DURATION.record(
497 start
498 .elapsed()
499 .map_or(0, |duration| duration.as_millis() as u64),
500 &[KeyValue::new("topic", topic.to_owned())],
501 )
502 })
503 .inspect_err(|err| debug!(?err))
504 }
505
506 pub async fn schema(&self, topic: &str) -> Result<Option<Schema>> {
507 debug!(?topic);
508
509 let proto = Path::from(format!("{topic}.proto"));
510 let json = Path::from(format!("{topic}.json"));
511 let avro = Path::from(format!("{topic}.avsc"));
512
513 if let Some(cached) = self.schemas.lock().map(|guard| guard.get(topic).cloned())? {
514 if self.cache_expiry_after.is_some_and(|cache_expiry_after| {
515 SystemTime::now()
516 .duration_since(cached.loaded_at)
517 .unwrap_or_default()
518 > cache_expiry_after
519 }) {
520 return Ok(Some(cached.schema));
521 } else {
522 debug!(cache_expiry = topic);
523 }
524 }
525
526 if let Ok(get_result) = self
527 .object_store
528 .get(&proto)
529 .await
530 .inspect(|get_result| debug!(?get_result))
531 .inspect_err(|err| debug!(?err))
532 {
533 get_result
534 .bytes()
535 .await
536 .map_err(Into::into)
537 .and_then(proto::Schema::try_from)
538 .map(Box::new)
539 .map(Schema::Proto)
540 .and_then(|schema| {
541 self.schemas
542 .lock()
543 .map_err(Into::into)
544 .map(|mut guard| {
545 guard.insert(topic.to_owned(), CachedSchema::new(schema.clone()))
546 })
547 .and(Ok(Some(schema)))
548 })
549 } else if let Ok(get_result) = self.object_store.get(&json).await {
550 get_result
551 .bytes()
552 .await
553 .map_err(Into::into)
554 .and_then(json::Schema::try_from)
555 .map(Arc::new)
556 .map(Schema::Json)
557 .and_then(|schema| {
558 self.schemas
559 .lock()
560 .map_err(Into::into)
561 .map(|mut guard| {
562 guard.insert(topic.to_owned(), CachedSchema::new(schema.clone()))
563 })
564 .and(Ok(Some(schema)))
565 })
566 } else if let Ok(get_result) = self.object_store.get(&avro).await {
567 get_result
568 .bytes()
569 .await
570 .map_err(Into::into)
571 .and_then(avro::Schema::try_from)
572 .map(Box::new)
573 .map(Schema::Avro)
574 .and_then(|schema| {
575 self.schemas
576 .lock()
577 .map_err(Into::into)
578 .map(|mut guard| {
579 guard.insert(topic.to_owned(), CachedSchema::new(schema.clone()))
580 })
581 .and(Ok(Some(schema)))
582 })
583 } else {
584 Ok(None)
585 }
586 }
587
588 pub async fn validate(&self, topic: &str, batch: &Batch) -> Result<()> {
589 debug!(%topic, ?batch);
590
591 let validation_start = SystemTime::now();
592
593 let Some(schema) = self.schema(topic).await? else {
594 debug!(no_schema_for_topic = %topic);
595 return Ok(());
596 };
597
598 schema
599 .validate(batch)
600 .inspect(|_| {
601 VALIDATION_DURATION.record(
602 validation_start
603 .elapsed()
604 .map_or(0, |duration| duration.as_millis() as u64),
605 &[KeyValue::new("topic", topic.to_owned())],
606 )
607 })
608 .inspect_err(|err| {
609 VALIDATION_ERROR.add(
610 1,
611 &[
612 KeyValue::new("topic", topic.to_owned()),
613 KeyValue::new("reason", err.to_string()),
614 ],
615 )
616 })
617 }
618}
619
620#[cfg(test)]
621mod tests {
622 use super::*;
623 use crate::Result;
624 use bytes::Bytes;
625 use object_store::PutPayload;
626 use serde_json::json;
627 use std::{fs::File, sync::Arc, thread};
628 use tansu_sans_io::record::Record;
629 use tracing::{error, subscriber::DefaultGuard};
630 use tracing_subscriber::EnvFilter;
631
632 fn init_tracing() -> Result<DefaultGuard> {
633 Ok(tracing::subscriber::set_default(
634 tracing_subscriber::fmt()
635 .with_level(true)
636 .with_line_number(true)
637 .with_thread_names(false)
638 .with_env_filter(
639 EnvFilter::from_default_env()
640 .add_directive(format!("{}=debug", env!("CARGO_CRATE_NAME")).parse()?),
641 )
642 .with_writer(
643 thread::current()
644 .name()
645 .ok_or(Error::Message(String::from("unnamed thread")))
646 .and_then(|name| {
647 File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
648 .map_err(Into::into)
649 })
650 .map(Arc::new)?,
651 )
652 .finish(),
653 ))
654 }
655
656 const DEF_PROTO: &[u8] = br#"
657 syntax = 'proto3';
658
659 message Key {
660 int32 id = 1;
661 }
662
663 message Value {
664 string name = 1;
665 string email = 2;
666 }
667 "#;
668
669 const PQR_AVRO: &[u8] = br#"
670 {
671 "type": "record",
672 "name": "test",
673 "fields": [
674 {"name": "a", "type": "long", "default": 42},
675 {"name": "b", "type": "string"},
676 {"name": "c", "type": "long", "default": 43}
677 ]
678 }
679 "#;
680
681 async fn populate() -> Result<Registry> {
682 let _guard = init_tracing()?;
683
684 let object_store = InMemory::new();
685
686 let location = Path::from("abc.json");
687 let payload = serde_json::to_vec(&json!({
688 "type": "object",
689 "properties": {
690 "key": {
691 "type": "number",
692 "multipleOf": 10
693 }
694 }
695 }))
696 .map(Bytes::from)
697 .map(PutPayload::from)?;
698
699 _ = object_store.put(&location, payload).await?;
700
701 let location = Path::from("def.proto");
702 let payload = PutPayload::from(Bytes::from_static(DEF_PROTO));
703 _ = object_store.put(&location, payload).await?;
704
705 let location = Path::from("pqr.avsc");
706 let payload = PutPayload::from(Bytes::from_static(PQR_AVRO));
707 _ = object_store.put(&location, payload).await?;
708
709 Ok(Registry::new(object_store))
710 }
711
712 #[tokio::test]
713 async fn abc_valid() -> Result<()> {
714 let _guard = init_tracing()?;
715
716 let registry = populate().await?;
717
718 let key = Bytes::from_static(b"5450");
719
720 let batch = Batch::builder()
721 .record(Record::builder().key(key.clone().into()))
722 .build()?;
723
724 registry.validate("abc", &batch).await?;
725
726 Ok(())
727 }
728
729 #[tokio::test]
730 async fn abc_invalid() -> Result<()> {
731 let _guard = init_tracing()?;
732 let registry = populate().await?;
733
734 let key = Bytes::from_static(b"545");
735
736 let batch = Batch::builder()
737 .record(Record::builder().key(key.clone().into()))
738 .build()?;
739
740 assert!(matches!(
741 registry
742 .validate("abc", &batch)
743 .await
744 .inspect_err(|err| error!(?err)),
745 Err(Error::Api(ErrorCode::InvalidRecord))
746 ));
747
748 Ok(())
749 }
750
751 #[tokio::test]
752 async fn pqr_valid() -> Result<()> {
753 let _guard = init_tracing()?;
754 let registry = populate().await?;
755
756 let key = Bytes::from_static(b"5450");
757
758 let batch = Batch::builder()
759 .record(Record::builder().key(key.clone().into()))
760 .build()?;
761
762 registry.validate("pqr", &batch).await?;
763
764 Ok(())
765 }
766}