hamelin_executor 0.9.4

Common package for executing Hamelin across different backends
Documentation
//! Configuration types for Hamelin backends.
//!
//! These types describe catalogs, executors, and dataset sources. They are
//! backend-agnostic — each executor crate resolves the dataset configs it
//! supports and returns a clean error for the rest.

use std::collections::HashMap;
use std::path::PathBuf;

use anyhow::Context;
use hamelin_lib::antlr::parse_type;
use hamelin_lib::catalog::{Column, HamelinType};
use hamelin_lib::types::Type;
use serde::{Deserialize, Serialize};

/// Wraps a [`CatalogConfig`] with an optional `default` flag.
///
/// When `default` is `true`, 2-part identifiers (`schema.table`) resolve against
/// this catalog. The TOML key is the catalog's real name (e.g., `lakekeeper`).
///
/// Uses `#[serde(flatten)]` so that `type`, `uri`, `warehouse`, and `default`
/// all appear as siblings at the same TOML level.
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct CatalogEntry {
    #[serde(default)]
    pub default: bool,
    #[serde(flatten)]
    pub config: CatalogConfig,
}

// --- Catalog types ---

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CatalogConfig {
    IcebergRest {
        uri: String,
        warehouse: Option<String>,
        #[serde(default, skip_serializing_if = "HashMap::is_empty")]
        extra_properties: HashMap<String, String>,
    },
    Reflection,
}

// --- Dataset types ---

/// File format for file-based datasets.
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum FileFormat {
    Jsonl,
    CSV,
    Parquet,
    Lines,
}

/// Compression codec for file-based datasets.
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Compression {
    Gzip,
    Bzip2,
    Xz,
    Zstd,
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct StaticAwsCredentials {
    pub access_id: String,
    pub secret_key: String,
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct FilesConfig {
    pub paths: Vec<String>,
    #[serde(default)]
    #[builder(into)]
    pub format: Option<FileFormat>,
    #[serde(default)]
    #[builder(into)]
    pub compression: Option<Compression>,
    #[serde(default)]
    #[builder(into)]
    pub file_extension: Option<String>,
    #[serde(default)]
    #[builder(into)]
    pub columns: Option<Vec<ColumnConfig>>,
    #[serde(default)]
    #[builder(into)]
    pub header: Option<bool>,
    #[serde(default)]
    #[builder(into)]
    pub delimiter: Option<String>,
    #[serde(default)]
    #[builder(into)]
    pub region: Option<String>,
    #[serde(default)]
    #[builder(into)]
    pub role_arn: Option<String>,
    #[serde(default)]
    pub static_credentials: Option<StaticAwsCredentials>,
    #[serde(default)]
    #[builder(into)]
    pub endpoint_url: Option<String>,
    #[serde(default)]
    pub force_path_style: Option<bool>,
    #[serde(default)]
    pub allow_http: Option<bool>,
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct MemConfig {
    pub columns: Vec<ColumnConfig>,
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct IcebergConfig {
    pub metadata_location: String,
    #[serde(default)]
    pub region: Option<String>,
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct ViewConfig {
    pub query: String,
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, derive_more::From)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum DatasetConfig {
    Files(FilesConfig),
    Mem(MemConfig),
    Iceberg(IcebergConfig),
    View(ViewConfig),
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct ColumnConfig {
    pub name: String,
    #[serde(
        deserialize_with = "deserialize_hamelin_type",
        serialize_with = "serialize_hamelin_type"
    )]
    pub typ: HamelinType,
}

/// Deserialize a `HamelinType` from a Hamelin type expression string
/// (e.g. `"string"`, `"int"`, `"{name: string, age: int}"`).
fn deserialize_hamelin_type<'de, D>(deserializer: D) -> Result<HamelinType, D::Error>
where
    D: serde::Deserializer<'de>,
{
    let s = String::deserialize(deserializer)?;
    let ctx = parse_type(s.clone()).map_err(|e| serde::de::Error::custom(e.contextualize(&s)))?;
    let typ =
        Type::from_parse_tree(&ctx).map_err(|e| serde::de::Error::custom(e.contextualize(&s)))?;
    Ok(typ.into())
}

/// Serialize a `HamelinType` as a Hamelin type expression string,
/// ensuring round-trip compatibility with `deserialize_hamelin_type`.
fn serialize_hamelin_type<S>(typ: &HamelinType, serializer: S) -> Result<S::Ok, S::Error>
where
    S: serde::Serializer,
{
    let t: Type = typ.clone().try_into().map_err(serde::ser::Error::custom)?;
    serializer.serialize_str(&t.to_string())
}

impl From<&Column> for ColumnConfig {
    fn from(col: &Column) -> Self {
        ColumnConfig {
            name: col.name.as_str().to_string(),
            typ: col.typ.clone(),
        }
    }
}

// --- Dataset resolution ---

/// What a table reference resolved to.
#[derive(Debug, Clone)]
pub enum ResolvedDataset {
    /// A locally-configured dataset (local override in the datasets map).
    Local {
        name: String,
        catalog: String,
        schema: String,
        table: String,
        config: DatasetConfig,
    },
    /// A table in a configured catalog — executor handles based on catalog type.
    Catalog {
        catalog_name: String,
        catalog_config: CatalogConfig,
        namespace: String,
        table: String,
    },
}

/// Holds configured datasets and catalogs, and resolves table references from queries.
///
/// Resolution logic:
/// - **2-segment ref** `schema.table` → resolved against the catalog marked `default = true`
/// - **3-segment ref** `catalog.schema.table`
///
/// Local dataset overrides always win. If there's no local override, fall back
/// to the catalog's configured resolution mechanism.
#[derive(Debug, Clone)]
pub struct DatasetStore {
    /// 3-level: catalog → schema → table → config
    datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
    catalogs: HashMap<String, CatalogConfig>,
    /// The catalog name with `default = true`, used for 2-part identifier resolution.
    default_catalog: Option<String>,
}

impl DatasetStore {
    pub fn new(
        datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
        catalog_entries: HashMap<String, CatalogEntry>,
    ) -> Self {
        let default_catalog = catalog_entries
            .iter()
            .find(|(_, entry)| entry.default)
            .map(|(name, _)| name.clone());

        let catalogs = catalog_entries
            .into_iter()
            .map(|(name, entry)| (name, entry.config))
            .collect();

        Self {
            datasets,
            catalogs,
            default_catalog,
        }
    }

    /// The name of the catalog marked as default, if any.
    pub fn default_catalog(&self) -> Option<&str> {
        self.default_catalog.as_deref()
    }

    /// Resolve a list of table segments (e.g. `["catalog", "schema", "table"]`).
    pub fn resolve(&self, segments: &[&str]) -> Result<ResolvedDataset, DatasetStoreError> {
        match segments {
            [schema, table] => {
                let catalog = self.default_catalog.as_deref().ok_or_else(|| {
                    DatasetStoreError::NoDefaultCatalog(format!("{}.{}", schema, table))
                })?;
                self.resolve_three(catalog, schema, table)
            }
            [catalog, schema, table] => self.resolve_three(catalog, schema, table),
            _ => Err(DatasetStoreError::UnresolvableReference(
                segments
                    .iter()
                    .map(|s| s.to_string())
                    .collect::<Vec<_>>()
                    .join("."),
            )),
        }
    }

    /// Whether `catalog` is the default catalog (used to decide 2-part vs 3-part display).
    fn is_default_catalog(&self, catalog: &str) -> bool {
        self.default_catalog
            .as_deref()
            .map_or(false, |dc| dc == catalog)
    }

    fn resolve_three(
        &self,
        catalog: &str,
        schema: &str,
        table: &str,
    ) -> Result<ResolvedDataset, DatasetStoreError> {
        // 1. Check local dataset override
        if let Some(schemas) = self.datasets.get(catalog) {
            if let Some(tables) = schemas.get(schema) {
                if let Some(config) = tables.get(table) {
                    let name = if self.is_default_catalog(catalog) {
                        format!("{}.{}", schema, table)
                    } else {
                        format!("{}.{}.{}", catalog, schema, table)
                    };
                    return Ok(ResolvedDataset::Local {
                        name,
                        catalog: catalog.to_string(),
                        schema: schema.to_string(),
                        table: table.to_string(),
                        config: config.clone(),
                    });
                }
            }
        }

        // 2. Check configured catalogs
        if let Some(catalog_config) = self.catalogs.get(catalog) {
            return Ok(ResolvedDataset::Catalog {
                catalog_name: catalog.to_string(),
                catalog_config: catalog_config.clone(),
                namespace: schema.to_string(),
                table: table.to_string(),
            });
        }

        // 3. Nothing matched
        let ref_name = if self.is_default_catalog(catalog) {
            format!("{}.{}", schema, table)
        } else {
            format!("{}.{}.{}", catalog, schema, table)
        };
        Err(DatasetStoreError::UnknownDataset(ref_name))
    }

    /// Flatten datasets for use by executors that pre-register everything.
    pub fn flat_datasets(&self) -> HashMap<String, DatasetConfig> {
        let mut result = HashMap::new();
        for (catalog, schemas) in &self.datasets {
            for (schema, tables) in schemas {
                for (table, config) in tables {
                    let key = if self.is_default_catalog(catalog) {
                        format!("{}.{}", schema, table)
                    } else {
                        format!("{}.{}.{}", catalog, schema, table)
                    };
                    result.insert(key, config.clone());
                }
            }
        }
        result
    }

    /// The config for the default catalog, if one is marked `default = true`.
    pub fn default_catalog_config(&self) -> Option<(&str, &CatalogConfig)> {
        let name = self.default_catalog.as_deref()?;
        let config = self.catalogs.get(name)?;
        Some((name, config))
    }
}

#[derive(Debug, thiserror::Error)]
pub enum DatasetStoreError {
    #[error("Cannot resolve table reference '{0}': not a 2 or 3-segment identifier")]
    UnresolvableReference(String),
    #[error("Unknown dataset '{0}': no local config or catalog matches")]
    UnknownDataset(String),
    #[error(
        "No default catalog configured; use a 3-part identifier (catalog.schema.table) for '{0}'"
    )]
    NoDefaultCatalog(String),
}

// --- Top-level configuration ---

/// Top-level Hamelin configuration loaded from `~/.hamelin.toml` and environment
/// variables. Contains catalog discovery sources and dataset definitions.
///
/// This struct is intentionally backend-agnostic — backend-specific configuration
/// (e.g. Trino connection settings) lives in each executor crate. Unknown TOML
/// keys are silently ignored, so backend-specific sections can coexist in the
/// same config file.
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
#[serde(default)]
pub struct HamelinConfig {
    /// Catalog discovery sources, keyed by catalog name.
    pub catalogs: HashMap<String, CatalogEntry>,
    /// Datasets: `datasets.{catalog}.{schema}.{table}` → DatasetConfig.
    pub datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
    /// Path to the reflected catalog file for the default catalog.
    /// Used by `--use-catalog-file` / `-e` for offline type-checking.
    pub catalog_filename: String,
}

impl Default for HamelinConfig {
    fn default() -> Self {
        Self {
            catalogs: HashMap::new(),
            datasets: HashMap::new(),
            catalog_filename: "~/.hamelin.env".to_string(),
        }
    }
}

impl HamelinConfig {
    pub fn catalog_filename_path(&self) -> anyhow::Result<PathBuf> {
        expanduser::expanduser(&self.catalog_filename)
            .context("Failed to expand environment filename")
    }
}