Skip to main content

krishiv_sql/catalog/
mod.rs

1//! Catalog abstractions for Krishiv.
2//!
3//! This crate defines `TableProvider`, `CatalogProvider`, schema types, and
4//! column statistics. An in-memory reference implementation is included.
5
6pub mod iceberg_rest;
7pub use iceberg_rest::{
8    GenericRestCatalog, IcebergCatalogClient, IcebergTableId, LoadedIcebergTable, RestCatalogConfig,
9};
10
11// ── Unified Iceberg catalog backends (Phase J1-J4) ──────────────────────────
12// Each module is gated behind its own feature so a build that does not need a
13// given backend never pulls its dependency tree (sqlx, iceberg-catalog-rest…).
14#[cfg(feature = "glue-catalog")]
15pub mod glue_catalog;
16#[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
17pub mod iceberg_catalog_bridge;
18#[cfg(feature = "iceberg-datafusion")]
19pub mod iceberg_table_provider;
20#[cfg(feature = "local-catalog")]
21pub mod local_catalog;
22#[cfg(feature = "postgres-catalog")]
23pub mod postgres_catalog;
24#[cfg(feature = "rest-catalog")]
25pub mod rest_catalog_wrapper;
26#[cfg(feature = "local-catalog")]
27pub mod unified;
28#[cfg(feature = "unity-catalog")]
29pub mod unity_catalog;
30
31#[cfg(feature = "glue-catalog")]
32pub use glue_catalog::GlueCatalog;
33#[cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
34pub use iceberg_catalog_bridge::IcebergCatalogBridge;
35#[cfg(feature = "local-catalog")]
36pub use local_catalog::LocalCatalog;
37#[cfg(feature = "postgres-catalog")]
38pub use postgres_catalog::PostgresCatalog;
39#[cfg(feature = "rest-catalog")]
40pub use rest_catalog_wrapper::KrishivRestCatalog;
41#[cfg(feature = "local-catalog")]
42pub use unified::KrishivCatalog;
43#[cfg(feature = "unity-catalog")]
44pub use unity_catalog::UnityCatalog;
45
46use std::collections::BTreeMap;
47use std::fmt;
48
49// ---------------------------------------------------------------------------
50// Error and Result
51// ---------------------------------------------------------------------------
52
53/// Errors produced by catalog operations.
54#[derive(Debug, thiserror::Error)]
55pub enum CatalogError {
56    /// A requested table was not found in the catalog.
57    #[error("table not found: '{name}'")]
58    TableNotFound { name: String },
59    /// Table already exists and `if_not_exists` was false.
60    #[error("table already exists: '{name}'")]
61    TableAlreadyExists { name: String },
62    /// A requested schema was not found.
63    #[error("schema not found: '{name}'")]
64    SchemaNotFound { name: String },
65    /// The provided schema is structurally invalid.
66    #[error("invalid schema: {message}")]
67    InvalidSchema { message: String },
68    /// Remote catalog configuration is malformed or unsafe.
69    #[error("invalid catalog configuration: {message}")]
70    InvalidConfiguration { message: String },
71    /// A remote catalog request could not be completed.
72    #[error("catalog transport error during {operation}: {message}")]
73    Transport { operation: String, message: String },
74    /// An HTTP request to a remote catalog service failed.
75    #[error("HTTP error {status}: {message}")]
76    Http { status: u16, message: String },
77    /// A successful remote response did not satisfy the catalog contract.
78    #[error("invalid catalog response during {operation}: {message}")]
79    InvalidResponse { operation: String, message: String },
80    /// A remote response exceeded the configured memory ceiling.
81    #[error("catalog response during {operation} exceeded {limit_bytes} bytes")]
82    ResponseTooLarge {
83        operation: String,
84        limit_bytes: usize,
85    },
86    /// The server explicitly does not advertise a required endpoint.
87    #[error("catalog server does not support {operation}")]
88    UnsupportedOperation { operation: String },
89    /// A filesystem / object-store I/O error.
90    #[error("I/O error: {0}")]
91    Io(String),
92    /// An error surfaced by the underlying Iceberg library.
93    #[error("Iceberg error: {0}")]
94    Iceberg(String),
95    /// An optimistic-concurrency commit lost a race with another writer.
96    #[error("concurrency conflict: {message}")]
97    ConcurrencyConflict { message: String },
98    /// A requested namespace was not found.
99    #[error("namespace not found: '{name}'")]
100    NamespaceNotFound { name: String },
101}
102
103/// Convenience result alias for catalog operations.
104pub type CatalogResult<T> = Result<T, CatalogError>;
105
106/// Alias used by the unified-catalog backends (J1-J4).
107///
108/// The file-system, Postgres, and REST catalog implementations all report
109/// failures through [`CatalogError`]; this alias gives those modules the
110/// `LakehouseError` name used throughout the lakehouse subsystem without
111/// introducing a second error type at the crate boundary.
112pub type LakehouseError = CatalogError;
113
114// ---------------------------------------------------------------------------
115// FieldType
116// ---------------------------------------------------------------------------
117
118/// Logical field types supported by the Krishiv catalog.
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub enum FieldType {
121    Int8,
122    Int16,
123    Int32,
124    Int64,
125    UInt8,
126    UInt16,
127    UInt32,
128    UInt64,
129    Float32,
130    Float64,
131    Boolean,
132    Utf8,
133    Binary,
134    Timestamp,
135    Date32,
136    /// List of `item_type` elements.
137    List(Box<FieldType>),
138    /// Struct with named fields.
139    Struct(Vec<CatalogField>),
140}
141
142impl fmt::Display for FieldType {
143    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144        let s = match self {
145            FieldType::Int8 => "Int8",
146            FieldType::Int16 => "Int16",
147            FieldType::Int32 => "Int32",
148            FieldType::Int64 => "Int64",
149            FieldType::UInt8 => "UInt8",
150            FieldType::UInt16 => "UInt16",
151            FieldType::UInt32 => "UInt32",
152            FieldType::UInt64 => "UInt64",
153            FieldType::Float32 => "Float32",
154            FieldType::Float64 => "Float64",
155            FieldType::Boolean => "Boolean",
156            FieldType::Utf8 => "Utf8",
157            FieldType::Binary => "Binary",
158            FieldType::Timestamp => "Timestamp",
159            FieldType::Date32 => "Date32",
160            FieldType::List(inner) => return write!(f, "List<{inner}>"),
161            FieldType::Struct(fields) => {
162                return write!(f, "Struct({} fields)", fields.len());
163            }
164        };
165        f.write_str(s)
166    }
167}
168
169impl FieldType {
170    /// Convert this field type to the equivalent Arrow [`DataType`].
171    ///
172    /// [`DataType`]: arrow::datatypes::DataType
173    pub fn to_arrow(&self) -> arrow::datatypes::DataType {
174        use arrow::datatypes::DataType;
175        use arrow::datatypes::TimeUnit;
176        match self {
177            FieldType::Int8 => DataType::Int8,
178            FieldType::Int16 => DataType::Int16,
179            FieldType::Int32 => DataType::Int32,
180            FieldType::Int64 => DataType::Int64,
181            FieldType::UInt8 => DataType::UInt8,
182            FieldType::UInt16 => DataType::UInt16,
183            FieldType::UInt32 => DataType::UInt32,
184            FieldType::UInt64 => DataType::UInt64,
185            FieldType::Float32 => DataType::Float32,
186            FieldType::Float64 => DataType::Float64,
187            FieldType::Boolean => DataType::Boolean,
188            FieldType::Utf8 => DataType::Utf8,
189            FieldType::Binary => DataType::Binary,
190            FieldType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
191            FieldType::Date32 => DataType::Date32,
192            FieldType::List(item) => DataType::List(std::sync::Arc::new(
193                arrow::datatypes::Field::new("item", item.to_arrow(), true),
194            )),
195            FieldType::Struct(fields) => {
196                let arrow_fields: arrow::datatypes::Fields = fields
197                    .iter()
198                    .map(|f| {
199                        std::sync::Arc::new(arrow::datatypes::Field::new(
200                            f.name(),
201                            f.field_type().to_arrow(),
202                            f.nullable(),
203                        ))
204                    })
205                    .collect();
206                DataType::Struct(arrow_fields)
207            }
208        }
209    }
210}
211
212// ---------------------------------------------------------------------------
213// CatalogField
214// ---------------------------------------------------------------------------
215
216/// A single field in a catalog table schema.
217#[derive(Debug, Clone, PartialEq, Eq)]
218pub struct CatalogField {
219    name: String,
220    field_type: FieldType,
221    nullable: bool,
222}
223
224impl CatalogField {
225    /// Create a new catalog field.
226    pub fn new(name: impl Into<String>, field_type: FieldType, nullable: bool) -> Self {
227        Self {
228            name: name.into(),
229            field_type,
230            nullable,
231        }
232    }
233
234    /// The field name.
235    pub fn name(&self) -> &str {
236        &self.name
237    }
238
239    /// The logical field type.
240    pub fn field_type(&self) -> &FieldType {
241        &self.field_type
242    }
243
244    /// Whether the field is nullable.
245    pub fn nullable(&self) -> bool {
246        self.nullable
247    }
248
249    /// Convert this field to an Arrow [`Field`].
250    ///
251    /// [`Field`]: arrow::datatypes::Field
252    pub fn to_arrow_field(&self) -> arrow::datatypes::Field {
253        arrow::datatypes::Field::new(
254            self.name.as_str(),
255            self.field_type.to_arrow(),
256            self.nullable,
257        )
258    }
259}
260
261// ---------------------------------------------------------------------------
262// TableSchema
263// ---------------------------------------------------------------------------
264
265/// The schema of a catalog table: an ordered list of fields.
266#[derive(Debug, Clone, PartialEq, Eq)]
267pub struct TableSchema {
268    fields: Vec<CatalogField>,
269}
270
271impl TableSchema {
272    /// Create a new schema from a list of fields.
273    pub fn new(fields: Vec<CatalogField>) -> Self {
274        Self { fields }
275    }
276
277    /// Create an empty schema with no fields.
278    pub fn empty() -> Self {
279        Self { fields: Vec::new() }
280    }
281
282    /// Convert to an Arrow [`Schema`].
283    ///
284    /// [`Schema`]: arrow::datatypes::Schema
285    pub fn to_arrow_schema(&self) -> arrow::datatypes::Schema {
286        let arrow_fields: Vec<arrow::datatypes::Field> = self
287            .fields
288            .iter()
289            .map(CatalogField::to_arrow_field)
290            .collect();
291        arrow::datatypes::Schema::new(arrow_fields)
292    }
293
294    /// Return the number of fields in this schema.
295    pub fn field_count(&self) -> usize {
296        self.fields.len()
297    }
298
299    /// Look up a field by name.
300    pub fn get_field(&self, name: &str) -> Option<&CatalogField> {
301        self.fields.iter().find(|f| f.name() == name)
302    }
303}
304
305// ---------------------------------------------------------------------------
306// ColumnStatistics
307// ---------------------------------------------------------------------------
308
309/// Optional statistics for a table column (or the whole table).
310#[derive(Debug, Clone, PartialEq, Eq, Default)]
311pub struct ColumnStatistics {
312    /// Total number of rows in the table, if known.
313    pub row_count: Option<u64>,
314    /// Number of null values in the column, if known.
315    pub null_count: Option<u64>,
316    /// String representation of the minimum value, if known.
317    pub min_value: Option<String>,
318    /// String representation of the maximum value, if known.
319    pub max_value: Option<String>,
320    /// Number of distinct values (NDV) for the column, if known.
321    ///
322    /// Drives join-order cost estimation and broadcast sizing: a column
323    /// with NDV ≈ 1 is a hot join key, while a column with NDV ≈ row
324    /// count is a near-unique key (build-side fan-out is high).
325    pub distinct_count: Option<u64>,
326    /// Wall-clock Unix timestamp (seconds) when these stats were
327    /// collected by `ANALYZE TABLE`. The CBO refuses to use stats
328    /// older than `stats_max_age_secs` to avoid optimising against
329    /// stale cardinality information.
330    pub collected_at_secs: Option<u64>,
331}
332
333impl ColumnStatistics {
334    /// Create a new `ColumnStatistics` with all fields set to `None`.
335    pub fn new() -> Self {
336        Self::default()
337    }
338
339    /// Set the row count.
340    #[must_use]
341    pub fn with_row_count(mut self, count: u64) -> Self {
342        self.row_count = Some(count);
343        self
344    }
345
346    /// Set the null count.
347    #[must_use]
348    pub fn with_null_count(mut self, count: u64) -> Self {
349        self.null_count = Some(count);
350        self
351    }
352
353    /// Set the minimum value.
354    #[must_use]
355    pub fn with_min(mut self, min: impl Into<String>) -> Self {
356        self.min_value = Some(min.into());
357        self
358    }
359
360    /// Set the maximum value.
361    #[must_use]
362    pub fn with_max(mut self, max: impl Into<String>) -> Self {
363        self.max_value = Some(max.into());
364        self
365    }
366
367    /// Set the NDV (number of distinct values) for the column.
368    #[must_use]
369    pub fn with_distinct_count(mut self, ndv: u64) -> Self {
370        self.distinct_count = Some(ndv);
371        self
372    }
373
374    /// Stamp the wall-clock timestamp (seconds) at which these stats
375    /// were collected. Stamped by [`analyze_table`][crate::analyze::analyze_table]
376    /// so the CBO can detect stale stats.
377    #[must_use]
378    pub fn with_collected_at_secs(mut self, secs: u64) -> Self {
379        self.collected_at_secs = Some(secs);
380        self
381    }
382
383    /// Selectivity of a single-point equality filter on this column.
384    ///
385    /// `1.0 / distinct_count` when both are known; otherwise `None`.
386    /// `0.0` when `distinct_count == 0` (the column is all-null or empty).
387    pub fn equality_selectivity(&self) -> Option<f64> {
388        let ndv = self.distinct_count?;
389        if ndv == 0 {
390            Some(0.0)
391        } else {
392            Some(1.0 / ndv as f64)
393        }
394    }
395
396    /// Whether the stats are "fresh" given a current timestamp and a
397    /// maximum age in seconds.
398    ///
399    /// Stats without a collection timestamp are considered fresh by
400    /// default (back-compat for manually-curated stats). Stats older
401    /// than `max_age_secs` are stale.
402    pub fn is_fresh(&self, now_secs: u64, max_age_secs: u64) -> bool {
403        match self.collected_at_secs {
404            None => true,
405            Some(ts) => now_secs.saturating_sub(ts) <= max_age_secs,
406        }
407    }
408}
409
410// ---------------------------------------------------------------------------
411// TableMetadata
412// ---------------------------------------------------------------------------
413
414/// Full metadata for a table: name, schema, and optional statistics.
415#[derive(Debug, Clone)]
416pub struct TableMetadata {
417    name: String,
418    schema: TableSchema,
419    stats: Option<ColumnStatistics>,
420}
421
422impl TableMetadata {
423    /// Create new table metadata with no statistics.
424    pub fn new(name: impl Into<String>, schema: TableSchema) -> Self {
425        Self {
426            name: name.into(),
427            schema,
428            stats: None,
429        }
430    }
431
432    /// Attach column statistics and return the updated metadata.
433    #[must_use]
434    pub fn with_stats(mut self, stats: ColumnStatistics) -> Self {
435        self.stats = Some(stats);
436        self
437    }
438
439    /// The table name.
440    pub fn name(&self) -> &str {
441        &self.name
442    }
443
444    /// The table schema.
445    pub fn schema(&self) -> &TableSchema {
446        &self.schema
447    }
448
449    /// Optional column statistics.
450    pub fn statistics(&self) -> Option<&ColumnStatistics> {
451        self.stats.as_ref()
452    }
453}
454
455// ---------------------------------------------------------------------------
456// TableProvider trait
457// ---------------------------------------------------------------------------
458
459/// A resolved reference to a single table's metadata.
460pub trait TableProvider {
461    /// The table name.
462    fn name(&self) -> &str;
463
464    /// The table schema.
465    fn schema(&self) -> &TableSchema;
466
467    /// Optional column statistics.
468    fn statistics(&self) -> Option<&ColumnStatistics>;
469}
470
471// ---------------------------------------------------------------------------
472// CatalogProvider trait
473// ---------------------------------------------------------------------------
474
475/// A registry of tables that can be listed, looked up, and registered.
476pub trait CatalogProvider {
477    /// Return the names of all tables in the catalog.
478    fn list_tables(&self) -> Vec<String>;
479
480    /// Look up a table by name.
481    fn get_table(&self, name: &str) -> CatalogResult<&dyn TableProvider>;
482
483    /// Register a table in the catalog.
484    ///
485    /// Returns an error if the schema is structurally invalid or if
486    /// implementation-specific constraints are violated.
487    fn register_table(&mut self, metadata: TableMetadata) -> CatalogResult<()>;
488}
489
490// ---------------------------------------------------------------------------
491// InMemoryCatalog
492// ---------------------------------------------------------------------------
493
494/// A `TableProvider` wrapper over `TableMetadata`.
495struct TableMetadataProvider {
496    metadata: TableMetadata,
497}
498
499impl TableProvider for TableMetadataProvider {
500    fn name(&self) -> &str {
501        self.metadata.name()
502    }
503
504    fn schema(&self) -> &TableSchema {
505        self.metadata.schema()
506    }
507
508    fn statistics(&self) -> Option<&ColumnStatistics> {
509        self.metadata.statistics()
510    }
511}
512
513/// An in-memory catalog backed by a sorted map.
514pub struct InMemoryCatalog {
515    tables: BTreeMap<String, TableMetadataProvider>,
516    /// Optional in-memory row data keyed by table name.
517    table_data: BTreeMap<String, std::sync::Arc<Vec<arrow::record_batch::RecordBatch>>>,
518}
519
520impl InMemoryCatalog {
521    /// Create a new, empty in-memory catalog.
522    pub fn new() -> Self {
523        Self {
524            tables: BTreeMap::new(),
525            table_data: BTreeMap::new(),
526        }
527    }
528
529    /// Register a table and attach in-memory Arrow batches for SQL scans (P0-9).
530    pub fn register_table_with_batches(
531        &mut self,
532        metadata: TableMetadata,
533        batches: Vec<arrow::record_batch::RecordBatch>,
534    ) -> CatalogResult<()> {
535        let name = metadata.name().to_owned();
536        self.register_table(metadata)?;
537        if !batches.is_empty() {
538            self.table_data.insert(name, std::sync::Arc::new(batches));
539        }
540        Ok(())
541    }
542
543    /// Return stored batches for a registered table, if any.
544    pub fn table_batches(
545        &self,
546        name: &str,
547    ) -> Option<std::sync::Arc<Vec<arrow::record_batch::RecordBatch>>> {
548        self.table_data.get(name).cloned()
549    }
550}
551
552impl Default for InMemoryCatalog {
553    fn default() -> Self {
554        Self::new()
555    }
556}
557
558impl CatalogProvider for InMemoryCatalog {
559    fn list_tables(&self) -> Vec<String> {
560        self.tables.keys().cloned().collect()
561    }
562
563    fn get_table(&self, name: &str) -> CatalogResult<&dyn TableProvider> {
564        self.tables
565            .get(name)
566            .map(|p| p as &dyn TableProvider)
567            .ok_or_else(|| CatalogError::TableNotFound {
568                name: name.to_string(),
569            })
570    }
571
572    fn register_table(&mut self, metadata: TableMetadata) -> CatalogResult<()> {
573        let name = metadata.name().to_string();
574        if self.tables.contains_key(&name) {
575            return Err(CatalogError::TableAlreadyExists { name });
576        }
577        self.tables.insert(name, TableMetadataProvider { metadata });
578        Ok(())
579    }
580}
581
582// ---------------------------------------------------------------------------
583// SchemaRegistry
584// ---------------------------------------------------------------------------
585
586/// A registry that maps logical schema names to [`TableSchema`] definitions.
587///
588/// Used by connectors (e.g., Kafka Avro/Protobuf topics) to resolve the
589/// Arrow schema for a data stream at runtime without hard-coding field lists.
590pub trait SchemaRegistry: Send + Sync {
591    /// Look up a schema by name.
592    fn get_schema(&self, name: &str) -> CatalogResult<TableSchema>;
593    /// Register a schema under a name, replacing any existing entry.
594    fn register_schema(&mut self, name: impl Into<String>, schema: TableSchema);
595    /// Return all registered schema names.
596    fn schema_names(&self) -> Vec<String>;
597}
598
599/// An in-memory [`SchemaRegistry`] backed by a sorted map.
600#[derive(Debug, Default)]
601pub struct InMemorySchemaRegistry {
602    schemas: BTreeMap<String, TableSchema>,
603}
604
605impl InMemorySchemaRegistry {
606    pub fn new() -> Self {
607        Self::default()
608    }
609}
610
611impl SchemaRegistry for InMemorySchemaRegistry {
612    fn get_schema(&self, name: &str) -> CatalogResult<TableSchema> {
613        self.schemas
614            .get(name)
615            .cloned()
616            .ok_or_else(|| CatalogError::SchemaNotFound {
617                name: name.to_string(),
618            })
619    }
620
621    fn register_schema(&mut self, name: impl Into<String>, schema: TableSchema) {
622        self.schemas.insert(name.into(), schema);
623    }
624
625    fn schema_names(&self) -> Vec<String> {
626        self.schemas.keys().cloned().collect()
627    }
628}
629
630// ---------------------------------------------------------------------------
631// DataFusion catalog bridge
632// ---------------------------------------------------------------------------
633
634/// DataFusion integration: wraps [`InMemoryCatalog`] as DataFusion catalog
635/// and schema providers so that Krishiv catalog tables can be used directly
636/// inside a DataFusion [`SessionContext`].
637///
638/// [`SessionContext`]: datafusion::prelude::SessionContext
639pub mod datafusion_bridge {
640    use std::any::Any;
641    use std::fmt;
642    use std::sync::{Arc, RwLock};
643
644    use datafusion::catalog::{CatalogProvider, SchemaProvider};
645    use datafusion::datasource::MemTable;
646    use datafusion::error::Result as DfResult;
647
648    /// Bridges a Krishiv [`InMemoryCatalog`] into a DataFusion
649    /// [`CatalogProvider`].
650    ///
651    /// The bridge exposes a single schema named `"public"` that mirrors
652    /// the tables registered in the underlying [`InMemoryCatalog`].
653    ///
654    /// [`InMemoryCatalog`]: super::InMemoryCatalog
655    pub struct DataFusionCatalogBridge {
656        catalog: Arc<RwLock<super::InMemoryCatalog>>,
657        schema_name: String,
658        /// MemTable cache shared across the inner `DataFusionSchemaBridge`
659        /// instances DataFusion requests. Avoids re-cloning the entire
660        /// `Vec<RecordBatch>` payload (which can be hundreds of MB) on
661        /// every DataFusion `table()` call. Keyed by table name; cleared
662        /// on `invalidate(name)`.
663        schema_cache: std::sync::Arc<dashmap::DashMap<String, Arc<MemTable>>>,
664    }
665
666    impl DataFusionCatalogBridge {
667        /// Create a bridge from an [`InMemoryCatalog`] shared reference.
668        ///
669        /// [`InMemoryCatalog`]: super::InMemoryCatalog
670        pub fn new(catalog: Arc<RwLock<super::InMemoryCatalog>>) -> Self {
671            Self {
672                catalog,
673                schema_name: "public".to_string(),
674                schema_cache: std::sync::Arc::new(dashmap::DashMap::new()),
675            }
676        }
677
678        /// Invalidate the MemTable cache for `name`, forcing the next
679        /// `table()` call to rebuild the cached `MemTable` from the
680        /// catalog's batch store. Call this after `register_table_with_batches`
681        /// mutates the underlying catalog so the bridge does not serve
682        /// a stale `MemTable` referencing the previous batch payload.
683        ///
684        /// The `DashMap` cache is per-bridge and survives across DataFusion
685        /// `table()` calls; without this invalidation hook a second
686        /// `register_table_with_batches` for the same name would not be
687        /// visible to the DataFusion query plan.
688        pub fn invalidate(&self, name: &str) {
689            self.schema_cache.remove(name);
690        }
691    }
692
693    impl fmt::Debug for DataFusionCatalogBridge {
694        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
695            f.debug_struct("DataFusionCatalogBridge")
696                .field("schema_name", &self.schema_name)
697                .finish()
698        }
699    }
700
701    impl CatalogProvider for DataFusionCatalogBridge {
702        fn as_any(&self) -> &dyn Any {
703            self
704        }
705
706        fn schema_names(&self) -> Vec<String> {
707            vec![self.schema_name.clone()]
708        }
709
710        fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
711            if name == self.schema_name {
712                Some(Arc::new(DataFusionSchemaBridge {
713                    catalog: self.catalog.clone(),
714                    cache: self.schema_cache.clone(),
715                }))
716            } else {
717                None
718            }
719        }
720    }
721
722    // -----------------------------------------------------------------------
723
724    struct DataFusionSchemaBridge {
725        catalog: Arc<RwLock<super::InMemoryCatalog>>,
726        cache: std::sync::Arc<dashmap::DashMap<String, Arc<MemTable>>>,
727    }
728
729    impl fmt::Debug for DataFusionSchemaBridge {
730        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
731            f.debug_struct("DataFusionSchemaBridge").finish()
732        }
733    }
734
735    #[async_trait::async_trait]
736    impl SchemaProvider for DataFusionSchemaBridge {
737        fn as_any(&self) -> &dyn Any {
738            self
739        }
740
741        fn table_names(&self) -> Vec<String> {
742            let catalog = self.catalog.read().unwrap_or_else(|p| p.into_inner());
743            use super::CatalogProvider as KrishivCatalogProvider;
744            catalog.list_tables()
745        }
746
747        async fn table(
748            &self,
749            name: &str,
750        ) -> DfResult<Option<Arc<dyn datafusion::datasource::TableProvider>>> {
751            // MemTable cache: if we built one for this table already, return
752            // the cached Arc. The cache is invalidated explicitly via
753            // `invalidate(name)` when the underlying catalog is mutated.
754            if let Some(cached) = self.cache.get(name) {
755                return Ok(Some(
756                    cached.clone() as Arc<dyn datafusion::datasource::TableProvider>
757                ));
758            }
759            let catalog = self.catalog.read().unwrap_or_else(|p| p.into_inner());
760            use super::CatalogProvider as KrishivCatalogProvider;
761            match catalog.get_table(name) {
762                Ok(table_provider) => {
763                    let arrow_schema = Arc::new(table_provider.schema().to_arrow_schema());
764                    let batches = catalog.table_batches(name);
765                    let partitions = batches.map(|b| (*b).clone()).unwrap_or_default();
766                    let mem = MemTable::try_new(arrow_schema, vec![partitions])?;
767                    let mem_arc = Arc::new(mem);
768                    self.cache.insert(name.to_string(), mem_arc.clone());
769                    Ok(Some(
770                        mem_arc as Arc<dyn datafusion::datasource::TableProvider>,
771                    ))
772                }
773                Err(super::CatalogError::TableNotFound { .. }) => Ok(None),
774                Err(error) => Err(datafusion::error::DataFusionError::External(Box::new(
775                    error,
776                ))),
777            }
778        }
779
780        fn table_exist(&self, name: &str) -> bool {
781            let catalog = self.catalog.read().unwrap_or_else(|p| p.into_inner());
782            use super::CatalogProvider as KrishivCatalogProvider;
783            catalog.get_table(name).is_ok()
784        }
785    }
786}
787
788// ---------------------------------------------------------------------------
789// Unit tests
790// ---------------------------------------------------------------------------
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795
796    fn make_schema() -> TableSchema {
797        TableSchema::new(vec![
798            CatalogField::new("id", FieldType::Int64, false),
799            CatalogField::new("name", FieldType::Utf8, true),
800        ])
801    }
802
803    #[test]
804    fn in_memory_catalog_registers_and_retrieves_table() {
805        let mut catalog = InMemoryCatalog::new();
806        let meta = TableMetadata::new("users", make_schema());
807        catalog.register_table(meta).unwrap();
808
809        let table = catalog.get_table("users").unwrap();
810        assert_eq!(table.name(), "users");
811        assert_eq!(table.schema().field_count(), 2);
812    }
813
814    #[test]
815    fn in_memory_catalog_lists_tables() {
816        let mut catalog = InMemoryCatalog::new();
817        catalog
818            .register_table(TableMetadata::new("alpha", make_schema()))
819            .unwrap();
820        catalog
821            .register_table(TableMetadata::new("beta", make_schema()))
822            .unwrap();
823
824        let mut tables = catalog.list_tables();
825        tables.sort();
826        assert_eq!(tables, vec!["alpha", "beta"]);
827    }
828
829    #[test]
830    fn in_memory_catalog_returns_error_for_unknown_table() {
831        let catalog = InMemoryCatalog::new();
832        let err = catalog.get_table("nonexistent").err().unwrap();
833        match err {
834            CatalogError::TableNotFound { name } => {
835                assert_eq!(name, "nonexistent");
836            }
837            other => panic!("unexpected error: {other}"),
838        }
839    }
840
841    #[test]
842    fn table_schema_converts_to_arrow_schema() {
843        let schema = make_schema();
844        let arrow_schema = schema.to_arrow_schema();
845
846        assert_eq!(arrow_schema.fields().len(), 2);
847        let id_field = arrow_schema.field_with_name("id").unwrap();
848        assert_eq!(id_field.data_type(), &arrow::datatypes::DataType::Int64);
849        assert!(!id_field.is_nullable());
850
851        let name_field = arrow_schema.field_with_name("name").unwrap();
852        assert_eq!(name_field.data_type(), &arrow::datatypes::DataType::Utf8);
853        assert!(name_field.is_nullable());
854    }
855
856    // -----------------------------------------------------------------------
857    // SchemaRegistry tests
858    // -----------------------------------------------------------------------
859
860    #[test]
861    fn schema_registry_registers_and_retrieves() {
862        let mut registry = InMemorySchemaRegistry::new();
863        registry.register_schema("events", make_schema());
864        let schema = registry.get_schema("events").unwrap();
865        assert_eq!(schema.field_count(), 2);
866    }
867
868    #[test]
869    fn schema_registry_returns_error_for_missing() {
870        let registry = InMemorySchemaRegistry::new();
871        let err = registry.get_schema("nonexistent").unwrap_err();
872        match err {
873            CatalogError::SchemaNotFound { name } => {
874                assert_eq!(name, "nonexistent");
875            }
876            other => panic!("unexpected error: {other}"),
877        }
878    }
879
880    #[test]
881    fn schema_registry_lists_names() {
882        let mut registry = InMemorySchemaRegistry::new();
883        registry.register_schema("orders", make_schema());
884        registry.register_schema("users", make_schema());
885        let mut names = registry.schema_names();
886        names.sort();
887        assert_eq!(names, vec!["orders", "users"]);
888    }
889
890    // -----------------------------------------------------------------------
891    // DataFusion bridge tests
892    // -----------------------------------------------------------------------
893
894    #[test]
895    fn datafusion_bridge_schema_names_returns_public() {
896        use datafusion::catalog::CatalogProvider as DfCatalogProvider;
897
898        let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
899        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
900        let names = bridge.schema_names();
901        assert_eq!(names, vec!["public"]);
902    }
903
904    #[test]
905    fn datafusion_bridge_table_exist() {
906        let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
907        {
908            let mut cat = catalog.write().unwrap();
909            cat.register_table(TableMetadata::new("orders", make_schema()))
910                .unwrap();
911        }
912        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
913        let schema_provider = {
914            use datafusion::catalog::CatalogProvider as DfCatalogProvider;
915            bridge.schema("public").unwrap()
916        };
917        assert!(schema_provider.table_exist("orders"));
918        assert!(!schema_provider.table_exist("nonexistent"));
919    }
920
921    #[tokio::test]
922    async fn datafusion_bridge_memtable_cache_reuses_arc() {
923        use std::sync::Arc;
924
925        let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
926        {
927            let mut cat = catalog.write().unwrap();
928            cat.register_table(TableMetadata::new("orders", make_schema()))
929                .unwrap();
930        }
931        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
932        let schema_provider = {
933            use datafusion::catalog::CatalogProvider as DfCatalogProvider;
934            bridge.schema("public").unwrap()
935        };
936        let first = schema_provider.table("orders").await.unwrap().unwrap();
937        let second = schema_provider.table("orders").await.unwrap().unwrap();
938        // Cached: identical Arc pointer, no re-clone of batch payload.
939        let cached = Arc::ptr_eq(&first, &second);
940        assert!(cached, "expected cached MemTable Arc, got fresh allocation");
941    }
942
943    #[tokio::test]
944    async fn datafusion_bridge_invalidate_forces_rebuild() {
945        use std::sync::Arc;
946
947        let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
948        {
949            let mut cat = catalog.write().unwrap();
950            cat.register_table(TableMetadata::new("orders", make_schema()))
951                .unwrap();
952        }
953        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
954        let schema_provider = {
955            use datafusion::catalog::CatalogProvider as DfCatalogProvider;
956            bridge.schema("public").unwrap()
957        };
958        let first = schema_provider.table("orders").await.unwrap().unwrap();
959        bridge.invalidate("orders");
960        let second = schema_provider.table("orders").await.unwrap().unwrap();
961        // After invalidation, a new MemTable Arc is constructed.
962        assert!(!Arc::ptr_eq(&first, &second));
963        // A second call without further invalidation must hit the cache again.
964        let third = schema_provider.table("orders").await.unwrap().unwrap();
965        assert!(Arc::ptr_eq(&second, &third));
966    }
967
968    #[tokio::test]
969    async fn catalog_scan_returns_registered_row_count() {
970        use std::sync::Arc;
971
972        use arrow::array::Int64Array;
973        use arrow::datatypes::{DataType, Field, Schema};
974        use arrow::record_batch::RecordBatch;
975        use datafusion::prelude::SessionContext;
976
977        let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
978        let schema = TableSchema::new(vec![CatalogField::new("id", FieldType::Int64, false)]);
979        let arrow_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
980        let values: Vec<Option<i64>> = (0..10).map(Some).collect();
981        let batch =
982            RecordBatch::try_new(arrow_schema, vec![Arc::new(Int64Array::from(values))]).unwrap();
983        catalog
984            .write()
985            .unwrap()
986            .register_table_with_batches(TableMetadata::new("t", schema), vec![batch])
987            .unwrap();
988
989        let ctx = SessionContext::new();
990        ctx.register_catalog(
991            "krishiv",
992            Arc::new(super::datafusion_bridge::DataFusionCatalogBridge::new(
993                catalog,
994            )),
995        );
996        let df = ctx.sql("SELECT * FROM krishiv.public.t").await.unwrap();
997        let batches = df.collect().await.unwrap();
998        let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
999        assert_eq!(rows, 10);
1000    }
1001
1002    #[test]
1003    fn datafusion_bridge_unknown_schema_returns_none() {
1004        use datafusion::catalog::CatalogProvider as DfCatalogProvider;
1005
1006        let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
1007        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
1008        let result = bridge.schema("nonexistent");
1009        assert!(result.is_none());
1010    }
1011
1012    // -----------------------------------------------------------------------
1013    // CatalogError Display tests
1014    // -----------------------------------------------------------------------
1015
1016    #[test]
1017    fn catalog_error_display_table_not_found() {
1018        let err = CatalogError::TableNotFound {
1019            name: "orders".to_string(),
1020        };
1021        assert_eq!(err.to_string(), "table not found: 'orders'");
1022    }
1023
1024    #[test]
1025    fn catalog_error_display_table_already_exists() {
1026        let err = CatalogError::TableAlreadyExists {
1027            name: "users".to_string(),
1028        };
1029        assert_eq!(err.to_string(), "table already exists: 'users'");
1030    }
1031
1032    #[test]
1033    fn catalog_error_display_schema_not_found() {
1034        let err = CatalogError::SchemaNotFound {
1035            name: "events".to_string(),
1036        };
1037        assert_eq!(err.to_string(), "schema not found: 'events'");
1038    }
1039
1040    #[test]
1041    fn catalog_error_display_invalid_schema() {
1042        let err = CatalogError::InvalidSchema {
1043            message: "missing required field 'id'".to_string(),
1044        };
1045        assert_eq!(
1046            err.to_string(),
1047            "invalid schema: missing required field 'id'"
1048        );
1049    }
1050
1051    #[test]
1052    fn catalog_error_display_http() {
1053        let err = CatalogError::Http {
1054            status: 404,
1055            message: "not found".to_string(),
1056        };
1057        assert_eq!(err.to_string(), "HTTP error 404: not found");
1058    }
1059
1060    #[test]
1061    fn catalog_error_is_std_error() {
1062        let err = CatalogError::TableNotFound {
1063            name: "t".to_string(),
1064        };
1065        let e: &dyn std::error::Error = &err;
1066        assert!(e.source().is_none());
1067    }
1068
1069    // -----------------------------------------------------------------------
1070    // FieldType to_arrow tests
1071    // -----------------------------------------------------------------------
1072
1073    #[test]
1074    fn field_type_to_arrow_int8() {
1075        assert_eq!(FieldType::Int8.to_arrow(), arrow::datatypes::DataType::Int8);
1076    }
1077
1078    #[test]
1079    fn field_type_to_arrow_int16() {
1080        assert_eq!(
1081            FieldType::Int16.to_arrow(),
1082            arrow::datatypes::DataType::Int16
1083        );
1084    }
1085
1086    #[test]
1087    fn field_type_to_arrow_int32() {
1088        assert_eq!(
1089            FieldType::Int32.to_arrow(),
1090            arrow::datatypes::DataType::Int32
1091        );
1092    }
1093
1094    #[test]
1095    fn field_type_to_arrow_int64() {
1096        assert_eq!(
1097            FieldType::Int64.to_arrow(),
1098            arrow::datatypes::DataType::Int64
1099        );
1100    }
1101
1102    #[test]
1103    fn field_type_to_arrow_uint8() {
1104        assert_eq!(
1105            FieldType::UInt8.to_arrow(),
1106            arrow::datatypes::DataType::UInt8
1107        );
1108    }
1109
1110    #[test]
1111    fn field_type_to_arrow_uint16() {
1112        assert_eq!(
1113            FieldType::UInt16.to_arrow(),
1114            arrow::datatypes::DataType::UInt16
1115        );
1116    }
1117
1118    #[test]
1119    fn field_type_to_arrow_uint32() {
1120        assert_eq!(
1121            FieldType::UInt32.to_arrow(),
1122            arrow::datatypes::DataType::UInt32
1123        );
1124    }
1125
1126    #[test]
1127    fn field_type_to_arrow_uint64() {
1128        assert_eq!(
1129            FieldType::UInt64.to_arrow(),
1130            arrow::datatypes::DataType::UInt64
1131        );
1132    }
1133
1134    #[test]
1135    fn field_type_to_arrow_float32() {
1136        assert_eq!(
1137            FieldType::Float32.to_arrow(),
1138            arrow::datatypes::DataType::Float32
1139        );
1140    }
1141
1142    #[test]
1143    fn field_type_to_arrow_float64() {
1144        assert_eq!(
1145            FieldType::Float64.to_arrow(),
1146            arrow::datatypes::DataType::Float64
1147        );
1148    }
1149
1150    #[test]
1151    fn field_type_to_arrow_boolean() {
1152        assert_eq!(
1153            FieldType::Boolean.to_arrow(),
1154            arrow::datatypes::DataType::Boolean
1155        );
1156    }
1157
1158    #[test]
1159    fn field_type_to_arrow_utf8() {
1160        assert_eq!(FieldType::Utf8.to_arrow(), arrow::datatypes::DataType::Utf8);
1161    }
1162
1163    #[test]
1164    fn field_type_to_arrow_binary() {
1165        assert_eq!(
1166            FieldType::Binary.to_arrow(),
1167            arrow::datatypes::DataType::Binary
1168        );
1169    }
1170
1171    #[test]
1172    fn field_type_to_arrow_timestamp() {
1173        use arrow::datatypes::{DataType, TimeUnit};
1174        assert_eq!(
1175            FieldType::Timestamp.to_arrow(),
1176            DataType::Timestamp(TimeUnit::Microsecond, None)
1177        );
1178    }
1179
1180    #[test]
1181    fn field_type_to_arrow_date32() {
1182        assert_eq!(
1183            FieldType::Date32.to_arrow(),
1184            arrow::datatypes::DataType::Date32
1185        );
1186    }
1187
1188    #[test]
1189    fn field_type_to_arrow_list() {
1190        let list_type = FieldType::List(Box::new(FieldType::Utf8));
1191        match list_type.to_arrow() {
1192            arrow::datatypes::DataType::List(field) => {
1193                assert_eq!(field.name(), "item");
1194                assert_eq!(field.data_type(), &arrow::datatypes::DataType::Utf8);
1195                assert!(field.is_nullable());
1196            }
1197            other => panic!("expected List, got {other:?}"),
1198        }
1199    }
1200
1201    #[test]
1202    fn field_type_to_arrow_struct() {
1203        let struct_type = FieldType::Struct(vec![
1204            CatalogField::new("x", FieldType::Int32, false),
1205            CatalogField::new("y", FieldType::Utf8, true),
1206        ]);
1207        match struct_type.to_arrow() {
1208            arrow::datatypes::DataType::Struct(fields) => {
1209                assert_eq!(fields.len(), 2);
1210                assert_eq!(fields[0].name(), "x");
1211                assert_eq!(fields[0].data_type(), &arrow::datatypes::DataType::Int32);
1212                assert!(!fields[0].is_nullable());
1213                assert_eq!(fields[1].name(), "y");
1214                assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
1215                assert!(fields[1].is_nullable());
1216            }
1217            other => panic!("expected Struct, got {other:?}"),
1218        }
1219    }
1220
1221    // -----------------------------------------------------------------------
1222    // FieldType Display tests
1223    // -----------------------------------------------------------------------
1224
1225    #[test]
1226    fn field_type_display_simple() {
1227        assert_eq!(FieldType::Int8.to_string(), "Int8");
1228        assert_eq!(FieldType::Int16.to_string(), "Int16");
1229        assert_eq!(FieldType::Int32.to_string(), "Int32");
1230        assert_eq!(FieldType::Int64.to_string(), "Int64");
1231        assert_eq!(FieldType::UInt8.to_string(), "UInt8");
1232        assert_eq!(FieldType::UInt16.to_string(), "UInt16");
1233        assert_eq!(FieldType::UInt32.to_string(), "UInt32");
1234        assert_eq!(FieldType::UInt64.to_string(), "UInt64");
1235        assert_eq!(FieldType::Float32.to_string(), "Float32");
1236        assert_eq!(FieldType::Float64.to_string(), "Float64");
1237        assert_eq!(FieldType::Boolean.to_string(), "Boolean");
1238        assert_eq!(FieldType::Utf8.to_string(), "Utf8");
1239        assert_eq!(FieldType::Binary.to_string(), "Binary");
1240        assert_eq!(FieldType::Timestamp.to_string(), "Timestamp");
1241        assert_eq!(FieldType::Date32.to_string(), "Date32");
1242    }
1243
1244    #[test]
1245    fn field_type_display_list() {
1246        let list = FieldType::List(Box::new(FieldType::Int32));
1247        assert_eq!(list.to_string(), "List<Int32>");
1248    }
1249
1250    #[test]
1251    fn field_type_display_struct() {
1252        let s = FieldType::Struct(vec![
1253            CatalogField::new("a", FieldType::Boolean, true),
1254            CatalogField::new("b", FieldType::Utf8, false),
1255        ]);
1256        assert_eq!(s.to_string(), "Struct(2 fields)");
1257    }
1258
1259    // -----------------------------------------------------------------------
1260    // CatalogField tests
1261    // -----------------------------------------------------------------------
1262
1263    #[test]
1264    fn catalog_field_accessors() {
1265        let f = CatalogField::new("col", FieldType::Float64, true);
1266        assert_eq!(f.name(), "col");
1267        assert_eq!(f.field_type(), &FieldType::Float64);
1268        assert!(f.nullable());
1269    }
1270
1271    #[test]
1272    fn catalog_field_to_arrow_field() {
1273        let f = CatalogField::new("ts", FieldType::Timestamp, false);
1274        let arrow_f = f.to_arrow_field();
1275        assert_eq!(arrow_f.name(), "ts");
1276        use arrow::datatypes::{DataType, TimeUnit};
1277        assert_eq!(
1278            arrow_f.data_type(),
1279            &DataType::Timestamp(TimeUnit::Microsecond, None)
1280        );
1281        assert!(!arrow_f.is_nullable());
1282    }
1283
1284    // -----------------------------------------------------------------------
1285    // ColumnStatistics tests
1286    // -----------------------------------------------------------------------
1287
1288    #[test]
1289    fn column_statistics_new_defaults() {
1290        let stats = ColumnStatistics::new();
1291        assert!(stats.row_count.is_none());
1292        assert!(stats.null_count.is_none());
1293        assert!(stats.min_value.is_none());
1294        assert!(stats.max_value.is_none());
1295    }
1296
1297    #[test]
1298    fn column_statistics_default_trait() {
1299        let stats = ColumnStatistics::default();
1300        assert_eq!(stats, ColumnStatistics::new());
1301    }
1302
1303    #[test]
1304    fn column_statistics_builder_all_fields() {
1305        let stats = ColumnStatistics::new()
1306            .with_row_count(1_000_000)
1307            .with_null_count(42)
1308            .with_min("abc")
1309            .with_max("xyz");
1310
1311        assert_eq!(stats.row_count, Some(1_000_000));
1312        assert_eq!(stats.null_count, Some(42));
1313        assert_eq!(stats.min_value.as_deref(), Some("abc"));
1314        assert_eq!(stats.max_value.as_deref(), Some("xyz"));
1315    }
1316
1317    #[test]
1318    fn column_statistics_builder_partial() {
1319        let stats = ColumnStatistics::new().with_row_count(500);
1320        assert_eq!(stats.row_count, Some(500));
1321        assert!(stats.null_count.is_none());
1322        assert!(stats.min_value.is_none());
1323        assert!(stats.max_value.is_none());
1324    }
1325
1326    #[test]
1327    fn column_statistics_into_string() {
1328        let stats = ColumnStatistics::new()
1329            .with_row_count(100)
1330            .with_null_count(5)
1331            .with_min("1")
1332            .with_max("99");
1333        let dbg = format!("{stats:?}");
1334        assert!(dbg.contains("row_count: Some(100)"));
1335        assert!(dbg.contains("null_count: Some(5)"));
1336        assert!(dbg.contains("min_value: Some(\"1\")"));
1337        assert!(dbg.contains("max_value: Some(\"99\")"));
1338    }
1339
1340    #[test]
1341    fn column_statistics_builder_overwrites() {
1342        let stats = ColumnStatistics::new()
1343            .with_row_count(10)
1344            .with_row_count(20);
1345        assert_eq!(stats.row_count, Some(20));
1346    }
1347
1348    #[test]
1349    fn column_statistics_eq() {
1350        let a = ColumnStatistics::new()
1351            .with_row_count(100)
1352            .with_null_count(5);
1353        let b = ColumnStatistics::new()
1354            .with_row_count(100)
1355            .with_null_count(5);
1356        let c = ColumnStatistics::new().with_row_count(99);
1357        assert_eq!(a, b);
1358        assert_ne!(a, c);
1359    }
1360
1361    // -----------------------------------------------------------------------
1362    // TableMetadata tests
1363    // -----------------------------------------------------------------------
1364
1365    #[test]
1366    fn table_metadata_new_and_accessors() {
1367        let meta = TableMetadata::new("events", make_schema());
1368        assert_eq!(meta.name(), "events");
1369        assert_eq!(meta.schema().field_count(), 2);
1370        assert!(meta.statistics().is_none());
1371    }
1372
1373    #[test]
1374    fn table_metadata_with_stats() {
1375        let stats = ColumnStatistics::new()
1376            .with_row_count(5000)
1377            .with_null_count(100);
1378        let meta = TableMetadata::new("clicks", make_schema()).with_stats(stats);
1379        assert_eq!(meta.name(), "clicks");
1380        let s = meta.statistics().unwrap();
1381        assert_eq!(s.row_count, Some(5000));
1382        assert_eq!(s.null_count, Some(100));
1383    }
1384
1385    #[test]
1386    fn table_metadata_into_string() {
1387        let meta = TableMetadata::new("test_table", make_schema());
1388        let dbg = format!("{meta:?}");
1389        assert!(dbg.contains("name: \"test_table\""));
1390        assert!(dbg.contains("stats: None"));
1391    }
1392
1393    // -----------------------------------------------------------------------
1394    // InMemorySchemaRegistry replace behavior
1395    // -----------------------------------------------------------------------
1396
1397    #[test]
1398    fn schema_registry_register_replaces_existing() {
1399        let mut registry = InMemorySchemaRegistry::new();
1400        let schema_a = TableSchema::new(vec![CatalogField::new("a", FieldType::Int32, false)]);
1401        let schema_b = TableSchema::new(vec![CatalogField::new("b", FieldType::Utf8, true)]);
1402
1403        registry.register_schema("my_schema", schema_a);
1404        registry.register_schema("my_schema", schema_b);
1405
1406        let retrieved = registry.get_schema("my_schema").unwrap();
1407        assert_eq!(retrieved.field_count(), 1);
1408        assert_eq!(
1409            retrieved.get_field("b").unwrap().field_type(),
1410            &FieldType::Utf8
1411        );
1412        assert!(retrieved.get_field("a").is_none());
1413    }
1414
1415    #[test]
1416    fn schema_registry_empty_names() {
1417        let registry = InMemorySchemaRegistry::new();
1418        assert!(registry.schema_names().is_empty());
1419    }
1420
1421    // -----------------------------------------------------------------------
1422    // TableSchema accessors
1423    // -----------------------------------------------------------------------
1424
1425    #[test]
1426    fn table_schema_empty() {
1427        let schema = TableSchema::empty();
1428        assert_eq!(schema.field_count(), 0);
1429        assert!(schema.get_field("anything").is_none());
1430    }
1431
1432    #[test]
1433    fn table_schema_get_field_found() {
1434        let schema = make_schema();
1435        let field = schema.get_field("name").unwrap();
1436        assert_eq!(field.name(), "name");
1437        assert_eq!(field.field_type(), &FieldType::Utf8);
1438        assert!(field.nullable());
1439    }
1440
1441    #[test]
1442    fn table_schema_get_field_not_found() {
1443        let schema = make_schema();
1444        assert!(schema.get_field("missing").is_none());
1445    }
1446
1447    // -----------------------------------------------------------------------
1448    // InMemoryCatalog duplicate registration
1449    // -----------------------------------------------------------------------
1450
1451    #[test]
1452    fn in_memory_catalog_duplicate_register_errors() {
1453        let mut catalog = InMemoryCatalog::new();
1454        catalog
1455            .register_table(TableMetadata::new("t", make_schema()))
1456            .unwrap();
1457        let err = catalog
1458            .register_table(TableMetadata::new("t", make_schema()))
1459            .unwrap_err();
1460        match err {
1461            CatalogError::TableAlreadyExists { name } => assert_eq!(name, "t"),
1462            other => panic!("expected TableAlreadyExists, got {other}"),
1463        }
1464    }
1465
1466    // -----------------------------------------------------------------------
1467    // CatalogResult type alias
1468    // -----------------------------------------------------------------------
1469
1470    #[test]
1471    fn catalog_result_ok() {
1472        let r: CatalogResult<i32> = Ok(42);
1473        assert_eq!(r.unwrap(), 42);
1474    }
1475
1476    #[test]
1477    fn catalog_result_err() {
1478        let r: CatalogResult<()> = Err(CatalogError::TableNotFound {
1479            name: "x".to_string(),
1480        });
1481        assert!(r.is_err());
1482    }
1483
1484    // -----------------------------------------------------------------------
1485    // Edge cases: empty catalog
1486    // -----------------------------------------------------------------------
1487
1488    #[test]
1489    fn empty_catalog_list_tables_returns_empty() {
1490        let catalog = InMemoryCatalog::new();
1491        assert!(catalog.list_tables().is_empty());
1492    }
1493
1494    #[test]
1495    fn empty_catalog_get_table_returns_not_found() {
1496        let catalog = InMemoryCatalog::new();
1497        let err = catalog.get_table("anything").err().unwrap();
1498        assert!(matches!(err, CatalogError::TableNotFound { .. }));
1499    }
1500
1501    #[test]
1502    fn empty_schema_registry_get_returns_not_found() {
1503        let registry = InMemorySchemaRegistry::new();
1504        assert!(registry.get_schema("x").is_err());
1505    }
1506
1507    #[test]
1508    fn empty_schema_schema_names_empty() {
1509        let registry = InMemorySchemaRegistry::new();
1510        assert!(registry.schema_names().is_empty());
1511    }
1512
1513    // -----------------------------------------------------------------------
1514    // Edge cases: special characters in names
1515    // -----------------------------------------------------------------------
1516
1517    #[test]
1518    fn table_name_with_special_characters() {
1519        let mut catalog = InMemoryCatalog::new();
1520        let meta = TableMetadata::new("table-with-dashes.dots_and_underscores", make_schema());
1521        catalog.register_table(meta).unwrap();
1522        let table = catalog
1523            .get_table("table-with-dashes.dots_and_underscores")
1524            .unwrap();
1525        assert_eq!(table.name(), "table-with-dashes.dots_and_underscores");
1526    }
1527
1528    #[test]
1529    fn table_name_with_unicode() {
1530        let mut catalog = InMemoryCatalog::new();
1531        let meta = TableMetadata::new("用户_table", make_schema());
1532        catalog.register_table(meta).unwrap();
1533        let table = catalog.get_table("用户_table").unwrap();
1534        assert_eq!(table.name(), "用户_table");
1535    }
1536
1537    #[test]
1538    fn table_name_with_spaces() {
1539        let mut catalog = InMemoryCatalog::new();
1540        let meta = TableMetadata::new("my table name", make_schema());
1541        catalog.register_table(meta).unwrap();
1542        let table = catalog.get_table("my table name").unwrap();
1543        assert_eq!(table.name(), "my table name");
1544    }
1545
1546    #[test]
1547    fn schema_name_with_special_characters() {
1548        let mut registry = InMemorySchemaRegistry::new();
1549        let schema = TableSchema::new(vec![CatalogField::new("col", FieldType::Int32, true)]);
1550        registry.register_schema("schema-with-dashes", schema);
1551        let retrieved = registry.get_schema("schema-with-dashes").unwrap();
1552        assert_eq!(retrieved.field_count(), 1);
1553    }
1554
1555    #[test]
1556    fn field_name_with_special_characters() {
1557        let f = CatalogField::new("field-with-dots_and@spaces", FieldType::Utf8, false);
1558        assert_eq!(f.name(), "field-with-dots_and@spaces");
1559        let arrow_f = f.to_arrow_field();
1560        assert_eq!(arrow_f.name(), "field-with-dots_and@spaces");
1561    }
1562
1563    // -----------------------------------------------------------------------
1564    // Edge cases: duplicate registration and overwrite
1565    // -----------------------------------------------------------------------
1566
1567    #[test]
1568    fn catalog_duplicate_different_table_errors() {
1569        let mut catalog = InMemoryCatalog::new();
1570        catalog
1571            .register_table(TableMetadata::new("t1", make_schema()))
1572            .unwrap();
1573        catalog
1574            .register_table(TableMetadata::new("t2", make_schema()))
1575            .unwrap();
1576        assert_eq!(catalog.list_tables().len(), 2);
1577    }
1578
1579    #[test]
1580    fn schema_registry_overwrite_preserves_single_entry() {
1581        let mut registry = InMemorySchemaRegistry::new();
1582        registry.register_schema("s", TableSchema::empty());
1583        registry.register_schema("s", make_schema());
1584        assert_eq!(registry.schema_names().len(), 1);
1585        assert_eq!(registry.get_schema("s").unwrap().field_count(), 2);
1586    }
1587
1588    // -----------------------------------------------------------------------
1589    // TableSchema: empty schema arrow conversion
1590    // -----------------------------------------------------------------------
1591
1592    #[test]
1593    fn empty_schema_to_arrow() {
1594        let schema = TableSchema::empty();
1595        let arrow_schema = schema.to_arrow_schema();
1596        assert_eq!(arrow_schema.fields().len(), 0);
1597    }
1598
1599    #[test]
1600    fn single_field_schema_to_arrow() {
1601        let schema = TableSchema::new(vec![CatalogField::new("only", FieldType::Float32, true)]);
1602        let arrow_schema = schema.to_arrow_schema();
1603        assert_eq!(arrow_schema.fields().len(), 1);
1604        let f = arrow_schema.field_with_name("only").unwrap();
1605        assert_eq!(f.data_type(), &arrow::datatypes::DataType::Float32);
1606        assert!(f.is_nullable());
1607    }
1608
1609    // -----------------------------------------------------------------------
1610    // FieldType: nested types to_arrow
1611    // -----------------------------------------------------------------------
1612
1613    #[test]
1614    fn field_type_list_of_list() {
1615        let inner = FieldType::List(Box::new(FieldType::Int32));
1616        let outer = FieldType::List(Box::new(inner));
1617        match outer.to_arrow() {
1618            arrow::datatypes::DataType::List(field) => match field.data_type() {
1619                arrow::datatypes::DataType::List(inner_field) => {
1620                    assert_eq!(inner_field.data_type(), &arrow::datatypes::DataType::Int32);
1621                }
1622                other => panic!("expected nested List, got {other:?}"),
1623            },
1624            other => panic!("expected outer List, got {other:?}"),
1625        }
1626    }
1627
1628    #[test]
1629    fn field_type_struct_nested_in_struct() {
1630        let inner_struct =
1631            FieldType::Struct(vec![CatalogField::new("a", FieldType::Boolean, true)]);
1632        let outer_struct = FieldType::Struct(vec![
1633            CatalogField::new("nested", inner_struct, false),
1634            CatalogField::new("simple", FieldType::Utf8, true),
1635        ]);
1636        match outer_struct.to_arrow() {
1637            arrow::datatypes::DataType::Struct(fields) => {
1638                assert_eq!(fields.len(), 2);
1639                match fields[0].data_type() {
1640                    arrow::datatypes::DataType::Struct(inner_fields) => {
1641                        assert_eq!(inner_fields.len(), 1);
1642                        assert_eq!(inner_fields[0].name(), "a");
1643                    }
1644                    other => panic!("expected inner Struct, got {other:?}"),
1645                }
1646                assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
1647            }
1648            other => panic!("expected Struct, got {other:?}"),
1649        }
1650    }
1651
1652    #[test]
1653    fn field_type_list_of_struct() {
1654        let list_type = FieldType::List(Box::new(FieldType::Struct(vec![
1655            CatalogField::new("x", FieldType::Int64, false),
1656            CatalogField::new("y", FieldType::Utf8, true),
1657        ])));
1658        match list_type.to_arrow() {
1659            arrow::datatypes::DataType::List(item_field) => match item_field.data_type() {
1660                arrow::datatypes::DataType::Struct(fields) => {
1661                    assert_eq!(fields.len(), 2);
1662                }
1663                other => panic!("expected inner Struct, got {other:?}"),
1664            },
1665            other => panic!("expected List, got {other:?}"),
1666        }
1667    }
1668
1669    #[test]
1670    fn field_type_empty_struct() {
1671        let empty_struct = FieldType::Struct(vec![]);
1672        match empty_struct.to_arrow() {
1673            arrow::datatypes::DataType::Struct(fields) => {
1674                assert_eq!(fields.len(), 0);
1675            }
1676            other => panic!("expected Struct, got {other:?}"),
1677        }
1678    }
1679
1680    #[test]
1681    fn field_type_list_of_binary() {
1682        let list_type = FieldType::List(Box::new(FieldType::Binary));
1683        match list_type.to_arrow() {
1684            arrow::datatypes::DataType::List(field) => {
1685                assert_eq!(field.data_type(), &arrow::datatypes::DataType::Binary);
1686            }
1687            other => panic!("expected List, got {other:?}"),
1688        }
1689    }
1690
1691    // -----------------------------------------------------------------------
1692    // CatalogField: Clone and PartialEq
1693    // -----------------------------------------------------------------------
1694
1695    #[test]
1696    fn catalog_field_clone_eq() {
1697        let f1 = CatalogField::new("col", FieldType::Int32, true);
1698        let f2 = f1.clone();
1699        assert_eq!(f1, f2);
1700    }
1701
1702    #[test]
1703    fn catalog_field_ne_name() {
1704        let f1 = CatalogField::new("a", FieldType::Int32, true);
1705        let f2 = CatalogField::new("b", FieldType::Int32, true);
1706        assert_ne!(f1, f2);
1707    }
1708
1709    #[test]
1710    fn catalog_field_ne_type() {
1711        let f1 = CatalogField::new("a", FieldType::Int32, true);
1712        let f2 = CatalogField::new("a", FieldType::Utf8, true);
1713        assert_ne!(f1, f2);
1714    }
1715
1716    #[test]
1717    fn catalog_field_ne_nullable() {
1718        let f1 = CatalogField::new("a", FieldType::Int32, true);
1719        let f2 = CatalogField::new("a", FieldType::Int32, false);
1720        assert_ne!(f1, f2);
1721    }
1722
1723    // -----------------------------------------------------------------------
1724    // TableSchema: Clone and PartialEq
1725    // -----------------------------------------------------------------------
1726
1727    #[test]
1728    fn table_schema_clone_eq() {
1729        let s1 = make_schema();
1730        let s2 = s1.clone();
1731        assert_eq!(s1, s2);
1732    }
1733
1734    #[test]
1735    fn table_schema_ne_different_fields() {
1736        let s1 = TableSchema::new(vec![CatalogField::new("a", FieldType::Int32, false)]);
1737        let s2 = TableSchema::new(vec![CatalogField::new("b", FieldType::Int32, false)]);
1738        assert_ne!(s1, s2);
1739    }
1740
1741    // -----------------------------------------------------------------------
1742    // TableMetadata: Clone
1743    // -----------------------------------------------------------------------
1744
1745    #[test]
1746    fn table_metadata_clone() {
1747        let meta = TableMetadata::new("t", make_schema())
1748            .with_stats(ColumnStatistics::new().with_row_count(100));
1749        let cloned = meta.clone();
1750        assert_eq!(cloned.name(), "t");
1751        assert_eq!(cloned.statistics().unwrap().row_count, Some(100));
1752    }
1753
1754    // -----------------------------------------------------------------------
1755    // InMemoryCatalog: register_table_with_batches
1756    // -----------------------------------------------------------------------
1757
1758    #[test]
1759    fn register_table_with_batches_stores_data() {
1760        let mut catalog = InMemoryCatalog::new();
1761        let schema = TableSchema::new(vec![CatalogField::new("id", FieldType::Int64, false)]);
1762        let arrow_schema = std::sync::Arc::new(arrow::datatypes::Schema::new(vec![
1763            arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int64, false),
1764        ]));
1765        let batch = arrow::record_batch::RecordBatch::try_new(
1766            arrow_schema,
1767            vec![std::sync::Arc::new(arrow::array::Int64Array::from(vec![
1768                1, 2, 3,
1769            ]))],
1770        )
1771        .unwrap();
1772        catalog
1773            .register_table_with_batches(TableMetadata::new("data", schema), vec![batch])
1774            .unwrap();
1775        assert!(catalog.table_batches("data").is_some());
1776        assert_eq!(catalog.table_batches("data").unwrap().len(), 1);
1777        assert_eq!(catalog.table_batches("data").unwrap()[0].num_rows(), 3);
1778    }
1779
1780    #[test]
1781    fn register_table_with_empty_batches_no_data() {
1782        let mut catalog = InMemoryCatalog::new();
1783        let schema = TableSchema::new(vec![CatalogField::new("id", FieldType::Int32, false)]);
1784        catalog
1785            .register_table_with_batches(TableMetadata::new("empty", schema), vec![])
1786            .unwrap();
1787        assert!(catalog.table_batches("empty").is_none());
1788    }
1789
1790    #[test]
1791    fn table_batches_nonexistent_table() {
1792        let catalog = InMemoryCatalog::new();
1793        assert!(catalog.table_batches("nope").is_none());
1794    }
1795
1796    // -----------------------------------------------------------------------
1797    // InMemoryCatalog: Default
1798    // -----------------------------------------------------------------------
1799
1800    #[test]
1801    fn in_memory_catalog_default() {
1802        let catalog = InMemoryCatalog::default();
1803        assert!(catalog.list_tables().is_empty());
1804    }
1805
1806    // -----------------------------------------------------------------------
1807    // InMemorySchemaRegistry: Default
1808    // -----------------------------------------------------------------------
1809
1810    #[test]
1811    fn in_memory_schema_registry_default() {
1812        let registry = InMemorySchemaRegistry::default();
1813        assert!(registry.schema_names().is_empty());
1814    }
1815
1816    // -----------------------------------------------------------------------
1817    // DataFusion bridge: SQL with empty table
1818    // -----------------------------------------------------------------------
1819
1820    #[tokio::test]
1821    async fn datafusion_bridge_empty_table_query() {
1822        use datafusion::prelude::SessionContext;
1823
1824        let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
1825        {
1826            let mut cat = catalog.write().unwrap();
1827            let schema = TableSchema::new(vec![CatalogField::new("id", FieldType::Int64, false)]);
1828            cat.register_table(TableMetadata::new("empty_table", schema))
1829                .unwrap();
1830        }
1831        let ctx = SessionContext::new();
1832        ctx.register_catalog(
1833            "krishiv",
1834            std::sync::Arc::new(super::datafusion_bridge::DataFusionCatalogBridge::new(
1835                catalog,
1836            )),
1837        );
1838        let df = ctx
1839            .sql("SELECT * FROM krishiv.public.empty_table")
1840            .await
1841            .unwrap();
1842        let batches = df.collect().await.unwrap();
1843        let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1844        assert_eq!(rows, 0);
1845    }
1846
1847    #[tokio::test]
1848    async fn datafusion_bridge_sql_filter() {
1849        use std::sync::Arc;
1850
1851        use arrow::array::Int64Array;
1852        use arrow::datatypes::{DataType, Field, Schema};
1853        use arrow::record_batch::RecordBatch;
1854        use datafusion::prelude::SessionContext;
1855
1856        let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
1857        let schema = TableSchema::new(vec![
1858            CatalogField::new("id", FieldType::Int64, false),
1859            CatalogField::new("val", FieldType::Int64, false),
1860        ]);
1861        let arrow_schema = Arc::new(Schema::new(vec![
1862            Field::new("id", DataType::Int64, false),
1863            Field::new("val", DataType::Int64, false),
1864        ]));
1865        let batch = RecordBatch::try_new(
1866            arrow_schema,
1867            vec![
1868                Arc::new(Int64Array::from(vec![1, 2, 3])),
1869                Arc::new(Int64Array::from(vec![10, 20, 30])),
1870            ],
1871        )
1872        .unwrap();
1873        catalog
1874            .write()
1875            .unwrap()
1876            .register_table_with_batches(TableMetadata::new("nums", schema), vec![batch])
1877            .unwrap();
1878
1879        let ctx = SessionContext::new();
1880        ctx.register_catalog(
1881            "krishiv",
1882            Arc::new(super::datafusion_bridge::DataFusionCatalogBridge::new(
1883                catalog,
1884            )),
1885        );
1886        let df = ctx
1887            .sql("SELECT id FROM krishiv.public.nums WHERE val > 15")
1888            .await
1889            .unwrap();
1890        let batches = df.collect().await.unwrap();
1891        let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1892        assert_eq!(rows, 2);
1893    }
1894
1895    #[tokio::test]
1896    async fn datafusion_bridge_sql_count_aggregate() {
1897        use std::sync::Arc;
1898
1899        use arrow::datatypes::{DataType, Field, Schema};
1900        use arrow::record_batch::RecordBatch;
1901        use datafusion::prelude::SessionContext;
1902
1903        let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
1904        let schema = TableSchema::new(vec![CatalogField::new("x", FieldType::Int32, false)]);
1905        let arrow_schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
1906        let batch = RecordBatch::try_new(
1907            arrow_schema,
1908            vec![Arc::new(arrow::array::Int32Array::from(vec![
1909                1, 2, 3, 4, 5,
1910            ]))],
1911        )
1912        .unwrap();
1913        catalog
1914            .write()
1915            .unwrap()
1916            .register_table_with_batches(TableMetadata::new("agg", schema), vec![batch])
1917            .unwrap();
1918
1919        let ctx = SessionContext::new();
1920        ctx.register_catalog(
1921            "krishiv",
1922            Arc::new(super::datafusion_bridge::DataFusionCatalogBridge::new(
1923                catalog,
1924            )),
1925        );
1926        let df = ctx
1927            .sql("SELECT COUNT(*) AS cnt FROM krishiv.public.agg")
1928            .await
1929            .unwrap();
1930        let batches = df.collect().await.unwrap();
1931        assert_eq!(batches.len(), 1);
1932        assert_eq!(batches[0].num_rows(), 1);
1933    }
1934
1935    // -----------------------------------------------------------------------
1936    // DataFusion bridge: multiple tables
1937    // -----------------------------------------------------------------------
1938
1939    #[tokio::test]
1940    async fn datafusion_bridge_multiple_tables() {
1941        use std::sync::Arc;
1942
1943        use arrow::array::Int64Array;
1944        use arrow::datatypes::{DataType, Field, Schema};
1945        use arrow::record_batch::RecordBatch;
1946        use datafusion::prelude::SessionContext;
1947
1948        let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
1949        let schema = TableSchema::new(vec![CatalogField::new("id", FieldType::Int64, false)]);
1950        let arrow_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1951        let batch =
1952            RecordBatch::try_new(arrow_schema, vec![Arc::new(Int64Array::from(vec![1]))]).unwrap();
1953        {
1954            let mut cat = catalog.write().unwrap();
1955            cat.register_table_with_batches(
1956                TableMetadata::new("t1", schema.clone()),
1957                vec![batch.clone()],
1958            )
1959            .unwrap();
1960            cat.register_table_with_batches(TableMetadata::new("t2", schema), vec![batch])
1961                .unwrap();
1962        }
1963
1964        let ctx = SessionContext::new();
1965        ctx.register_catalog(
1966            "krishiv",
1967            Arc::new(super::datafusion_bridge::DataFusionCatalogBridge::new(
1968                catalog,
1969            )),
1970        );
1971        let df1 = ctx.sql("SELECT * FROM krishiv.public.t1").await.unwrap();
1972        let batches1 = df1.collect().await.unwrap();
1973        assert_eq!(batches1.len(), 1);
1974        assert_eq!(batches1[0].num_rows(), 1);
1975
1976        let df2 = ctx.sql("SELECT * FROM krishiv.public.t2").await.unwrap();
1977        let batches2 = df2.collect().await.unwrap();
1978        assert_eq!(batches2.len(), 1);
1979        assert_eq!(batches2[0].num_rows(), 1);
1980    }
1981
1982    // -----------------------------------------------------------------------
1983    // DataFusion bridge: schema() returns None for non-public
1984    // -----------------------------------------------------------------------
1985
1986    #[test]
1987    fn datafusion_bridge_custom_schema_name_returns_none() {
1988        use datafusion::catalog::CatalogProvider as DfCatalogProvider;
1989
1990        let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
1991        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
1992        assert!(bridge.schema("custom").is_none());
1993        assert!(bridge.schema("public").is_some());
1994    }
1995
1996    // -----------------------------------------------------------------------
1997    // DataFusion bridge: as_any returns self
1998    // -----------------------------------------------------------------------
1999
2000    #[test]
2001    fn datafusion_bridge_as_any() {
2002        use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2003
2004        let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2005        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2006        assert!(
2007            bridge
2008                .as_any()
2009                .downcast_ref::<super::datafusion_bridge::DataFusionCatalogBridge>()
2010                .is_some()
2011        );
2012    }
2013
2014    // -----------------------------------------------------------------------
2015    // DataFusion bridge: schema_names returns single "public"
2016    // -----------------------------------------------------------------------
2017
2018    #[test]
2019    fn datafusion_bridge_only_public_schema() {
2020        use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2021
2022        let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2023        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2024        let names = bridge.schema_names();
2025        assert_eq!(names.len(), 1);
2026        assert_eq!(names[0], "public");
2027    }
2028
2029    // -----------------------------------------------------------------------
2030    // FieldType Display: empty struct and empty list
2031    // -----------------------------------------------------------------------
2032
2033    #[test]
2034    fn field_type_display_empty_struct() {
2035        let s = FieldType::Struct(vec![]);
2036        assert_eq!(s.to_string(), "Struct(0 fields)");
2037    }
2038
2039    #[test]
2040    fn field_type_display_nested_list() {
2041        let inner = FieldType::List(Box::new(FieldType::Int32));
2042        let outer = FieldType::List(Box::new(inner));
2043        assert_eq!(outer.to_string(), "List<List<Int32>>");
2044    }
2045
2046    // -----------------------------------------------------------------------
2047    // ColumnStatistics: eq with None fields
2048    // -----------------------------------------------------------------------
2049
2050    #[test]
2051    fn column_statistics_eq_all_none() {
2052        let a = ColumnStatistics::new();
2053        let b = ColumnStatistics::new();
2054        assert_eq!(a, b);
2055    }
2056
2057    #[test]
2058    fn column_statistics_ne_different_min() {
2059        let a = ColumnStatistics::new().with_min("aaa");
2060        let b = ColumnStatistics::new().with_min("zzz");
2061        assert_ne!(a, b);
2062    }
2063
2064    // -----------------------------------------------------------------------
2065    // TableSchema: get_field with multiple fields
2066    // -----------------------------------------------------------------------
2067
2068    #[test]
2069    fn table_schema_get_field_last() {
2070        let schema = TableSchema::new(vec![
2071            CatalogField::new("a", FieldType::Int32, false),
2072            CatalogField::new("b", FieldType::Utf8, true),
2073            CatalogField::new("c", FieldType::Float64, false),
2074        ]);
2075        let field = schema.get_field("c").unwrap();
2076        assert_eq!(field.field_type(), &FieldType::Float64);
2077    }
2078
2079    #[test]
2080    fn table_schema_get_field_middle() {
2081        let schema = TableSchema::new(vec![
2082            CatalogField::new("a", FieldType::Int32, false),
2083            CatalogField::new("b", FieldType::Utf8, true),
2084            CatalogField::new("c", FieldType::Float64, false),
2085        ]);
2086        let field = schema.get_field("b").unwrap();
2087        assert_eq!(field.field_type(), &FieldType::Utf8);
2088        assert!(field.nullable());
2089    }
2090
2091    // -----------------------------------------------------------------------
2092    // CatalogError: all variants as std::error::Error
2093    // -----------------------------------------------------------------------
2094
2095    #[test]
2096    fn all_catalog_errors_are_std_error() {
2097        let errors: Vec<CatalogError> = vec![
2098            CatalogError::TableNotFound { name: "a".into() },
2099            CatalogError::TableAlreadyExists { name: "b".into() },
2100            CatalogError::SchemaNotFound { name: "c".into() },
2101            CatalogError::InvalidSchema {
2102                message: "d".into(),
2103            },
2104            CatalogError::InvalidConfiguration {
2105                message: "bad URL".into(),
2106            },
2107            CatalogError::Transport {
2108                operation: "load table".into(),
2109                message: "timed out".into(),
2110            },
2111            CatalogError::Http {
2112                status: 500,
2113                message: "e".into(),
2114            },
2115            CatalogError::InvalidResponse {
2116                operation: "list tables".into(),
2117                message: "missing identifiers".into(),
2118            },
2119            CatalogError::ResponseTooLarge {
2120                operation: "load table".into(),
2121                limit_bytes: 1024,
2122            },
2123            CatalogError::UnsupportedOperation {
2124                operation: "committing a table".into(),
2125            },
2126        ];
2127        for err in errors {
2128            let e: &dyn std::error::Error = &err;
2129            let _ = e.to_string();
2130            assert!(e.source().is_none());
2131        }
2132    }
2133
2134    // -----------------------------------------------------------------------
2135    // InMemoryCatalog: register many tables
2136    // -----------------------------------------------------------------------
2137
2138    #[test]
2139    fn in_memory_catalog_many_tables() {
2140        let mut catalog = InMemoryCatalog::new();
2141        for i in 0..100 {
2142            catalog
2143                .register_table(TableMetadata::new(format!("table_{i:03}"), make_schema()))
2144                .unwrap();
2145        }
2146        assert_eq!(catalog.list_tables().len(), 100);
2147        assert!(catalog.get_table("table_000").is_ok());
2148        assert!(catalog.get_table("table_099").is_ok());
2149        assert!(catalog.get_table("table_100").is_err());
2150    }
2151
2152    // -----------------------------------------------------------------------
2153    // DataFusion bridge: table() returns None for unknown
2154    // -----------------------------------------------------------------------
2155
2156    #[tokio::test]
2157    async fn datafusion_bridge_table_unknown() {
2158        use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2159        use std::sync::Arc;
2160
2161        let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2162        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2163        let schema_provider = bridge.schema("public").unwrap();
2164        let result = schema_provider.table("nonexistent").await.unwrap();
2165        assert!(result.is_none());
2166    }
2167
2168    // -----------------------------------------------------------------------
2169    // DataFusion bridge: table() returns Some for known table
2170    // -----------------------------------------------------------------------
2171
2172    #[tokio::test]
2173    async fn datafusion_bridge_table_known() {
2174        use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2175        use std::sync::Arc;
2176
2177        let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2178        {
2179            let mut cat = catalog.write().unwrap();
2180            cat.register_table(TableMetadata::new("mytable", make_schema()))
2181                .unwrap();
2182        }
2183        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2184        let schema_provider = bridge.schema("public").unwrap();
2185        let result = schema_provider.table("mytable").await.unwrap();
2186        assert!(result.is_some());
2187    }
2188
2189    // -----------------------------------------------------------------------
2190    // DataFusion bridge: table_names lists registered tables
2191    // -----------------------------------------------------------------------
2192
2193    #[tokio::test]
2194    async fn datafusion_bridge_table_names() {
2195        use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2196        use std::sync::Arc;
2197
2198        let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2199        {
2200            let mut cat = catalog.write().unwrap();
2201            cat.register_table(TableMetadata::new("alpha", make_schema()))
2202                .unwrap();
2203            cat.register_table(TableMetadata::new("beta", make_schema()))
2204                .unwrap();
2205        }
2206        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2207        let schema_provider = bridge.schema("public").unwrap();
2208        let mut names = schema_provider.table_names();
2209        names.sort();
2210        assert_eq!(names, vec!["alpha", "beta"]);
2211    }
2212
2213    // -----------------------------------------------------------------------
2214    // DataFusion bridge: empty catalog table_names
2215    // -----------------------------------------------------------------------
2216
2217    #[tokio::test]
2218    async fn datafusion_bridge_empty_table_names() {
2219        use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2220        use std::sync::Arc;
2221
2222        let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2223        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2224        let schema_provider = bridge.schema("public").unwrap();
2225        let names = schema_provider.table_names();
2226        assert!(names.is_empty());
2227    }
2228
2229    // -----------------------------------------------------------------------
2230    // DataFusion bridge: table_exist with multiple tables
2231    // -----------------------------------------------------------------------
2232
2233    #[test]
2234    fn datafusion_bridge_table_exist_multiple() {
2235        use datafusion::catalog::CatalogProvider as DfCatalogProvider;
2236
2237        let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2238        {
2239            let mut cat = catalog.write().unwrap();
2240            cat.register_table(TableMetadata::new("a", make_schema()))
2241                .unwrap();
2242            cat.register_table(TableMetadata::new("b", make_schema()))
2243                .unwrap();
2244        }
2245        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2246        let sp = bridge.schema("public").unwrap();
2247        assert!(sp.table_exist("a"));
2248        assert!(sp.table_exist("b"));
2249        assert!(!sp.table_exist("c"));
2250    }
2251
2252    // -----------------------------------------------------------------------
2253    // DataFusion bridge: debug format
2254    // -----------------------------------------------------------------------
2255
2256    #[test]
2257    fn datafusion_bridge_debug() {
2258        let catalog = std::sync::Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2259        let bridge = super::datafusion_bridge::DataFusionCatalogBridge::new(catalog);
2260        let dbg = format!("{bridge:?}");
2261        assert!(dbg.contains("DataFusionCatalogBridge"));
2262    }
2263
2264    // -----------------------------------------------------------------------
2265    // InMemoryCatalog: register_table_with_batches error on duplicate
2266    // -----------------------------------------------------------------------
2267
2268    #[test]
2269    fn register_table_with_batches_duplicate_errors() {
2270        let mut catalog = InMemoryCatalog::new();
2271        let schema = TableSchema::new(vec![CatalogField::new("id", FieldType::Int32, false)]);
2272        let arrow_schema = std::sync::Arc::new(arrow::datatypes::Schema::new(vec![
2273            arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int32, false),
2274        ]));
2275        let batch = arrow::record_batch::RecordBatch::try_new(
2276            arrow_schema,
2277            vec![std::sync::Arc::new(arrow::array::Int32Array::from(vec![1]))],
2278        )
2279        .unwrap();
2280        catalog
2281            .register_table_with_batches(TableMetadata::new("t", schema.clone()), vec![batch])
2282            .unwrap();
2283        let err = catalog
2284            .register_table_with_batches(TableMetadata::new("t", schema), vec![])
2285            .unwrap_err();
2286        assert!(matches!(err, CatalogError::TableAlreadyExists { .. }));
2287    }
2288
2289    // -----------------------------------------------------------------------
2290    // DataFusion bridge: SQL with multiple columns and types
2291    // -----------------------------------------------------------------------
2292
2293    #[tokio::test]
2294    async fn datafusion_bridge_sql_multiple_columns() {
2295        use std::sync::Arc;
2296
2297        use arrow::array::{Int32Array, StringArray};
2298        use arrow::datatypes::{DataType, Field, Schema};
2299        use arrow::record_batch::RecordBatch;
2300        use datafusion::prelude::SessionContext;
2301
2302        let catalog = Arc::new(std::sync::RwLock::new(InMemoryCatalog::new()));
2303        let schema = TableSchema::new(vec![
2304            CatalogField::new("id", FieldType::Int32, false),
2305            CatalogField::new("name", FieldType::Utf8, true),
2306        ]);
2307        let arrow_schema = Arc::new(Schema::new(vec![
2308            Field::new("id", DataType::Int32, false),
2309            Field::new("name", DataType::Utf8, true),
2310        ]));
2311        let batch = RecordBatch::try_new(
2312            arrow_schema,
2313            vec![
2314                Arc::new(Int32Array::from(vec![1, 2, 3])),
2315                Arc::new(StringArray::from(vec!["a", "b", "c"])),
2316            ],
2317        )
2318        .unwrap();
2319        catalog
2320            .write()
2321            .unwrap()
2322            .register_table_with_batches(TableMetadata::new("mixed", schema), vec![batch])
2323            .unwrap();
2324
2325        let ctx = SessionContext::new();
2326        ctx.register_catalog(
2327            "krishiv",
2328            Arc::new(super::datafusion_bridge::DataFusionCatalogBridge::new(
2329                catalog,
2330            )),
2331        );
2332        let df = ctx
2333            .sql("SELECT name FROM krishiv.public.mixed WHERE id > 1")
2334            .await
2335            .unwrap();
2336        let batches = df.collect().await.unwrap();
2337        let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2338        assert_eq!(rows, 2);
2339    }
2340
2341    // -----------------------------------------------------------------------
2342    // CatalogError: Display for all variants via format
2343    // -----------------------------------------------------------------------
2344
2345    #[test]
2346    fn catalog_error_display_all_variants() {
2347        let cases: Vec<(CatalogError, &str)> = vec![
2348            (
2349                CatalogError::TableNotFound {
2350                    name: "x".to_string(),
2351                },
2352                "table not found: 'x'",
2353            ),
2354            (
2355                CatalogError::TableAlreadyExists {
2356                    name: "y".to_string(),
2357                },
2358                "table already exists: 'y'",
2359            ),
2360            (
2361                CatalogError::SchemaNotFound {
2362                    name: "z".to_string(),
2363                },
2364                "schema not found: 'z'",
2365            ),
2366            (
2367                CatalogError::InvalidSchema {
2368                    message: "bad".to_string(),
2369                },
2370                "invalid schema: bad",
2371            ),
2372            (
2373                CatalogError::Http {
2374                    status: 403,
2375                    message: "forbidden".to_string(),
2376                },
2377                "HTTP error 403: forbidden",
2378            ),
2379            (
2380                CatalogError::InvalidConfiguration {
2381                    message: "bad URL".to_string(),
2382                },
2383                "invalid catalog configuration: bad URL",
2384            ),
2385            (
2386                CatalogError::Transport {
2387                    operation: "list tables".to_string(),
2388                    message: "timed out".to_string(),
2389                },
2390                "catalog transport error during list tables: timed out",
2391            ),
2392            (
2393                CatalogError::InvalidResponse {
2394                    operation: "load table".to_string(),
2395                    message: "missing metadata".to_string(),
2396                },
2397                "invalid catalog response during load table: missing metadata",
2398            ),
2399            (
2400                CatalogError::ResponseTooLarge {
2401                    operation: "load table".to_string(),
2402                    limit_bytes: 4096,
2403                },
2404                "catalog response during load table exceeded 4096 bytes",
2405            ),
2406            (
2407                CatalogError::UnsupportedOperation {
2408                    operation: "committing a table".to_string(),
2409                },
2410                "catalog server does not support committing a table",
2411            ),
2412        ];
2413        for (err, expected) in cases {
2414            assert_eq!(err.to_string(), expected);
2415        }
2416    }
2417}