use std::collections::HashMap;
use std::path::PathBuf;
use anyhow::Context;
use hamelin_lib::antlr::parse_type;
use hamelin_lib::catalog::{Column, HamelinType};
use hamelin_lib::types::Type;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct CatalogEntry {
#[serde(default)]
pub default: bool,
#[serde(flatten)]
pub config: CatalogConfig,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CatalogConfig {
IcebergRest {
uri: String,
warehouse: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
extra_properties: HashMap<String, String>,
},
Reflection,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum FileFormat {
Jsonl,
CSV,
Parquet,
Lines,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Compression {
Gzip,
Bzip2,
Xz,
Zstd,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct StaticAwsCredentials {
pub access_id: String,
pub secret_key: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct FilesConfig {
pub paths: Vec<String>,
#[serde(default)]
#[builder(into)]
pub format: Option<FileFormat>,
#[serde(default)]
#[builder(into)]
pub compression: Option<Compression>,
#[serde(default)]
#[builder(into)]
pub file_extension: Option<String>,
#[serde(default)]
#[builder(into)]
pub columns: Option<Vec<ColumnConfig>>,
#[serde(default)]
#[builder(into)]
pub header: Option<bool>,
#[serde(default)]
#[builder(into)]
pub delimiter: Option<String>,
#[serde(default)]
#[builder(into)]
pub region: Option<String>,
#[serde(default)]
#[builder(into)]
pub role_arn: Option<String>,
#[serde(default)]
pub static_credentials: Option<StaticAwsCredentials>,
#[serde(default)]
#[builder(into)]
pub endpoint_url: Option<String>,
#[serde(default)]
pub force_path_style: Option<bool>,
#[serde(default)]
pub allow_http: Option<bool>,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct MemConfig {
pub columns: Vec<ColumnConfig>,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct IcebergConfig {
pub metadata_location: String,
#[serde(default)]
pub region: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct ViewConfig {
pub query: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, derive_more::From)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum DatasetConfig {
Files(FilesConfig),
Mem(MemConfig),
Iceberg(IcebergConfig),
View(ViewConfig),
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
pub struct ColumnConfig {
pub name: String,
#[serde(
deserialize_with = "deserialize_hamelin_type",
serialize_with = "serialize_hamelin_type"
)]
pub typ: HamelinType,
}
fn deserialize_hamelin_type<'de, D>(deserializer: D) -> Result<HamelinType, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let ctx = parse_type(s.clone()).map_err(|e| serde::de::Error::custom(e.contextualize(&s)))?;
let typ =
Type::from_parse_tree(&ctx).map_err(|e| serde::de::Error::custom(e.contextualize(&s)))?;
Ok(typ.into())
}
fn serialize_hamelin_type<S>(typ: &HamelinType, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let t: Type = typ.clone().try_into().map_err(serde::ser::Error::custom)?;
serializer.serialize_str(&t.to_string())
}
impl From<&Column> for ColumnConfig {
fn from(col: &Column) -> Self {
ColumnConfig {
name: col.name.as_str().to_string(),
typ: col.typ.clone(),
}
}
}
#[derive(Debug, Clone)]
pub enum ResolvedDataset {
Local {
name: String,
catalog: String,
schema: String,
table: String,
config: DatasetConfig,
},
Catalog {
catalog_name: String,
catalog_config: CatalogConfig,
namespace: String,
table: String,
},
}
#[derive(Debug, Clone)]
pub struct DatasetStore {
datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
catalogs: HashMap<String, CatalogConfig>,
default_catalog: Option<String>,
}
impl DatasetStore {
pub fn new(
datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
catalog_entries: HashMap<String, CatalogEntry>,
) -> Self {
let default_catalog = catalog_entries
.iter()
.find(|(_, entry)| entry.default)
.map(|(name, _)| name.clone());
let catalogs = catalog_entries
.into_iter()
.map(|(name, entry)| (name, entry.config))
.collect();
Self {
datasets,
catalogs,
default_catalog,
}
}
pub fn default_catalog(&self) -> Option<&str> {
self.default_catalog.as_deref()
}
pub fn resolve(&self, segments: &[&str]) -> Result<ResolvedDataset, DatasetStoreError> {
match segments {
[schema, table] => {
let catalog = self.default_catalog.as_deref().ok_or_else(|| {
DatasetStoreError::NoDefaultCatalog(format!("{}.{}", schema, table))
})?;
self.resolve_three(catalog, schema, table)
}
[catalog, schema, table] => self.resolve_three(catalog, schema, table),
_ => Err(DatasetStoreError::UnresolvableReference(
segments
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
.join("."),
)),
}
}
fn is_default_catalog(&self, catalog: &str) -> bool {
self.default_catalog
.as_deref()
.map_or(false, |dc| dc == catalog)
}
fn resolve_three(
&self,
catalog: &str,
schema: &str,
table: &str,
) -> Result<ResolvedDataset, DatasetStoreError> {
if let Some(schemas) = self.datasets.get(catalog) {
if let Some(tables) = schemas.get(schema) {
if let Some(config) = tables.get(table) {
let name = if self.is_default_catalog(catalog) {
format!("{}.{}", schema, table)
} else {
format!("{}.{}.{}", catalog, schema, table)
};
return Ok(ResolvedDataset::Local {
name,
catalog: catalog.to_string(),
schema: schema.to_string(),
table: table.to_string(),
config: config.clone(),
});
}
}
}
if let Some(catalog_config) = self.catalogs.get(catalog) {
return Ok(ResolvedDataset::Catalog {
catalog_name: catalog.to_string(),
catalog_config: catalog_config.clone(),
namespace: schema.to_string(),
table: table.to_string(),
});
}
let ref_name = if self.is_default_catalog(catalog) {
format!("{}.{}", schema, table)
} else {
format!("{}.{}.{}", catalog, schema, table)
};
Err(DatasetStoreError::UnknownDataset(ref_name))
}
pub fn flat_datasets(&self) -> HashMap<String, DatasetConfig> {
let mut result = HashMap::new();
for (catalog, schemas) in &self.datasets {
for (schema, tables) in schemas {
for (table, config) in tables {
let key = if self.is_default_catalog(catalog) {
format!("{}.{}", schema, table)
} else {
format!("{}.{}.{}", catalog, schema, table)
};
result.insert(key, config.clone());
}
}
}
result
}
pub fn default_catalog_config(&self) -> Option<(&str, &CatalogConfig)> {
let name = self.default_catalog.as_deref()?;
let config = self.catalogs.get(name)?;
Some((name, config))
}
}
#[derive(Debug, thiserror::Error)]
pub enum DatasetStoreError {
#[error("Cannot resolve table reference '{0}': not a 2 or 3-segment identifier")]
UnresolvableReference(String),
#[error("Unknown dataset '{0}': no local config or catalog matches")]
UnknownDataset(String),
#[error(
"No default catalog configured; use a 3-part identifier (catalog.schema.table) for '{0}'"
)]
NoDefaultCatalog(String),
}
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
#[serde(default)]
pub struct HamelinConfig {
pub catalogs: HashMap<String, CatalogEntry>,
pub datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
pub catalog_filename: String,
}
impl Default for HamelinConfig {
fn default() -> Self {
Self {
catalogs: HashMap::new(),
datasets: HashMap::new(),
catalog_filename: "~/.hamelin.env".to_string(),
}
}
}
impl HamelinConfig {
pub fn catalog_filename_path(&self) -> anyhow::Result<PathBuf> {
expanduser::expanduser(&self.catalog_filename)
.context("Failed to expand environment filename")
}
}