Skip to main content

tansu_schema/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Schema
16//!
17//! Schema includes the following:
18//! - Validation of Kafka messages with an AVRO, JSON or Protobuf schema
19
20use std::{
21    collections::BTreeMap,
22    env::{self},
23    fmt::{self, Display, Formatter},
24    io,
25    num::TryFromIntError,
26    result,
27    str::FromStr,
28    string::FromUtf8Error,
29    sync::{Arc, LazyLock, Mutex, PoisonError},
30    time::{Duration, SystemTime},
31};
32
33#[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
34use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
35
36use bytes::Bytes;
37
38#[cfg(any(feature = "iceberg", feature = "delta"))]
39use datafusion::error::DataFusionError;
40
41#[cfg(feature = "delta")]
42use deltalake::DeltaTableError;
43
44use governor::InsufficientCapacity;
45
46#[cfg(feature = "iceberg")]
47use iceberg::spec::DataFileBuilderError;
48
49use jsonschema::ValidationError;
50use object_store::{
51    DynObjectStore, ObjectStore, ObjectStoreExt, aws::AmazonS3Builder, local::LocalFileSystem,
52    memory::InMemory, path::Path,
53};
54use opentelemetry::{
55    InstrumentationScope, KeyValue, global,
56    metrics::{Counter, Histogram, Meter},
57};
58use opentelemetry_semantic_conventions::SCHEMA_URL;
59
60#[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
61use parquet::errors::ParquetError;
62
63use rhai::EvalAltResult;
64use serde_json::Value;
65use tansu_sans_io::{ErrorCode, record::inflated::Batch};
66use tracing::{debug, instrument};
67use tracing_subscriber::filter::ParseError;
68use url::Url;
69
70pub mod avro;
71pub mod json;
72pub mod lake;
73pub mod proto;
74
75#[cfg(feature = "delta")]
76pub(crate) mod sql;
77
78pub(crate) const ARROW_LIST_FIELD_NAME: &str = "element";
79
80/// Error
81#[derive(thiserror::Error, Debug)]
82pub enum Error {
83    Anyhow(#[from] anyhow::Error),
84
85    Api(ErrorCode),
86
87    #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
88    Arrow(#[from] ArrowError),
89
90    Avro(Box<apache_avro::Error>),
91
92    AvroToJson(apache_avro::types::Value),
93
94    BadDowncast {
95        field: String,
96    },
97
98    EvalAlt(#[from] Box<EvalAltResult>),
99
100    BuilderExhausted,
101
102    ChronoParse(#[from] chrono::ParseError),
103
104    #[cfg(feature = "iceberg")]
105    DataFileBuilder(#[from] DataFileBuilderError),
106
107    #[cfg(any(feature = "iceberg", feature = "delta"))]
108    DataFusion(Box<DataFusionError>),
109
110    #[cfg(feature = "delta")]
111    DeltaTable(Box<DeltaTableError>),
112
113    Downcast,
114
115    FromUtf8(#[from] FromUtf8Error),
116
117    #[cfg(feature = "iceberg")]
118    Iceberg(Box<::iceberg::Error>),
119
120    InvalidValue(apache_avro::types::Value),
121
122    InsufficientCapacity(#[from] InsufficientCapacity),
123
124    Io(#[from] io::Error),
125
126    JsonToAvro(Box<apache_avro::Schema>, Box<Value>),
127
128    JsonToAvroFieldNotFound {
129        schema: Box<apache_avro::Schema>,
130        value: Box<Value>,
131        field: String,
132    },
133
134    KafkaSansIo(#[from] tansu_sans_io::Error),
135
136    Message(String),
137
138    #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
139    NoCommonType(Vec<DataType>),
140
141    ObjectStore(#[from] object_store::Error),
142
143    #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
144    Parquet(#[from] ParquetError),
145
146    ParseFilter(#[from] ParseError),
147
148    ParseUrl(#[from] url::ParseError),
149
150    Poison,
151
152    ProtobufJsonMapping(#[from] protobuf_json_mapping::ParseError),
153
154    ProtobufJsonMappingPrint(#[from] protobuf_json_mapping::PrintError),
155
156    Protobuf(#[from] protobuf::Error),
157
158    ProtobufFileDescriptorMissing(Bytes),
159
160    SchemaValidation,
161
162    SerdeJson(#[from] serde_json::Error),
163
164    #[cfg(any(feature = "iceberg", feature = "delta"))]
165    SqlParser(#[from] datafusion::logical_expr::sqlparser::parser::ParserError),
166
167    TopicWithoutSchema(String),
168
169    TryFromInt(#[from] TryFromIntError),
170
171    UnsupportedIcebergCatalogUrl(Url),
172
173    UnsupportedLakeHouseUrl(Url),
174
175    UnsupportedSchemaRegistryUrl(Url),
176
177    #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
178    UnsupportedSchemaRuntimeValue(DataType, Value),
179
180    Uuid(#[from] uuid::Error),
181}
182
183impl Display for Error {
184    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
185        write!(f, "{self:?}")
186    }
187}
188
189#[cfg(any(feature = "iceberg", feature = "delta"))]
190impl From<DataFusionError> for Error {
191    fn from(value: DataFusionError) -> Self {
192        Self::DataFusion(Box::new(value))
193    }
194}
195
196#[cfg(feature = "iceberg")]
197impl From<::iceberg::Error> for Error {
198    fn from(value: ::iceberg::Error) -> Self {
199        Self::Iceberg(Box::new(value))
200    }
201}
202
203#[cfg(feature = "delta")]
204impl From<DeltaTableError> for Error {
205    fn from(value: DeltaTableError) -> Self {
206        Self::DeltaTable(Box::new(value))
207    }
208}
209
210impl From<apache_avro::Error> for Error {
211    fn from(value: apache_avro::Error) -> Self {
212        Self::Avro(Box::new(value))
213    }
214}
215
216impl<T> From<PoisonError<T>> for Error {
217    fn from(_value: PoisonError<T>) -> Self {
218        Self::Poison
219    }
220}
221
222impl From<ValidationError<'_>> for Error {
223    fn from(_value: ValidationError<'_>) -> Self {
224        Self::SchemaValidation
225    }
226}
227
228pub type Result<T, E = Error> = result::Result<T, E>;
229
230/// Validate a Batch with a Schema
231pub trait Validator {
232    fn validate(&self, batch: &Batch) -> Result<()>;
233}
234
235/// Represent a Batch in the Arrow columnar data format
236#[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
237trait AsArrow {
238    async fn as_arrow(
239        &self,
240        topic: &str,
241        partition: i32,
242        batch: &Batch,
243        lake_type: lake::LakeHouseType,
244    ) -> Result<RecordBatch>;
245}
246
247/// Convert a JSON message into a Kafka record
248pub trait AsKafkaRecord {
249    fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder>;
250}
251
252/// Convert a Batch into a JSON value
253pub trait AsJsonValue {
254    fn as_json_value(&self, batch: &Batch) -> Result<Value>;
255}
256
257/// Generate a record
258pub trait Generator {
259    fn generate(&self) -> Result<tansu_sans_io::record::Builder>;
260}
261
262// Schema
263//
264// This is wrapper enumeration of the supported schema types
265#[derive(Clone, Debug)]
266pub enum Schema {
267    Avro(Box<avro::Schema>),
268    Json(Arc<json::Schema>),
269    Proto(Box<proto::Schema>),
270}
271
272#[derive(Clone, Debug)]
273struct CachedSchema {
274    loaded_at: SystemTime,
275    schema: Schema,
276}
277
278impl CachedSchema {
279    fn new(schema: Schema) -> Self {
280        Self {
281            schema,
282            loaded_at: SystemTime::now(),
283        }
284    }
285}
286
287impl AsKafkaRecord for Schema {
288    fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder> {
289        debug!(?value);
290
291        match self {
292            Self::Avro(schema) => schema.as_kafka_record(value),
293            Self::Json(schema) => schema.as_kafka_record(value),
294            Self::Proto(schema) => schema.as_kafka_record(value),
295        }
296    }
297}
298
299impl Validator for Schema {
300    #[instrument(skip(self, batch), ret)]
301    fn validate(&self, batch: &Batch) -> Result<()> {
302        match self {
303            Self::Avro(schema) => schema.validate(batch),
304            Self::Json(schema) => schema.validate(batch),
305            Self::Proto(schema) => schema.validate(batch),
306        }
307    }
308}
309
310#[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
311impl AsArrow for Schema {
312    #[instrument(skip(self, batch), ret)]
313    async fn as_arrow(
314        &self,
315        topic: &str,
316        partition: i32,
317        batch: &Batch,
318        lake_type: lake::LakeHouseType,
319    ) -> Result<RecordBatch> {
320        match self {
321            Self::Avro(schema) => schema.as_arrow(topic, partition, batch, lake_type).await,
322            Self::Json(schema) => schema.as_arrow(topic, partition, batch, lake_type).await,
323            Self::Proto(schema) => schema.as_arrow(topic, partition, batch, lake_type).await,
324        }
325    }
326}
327
328impl AsJsonValue for Schema {
329    #[instrument(skip(self, batch), ret)]
330    fn as_json_value(&self, batch: &Batch) -> Result<Value> {
331        debug!(?batch);
332
333        match self {
334            Self::Avro(schema) => schema.as_json_value(batch),
335            Self::Json(schema) => schema.as_json_value(batch),
336            Self::Proto(schema) => schema.as_json_value(batch),
337        }
338    }
339}
340
341impl Generator for Schema {
342    fn generate(&self) -> Result<tansu_sans_io::record::Builder> {
343        match self {
344            Schema::Avro(schema) => schema.generate(),
345            Schema::Json(schema) => schema.generate(),
346            Schema::Proto(schema) => schema.generate(),
347        }
348    }
349}
350
351type SchemaCache = Arc<Mutex<BTreeMap<String, CachedSchema>>>;
352
353// Schema Registry
354#[derive(Clone, Debug)]
355pub struct Registry {
356    object_store: Arc<DynObjectStore>,
357    schemas: SchemaCache,
358    cache_expiry_after: Option<Duration>,
359}
360
361impl FromStr for Registry {
362    type Err = Error;
363
364    fn from_str(s: &str) -> result::Result<Self, Self::Err> {
365        Url::parse(s)
366            .map_err(Into::into)
367            .and_then(|location| Builder::try_from(&location))
368            .map(Into::into)
369    }
370}
371
372// Schema Registry builder
373#[derive(Clone, Debug)]
374pub struct Builder {
375    object_store: Arc<DynObjectStore>,
376    cache_expiry_after: Option<Duration>,
377}
378
379impl TryFrom<&Url> for Builder {
380    type Error = Error;
381
382    fn try_from(storage: &Url) -> Result<Self, Self::Error> {
383        debug!(%storage);
384
385        match storage.scheme() {
386            "s3" => {
387                let bucket_name = storage.host_str().unwrap_or("schema");
388
389                AmazonS3Builder::from_env()
390                    .with_bucket_name(bucket_name)
391                    .build()
392                    .map_err(Into::into)
393                    .map(Self::new)
394            }
395
396            "file" => {
397                let mut path = env::current_dir().inspect(|current_dir| debug!(?current_dir))?;
398
399                if let Some(domain) = storage.domain() {
400                    path.push(domain);
401                }
402
403                if let Some(relative) = storage.path().strip_prefix("/") {
404                    path.push(relative);
405                } else {
406                    path.push(storage.path());
407                }
408
409                debug!(?path);
410
411                LocalFileSystem::new_with_prefix(path)
412                    .map_err(Into::into)
413                    .map(Self::new)
414            }
415
416            "memory" => Ok(Self::new(InMemory::new())),
417
418            _unsupported => Err(Error::UnsupportedSchemaRegistryUrl(storage.to_owned())),
419        }
420    }
421}
422
423impl From<Builder> for Registry {
424    fn from(builder: Builder) -> Self {
425        Self {
426            object_store: builder.object_store,
427            schemas: Arc::new(Mutex::new(BTreeMap::new())),
428            cache_expiry_after: builder.cache_expiry_after,
429        }
430    }
431}
432
433impl Builder {
434    pub fn new(object_store: impl ObjectStore) -> Self {
435        Self {
436            object_store: Arc::new(object_store),
437            cache_expiry_after: None,
438        }
439    }
440
441    pub fn with_cache_expiry_after(self, cache_expiry_after: Option<Duration>) -> Self {
442        Self {
443            cache_expiry_after,
444            ..self
445        }
446    }
447
448    pub fn build(self) -> Registry {
449        Registry::from(self)
450    }
451}
452
453pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
454    global::meter_with_scope(
455        InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
456            .with_version(env!("CARGO_PKG_VERSION"))
457            .with_schema_url(SCHEMA_URL)
458            .build(),
459    )
460});
461
462static VALIDATION_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
463    METER
464        .u64_histogram("registry_validation_duration")
465        .with_unit("ms")
466        .with_description("The registry validation request latencies in milliseconds")
467        .build()
468});
469
470static VALIDATION_ERROR: LazyLock<Counter<u64>> = LazyLock::new(|| {
471    METER
472        .u64_counter("registry_validation_error")
473        .with_description("The registry validation error count")
474        .build()
475});
476
477impl Registry {
478    pub fn new(object_store: impl ObjectStore) -> Self {
479        Builder::new(object_store).build()
480    }
481
482    pub fn builder(object_store: impl ObjectStore) -> Builder {
483        Builder::new(object_store)
484    }
485
486    pub fn builder_try_from_url(url: &Url) -> Result<Builder> {
487        Builder::try_from(url)
488    }
489
490    #[instrument(skip(self))]
491    pub async fn schema(&self, topic: &str) -> Result<Option<Schema>> {
492        let proto = Path::from(format!("{topic}.proto"));
493        let json = Path::from(format!("{topic}.json"));
494        let avro = Path::from(format!("{topic}.avsc"));
495
496        if let Some(cached) = self.schemas.lock().map(|guard| guard.get(topic).cloned())? {
497            if self.cache_expiry_after.is_some_and(|cache_expiry_after| {
498                SystemTime::now()
499                    .duration_since(cached.loaded_at)
500                    .unwrap_or_default()
501                    > cache_expiry_after
502            }) {
503                return Ok(Some(cached.schema));
504            } else {
505                debug!(cache_expiry = topic);
506            }
507        }
508
509        if let Ok(get_result) = self
510            .object_store
511            .get(&proto)
512            .await
513            .inspect(|get_result| debug!(?get_result))
514            .inspect_err(|err| debug!(?err))
515        {
516            get_result
517                .bytes()
518                .await
519                .map_err(Into::into)
520                .and_then(proto::Schema::try_from)
521                .map(Box::new)
522                .map(Schema::Proto)
523                .and_then(|schema| {
524                    self.schemas
525                        .lock()
526                        .map_err(Into::into)
527                        .map(|mut guard| {
528                            guard.insert(topic.to_owned(), CachedSchema::new(schema.clone()))
529                        })
530                        .and(Ok(Some(schema)))
531                })
532        } else if let Ok(get_result) = self.object_store.get(&json).await {
533            get_result
534                .bytes()
535                .await
536                .map_err(Into::into)
537                .and_then(json::Schema::try_from)
538                .map(Arc::new)
539                .map(Schema::Json)
540                .and_then(|schema| {
541                    self.schemas
542                        .lock()
543                        .map_err(Into::into)
544                        .map(|mut guard| {
545                            guard.insert(topic.to_owned(), CachedSchema::new(schema.clone()))
546                        })
547                        .and(Ok(Some(schema)))
548                })
549        } else if let Ok(get_result) = self.object_store.get(&avro).await {
550            get_result
551                .bytes()
552                .await
553                .map_err(Into::into)
554                .and_then(avro::Schema::try_from)
555                .map(Box::new)
556                .map(Schema::Avro)
557                .and_then(|schema| {
558                    self.schemas
559                        .lock()
560                        .map_err(Into::into)
561                        .map(|mut guard| {
562                            guard.insert(topic.to_owned(), CachedSchema::new(schema.clone()))
563                        })
564                        .and(Ok(Some(schema)))
565                })
566        } else {
567            Ok(None)
568        }
569    }
570
571    #[instrument(skip(self, batch))]
572    pub async fn validate(&self, topic: &str, batch: &Batch) -> Result<()> {
573        let validation_start = SystemTime::now();
574
575        let Some(schema) = self.schema(topic).await? else {
576            debug!(no_schema_for_topic = %topic);
577            return Ok(());
578        };
579
580        schema
581            .validate(batch)
582            .inspect(|_| {
583                VALIDATION_DURATION.record(
584                    validation_start
585                        .elapsed()
586                        .map_or(0, |duration| duration.as_millis() as u64),
587                    &[KeyValue::new("topic", topic.to_owned())],
588                )
589            })
590            .inspect_err(|err| {
591                VALIDATION_ERROR.add(
592                    1,
593                    &[
594                        KeyValue::new("topic", topic.to_owned()),
595                        KeyValue::new("reason", err.to_string()),
596                    ],
597                )
598            })
599    }
600}
601
602#[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
603impl AsArrow for Registry {
604    #[instrument(skip(self, batch), ret)]
605    async fn as_arrow(
606        &self,
607        topic: &str,
608        partition: i32,
609        batch: &Batch,
610        lake_type: lake::LakeHouseType,
611    ) -> Result<RecordBatch> {
612        let start = SystemTime::now();
613
614        let schema = self
615            .schema(topic)
616            .await
617            .and_then(|schema| schema.ok_or(Error::TopicWithoutSchema(topic.to_owned())))?;
618
619        schema
620            .as_arrow(topic, partition, batch, lake_type)
621            .await
622            .inspect(|_| {
623                lake::AS_ARROW_DURATION.record(
624                    start
625                        .elapsed()
626                        .map_or(0, |duration| duration.as_millis() as u64),
627                    &[KeyValue::new("topic", topic.to_owned())],
628                )
629            })
630            .inspect_err(|err| debug!(?err))
631    }
632}
633
634#[cfg(test)]
635mod tests {
636    use super::*;
637    use crate::Result;
638    use bytes::Bytes;
639    use object_store::PutPayload;
640    use serde_json::json;
641    use std::{fs::File, sync::Arc, thread};
642    use tansu_sans_io::record::Record;
643    use tracing::subscriber::DefaultGuard;
644    use tracing_subscriber::EnvFilter;
645
646    fn init_tracing() -> Result<DefaultGuard> {
647        Ok(tracing::subscriber::set_default(
648            tracing_subscriber::fmt()
649                .with_level(true)
650                .with_line_number(true)
651                .with_thread_names(false)
652                .with_env_filter(
653                    EnvFilter::from_default_env()
654                        .add_directive(format!("{}=debug", env!("CARGO_CRATE_NAME")).parse()?),
655                )
656                .with_writer(
657                    thread::current()
658                        .name()
659                        .ok_or(Error::Message(String::from("unnamed thread")))
660                        .and_then(|name| {
661                            File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
662                                .map_err(Into::into)
663                        })
664                        .map(Arc::new)?,
665                )
666                .finish(),
667        ))
668    }
669
670    const DEF_PROTO: &[u8] = br#"
671      syntax = 'proto3';
672
673      message Key {
674        int32 id = 1;
675      }
676
677      message Value {
678        string name = 1;
679        string email = 2;
680      }
681    "#;
682
683    const PQR_AVRO: &[u8] = br#"
684        {
685            "type": "record",
686            "name": "test",
687            "fields": [
688                {"name": "a", "type": "long", "default": 42},
689                {"name": "b", "type": "string"},
690                {"name": "c", "type": "long", "default": 43}
691            ]
692        }
693    "#;
694
695    async fn populate() -> Result<Registry> {
696        let _guard = init_tracing()?;
697
698        let object_store = InMemory::new();
699
700        let location = Path::from("abc.json");
701        let payload = serde_json::to_vec(&json!({
702            "type": "object",
703            "properties": {
704                "key": {
705                    "type": "number",
706                    "multipleOf": 10
707                }
708            }
709        }))
710        .map(Bytes::from)
711        .map(PutPayload::from)?;
712
713        _ = object_store.put(&location, payload).await?;
714
715        let location = Path::from("def.proto");
716        let payload = PutPayload::from(Bytes::from_static(DEF_PROTO));
717        _ = object_store.put(&location, payload).await?;
718
719        let location = Path::from("pqr.avsc");
720        let payload = PutPayload::from(Bytes::from_static(PQR_AVRO));
721        _ = object_store.put(&location, payload).await?;
722
723        Ok(Registry::new(object_store))
724    }
725
726    #[tokio::test]
727    async fn abc_valid() -> Result<()> {
728        let _guard = init_tracing()?;
729
730        let registry = populate().await?;
731
732        let key = Bytes::from_static(b"5450");
733
734        let batch = Batch::builder()
735            .record(Record::builder().key(key.clone().into()))
736            .build()?;
737
738        registry.validate("abc", &batch).await?;
739
740        Ok(())
741    }
742
743    #[tokio::test]
744    async fn abc_invalid() -> Result<()> {
745        let _guard = init_tracing()?;
746        let registry = populate().await?;
747
748        let key = Bytes::from_static(b"545");
749
750        let batch = Batch::builder()
751            .record(Record::builder().key(key.clone().into()))
752            .build()?;
753
754        assert!(matches!(
755            registry
756                .validate("abc", &batch)
757                .await
758                .inspect_err(|err| debug!(?err)),
759            Err(Error::Api(ErrorCode::InvalidRecord))
760        ));
761
762        Ok(())
763    }
764
765    #[tokio::test]
766    async fn pqr_valid() -> Result<()> {
767        let _guard = init_tracing()?;
768        let registry = populate().await?;
769
770        let key = Bytes::from_static(b"5450");
771
772        let batch = Batch::builder()
773            .record(Record::builder().key(key.clone().into()))
774            .build()?;
775
776        registry.validate("pqr", &batch).await?;
777
778        Ok(())
779    }
780
781    #[test]
782    fn error_size_of() -> Result<()> {
783        let _guard = init_tracing()?;
784
785        debug!(error = size_of::<Error>());
786        debug!(anyhow = size_of::<anyhow::Error>());
787        #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
788        debug!(arrow = size_of::<ArrowError>());
789        debug!(avro_to_json = size_of::<apache_avro::types::Value>());
790        #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
791        debug!(data_file_builder = size_of::<DataFileBuilderError>());
792        #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
793        debug!(data_fusion = size_of::<Box<DataFusionError>>());
794        #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
795        debug!(delta_table = size_of::<Box<DeltaTableError>>());
796        #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
797        debug!(iceberg = size_of::<Box<::iceberg::Error>>());
798        debug!(sans_io = size_of::<tansu_sans_io::Error>());
799        debug!(object_store = size_of::<object_store::Error>());
800
801        #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
802        debug!(parquet = size_of::<ParquetError>());
803
804        debug!(parse_filter = size_of::<ParseError>());
805        debug!(protobuf_json_mapping = size_of::<protobuf_json_mapping::ParseError>());
806        debug!(protobuf_json_mapping_print = size_of::<protobuf_json_mapping::PrintError>());
807        debug!(protobuf = size_of::<protobuf::Error>());
808        debug!(serde_json = size_of::<serde_json::Error>());
809        #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
810        debug!(sql_parser = size_of::<datafusion::logical_expr::sqlparser::parser::ParserError>());
811        debug!(try_from_int = size_of::<TryFromIntError>());
812        debug!(url = size_of::<Url>());
813        #[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
814        debug!(unsupported_schema_runtime_value = size_of::<(DataType, serde_json::Value)>());
815        debug!(uuid = size_of::<uuid::Error>());
816        Ok(())
817    }
818}