1use std::{
21 collections::BTreeMap,
22 env::{self},
23 io,
24 num::TryFromIntError,
25 result,
26 string::FromUtf8Error,
27 sync::{Arc, LazyLock, Mutex, PoisonError},
28 time::SystemTime,
29};
30
31use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
32use bytes::Bytes;
33use datafusion::error::DataFusionError;
34use deltalake::DeltaTableError;
35use governor::InsufficientCapacity;
36use iceberg::spec::DataFileBuilderError;
37use jsonschema::ValidationError;
38use object_store::{
39 DynObjectStore, ObjectStore, aws::AmazonS3Builder, local::LocalFileSystem, memory::InMemory,
40 path::Path,
41};
42use opentelemetry::{
43 InstrumentationScope, KeyValue, global,
44 metrics::{Counter, Histogram, Meter},
45};
46use opentelemetry_semantic_conventions::SCHEMA_URL;
47use parquet::errors::ParquetError;
48use rhai::EvalAltResult;
49use serde_json::Value;
50use tansu_sans_io::{ErrorCode, record::inflated::Batch};
51use tracing::{debug, error};
52use tracing_subscriber::filter::ParseError;
53use url::Url;
54
55use crate::lake::LakeHouseType;
56
57pub mod avro;
58pub mod json;
59pub mod lake;
60pub mod proto;
61pub(crate) mod sql;
62
63pub(crate) const ARROW_LIST_FIELD_NAME: &str = "element";
64
65#[derive(thiserror::Error, Debug)]
67pub enum Error {
68 #[error("{:?}", self)]
69 Anyhow(#[from] anyhow::Error),
70
71 #[error("{:?}", self)]
72 Api(ErrorCode),
73
74 #[error("{:?}", self)]
75 Arrow(#[from] ArrowError),
76
77 #[error("{:?}", self)]
78 Avro(Box<apache_avro::Error>),
79
80 #[error("{:?}", self)]
81 AvroToJson(apache_avro::types::Value),
82
83 #[error("{:?}", self)]
84 BadDowncast { field: String },
85
86 #[error("{:?}", self)]
87 EvalAlt(#[from] Box<EvalAltResult>),
88
89 #[error("{:?}", self)]
90 BuilderExhausted,
91
92 #[error("{:?}", self)]
93 ChronoParse(#[from] chrono::ParseError),
94
95 #[error("{:?}", self)]
96 DataFileBuilder(#[from] DataFileBuilderError),
97
98 #[error("{:?}", self)]
99 DataFusion(#[from] DataFusionError),
100
101 #[error("{:?}", self)]
102 DeltaTable(#[from] DeltaTableError),
103
104 #[error("{:?}", self)]
105 Downcast,
106
107 #[error("{:?}", self)]
108 FromUtf8(#[from] FromUtf8Error),
109
110 #[error("{:?}", self)]
111 Iceberg(#[from] ::iceberg::Error),
112
113 #[error("{:?}", self)]
114 InvalidValue(apache_avro::types::Value),
115
116 #[error("{:?}", self)]
117 InsufficientCapacity(#[from] InsufficientCapacity),
118
119 #[error("{:?}", self)]
120 Io(#[from] io::Error),
121
122 #[error("{:?}", self)]
123 JsonToAvro(Box<apache_avro::Schema>, Box<Value>),
124
125 #[error("field: {field}, not found in: {value} with schema: {schema}")]
126 JsonToAvroFieldNotFound {
127 schema: Box<apache_avro::Schema>,
128 value: Box<Value>,
129 field: String,
130 },
131
132 #[error("{:?}", self)]
133 KafkaSansIo(#[from] tansu_sans_io::Error),
134
135 #[error("{:?}", self)]
136 Message(String),
137
138 #[error("{:?}", self)]
139 NoCommonType(Vec<DataType>),
140
141 #[error("{:?}", self)]
142 ObjectStore(#[from] object_store::Error),
143
144 #[error("{:?}", self)]
145 Parquet(#[from] ParquetError),
146
147 #[error("{:?}", self)]
148 ParseFilter(#[from] ParseError),
149
150 #[error("{:?}", self)]
151 ParseUrl(#[from] url::ParseError),
152
153 #[error("{:?}", self)]
154 Poison,
155
156 #[error("{:?}", self)]
157 ProtobufJsonMapping(#[from] protobuf_json_mapping::ParseError),
158
159 #[error("{:?}", self)]
160 ProtobufJsonMappingPrint(#[from] protobuf_json_mapping::PrintError),
161
162 #[error("{:?}", self)]
163 Protobuf(#[from] protobuf::Error),
164
165 #[error("{:?}", self)]
166 ProtobufFileDescriptorMissing(Bytes),
167
168 #[error("{:?}", self)]
169 SchemaValidation,
170
171 #[error("{:?}", self)]
172 SerdeJson(#[from] serde_json::Error),
173
174 #[error("{:?}", self)]
175 SqlParser(#[from] datafusion::logical_expr::sqlparser::parser::ParserError),
176
177 #[error("{:?}", self)]
178 TryFromInt(#[from] TryFromIntError),
179
180 #[error("{:?}", self)]
181 UnsupportedIcebergCatalogUrl(Url),
182
183 #[error("{:?}", self)]
184 UnsupportedLakeHouseUrl(Url),
185
186 #[error("{:?}", self)]
187 UnsupportedSchemaRegistryUrl(Url),
188
189 #[error("{:?}", self)]
190 UnsupportedSchemaRuntimeValue(DataType, Value),
191
192 #[error("{:?}", self)]
193 Uuid(#[from] uuid::Error),
194}
195
196impl From<apache_avro::Error> for Error {
197 fn from(value: apache_avro::Error) -> Self {
198 Self::Avro(Box::new(value))
199 }
200}
201
202impl<T> From<PoisonError<T>> for Error {
203 fn from(_value: PoisonError<T>) -> Self {
204 Self::Poison
205 }
206}
207
208impl From<ValidationError<'_>> for Error {
209 fn from(_value: ValidationError<'_>) -> Self {
210 Self::SchemaValidation
211 }
212}
213
214pub type Result<T, E = Error> = result::Result<T, E>;
215
216pub trait Validator {
218 fn validate(&self, batch: &Batch) -> Result<()>;
219}
220
221pub trait AsArrow {
223 fn as_arrow(
224 &self,
225 partition: i32,
226 batch: &Batch,
227 lake_type: LakeHouseType,
228 ) -> Result<RecordBatch>;
229}
230
231pub trait AsKafkaRecord {
233 fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder>;
234}
235
236pub trait AsJsonValue {
238 fn as_json_value(&self, batch: &Batch) -> Result<Value>;
239}
240
241pub trait Generator {
243 fn generate(&self) -> Result<tansu_sans_io::record::Builder>;
244}
245
246#[derive(Clone, Debug)]
250pub enum Schema {
251 Avro(Box<avro::Schema>),
252 Json(Arc<json::Schema>),
253 Proto(Box<proto::Schema>),
254}
255
256impl AsKafkaRecord for Schema {
257 fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder> {
258 debug!(?value);
259
260 match self {
261 Self::Avro(schema) => schema.as_kafka_record(value),
262 Self::Json(schema) => schema.as_kafka_record(value),
263 Self::Proto(schema) => schema.as_kafka_record(value),
264 }
265 }
266}
267
268impl Validator for Schema {
269 fn validate(&self, batch: &Batch) -> Result<()> {
270 debug!(?batch);
271
272 match self {
273 Self::Avro(schema) => schema.validate(batch),
274 Self::Json(schema) => schema.validate(batch),
275 Self::Proto(schema) => schema.validate(batch),
276 }
277 }
278}
279
280impl AsArrow for Schema {
281 fn as_arrow(
282 &self,
283 partition: i32,
284 batch: &Batch,
285 lake_type: LakeHouseType,
286 ) -> Result<RecordBatch> {
287 debug!(?batch);
288
289 match self {
290 Self::Avro(schema) => schema.as_arrow(partition, batch, lake_type),
291 Self::Json(schema) => schema.as_arrow(partition, batch, lake_type),
292 Self::Proto(schema) => schema.as_arrow(partition, batch, lake_type),
293 }
294 }
295}
296
297impl AsJsonValue for Schema {
298 fn as_json_value(&self, batch: &Batch) -> Result<Value> {
299 debug!(?batch);
300
301 match self {
302 Self::Avro(schema) => schema.as_json_value(batch),
303 Self::Json(schema) => schema.as_json_value(batch),
304 Self::Proto(schema) => schema.as_json_value(batch),
305 }
306 }
307}
308
309impl Generator for Schema {
310 fn generate(&self) -> Result<tansu_sans_io::record::Builder> {
311 match self {
312 Schema::Avro(schema) => schema.generate(),
313 Schema::Json(schema) => schema.generate(),
314 Schema::Proto(schema) => schema.generate(),
315 }
316 }
317}
318
319#[derive(Clone, Debug)]
321pub struct Registry {
322 object_store: Arc<DynObjectStore>,
323 schemas: Arc<Mutex<BTreeMap<String, Schema>>>,
324 validation_duration: Histogram<u64>,
325 validation_error: Counter<u64>,
326 as_arrow_duration: Histogram<u64>,
327}
328
329pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
330 global::meter_with_scope(
331 InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
332 .with_version(env!("CARGO_PKG_VERSION"))
333 .with_schema_url(SCHEMA_URL)
334 .build(),
335 )
336});
337
338impl Registry {
339 pub fn new(storage: impl ObjectStore) -> Self {
340 Self {
341 object_store: Arc::new(storage),
342 schemas: Arc::new(Mutex::new(BTreeMap::new())),
343 validation_duration: METER
344 .u64_histogram("registry_validation_duration")
345 .with_unit("ms")
346 .with_description("The registry validation request latencies in milliseconds")
347 .build(),
348 validation_error: METER
349 .u64_counter("registry_validation_error")
350 .with_description("The registry validation error count")
351 .build(),
352 as_arrow_duration: METER
353 .u64_histogram("registry_as_arrow_duration")
354 .with_unit("ms")
355 .with_description("The registry as Apache Arrow latencies in milliseconds")
356 .build(),
357 }
358 }
359
360 pub fn as_arrow(
361 &self,
362 topic: &str,
363 partition: i32,
364 batch: &Batch,
365 lake_type: LakeHouseType,
366 ) -> Result<Option<RecordBatch>> {
367 debug!(topic, partition, ?batch);
368
369 let start = SystemTime::now();
370
371 self.schemas
372 .lock()
373 .map_err(Into::into)
374 .and_then(|guard| {
375 guard
376 .get(topic)
377 .map(|schema| schema.as_arrow(partition, batch, lake_type))
378 .transpose()
379 })
380 .inspect(|record_batch| {
381 debug!(
382 rows = record_batch
383 .as_ref()
384 .map(|record_batch| record_batch.num_rows())
385 );
386 self.as_arrow_duration.record(
387 start
388 .elapsed()
389 .map_or(0, |duration| duration.as_millis() as u64),
390 &[KeyValue::new("topic", topic.to_owned())],
391 )
392 })
393 .inspect_err(|err| debug!(?err))
394 }
395
396 pub async fn schema(&self, topic: &str) -> Result<Option<Schema>> {
397 debug!(?topic);
398
399 let proto = Path::from(format!("{topic}.proto"));
400 let json = Path::from(format!("{topic}.json"));
401 let avro = Path::from(format!("{topic}.avsc"));
402
403 if let Some(schema) = self.schemas.lock().map(|guard| guard.get(topic).cloned())? {
404 Ok(Some(schema))
405 } else if let Ok(get_result) = self
406 .object_store
407 .get(&proto)
408 .await
409 .inspect(|get_result| debug!(?get_result))
410 .inspect_err(|err| debug!(?err))
411 {
412 get_result
413 .bytes()
414 .await
415 .map_err(Into::into)
416 .and_then(proto::Schema::try_from)
417 .map(Box::new)
418 .map(Schema::Proto)
419 .and_then(|schema| {
420 self.schemas
421 .lock()
422 .map_err(Into::into)
423 .map(|mut guard| guard.insert(topic.to_owned(), schema.clone()))
424 .and(Ok(Some(schema)))
425 })
426 } else if let Ok(get_result) = self.object_store.get(&json).await {
427 get_result
428 .bytes()
429 .await
430 .map_err(Into::into)
431 .and_then(json::Schema::try_from)
432 .map(Arc::new)
433 .map(Schema::Json)
434 .and_then(|schema| {
435 self.schemas
436 .lock()
437 .map_err(Into::into)
438 .map(|mut guard| guard.insert(topic.to_owned(), schema.clone()))
439 .and(Ok(Some(schema)))
440 })
441 } else if let Ok(get_result) = self.object_store.get(&avro).await {
442 get_result
443 .bytes()
444 .await
445 .map_err(Into::into)
446 .and_then(avro::Schema::try_from)
447 .map(Box::new)
448 .map(Schema::Avro)
449 .and_then(|schema| {
450 self.schemas
451 .lock()
452 .map_err(Into::into)
453 .map(|mut guard| guard.insert(topic.to_owned(), schema.clone()))
454 .and(Ok(Some(schema)))
455 })
456 } else {
457 Ok(None)
458 }
459 }
460
461 pub async fn validate(&self, topic: &str, batch: &Batch) -> Result<()> {
462 debug!(%topic, ?batch);
463
464 let validation_start = SystemTime::now();
465
466 let Some(schema) = self.schema(topic).await? else {
467 debug!(no_schema_for_topic = %topic);
468 return Ok(());
469 };
470
471 schema
472 .validate(batch)
473 .inspect(|_| {
474 self.validation_duration.record(
475 validation_start
476 .elapsed()
477 .map_or(0, |duration| duration.as_millis() as u64),
478 &[KeyValue::new("topic", topic.to_owned())],
479 )
480 })
481 .inspect_err(|err| {
482 self.validation_error.add(
483 1,
484 &[
485 KeyValue::new("topic", topic.to_owned()),
486 KeyValue::new("reason", err.to_string()),
487 ],
488 )
489 })
490 }
491}
492
493impl TryFrom<Url> for Registry {
494 type Error = Error;
495
496 fn try_from(storage: Url) -> Result<Self, Self::Error> {
497 Self::try_from(&storage)
498 }
499}
500
501impl TryFrom<&Url> for Registry {
502 type Error = Error;
503
504 fn try_from(storage: &Url) -> Result<Self, Self::Error> {
505 debug!(%storage);
506
507 match storage.scheme() {
508 "s3" => {
509 let bucket_name = storage.host_str().unwrap_or("schema");
510
511 AmazonS3Builder::from_env()
512 .with_bucket_name(bucket_name)
513 .build()
514 .map_err(Into::into)
515 .map(Registry::new)
516 }
517
518 "file" => {
519 let mut path = env::current_dir().inspect(|current_dir| debug!(?current_dir))?;
520
521 if let Some(domain) = storage.domain() {
522 path.push(domain);
523 }
524
525 if let Some(relative) = storage.path().strip_prefix("/") {
526 path.push(relative);
527 } else {
528 path.push(storage.path());
529 }
530
531 debug!(?path);
532
533 LocalFileSystem::new_with_prefix(path)
534 .map_err(Into::into)
535 .map(Registry::new)
536 }
537
538 "memory" => Ok(Registry::new(InMemory::new())),
539
540 _unsupported => Err(Error::UnsupportedSchemaRegistryUrl(storage.to_owned())),
541 }
542 }
543}
544
545#[cfg(test)]
546mod tests {
547 use super::*;
548 use crate::Result;
549 use bytes::Bytes;
550 use object_store::PutPayload;
551 use serde_json::json;
552 use std::{fs::File, sync::Arc, thread};
553 use tansu_sans_io::record::Record;
554 use tracing::{error, subscriber::DefaultGuard};
555 use tracing_subscriber::EnvFilter;
556
557 fn init_tracing() -> Result<DefaultGuard> {
558 Ok(tracing::subscriber::set_default(
559 tracing_subscriber::fmt()
560 .with_level(true)
561 .with_line_number(true)
562 .with_thread_names(false)
563 .with_env_filter(
564 EnvFilter::from_default_env()
565 .add_directive(format!("{}=debug", env!("CARGO_CRATE_NAME")).parse()?),
566 )
567 .with_writer(
568 thread::current()
569 .name()
570 .ok_or(Error::Message(String::from("unnamed thread")))
571 .and_then(|name| {
572 File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
573 .map_err(Into::into)
574 })
575 .map(Arc::new)?,
576 )
577 .finish(),
578 ))
579 }
580
581 const DEF_PROTO: &[u8] = br#"
582 syntax = 'proto3';
583
584 message Key {
585 int32 id = 1;
586 }
587
588 message Value {
589 string name = 1;
590 string email = 2;
591 }
592 "#;
593
594 const PQR_AVRO: &[u8] = br#"
595 {
596 "type": "record",
597 "name": "test",
598 "fields": [
599 {"name": "a", "type": "long", "default": 42},
600 {"name": "b", "type": "string"},
601 {"name": "c", "type": "long", "default": 43}
602 ]
603 }
604 "#;
605
606 async fn populate() -> Result<Registry> {
607 let _guard = init_tracing()?;
608
609 let object_store = InMemory::new();
610
611 let location = Path::from("abc.json");
612 let payload = serde_json::to_vec(&json!({
613 "type": "object",
614 "properties": {
615 "key": {
616 "type": "number",
617 "multipleOf": 10
618 }
619 }
620 }))
621 .map(Bytes::from)
622 .map(PutPayload::from)?;
623
624 _ = object_store.put(&location, payload).await?;
625
626 let location = Path::from("def.proto");
627 let payload = PutPayload::from(Bytes::from_static(DEF_PROTO));
628 _ = object_store.put(&location, payload).await?;
629
630 let location = Path::from("pqr.avsc");
631 let payload = PutPayload::from(Bytes::from_static(PQR_AVRO));
632 _ = object_store.put(&location, payload).await?;
633
634 Ok(Registry::new(object_store))
635 }
636
637 #[tokio::test]
638 async fn abc_valid() -> Result<()> {
639 let _guard = init_tracing()?;
640
641 let registry = populate().await?;
642
643 let key = Bytes::from_static(b"5450");
644
645 let batch = Batch::builder()
646 .record(Record::builder().key(key.clone().into()))
647 .build()?;
648
649 registry.validate("abc", &batch).await?;
650
651 Ok(())
652 }
653
654 #[tokio::test]
655 async fn abc_invalid() -> Result<()> {
656 let _guard = init_tracing()?;
657 let registry = populate().await?;
658
659 let key = Bytes::from_static(b"545");
660
661 let batch = Batch::builder()
662 .record(Record::builder().key(key.clone().into()))
663 .build()?;
664
665 assert!(matches!(
666 registry
667 .validate("abc", &batch)
668 .await
669 .inspect_err(|err| error!(?err)),
670 Err(Error::Api(ErrorCode::InvalidRecord))
671 ));
672
673 Ok(())
674 }
675
676 #[tokio::test]
677 async fn pqr_valid() -> Result<()> {
678 let _guard = init_tracing()?;
679 let registry = populate().await?;
680
681 let key = Bytes::from_static(b"5450");
682
683 let batch = Batch::builder()
684 .record(Record::builder().key(key.clone().into()))
685 .build()?;
686
687 registry.validate("pqr", &batch).await?;
688
689 Ok(())
690 }
691}