1pub mod iceberg_rest;
7pub use iceberg_rest::{
8 GenericRestCatalog, IcebergCatalogClient, IcebergTableId, LoadedIcebergTable, RestCatalogConfig,
9};
10
11#[cfg(feature = "glue-catalog")]
15pub mod glue_catalog;
16#[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
17pub mod iceberg_catalog_bridge;
18#[cfg(feature = "iceberg-datafusion")]
19pub mod iceberg_table_provider;
20#[cfg(feature = "local-catalog")]
21pub mod local_catalog;
22#[cfg(feature = "postgres-catalog")]
23pub mod postgres_catalog;
24#[cfg(feature = "rest-catalog")]
25pub mod rest_catalog_wrapper;
26#[cfg(feature = "local-catalog")]
27pub mod unified;
28#[cfg(feature = "unity-catalog")]
29pub mod unity_catalog;
30
31#[cfg(feature = "glue-catalog")]
32pub use glue_catalog::GlueCatalog;
33#[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
34pub use iceberg_catalog_bridge::IcebergCatalogBridge;
35#[cfg(feature = "local-catalog")]
36pub use local_catalog::LocalCatalog;
37#[cfg(feature = "postgres-catalog")]
38pub use postgres_catalog::PostgresCatalog;
39#[cfg(feature = "rest-catalog")]
40pub use rest_catalog_wrapper::KrishivRestCatalog;
41#[cfg(feature = "local-catalog")]
42pub use unified::KrishivCatalog;
43#[cfg(feature = "unity-catalog")]
44pub use unity_catalog::UnityCatalog;
45
46use std::collections::BTreeMap;
47use std::fmt;
48
49#[derive(Debug, thiserror::Error)]
55pub enum CatalogError {
56 #[error("table not found: '{name}'")]
58 TableNotFound { name: String },
59 #[error("table already exists: '{name}'")]
61 TableAlreadyExists { name: String },
62 #[error("schema not found: '{name}'")]
64 SchemaNotFound { name: String },
65 #[error("invalid schema: {message}")]
67 InvalidSchema { message: String },
68 #[error("invalid catalog configuration: {message}")]
70 InvalidConfiguration { message: String },
71 #[error("catalog transport error during {operation}: {message}")]
73 Transport { operation: String, message: String },
74 #[error("HTTP error {status}: {message}")]
76 Http { status: u16, message: String },
77 #[error("invalid catalog response during {operation}: {message}")]
79 InvalidResponse { operation: String, message: String },
80 #[error("catalog response during {operation} exceeded {limit_bytes} bytes")]
82 ResponseTooLarge {
83 operation: String,
84 limit_bytes: usize,
85 },
86 #[error("catalog server does not support {operation}")]
88 UnsupportedOperation { operation: String },
89 #[error("I/O error: {0}")]
91 Io(String),
92 #[error("Iceberg error: {0}")]
94 Iceberg(String),
95 #[error("concurrency conflict: {message}")]
97 ConcurrencyConflict { message: String },
98 #[error("namespace not found: '{name}'")]
100 NamespaceNotFound { name: String },
101}
102
103pub type CatalogResult<T> = Result<T, CatalogError>;
105
106pub type LakehouseError = CatalogError;
113
114#[derive(Debug, Clone, PartialEq, Eq)]
120pub enum FieldType {
121 Int8,
122 Int16,
123 Int32,
124 Int64,
125 UInt8,
126 UInt16,
127 UInt32,
128 UInt64,
129 Float32,
130 Float64,
131 Boolean,
132 Utf8,
133 Binary,
134 Timestamp,
135 Date32,
136 List(Box<FieldType>),
138 Struct(Vec<CatalogField>),
140}
141
142impl fmt::Display for FieldType {
143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144 let s = match self {
145 FieldType::Int8 => "Int8",
146 FieldType::Int16 => "Int16",
147 FieldType::Int32 => "Int32",
148 FieldType::Int64 => "Int64",
149 FieldType::UInt8 => "UInt8",
150 FieldType::UInt16 => "UInt16",
151 FieldType::UInt32 => "UInt32",
152 FieldType::UInt64 => "UInt64",
153 FieldType::Float32 => "Float32",
154 FieldType::Float64 => "Float64",
155 FieldType::Boolean => "Boolean",
156 FieldType::Utf8 => "Utf8",
157 FieldType::Binary => "Binary",
158 FieldType::Timestamp => "Timestamp",
159 FieldType::Date32 => "Date32",
160 FieldType::List(inner) => return write!(f, "List<{inner}>"),
161 FieldType::Struct(fields) => {
162 return write!(f, "Struct({} fields)", fields.len());
163 }
164 };
165 f.write_str(s)
166 }
167}
168
169impl FieldType {
170 pub fn to_arrow(&self) -> arrow::datatypes::DataType {
174 use arrow::datatypes::DataType;
175 use arrow::datatypes::TimeUnit;
176 match self {
177 FieldType::Int8 => DataType::Int8,
178 FieldType::Int16 => DataType::Int16,
179 FieldType::Int32 => DataType::Int32,
180 FieldType::Int64 => DataType::Int64,
181 FieldType::UInt8 => DataType::UInt8,
182 FieldType::UInt16 => DataType::UInt16,
183 FieldType::UInt32 => DataType::UInt32,
184 FieldType::UInt64 => DataType::UInt64,
185 FieldType::Float32 => DataType::Float32,
186 FieldType::Float64 => DataType::Float64,
187 FieldType::Boolean => DataType::Boolean,
188 FieldType::Utf8 => DataType::Utf8,
189 FieldType::Binary => DataType::Binary,
190 FieldType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
191 FieldType::Date32 => DataType::Date32,
192 FieldType::List(item) => DataType::List(std::sync::Arc::new(
193 arrow::datatypes::Field::new("item", item.to_arrow(), true),
194 )),
195 FieldType::Struct(fields) => {
196 let arrow_fields: arrow::datatypes::Fields = fields
197 .iter()
198 .map(|f| {
199 std::sync::Arc::new(arrow::datatypes::Field::new(
200 f.name(),
201 f.field_type().to_arrow(),
202 f.nullable(),
203 ))
204 })
205 .collect();
206 DataType::Struct(arrow_fields)
207 }
208 }
209 }
210}
211
212#[derive(Debug, Clone, PartialEq, Eq)]
218pub struct CatalogField {
219 name: String,
220 field_type: FieldType,
221 nullable: bool,
222}
223
224impl CatalogField {
225 pub fn new(name: impl Into<String>, field_type: FieldType, nullable: bool) -> Self {
227 Self {
228 name: name.into(),
229 field_type,
230 nullable,
231 }
232 }
233
234 pub fn name(&self) -> &str {
236 &self.name
237 }
238
239 pub fn field_type(&self) -> &FieldType {
241 &self.field_type
242 }
243
244 pub fn nullable(&self) -> bool {
246 self.nullable
247 }
248
249 pub fn to_arrow_field(&self) -> arrow::datatypes::Field {
253 arrow::datatypes::Field::new(
254 self.name.as_str(),
255 self.field_type.to_arrow(),
256 self.nullable,
257 )
258 }
259}
260
261#[derive(Debug, Clone, PartialEq, Eq)]
267pub struct TableSchema {
268 fields: Vec<CatalogField>,
269}
270
271impl TableSchema {
272 pub fn new(fields: Vec<CatalogField>) -> Self {
274 Self { fields }
275 }
276
277 pub fn empty() -> Self {
279 Self { fields: Vec::new() }
280 }
281
282 pub fn to_arrow_schema(&self) -> arrow::datatypes::Schema {
286 let arrow_fields: Vec<arrow::datatypes::Field> = self
287 .fields
288 .iter()
289 .map(CatalogField::to_arrow_field)
290 .collect();
291 arrow::datatypes::Schema::new(arrow_fields)
292 }
293
294 pub fn field_count(&self) -> usize {
296 self.fields.len()
297 }
298
299 pub fn get_field(&self, name: &str) -> Option<&CatalogField> {
301 self.fields.iter().find(|f| f.name() == name)
302 }
303}
304
305#[derive(Debug, Clone, PartialEq, Eq, Default)]
311pub struct ColumnStatistics {
312 pub row_count: Option<u64>,
314 pub null_count: Option<u64>,
316 pub min_value: Option<String>,
318 pub max_value: Option<String>,
320 pub distinct_count: Option<u64>,
326 pub collected_at_secs: Option<u64>,
331}
332
333impl ColumnStatistics {
334 pub fn new() -> Self {
336 Self::default()
337 }
338
339 #[must_use]
341 pub fn with_row_count(mut self, count: u64) -> Self {
342 self.row_count = Some(count);
343 self
344 }
345
346 #[must_use]
348 pub fn with_null_count(mut self, count: u64) -> Self {
349 self.null_count = Some(count);
350 self
351 }
352
353 #[must_use]
355 pub fn with_min(mut self, min: impl Into<String>) -> Self {
356 self.min_value = Some(min.into());
357 self
358 }
359
360 #[must_use]
362 pub fn with_max(mut self, max: impl Into<String>) -> Self {
363 self.max_value = Some(max.into());
364 self
365 }
366
367 #[must_use]
369 pub fn with_distinct_count(mut self, ndv: u64) -> Self {
370 self.distinct_count = Some(ndv);
371 self
372 }
373
374 #[must_use]
378 pub fn with_collected_at_secs(mut self, secs: u64) -> Self {
379 self.collected_at_secs = Some(secs);
380 self
381 }
382
383 pub fn equality_selectivity(&self) -> Option<f64> {
388 let ndv = self.distinct_count?;
389 if ndv == 0 {
390 Some(0.0)
391 } else {
392 Some(1.0 / ndv as f64)
393 }
394 }
395
396 pub fn is_fresh(&self, now_secs: u64, max_age_secs: u64) -> bool {
403 match self.collected_at_secs {
404 None => true,
405 Some(ts) => now_secs.saturating_sub(ts) <= max_age_secs,
406 }
407 }
408}
409
410#[derive(Debug, Clone)]
416pub struct TableMetadata {
417 name: String,
418 schema: TableSchema,
419 stats: Option<ColumnStatistics>,
420}
421
422impl TableMetadata {
423 pub fn new(name: impl Into<String>, schema: TableSchema) -> Self {
425 Self {
426 name: name.into(),
427 schema,
428 stats: None,
429 }
430 }
431
432 #[must_use]
434 pub fn with_stats(mut self, stats: ColumnStatistics) -> Self {
435 self.stats = Some(stats);
436 self
437 }
438
439 pub fn name(&self) -> &str {
441 &self.name
442 }
443
444 pub fn schema(&self) -> &TableSchema {
446 &self.schema
447 }
448
449 pub fn statistics(&self) -> Option<&ColumnStatistics> {
451 self.stats.as_ref()
452 }
453}
454
455pub trait TableProvider {
461 fn name(&self) -> &str;
463
464 fn schema(&self) -> &TableSchema;
466
467 fn statistics(&self) -> Option<&ColumnStatistics>;
469}
470
471pub trait CatalogProvider {
477 fn list_tables(&self) -> Vec<String>;
479
480 fn get_table(&self, name: &str) -> CatalogResult<&dyn TableProvider>;
482
483 fn register_table(&mut self, metadata: TableMetadata) -> CatalogResult<()>;
488}
489
490struct TableMetadataProvider {
496 metadata: TableMetadata,
497}
498
499impl TableProvider for TableMetadataProvider {
500 fn name(&self) -> &str {
501 self.metadata.name()
502 }
503
504 fn schema(&self) -> &TableSchema {
505 self.metadata.schema()
506 }
507
508 fn statistics(&self) -> Option<&ColumnStatistics> {
509 self.metadata.statistics()
510 }
511}
512
513pub struct InMemoryCatalog {
515 tables: BTreeMap<String, TableMetadataProvider>,
516 table_data: BTreeMap<String, std::sync::Arc<Vec<arrow::record_batch::RecordBatch>>>,
518}
519
520impl InMemoryCatalog {
521 pub fn new() -> Self {
523 Self {
524 tables: BTreeMap::new(),
525 table_data: BTreeMap::new(),
526 }
527 }
528
529 pub fn register_table_with_batches(
531 &mut self,
532 metadata: TableMetadata,
533 batches: Vec<arrow::record_batch::RecordBatch>,
534 ) -> CatalogResult<()> {
535 let name = metadata.name().to_owned();
536 self.register_table(metadata)?;
537 if !batches.is_empty() {
538 self.table_data.insert(name, std::sync::Arc::new(batches));
539 }
540 Ok(())
541 }
542
543 pub fn table_batches(
545 &self,
546 name: &str,
547 ) -> Option<std::sync::Arc<Vec<arrow::record_batch::RecordBatch>>> {
548 self.table_data.get(name).cloned()
549 }
550}
551
552impl Default for InMemoryCatalog {
553 fn default() -> Self {
554 Self::new()
555 }
556}
557
558impl CatalogProvider for InMemoryCatalog {
559 fn list_tables(&self) -> Vec<String> {
560 self.tables.keys().cloned().collect()
561 }
562
563 fn get_table(&self, name: &str) -> CatalogResult<&dyn TableProvider> {
564 self.tables
565 .get(name)
566 .map(|p| p as &dyn TableProvider)
567 .ok_or_else(|| CatalogError::TableNotFound {
568 name: name.to_string(),
569 })
570 }
571
572 fn register_table(&mut self, metadata: TableMetadata) -> CatalogResult<()> {
573 let name = metadata.name().to_string();
574 if self.tables.contains_key(&name) {
575 return Err(CatalogError::TableAlreadyExists { name });
576 }
577 self.tables.insert(name, TableMetadataProvider { metadata });
578 Ok(())
579 }
580}
581
582pub trait SchemaRegistry: Send + Sync {
591 fn get_schema(&self, name: &str) -> CatalogResult<TableSchema>;
593 fn register_schema(&mut self, name: impl Into<String>, schema: TableSchema);
595 fn schema_names(&self) -> Vec<String>;
597}
598
599#[derive(Debug, Default)]
601pub struct InMemorySchemaRegistry {
602 schemas: BTreeMap<String, TableSchema>,
603}
604
605impl InMemorySchemaRegistry {
606 pub fn new() -> Self {
607 Self::default()
608 }
609}
610
611impl SchemaRegistry for InMemorySchemaRegistry {
612 fn get_schema(&self, name: &str) -> CatalogResult<TableSchema> {
613 self.schemas
614 .get(name)
615 .cloned()
616 .ok_or_else(|| CatalogError::SchemaNotFound {
617 name: name.to_string(),
618 })
619 }
620
621 fn register_schema(&mut self, name: impl Into<String>, schema: TableSchema) {
622 self.schemas.insert(name.into(), schema);
623 }
624
625 fn schema_names(&self) -> Vec<String> {
626 self.schemas.keys().cloned().collect()
627 }
628}
629
630pub mod datafusion_bridge {
640 use std::any::Any;
641 use std::fmt;
642 use std::sync::{Arc, RwLock};
643
644 use datafusion::catalog::{CatalogProvider, SchemaProvider};
645 use datafusion::datasource::MemTable;
646 use datafusion::error::Result as DfResult;
647
648 pub struct DataFusionCatalogBridge {
656 catalog: Arc<RwLock<super::InMemoryCatalog>>,
657 schema_name: String,
658 schema_cache: std::sync::Arc<dashmap::DashMap<String, Arc<MemTable>>>,
664 }
665
666 impl DataFusionCatalogBridge {
667 pub fn new(catalog: Arc<RwLock<super::InMemoryCatalog>>) -> Self {
671 Self {
672 catalog,
673 schema_name: "public".to_string(),
674 schema_cache: std::sync::Arc::new(dashmap::DashMap::new()),
675 }
676 }
677
678 pub fn invalidate(&self, name: &str) {
689 self.schema_cache.remove(name);
690 }
691 }
692
693 impl fmt::Debug for DataFusionCatalogBridge {
694 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
695 f.debug_struct("DataFusionCatalogBridge")
696 .field("schema_name", &self.schema_name)
697 .finish()
698 }
699 }
700
701 impl CatalogProvider for DataFusionCatalogBridge {
702 fn as_any(&self) -> &dyn Any {
703 self
704 }
705
706 fn schema_names(&self) -> Vec<String> {
707 vec![self.schema_name.clone()]
708 }
709
710 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
711 if name == self.schema_name {
712 Some(Arc::new(DataFusionSchemaBridge {
713 catalog: self.catalog.clone(),
714 cache: self.schema_cache.clone(),
715 }))
716 } else {
717 None
718 }
719 }
720 }
721
722 struct DataFusionSchemaBridge {
725 catalog: Arc<RwLock<super::InMemoryCatalog>>,
726 cache: std::sync::Arc<dashmap::DashMap<String, Arc<MemTable>>>,
727 }
728
729 impl fmt::Debug for DataFusionSchemaBridge {
730 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
731 f.debug_struct("DataFusionSchemaBridge").finish()
732 }
733 }
734
735 #[async_trait::async_trait]
736 impl SchemaProvider for DataFusionSchemaBridge {
737 fn as_any(&self) -> &dyn Any {
738 self
739 }
740
741 fn table_names(&self) -> Vec<String> {
742 let catalog = self.catalog.read().unwrap_or_else(|p| p.into_inner());
743 use super::CatalogProvider as KrishivCatalogProvider;
744 catalog.list_tables()
745 }
746
747 async fn table(
748 &self,
749 name: &str,
750 ) -> DfResult<Option<Arc<dyn datafusion::datasource::TableProvider>>> {
751 if let Some(cached) = self.cache.get(name) {
755 return Ok(Some(
756 cached.clone() as Arc<dyn datafusion::datasource::TableProvider>
757 ));
758 }
759 let catalog = self.catalog.read().unwrap_or_else(|p| p.into_inner());
760 use super::CatalogProvider as KrishivCatalogProvider;
761 match catalog.get_table(name) {
762 Ok(table_provider) => {
763 let arrow_schema = Arc::new(table_provider.schema().to_arrow_schema());
764 let batches = catalog.table_batches(name);
765 let partitions = batches.map(|b| (*b).clone()).unwrap_or_default();
766 let mem = MemTable::try_new(arrow_schema, vec![partitions])?;
767 let mem_arc = Arc::new(mem);
768 self.cache.insert(name.to_string(), mem_arc.clone());
769 Ok(Some(
770 mem_arc as Arc<dyn datafusion::datasource::TableProvider>,
771 ))
772 }
773 Err(super::CatalogError::TableNotFound { .. }) => Ok(None),
774 Err(error) => Err(datafusion::error::DataFusionError::External(Box::new(
775 error,
776 ))),
777 }
778 }
779
780 fn table_exist(&self, name: &str) -> bool {
781 let catalog = self.catalog.read().unwrap_or_else(|p| p.into_inner());
782 use super::CatalogProvider as KrishivCatalogProvider;
783 catalog.get_table(name).is_ok()
784 }
785 }
786}
787
788#[cfg(test)]
793mod tests {
794 use super::*;
795
796 fn make_schema() -> TableSchema {
797 TableSchema::new(vec![
798 CatalogField::new("id", FieldType::Int64, false),
799 CatalogField::new("name", FieldType::Utf8, true),
800 ])
801 }
802
803 #[test]
804 fn in_memory_catalog_registers_and_retrieves_table() {
805 let mut catalog = InMemoryCatalog::new();
806 let meta = TableMetadata::new("users", make_schema());
807 catalog.register_table(meta).unwrap();
808
809 let table = catalog.get_table("users").unwrap();
810 assert_eq!(table.name(), "users");
811 assert_eq!(table.schema().field_count(), 2);
812 }
813
814 #[test]
815 fn in_memory_catalog_lists_tables() {
816 let mut catalog = InMemoryCatalog::new();
817 catalog
818 .register_table(TableMetadata::new("alpha", make_schema()))
819 .unwrap();
820 catalog
821 .register_table(TableMetadata::new("beta", make_schema()))
822 .unwrap();
823
824 let mut tables = catalog.list_tables();
825 tables.sort();
826 assert_eq!(tables, vec!["alpha", "beta"]);
827 }
828
829 #[test]
830 fn in_memory_catalog_returns_error_for_unknown_table() {
831 let catalog = InMemoryCatalog::new();
832 let err = catalog.get_table("nonexistent").err().unwrap();
833 match err {
834 CatalogError::TableNotFound { name } => {
835 assert_eq!(name, "nonexistent");
836 }
837 other => panic!("unexpected error: {other}"),
838 }
839 }
840
841 #[test]
842 fn table_schema_converts_to_arrow_schema() {
843 let schema = make_schema();
844 let arrow_schema = schema.to_arrow_schema();
845
846 assert_eq!(arrow_schema.fields().len(), 2);
847 let id_field = arrow_schema.field_with_name("id").unwrap();
848 assert_eq!(id_field.data_type(), &arrow::datatypes::DataType::Int64);
849 assert!(!id_field.is_nullable());
850
851 let name_field = arrow_schema.field_with_name("name").unwrap();
852 assert_eq!(name_field.data_type(), &arrow::datatypes::DataType::Utf8);
853 assert!(name_field.is_nullable());
854 }
855
856 #[test]
861 fn schema_registry_registers_and_retrieves() {
862 let mut registry = InMemorySchemaRegistry::new();
863 registry.register_schema("events", make_schema());
864 let schema = registry.get_schema("events").unwrap();
865 assert_eq!(schema.field_count(), 2);
866 }
867
868 #[test]
869 fn schema_registry_returns_error_for_missing() {
870 let registry = InMemorySchemaRegistry::new();
871 let err = registry.get_schema("nonexistent").unwrap_err();
872 match err {
873 CatalogError::SchemaNotFound { name } => {
874 assert_eq!(name, "nonexistent");
875 }
876 other => panic!("unexpected error: {other}"),
877 }
878 }
879
880 #[test]
881 fn schema_registry_lists_names() {
882 let mut registry = InMemorySchemaRegistry::new();
883 registry.register_schema("orders", make_schema());
884 registry.register_schema("users", make_schema());
885 let mut names = registry.schema_names();
886 names.sort();
887 assert_eq!(names, vec!["orders", "users"]);
888 }
889
890 #[test]
895 fn datafusion_bridge_schema_names_returns_public() {
896 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
897
898 let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
899 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
900 let names = bridge.schema_names();
901 assert_eq!(names, vec!["public"]);
902 }
903
904 #[test]
905 fn datafusion_bridge_table_exist() {
906 let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
907 {
908 let mut cat = catalog.write().unwrap();
909 cat.register_table(TableMetadata::new("orders", make_schema()))
910 .unwrap();
911 }
912 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
913 let schema_provider = {
914 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
915 bridge.schema("public").unwrap()
916 };
917 assert!(schema_provider.table_exist("orders"));
918 assert!(!schema_provider.table_exist("nonexistent"));
919 }
920
921 #[tokio::test]
922 async fn datafusion_bridge_memtable_cache_reuses_arc() {
923 use std::sync::Arc;
924
925 let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
926 {
927 let mut cat = catalog.write().unwrap();
928 cat.register_table(TableMetadata::new("orders", make_schema()))
929 .unwrap();
930 }
931 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
932 let schema_provider = {
933 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
934 bridge.schema("public").unwrap()
935 };
936 let first = schema_provider.table("orders").await.unwrap().unwrap();
937 let second = schema_provider.table("orders").await.unwrap().unwrap();
938 let cached = Arc::ptr_eq(&first, &second);
940 assert!(cached, "expected cached MemTable Arc, got fresh allocation");
941 }
942
943 #[tokio::test]
944 async fn datafusion_bridge_invalidate_forces_rebuild() {
945 use std::sync::Arc;
946
947 let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
948 {
949 let mut cat = catalog.write().unwrap();
950 cat.register_table(TableMetadata::new("orders", make_schema()))
951 .unwrap();
952 }
953 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
954 let schema_provider = {
955 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
956 bridge.schema("public").unwrap()
957 };
958 let first = schema_provider.table("orders").await.unwrap().unwrap();
959 bridge.invalidate("orders");
960 let second = schema_provider.table("orders").await.unwrap().unwrap();
961 assert!(!Arc::ptr_eq(&first, &second));
963 let third = schema_provider.table("orders").await.unwrap().unwrap();
965 assert!(Arc::ptr_eq(&second, &third));
966 }
967
968 #[tokio::test]
969 async fn catalog_scan_returns_registered_row_count() {
970 use std::sync::Arc;
971
972 use arrow::array::Int64Array;
973 use arrow::datatypes::{DataType, Field, Schema};
974 use arrow::record_batch::RecordBatch;
975 use datafusion::prelude::SessionContext;
976
977 let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
978 let schema = TableSchema::new(vec![CatalogField::new("id", FieldType::Int64, false)]);
979 let arrow_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
980 let values: Vec<Option<i64>> = (0..10).map(Some).collect();
981 let batch =
982 RecordBatch::try_new(arrow_schema, vec![Arc::new(Int64Array::from(values))]).unwrap();
983 catalog
984 .write()
985 .unwrap()
986 .register_table_with_batches(TableMetadata::new("t", schema), vec![batch])
987 .unwrap();
988
989 let ctx = SessionContext::new();
990 ctx.register_catalog(
991 "krishiv",
992 Arc::new(super::datafusion_bridge::DataFusionCatalogBridge::new(
993 catalog,
994 )),
995 );
996 let df = ctx.sql("SELECT * FROM krishiv.public.t").await.unwrap();
997 let batches = df.collect().await.unwrap();
998 let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
999 assert_eq!(rows, 10);
1000 }
1001
1002 #[test]
1003 fn datafusion_bridge_unknown_schema_returns_none() {
1004 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
1005
1006 let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
1007 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
1008 let result = bridge.schema("nonexistent");
1009 assert!(result.is_none());
1010 }
1011
1012 #[test]
1017 fn catalog_error_display_table_not_found() {
1018 let err = CatalogError::TableNotFound {
1019 name: "orders".to_string(),
1020 };
1021 assert_eq!(err.to_string(), "table not found: 'orders'");
1022 }
1023
1024 #[test]
1025 fn catalog_error_display_table_already_exists() {
1026 let err = CatalogError::TableAlreadyExists {
1027 name: "users".to_string(),
1028 };
1029 assert_eq!(err.to_string(), "table already exists: 'users'");
1030 }
1031
1032 #[test]
1033 fn catalog_error_display_schema_not_found() {
1034 let err = CatalogError::SchemaNotFound {
1035 name: "events".to_string(),
1036 };
1037 assert_eq!(err.to_string(), "schema not found: 'events'");
1038 }
1039
1040 #[test]
1041 fn catalog_error_display_invalid_schema() {
1042 let err = CatalogError::InvalidSchema {
1043 message: "missing required field 'id'".to_string(),
1044 };
1045 assert_eq!(
1046 err.to_string(),
1047 "invalid schema: missing required field 'id'"
1048 );
1049 }
1050
1051 #[test]
1052 fn catalog_error_display_http() {
1053 let err = CatalogError::Http {
1054 status: 404,
1055 message: "not found".to_string(),
1056 };
1057 assert_eq!(err.to_string(), "HTTP error 404: not found");
1058 }
1059
1060 #[test]
1061 fn catalog_error_is_std_error() {
1062 let err = CatalogError::TableNotFound {
1063 name: "t".to_string(),
1064 };
1065 let e: &dyn std::error::Error = &err;
1066 assert!(e.source().is_none());
1067 }
1068
1069 #[test]
1074 fn field_type_to_arrow_int8() {
1075 assert_eq!(FieldType::Int8.to_arrow(), arrow::datatypes::DataType::Int8);
1076 }
1077
1078 #[test]
1079 fn field_type_to_arrow_int16() {
1080 assert_eq!(
1081 FieldType::Int16.to_arrow(),
1082 arrow::datatypes::DataType::Int16
1083 );
1084 }
1085
1086 #[test]
1087 fn field_type_to_arrow_int32() {
1088 assert_eq!(
1089 FieldType::Int32.to_arrow(),
1090 arrow::datatypes::DataType::Int32
1091 );
1092 }
1093
1094 #[test]
1095 fn field_type_to_arrow_int64() {
1096 assert_eq!(
1097 FieldType::Int64.to_arrow(),
1098 arrow::datatypes::DataType::Int64
1099 );
1100 }
1101
1102 #[test]
1103 fn field_type_to_arrow_uint8() {
1104 assert_eq!(
1105 FieldType::UInt8.to_arrow(),
1106 arrow::datatypes::DataType::UInt8
1107 );
1108 }
1109
1110 #[test]
1111 fn field_type_to_arrow_uint16() {
1112 assert_eq!(
1113 FieldType::UInt16.to_arrow(),
1114 arrow::datatypes::DataType::UInt16
1115 );
1116 }
1117
1118 #[test]
1119 fn field_type_to_arrow_uint32() {
1120 assert_eq!(
1121 FieldType::UInt32.to_arrow(),
1122 arrow::datatypes::DataType::UInt32
1123 );
1124 }
1125
1126 #[test]
1127 fn field_type_to_arrow_uint64() {
1128 assert_eq!(
1129 FieldType::UInt64.to_arrow(),
1130 arrow::datatypes::DataType::UInt64
1131 );
1132 }
1133
1134 #[test]
1135 fn field_type_to_arrow_float32() {
1136 assert_eq!(
1137 FieldType::Float32.to_arrow(),
1138 arrow::datatypes::DataType::Float32
1139 );
1140 }
1141
1142 #[test]
1143 fn field_type_to_arrow_float64() {
1144 assert_eq!(
1145 FieldType::Float64.to_arrow(),
1146 arrow::datatypes::DataType::Float64
1147 );
1148 }
1149
1150 #[test]
1151 fn field_type_to_arrow_boolean() {
1152 assert_eq!(
1153 FieldType::Boolean.to_arrow(),
1154 arrow::datatypes::DataType::Boolean
1155 );
1156 }
1157
1158 #[test]
1159 fn field_type_to_arrow_utf8() {
1160 assert_eq!(FieldType::Utf8.to_arrow(), arrow::datatypes::DataType::Utf8);
1161 }
1162
1163 #[test]
1164 fn field_type_to_arrow_binary() {
1165 assert_eq!(
1166 FieldType::Binary.to_arrow(),
1167 arrow::datatypes::DataType::Binary
1168 );
1169 }
1170
1171 #[test]
1172 fn field_type_to_arrow_timestamp() {
1173 use arrow::datatypes::{DataType, TimeUnit};
1174 assert_eq!(
1175 FieldType::Timestamp.to_arrow(),
1176 DataType::Timestamp(TimeUnit::Microsecond, None)
1177 );
1178 }
1179
1180 #[test]
1181 fn field_type_to_arrow_date32() {
1182 assert_eq!(
1183 FieldType::Date32.to_arrow(),
1184 arrow::datatypes::DataType::Date32
1185 );
1186 }
1187
1188 #[test]
1189 fn field_type_to_arrow_list() {
1190 let list_type = FieldType::List(Box::new(FieldType::Utf8));
1191 match list_type.to_arrow() {
1192 arrow::datatypes::DataType::List(field) => {
1193 assert_eq!(field.name(), "item");
1194 assert_eq!(field.data_type(), &arrow::datatypes::DataType::Utf8);
1195 assert!(field.is_nullable());
1196 }
1197 other => panic!("expected List, got {other:?}"),
1198 }
1199 }
1200
1201 #[test]
1202 fn field_type_to_arrow_struct() {
1203 let struct_type = FieldType::Struct(vec![
1204 CatalogField::new("x", FieldType::Int32, false),
1205 CatalogField::new("y", FieldType::Utf8, true),
1206 ]);
1207 match struct_type.to_arrow() {
1208 arrow::datatypes::DataType::Struct(fields) => {
1209 assert_eq!(fields.len(), 2);
1210 assert_eq!(fields[0].name(), "x");
1211 assert_eq!(fields[0].data_type(), &arrow::datatypes::DataType::Int32);
1212 assert!(!fields[0].is_nullable());
1213 assert_eq!(fields[1].name(), "y");
1214 assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
1215 assert!(fields[1].is_nullable());
1216 }
1217 other => panic!("expected Struct, got {other:?}"),
1218 }
1219 }
1220
1221 #[test]
1226 fn field_type_display_simple() {
1227 assert_eq!(FieldType::Int8.to_string(), "Int8");
1228 assert_eq!(FieldType::Int16.to_string(), "Int16");
1229 assert_eq!(FieldType::Int32.to_string(), "Int32");
1230 assert_eq!(FieldType::Int64.to_string(), "Int64");
1231 assert_eq!(FieldType::UInt8.to_string(), "UInt8");
1232 assert_eq!(FieldType::UInt16.to_string(), "UInt16");
1233 assert_eq!(FieldType::UInt32.to_string(), "UInt32");
1234 assert_eq!(FieldType::UInt64.to_string(), "UInt64");
1235 assert_eq!(FieldType::Float32.to_string(), "Float32");
1236 assert_eq!(FieldType::Float64.to_string(), "Float64");
1237 assert_eq!(FieldType::Boolean.to_string(), "Boolean");
1238 assert_eq!(FieldType::Utf8.to_string(), "Utf8");
1239 assert_eq!(FieldType::Binary.to_string(), "Binary");
1240 assert_eq!(FieldType::Timestamp.to_string(), "Timestamp");
1241 assert_eq!(FieldType::Date32.to_string(), "Date32");
1242 }
1243
1244 #[test]
1245 fn field_type_display_list() {
1246 let list = FieldType::List(Box::new(FieldType::Int32));
1247 assert_eq!(list.to_string(), "List<Int32>");
1248 }
1249
1250 #[test]
1251 fn field_type_display_struct() {
1252 let s = FieldType::Struct(vec![
1253 CatalogField::new("a", FieldType::Boolean, true),
1254 CatalogField::new("b", FieldType::Utf8, false),
1255 ]);
1256 assert_eq!(s.to_string(), "Struct(2 fields)");
1257 }
1258
1259 #[test]
1264 fn catalog_field_accessors() {
1265 let f = CatalogField::new("col", FieldType::Float64, true);
1266 assert_eq!(f.name(), "col");
1267 assert_eq!(f.field_type(), &FieldType::Float64);
1268 assert!(f.nullable());
1269 }
1270
1271 #[test]
1272 fn catalog_field_to_arrow_field() {
1273 let f = CatalogField::new("ts", FieldType::Timestamp, false);
1274 let arrow_f = f.to_arrow_field();
1275 assert_eq!(arrow_f.name(), "ts");
1276 use arrow::datatypes::{DataType, TimeUnit};
1277 assert_eq!(
1278 arrow_f.data_type(),
1279 &DataType::Timestamp(TimeUnit::Microsecond, None)
1280 );
1281 assert!(!arrow_f.is_nullable());
1282 }
1283
1284 #[test]
1289 fn column_statistics_new_defaults() {
1290 let stats = ColumnStatistics::new();
1291 assert!(stats.row_count.is_none());
1292 assert!(stats.null_count.is_none());
1293 assert!(stats.min_value.is_none());
1294 assert!(stats.max_value.is_none());
1295 }
1296
1297 #[test]
1298 fn column_statistics_default_trait() {
1299 let stats = ColumnStatistics::default();
1300 assert_eq!(stats, ColumnStatistics::new());
1301 }
1302
1303 #[test]
1304 fn column_statistics_builder_all_fields() {
1305 let stats = ColumnStatistics::new()
1306 .with_row_count(1_000_000)
1307 .with_null_count(42)
1308 .with_min("abc")
1309 .with_max("xyz");
1310
1311 assert_eq!(stats.row_count, Some(1_000_000));
1312 assert_eq!(stats.null_count, Some(42));
1313 assert_eq!(stats.min_value.as_deref(), Some("abc"));
1314 assert_eq!(stats.max_value.as_deref(), Some("xyz"));
1315 }
1316
1317 #[test]
1318 fn column_statistics_builder_partial() {
1319 let stats = ColumnStatistics::new().with_row_count(500);
1320 assert_eq!(stats.row_count, Some(500));
1321 assert!(stats.null_count.is_none());
1322 assert!(stats.min_value.is_none());
1323 assert!(stats.max_value.is_none());
1324 }
1325
1326 #[test]
1327 fn column_statistics_into_string() {
1328 let stats = ColumnStatistics::new()
1329 .with_row_count(100)
1330 .with_null_count(5)
1331 .with_min("1")
1332 .with_max("99");
1333 let dbg = format!("{stats:?}");
1334 assert!(dbg.contains("row_count: Some(100)"));
1335 assert!(dbg.contains("null_count: Some(5)"));
1336 assert!(dbg.contains("min_value: Some(\"1\")"));
1337 assert!(dbg.contains("max_value: Some(\"99\")"));
1338 }
1339
1340 #[test]
1341 fn column_statistics_builder_overwrites() {
1342 let stats = ColumnStatistics::new()
1343 .with_row_count(10)
1344 .with_row_count(20);
1345 assert_eq!(stats.row_count, Some(20));
1346 }
1347
1348 #[test]
1349 fn column_statistics_eq() {
1350 let a = ColumnStatistics::new()
1351 .with_row_count(100)
1352 .with_null_count(5);
1353 let b = ColumnStatistics::new()
1354 .with_row_count(100)
1355 .with_null_count(5);
1356 let c = ColumnStatistics::new().with_row_count(99);
1357 assert_eq!(a, b);
1358 assert_ne!(a, c);
1359 }
1360
1361 #[test]
1366 fn table_metadata_new_and_accessors() {
1367 let meta = TableMetadata::new("events", make_schema());
1368 assert_eq!(meta.name(), "events");
1369 assert_eq!(meta.schema().field_count(), 2);
1370 assert!(meta.statistics().is_none());
1371 }
1372
1373 #[test]
1374 fn table_metadata_with_stats() {
1375 let stats = ColumnStatistics::new()
1376 .with_row_count(5000)
1377 .with_null_count(100);
1378 let meta = TableMetadata::new("clicks", make_schema()).with_stats(stats);
1379 assert_eq!(meta.name(), "clicks");
1380 let s = meta.statistics().unwrap();
1381 assert_eq!(s.row_count, Some(5000));
1382 assert_eq!(s.null_count, Some(100));
1383 }
1384
1385 #[test]
1386 fn table_metadata_into_string() {
1387 let meta = TableMetadata::new("test_table", make_schema());
1388 let dbg = format!("{meta:?}");
1389 assert!(dbg.contains("name: \"test_table\""));
1390 assert!(dbg.contains("stats: None"));
1391 }
1392
1393 #[test]
1398 fn schema_registry_register_replaces_existing() {
1399 let mut registry = InMemorySchemaRegistry::new();
1400 let schema_a = TableSchema::new(vec![CatalogField::new("a", FieldType::Int32, false)]);
1401 let schema_b = TableSchema::new(vec![CatalogField::new("b", FieldType::Utf8, true)]);
1402
1403 registry.register_schema("my_schema", schema_a);
1404 registry.register_schema("my_schema", schema_b);
1405
1406 let retrieved = registry.get_schema("my_schema").unwrap();
1407 assert_eq!(retrieved.field_count(), 1);
1408 assert_eq!(
1409 retrieved.get_field("b").unwrap().field_type(),
1410 &FieldType::Utf8
1411 );
1412 assert!(retrieved.get_field("a").is_none());
1413 }
1414
1415 #[test]
1416 fn schema_registry_empty_names() {
1417 let registry = InMemorySchemaRegistry::new();
1418 assert!(registry.schema_names().is_empty());
1419 }
1420
1421 #[test]
1426 fn table_schema_empty() {
1427 let schema = TableSchema::empty();
1428 assert_eq!(schema.field_count(), 0);
1429 assert!(schema.get_field("anything").is_none());
1430 }
1431
1432 #[test]
1433 fn table_schema_get_field_found() {
1434 let schema = make_schema();
1435 let field = schema.get_field("name").unwrap();
1436 assert_eq!(field.name(), "name");
1437 assert_eq!(field.field_type(), &FieldType::Utf8);
1438 assert!(field.nullable());
1439 }
1440
1441 #[test]
1442 fn table_schema_get_field_not_found() {
1443 let schema = make_schema();
1444 assert!(schema.get_field("missing").is_none());
1445 }
1446
1447 #[test]
1452 fn in_memory_catalog_duplicate_register_errors() {
1453 let mut catalog = InMemoryCatalog::new();
1454 catalog
1455 .register_table(TableMetadata::new("t", make_schema()))
1456 .unwrap();
1457 let err = catalog
1458 .register_table(TableMetadata::new("t", make_schema()))
1459 .unwrap_err();
1460 match err {
1461 CatalogError::TableAlreadyExists { name } => assert_eq!(name, "t"),
1462 other => panic!("expected TableAlreadyExists, got {other}"),
1463 }
1464 }
1465
1466 #[test]
1471 fn catalog_result_ok() {
1472 let r: CatalogResult<i32> = Ok(42);
1473 assert_eq!(r.unwrap(), 42);
1474 }
1475
1476 #[test]
1477 fn catalog_result_err() {
1478 let r: CatalogResult<()> = Err(CatalogError::TableNotFound {
1479 name: "x".to_string(),
1480 });
1481 assert!(r.is_err());
1482 }
1483
1484 #[test]
1489 fn empty_catalog_list_tables_returns_empty() {
1490 let catalog = InMemoryCatalog::new();
1491 assert!(catalog.list_tables().is_empty());
1492 }
1493
1494 #[test]
1495 fn empty_catalog_get_table_returns_not_found() {
1496 let catalog = InMemoryCatalog::new();
1497 let err = catalog.get_table("anything").err().unwrap();
1498 assert!(matches!(err, CatalogError::TableNotFound { .. }));
1499 }
1500
1501 #[test]
1502 fn empty_schema_registry_get_returns_not_found() {
1503 let registry = InMemorySchemaRegistry::new();
1504 assert!(registry.get_schema("x").is_err());
1505 }
1506
1507 #[test]
1508 fn empty_schema_schema_names_empty() {
1509 let registry = InMemorySchemaRegistry::new();
1510 assert!(registry.schema_names().is_empty());
1511 }
1512
1513 #[test]
1518 fn table_name_with_special_characters() {
1519 let mut catalog = InMemoryCatalog::new();
1520 let meta = TableMetadata::new("table-with-dashes.dots_and_underscores", make_schema());
1521 catalog.register_table(meta).unwrap();
1522 let table = catalog
1523 .get_table("table-with-dashes.dots_and_underscores")
1524 .unwrap();
1525 assert_eq!(table.name(), "table-with-dashes.dots_and_underscores");
1526 }
1527
1528 #[test]
1529 fn table_name_with_unicode() {
1530 let mut catalog = InMemoryCatalog::new();
1531 let meta = TableMetadata::new("用户_table", make_schema());
1532 catalog.register_table(meta).unwrap();
1533 let table = catalog.get_table("用户_table").unwrap();
1534 assert_eq!(table.name(), "用户_table");
1535 }
1536
1537 #[test]
1538 fn table_name_with_spaces() {
1539 let mut catalog = InMemoryCatalog::new();
1540 let meta = TableMetadata::new("my table name", make_schema());
1541 catalog.register_table(meta).unwrap();
1542 let table = catalog.get_table("my table name").unwrap();
1543 assert_eq!(table.name(), "my table name");
1544 }
1545
1546 #[test]
1547 fn schema_name_with_special_characters() {
1548 let mut registry = InMemorySchemaRegistry::new();
1549 let schema = TableSchema::new(vec![CatalogField::new("col", FieldType::Int32, true)]);
1550 registry.register_schema("schema-with-dashes", schema);
1551 let retrieved = registry.get_schema("schema-with-dashes").unwrap();
1552 assert_eq!(retrieved.field_count(), 1);
1553 }
1554
1555 #[test]
1556 fn field_name_with_special_characters() {
1557 let f = CatalogField::new("field-with-dots_and@spaces", FieldType::Utf8, false);
1558 assert_eq!(f.name(), "field-with-dots_and@spaces");
1559 let arrow_f = f.to_arrow_field();
1560 assert_eq!(arrow_f.name(), "field-with-dots_and@spaces");
1561 }
1562
1563 #[test]
1568 fn catalog_duplicate_different_table_errors() {
1569 let mut catalog = InMemoryCatalog::new();
1570 catalog
1571 .register_table(TableMetadata::new("t1", make_schema()))
1572 .unwrap();
1573 catalog
1574 .register_table(TableMetadata::new("t2", make_schema()))
1575 .unwrap();
1576 assert_eq!(catalog.list_tables().len(), 2);
1577 }
1578
1579 #[test]
1580 fn schema_registry_overwrite_preserves_single_entry() {
1581 let mut registry = InMemorySchemaRegistry::new();
1582 registry.register_schema("s", TableSchema::empty());
1583 registry.register_schema("s", make_schema());
1584 assert_eq!(registry.schema_names().len(), 1);
1585 assert_eq!(registry.get_schema("s").unwrap().field_count(), 2);
1586 }
1587
1588 #[test]
1593 fn empty_schema_to_arrow() {
1594 let schema = TableSchema::empty();
1595 let arrow_schema = schema.to_arrow_schema();
1596 assert_eq!(arrow_schema.fields().len(), 0);
1597 }
1598
1599 #[test]
1600 fn single_field_schema_to_arrow() {
1601 let schema = TableSchema::new(vec![CatalogField::new("only", FieldType::Float32, true)]);
1602 let arrow_schema = schema.to_arrow_schema();
1603 assert_eq!(arrow_schema.fields().len(), 1);
1604 let f = arrow_schema.field_with_name("only").unwrap();
1605 assert_eq!(f.data_type(), &arrow::datatypes::DataType::Float32);
1606 assert!(f.is_nullable());
1607 }
1608
1609 #[test]
1614 fn field_type_list_of_list() {
1615 let inner = FieldType::List(Box::new(FieldType::Int32));
1616 let outer = FieldType::List(Box::new(inner));
1617 match outer.to_arrow() {
1618 arrow::datatypes::DataType::List(field) => match field.data_type() {
1619 arrow::datatypes::DataType::List(inner_field) => {
1620 assert_eq!(inner_field.data_type(), &arrow::datatypes::DataType::Int32);
1621 }
1622 other => panic!("expected nested List, got {other:?}"),
1623 },
1624 other => panic!("expected outer List, got {other:?}"),
1625 }
1626 }
1627
1628 #[test]
1629 fn field_type_struct_nested_in_struct() {
1630 let inner_struct =
1631 FieldType::Struct(vec![CatalogField::new("a", FieldType::Boolean, true)]);
1632 let outer_struct = FieldType::Struct(vec![
1633 CatalogField::new("nested", inner_struct, false),
1634 CatalogField::new("simple", FieldType::Utf8, true),
1635 ]);
1636 match outer_struct.to_arrow() {
1637 arrow::datatypes::DataType::Struct(fields) => {
1638 assert_eq!(fields.len(), 2);
1639 match fields[0].data_type() {
1640 arrow::datatypes::DataType::Struct(inner_fields) => {
1641 assert_eq!(inner_fields.len(), 1);
1642 assert_eq!(inner_fields[0].name(), "a");
1643 }
1644 other => panic!("expected inner Struct, got {other:?}"),
1645 }
1646 assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
1647 }
1648 other => panic!("expected Struct, got {other:?}"),
1649 }
1650 }
1651
1652 #[test]
1653 fn field_type_list_of_struct() {
1654 let list_type = FieldType::List(Box::new(FieldType::Struct(vec![
1655 CatalogField::new("x", FieldType::Int64, false),
1656 CatalogField::new("y", FieldType::Utf8, true),
1657 ])));
1658 match list_type.to_arrow() {
1659 arrow::datatypes::DataType::List(item_field) => match item_field.data_type() {
1660 arrow::datatypes::DataType::Struct(fields) => {
1661 assert_eq!(fields.len(), 2);
1662 }
1663 other => panic!("expected inner Struct, got {other:?}"),
1664 },
1665 other => panic!("expected List, got {other:?}"),
1666 }
1667 }
1668
1669 #[test]
1670 fn field_type_empty_struct() {
1671 let empty_struct = FieldType::Struct(vec![]);
1672 match empty_struct.to_arrow() {
1673 arrow::datatypes::DataType::Struct(fields) => {
1674 assert_eq!(fields.len(), 0);
1675 }
1676 other => panic!("expected Struct, got {other:?}"),
1677 }
1678 }
1679
1680 #[test]
1681 fn field_type_list_of_binary() {
1682 let list_type = FieldType::List(Box::new(FieldType::Binary));
1683 match list_type.to_arrow() {
1684 arrow::datatypes::DataType::List(field) => {
1685 assert_eq!(field.data_type(), &arrow::datatypes::DataType::Binary);
1686 }
1687 other => panic!("expected List, got {other:?}"),
1688 }
1689 }
1690
1691 #[test]
1696 fn catalog_field_clone_eq() {
1697 let f1 = CatalogField::new("col", FieldType::Int32, true);
1698 let f2 = f1.clone();
1699 assert_eq!(f1, f2);
1700 }
1701
1702 #[test]
1703 fn catalog_field_ne_name() {
1704 let f1 = CatalogField::new("a", FieldType::Int32, true);
1705 let f2 = CatalogField::new("b", FieldType::Int32, true);
1706 assert_ne!(f1, f2);
1707 }
1708
1709 #[test]
1710 fn catalog_field_ne_type() {
1711 let f1 = CatalogField::new("a", FieldType::Int32, true);
1712 let f2 = CatalogField::new("a", FieldType::Utf8, true);
1713 assert_ne!(f1, f2);
1714 }
1715
1716 #[test]
1717 fn catalog_field_ne_nullable() {
1718 let f1 = CatalogField::new("a", FieldType::Int32, true);
1719 let f2 = CatalogField::new("a", FieldType::Int32, false);
1720 assert_ne!(f1, f2);
1721 }
1722
1723 #[test]
1728 fn table_schema_clone_eq() {
1729 let s1 = make_schema();
1730 let s2 = s1.clone();
1731 assert_eq!(s1, s2);
1732 }
1733
1734 #[test]
1735 fn table_schema_ne_different_fields() {
1736 let s1 = TableSchema::new(vec![CatalogField::new("a", FieldType::Int32, false)]);
1737 let s2 = TableSchema::new(vec![CatalogField::new("b", FieldType::Int32, false)]);
1738 assert_ne!(s1, s2);
1739 }
1740
1741 #[test]
1746 fn table_metadata_clone() {
1747 let meta = TableMetadata::new("t", make_schema())
1748 .with_stats(ColumnStatistics::new().with_row_count(100));
1749 let cloned = meta.clone();
1750 assert_eq!(cloned.name(), "t");
1751 assert_eq!(cloned.statistics().unwrap().row_count, Some(100));
1752 }
1753
1754 #[test]
1759 fn register_table_with_batches_stores_data() {
1760 let mut catalog = InMemoryCatalog::new();
1761 let schema = TableSchema::new(vec![CatalogField::new("id", FieldType::Int64, false)]);
1762 let arrow_schema = std::sync::Arc::new(arrow::datatypes::Schema::new(vec![
1763 arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int64, false),
1764 ]));
1765 let batch = arrow::record_batch::RecordBatch::try_new(
1766 arrow_schema,
1767 vec![std::sync::Arc::new(arrow::array::Int64Array::from(vec![
1768 1, 2, 3,
1769 ]))],
1770 )
1771 .unwrap();
1772 catalog
1773 .register_table_with_batches(TableMetadata::new("data", schema), vec![batch])
1774 .unwrap();
1775 assert!(catalog.table_batches("data").is_some());
1776 assert_eq!(catalog.table_batches("data").unwrap().len(), 1);
1777 assert_eq!(catalog.table_batches("data").unwrap()[0].num_rows(), 3);
1778 }
1779
1780 #[test]
1781 fn register_table_with_empty_batches_no_data() {
1782 let mut catalog = InMemoryCatalog::new();
1783 let schema = TableSchema::new(vec![CatalogField::new("id", FieldType::Int32, false)]);
1784 catalog
1785 .register_table_with_batches(TableMetadata::new("empty", schema), vec![])
1786 .unwrap();
1787 assert!(catalog.table_batches("empty").is_none());
1788 }
1789
1790 #[test]
1791 fn table_batches_nonexistent_table() {
1792 let catalog = InMemoryCatalog::new();
1793 assert!(catalog.table_batches("nope").is_none());
1794 }
1795
1796 #[test]
1801 fn in_memory_catalog_default() {
1802 let catalog = InMemoryCatalog::default();
1803 assert!(catalog.list_tables().is_empty());
1804 }
1805
1806 #[test]
1811 fn in_memory_schema_registry_default() {
1812 let registry = InMemorySchemaRegistry::default();
1813 assert!(registry.schema_names().is_empty());
1814 }
1815
1816 #[tokio::test]
1821 async fn datafusion_bridge_empty_table_query() {
1822 use datafusion::prelude::SessionContext;
1823
1824 let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
1825 {
1826 let mut cat = catalog.write().unwrap();
1827 let schema = TableSchema::new(vec![CatalogField::new("id", FieldType::Int64, false)]);
1828 cat.register_table(TableMetadata::new("empty_table", schema))
1829 .unwrap();
1830 }
1831 let ctx = SessionContext::new();
1832 ctx.register_catalog(
1833 "krishiv",
1834 std::sync::Arc::new(super::datafusion_bridge::DataFusionCatalogBridge::new(
1835 catalog,
1836 )),
1837 );
1838 let df = ctx
1839 .sql("SELECT * FROM krishiv.public.empty_table")
1840 .await
1841 .unwrap();
1842 let batches = df.collect().await.unwrap();
1843 let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1844 assert_eq!(rows, 0);
1845 }
1846
1847 #[tokio::test]
1848 async fn datafusion_bridge_sql_filter() {
1849 use std::sync::Arc;
1850
1851 use arrow::array::Int64Array;
1852 use arrow::datatypes::{DataType, Field, Schema};
1853 use arrow::record_batch::RecordBatch;
1854 use datafusion::prelude::SessionContext;
1855
1856 let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
1857 let schema = TableSchema::new(vec![
1858 CatalogField::new("id", FieldType::Int64, false),
1859 CatalogField::new("val", FieldType::Int64, false),
1860 ]);
1861 let arrow_schema = Arc::new(Schema::new(vec![
1862 Field::new("id", DataType::Int64, false),
1863 Field::new("val", DataType::Int64, false),
1864 ]));
1865 let batch = RecordBatch::try_new(
1866 arrow_schema,
1867 vec![
1868 Arc::new(Int64Array::from(vec![1, 2, 3])),
1869 Arc::new(Int64Array::from(vec![10, 20, 30])),
1870 ],
1871 )
1872 .unwrap();
1873 catalog
1874 .write()
1875 .unwrap()
1876 .register_table_with_batches(TableMetadata::new("nums", schema), vec![batch])
1877 .unwrap();
1878
1879 let ctx = SessionContext::new();
1880 ctx.register_catalog(
1881 "krishiv",
1882 Arc::new(super::datafusion_bridge::DataFusionCatalogBridge::new(
1883 catalog,
1884 )),
1885 );
1886 let df = ctx
1887 .sql("SELECT id FROM krishiv.public.nums WHERE val > 15")
1888 .await
1889 .unwrap();
1890 let batches = df.collect().await.unwrap();
1891 let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1892 assert_eq!(rows, 2);
1893 }
1894
1895 #[tokio::test]
1896 async fn datafusion_bridge_sql_count_aggregate() {
1897 use std::sync::Arc;
1898
1899 use arrow::datatypes::{DataType, Field, Schema};
1900 use arrow::record_batch::RecordBatch;
1901 use datafusion::prelude::SessionContext;
1902
1903 let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
1904 let schema = TableSchema::new(vec![CatalogField::new("x", FieldType::Int32, false)]);
1905 let arrow_schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
1906 let batch = RecordBatch::try_new(
1907 arrow_schema,
1908 vec![Arc::new(arrow::array::Int32Array::from(vec![
1909 1, 2, 3, 4, 5,
1910 ]))],
1911 )
1912 .unwrap();
1913 catalog
1914 .write()
1915 .unwrap()
1916 .register_table_with_batches(TableMetadata::new("agg", schema), vec![batch])
1917 .unwrap();
1918
1919 let ctx = SessionContext::new();
1920 ctx.register_catalog(
1921 "krishiv",
1922 Arc::new(super::datafusion_bridge::DataFusionCatalogBridge::new(
1923 catalog,
1924 )),
1925 );
1926 let df = ctx
1927 .sql("SELECT COUNT(*) AS cnt FROM krishiv.public.agg")
1928 .await
1929 .unwrap();
1930 let batches = df.collect().await.unwrap();
1931 assert_eq!(batches.len(), 1);
1932 assert_eq!(batches[0].num_rows(), 1);
1933 }
1934
1935 #[tokio::test]
1940 async fn datafusion_bridge_multiple_tables() {
1941 use std::sync::Arc;
1942
1943 use arrow::array::Int64Array;
1944 use arrow::datatypes::{DataType, Field, Schema};
1945 use arrow::record_batch::RecordBatch;
1946 use datafusion::prelude::SessionContext;
1947
1948 let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
1949 let schema = TableSchema::new(vec![CatalogField::new("id", FieldType::Int64, false)]);
1950 let arrow_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1951 let batch =
1952 RecordBatch::try_new(arrow_schema, vec![Arc::new(Int64Array::from(vec![1]))]).unwrap();
1953 {
1954 let mut cat = catalog.write().unwrap();
1955 cat.register_table_with_batches(
1956 TableMetadata::new("t1", schema.clone()),
1957 vec![batch.clone()],
1958 )
1959 .unwrap();
1960 cat.register_table_with_batches(TableMetadata::new("t2", schema), vec![batch])
1961 .unwrap();
1962 }
1963
1964 let ctx = SessionContext::new();
1965 ctx.register_catalog(
1966 "krishiv",
1967 Arc::new(super::datafusion_bridge::DataFusionCatalogBridge::new(
1968 catalog,
1969 )),
1970 );
1971 let df1 = ctx.sql("SELECT * FROM krishiv.public.t1").await.unwrap();
1972 let batches1 = df1.collect().await.unwrap();
1973 assert_eq!(batches1.len(), 1);
1974 assert_eq!(batches1[0].num_rows(), 1);
1975
1976 let df2 = ctx.sql("SELECT * FROM krishiv.public.t2").await.unwrap();
1977 let batches2 = df2.collect().await.unwrap();
1978 assert_eq!(batches2.len(), 1);
1979 assert_eq!(batches2[0].num_rows(), 1);
1980 }
1981
1982 #[test]
1987 fn datafusion_bridge_custom_schema_name_returns_none() {
1988 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
1989
1990 let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
1991 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
1992 assert!(bridge.schema("custom").is_none());
1993 assert!(bridge.schema("public").is_some());
1994 }
1995
1996 #[test]
2001 fn datafusion_bridge_as_any() {
2002 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2003
2004 let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2005 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2006 assert!(
2007 bridge
2008 .as_any()
2009 .downcast_ref::<super::datafusion_bridge::DataFusionCatalogBridge>()
2010 .is_some()
2011 );
2012 }
2013
2014 #[test]
2019 fn datafusion_bridge_only_public_schema() {
2020 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2021
2022 let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2023 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2024 let names = bridge.schema_names();
2025 assert_eq!(names.len(), 1);
2026 assert_eq!(names[0], "public");
2027 }
2028
2029 #[test]
2034 fn field_type_display_empty_struct() {
2035 let s = FieldType::Struct(vec![]);
2036 assert_eq!(s.to_string(), "Struct(0 fields)");
2037 }
2038
2039 #[test]
2040 fn field_type_display_nested_list() {
2041 let inner = FieldType::List(Box::new(FieldType::Int32));
2042 let outer = FieldType::List(Box::new(inner));
2043 assert_eq!(outer.to_string(), "List<List<Int32>>");
2044 }
2045
2046 #[test]
2051 fn column_statistics_eq_all_none() {
2052 let a = ColumnStatistics::new();
2053 let b = ColumnStatistics::new();
2054 assert_eq!(a, b);
2055 }
2056
2057 #[test]
2058 fn column_statistics_ne_different_min() {
2059 let a = ColumnStatistics::new().with_min("aaa");
2060 let b = ColumnStatistics::new().with_min("zzz");
2061 assert_ne!(a, b);
2062 }
2063
2064 #[test]
2069 fn table_schema_get_field_last() {
2070 let schema = TableSchema::new(vec![
2071 CatalogField::new("a", FieldType::Int32, false),
2072 CatalogField::new("b", FieldType::Utf8, true),
2073 CatalogField::new("c", FieldType::Float64, false),
2074 ]);
2075 let field = schema.get_field("c").unwrap();
2076 assert_eq!(field.field_type(), &FieldType::Float64);
2077 }
2078
2079 #[test]
2080 fn table_schema_get_field_middle() {
2081 let schema = TableSchema::new(vec![
2082 CatalogField::new("a", FieldType::Int32, false),
2083 CatalogField::new("b", FieldType::Utf8, true),
2084 CatalogField::new("c", FieldType::Float64, false),
2085 ]);
2086 let field = schema.get_field("b").unwrap();
2087 assert_eq!(field.field_type(), &FieldType::Utf8);
2088 assert!(field.nullable());
2089 }
2090
2091 #[test]
2096 fn all_catalog_errors_are_std_error() {
2097 let errors: Vec<CatalogError> = vec![
2098 CatalogError::TableNotFound { name: "a".into() },
2099 CatalogError::TableAlreadyExists { name: "b".into() },
2100 CatalogError::SchemaNotFound { name: "c".into() },
2101 CatalogError::InvalidSchema {
2102 message: "d".into(),
2103 },
2104 CatalogError::InvalidConfiguration {
2105 message: "bad URL".into(),
2106 },
2107 CatalogError::Transport {
2108 operation: "load table".into(),
2109 message: "timed out".into(),
2110 },
2111 CatalogError::Http {
2112 status: 500,
2113 message: "e".into(),
2114 },
2115 CatalogError::InvalidResponse {
2116 operation: "list tables".into(),
2117 message: "missing identifiers".into(),
2118 },
2119 CatalogError::ResponseTooLarge {
2120 operation: "load table".into(),
2121 limit_bytes: 1024,
2122 },
2123 CatalogError::UnsupportedOperation {
2124 operation: "committing a table".into(),
2125 },
2126 ];
2127 for err in errors {
2128 let e: &dyn std::error::Error = &err;
2129 let _ = e.to_string();
2130 assert!(e.source().is_none());
2131 }
2132 }
2133
2134 #[test]
2139 fn in_memory_catalog_many_tables() {
2140 let mut catalog = InMemoryCatalog::new();
2141 for i in 0..100 {
2142 catalog
2143 .register_table(TableMetadata::new(format!("table_{i:03}"), make_schema()))
2144 .unwrap();
2145 }
2146 assert_eq!(catalog.list_tables().len(), 100);
2147 assert!(catalog.get_table("table_000").is_ok());
2148 assert!(catalog.get_table("table_099").is_ok());
2149 assert!(catalog.get_table("table_100").is_err());
2150 }
2151
2152 #[tokio::test]
2157 async fn datafusion_bridge_table_unknown() {
2158 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2159 use std::sync::Arc;
2160
2161 let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2162 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2163 let schema_provider = bridge.schema("public").unwrap();
2164 let result = schema_provider.table("nonexistent").await.unwrap();
2165 assert!(result.is_none());
2166 }
2167
2168 #[tokio::test]
2173 async fn datafusion_bridge_table_known() {
2174 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2175 use std::sync::Arc;
2176
2177 let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2178 {
2179 let mut cat = catalog.write().unwrap();
2180 cat.register_table(TableMetadata::new("mytable", make_schema()))
2181 .unwrap();
2182 }
2183 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2184 let schema_provider = bridge.schema("public").unwrap();
2185 let result = schema_provider.table("mytable").await.unwrap();
2186 assert!(result.is_some());
2187 }
2188
2189 #[tokio::test]
2194 async fn datafusion_bridge_table_names() {
2195 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2196 use std::sync::Arc;
2197
2198 let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2199 {
2200 let mut cat = catalog.write().unwrap();
2201 cat.register_table(TableMetadata::new("alpha", make_schema()))
2202 .unwrap();
2203 cat.register_table(TableMetadata::new("beta", make_schema()))
2204 .unwrap();
2205 }
2206 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2207 let schema_provider = bridge.schema("public").unwrap();
2208 let mut names = schema_provider.table_names();
2209 names.sort();
2210 assert_eq!(names, vec!["alpha", "beta"]);
2211 }
2212
2213 #[tokio::test]
2218 async fn datafusion_bridge_empty_table_names() {
2219 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2220 use std::sync::Arc;
2221
2222 let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2223 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2224 let schema_provider = bridge.schema("public").unwrap();
2225 let names = schema_provider.table_names();
2226 assert!(names.is_empty());
2227 }
2228
2229 #[test]
2234 fn datafusion_bridge_table_exist_multiple() {
2235 use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2236
2237 let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2238 {
2239 let mut cat = catalog.write().unwrap();
2240 cat.register_table(TableMetadata::new("a", make_schema()))
2241 .unwrap();
2242 cat.register_table(TableMetadata::new("b", make_schema()))
2243 .unwrap();
2244 }
2245 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2246 let sp = bridge.schema("public").unwrap();
2247 assert!(sp.table_exist("a"));
2248 assert!(sp.table_exist("b"));
2249 assert!(!sp.table_exist("c"));
2250 }
2251
2252 #[test]
2257 fn datafusion_bridge_debug() {
2258 let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2259 let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2260 let dbg = format!("{bridge:?}");
2261 assert!(dbg.contains("DataFusionCatalogBridge"));
2262 }
2263
2264 #[test]
2269 fn register_table_with_batches_duplicate_errors() {
2270 let mut catalog = InMemoryCatalog::new();
2271 let schema = TableSchema::new(vec![CatalogField::new("id", FieldType::Int32, false)]);
2272 let arrow_schema = std::sync::Arc::new(arrow::datatypes::Schema::new(vec![
2273 arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int32, false),
2274 ]));
2275 let batch = arrow::record_batch::RecordBatch::try_new(
2276 arrow_schema,
2277 vec![std::sync::Arc::new(arrow::array::Int32Array::from(vec![1]))],
2278 )
2279 .unwrap();
2280 catalog
2281 .register_table_with_batches(TableMetadata::new("t", schema.clone()), vec![batch])
2282 .unwrap();
2283 let err = catalog
2284 .register_table_with_batches(TableMetadata::new("t", schema), vec![])
2285 .unwrap_err();
2286 assert!(matches!(err, CatalogError::TableAlreadyExists { .. }));
2287 }
2288
2289 #[tokio::test]
2294 async fn datafusion_bridge_sql_multiple_columns() {
2295 use std::sync::Arc;
2296
2297 use arrow::array::{Int32Array, StringArray};
2298 use arrow::datatypes::{DataType, Field, Schema};
2299 use arrow::record_batch::RecordBatch;
2300 use datafusion::prelude::SessionContext;
2301
2302 let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2303 let schema = TableSchema::new(vec![
2304 CatalogField::new("id", FieldType::Int32, false),
2305 CatalogField::new("name", FieldType::Utf8, true),
2306 ]);
2307 let arrow_schema = Arc::new(Schema::new(vec![
2308 Field::new("id", DataType::Int32, false),
2309 Field::new("name", DataType::Utf8, true),
2310 ]));
2311 let batch = RecordBatch::try_new(
2312 arrow_schema,
2313 vec![
2314 Arc::new(Int32Array::from(vec![1, 2, 3])),
2315 Arc::new(StringArray::from(vec!["a", "b", "c"])),
2316 ],
2317 )
2318 .unwrap();
2319 catalog
2320 .write()
2321 .unwrap()
2322 .register_table_with_batches(TableMetadata::new("mixed", schema), vec![batch])
2323 .unwrap();
2324
2325 let ctx = SessionContext::new();
2326 ctx.register_catalog(
2327 "krishiv",
2328 Arc::new(super::datafusion_bridge::DataFusionCatalogBridge::new(
2329 catalog,
2330 )),
2331 );
2332 let df = ctx
2333 .sql("SELECT name FROM krishiv.public.mixed WHERE id > 1")
2334 .await
2335 .unwrap();
2336 let batches = df.collect().await.unwrap();
2337 let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2338 assert_eq!(rows, 2);
2339 }
2340
2341 #[test]
2346 fn catalog_error_display_all_variants() {
2347 let cases: Vec<(CatalogError, &str)> = vec![
2348 (
2349 CatalogError::TableNotFound {
2350 name: "x".to_string(),
2351 },
2352 "table not found: 'x'",
2353 ),
2354 (
2355 CatalogError::TableAlreadyExists {
2356 name: "y".to_string(),
2357 },
2358 "table already exists: 'y'",
2359 ),
2360 (
2361 CatalogError::SchemaNotFound {
2362 name: "z".to_string(),
2363 },
2364 "schema not found: 'z'",
2365 ),
2366 (
2367 CatalogError::InvalidSchema {
2368 message: "bad".to_string(),
2369 },
2370 "invalid schema: bad",
2371 ),
2372 (
2373 CatalogError::Http {
2374 status: 403,
2375 message: "forbidden".to_string(),
2376 },
2377 "HTTP error 403: forbidden",
2378 ),
2379 (
2380 CatalogError::InvalidConfiguration {
2381 message: "bad URL".to_string(),
2382 },
2383 "invalid catalog configuration: bad URL",
2384 ),
2385 (
2386 CatalogError::Transport {
2387 operation: "list tables".to_string(),
2388 message: "timed out".to_string(),
2389 },
2390 "catalog transport error during list tables: timed out",
2391 ),
2392 (
2393 CatalogError::InvalidResponse {
2394 operation: "load table".to_string(),
2395 message: "missing metadata".to_string(),
2396 },
2397 "invalid catalog response during load table: missing metadata",
2398 ),
2399 (
2400 CatalogError::ResponseTooLarge {
2401 operation: "load table".to_string(),
2402 limit_bytes: 4096,
2403 },
2404 "catalog response during load table exceeded 4096 bytes",
2405 ),
2406 (
2407 CatalogError::UnsupportedOperation {
2408 operation: "committing a table".to_string(),
2409 },
2410 "catalog server does not support committing a table",
2411 ),
2412 ];
2413 for (err, expected) in cases {
2414 assert_eq!(err.to_string(), expected);
2415 }
2416 }
2417}