Skip to main content

hamelin_executor/
config.rs

1//! Configuration types for Hamelin backends.
2//!
3//! These types describe catalogs, executors, and dataset sources. They are
4//! backend-agnostic — each executor crate resolves the dataset configs it
5//! supports and returns a clean error for the rest.
6
7use std::collections::HashMap;
8use std::path::PathBuf;
9
10use anyhow::Context;
11use hamelin_lib::antlr::parse_type;
12use hamelin_lib::catalog::{Column, HamelinType};
13use hamelin_lib::types::Type;
14use serde::{Deserialize, Serialize};
15
16/// Wraps a [`CatalogConfig`] with an optional `default` flag.
17///
18/// When `default` is `true`, 2-part identifiers (`schema.table`) resolve against
19/// this catalog. The TOML key is the catalog's real name (e.g., `lakekeeper`).
20///
21/// Uses `#[serde(flatten)]` so that `type`, `uri`, `warehouse`, and `default`
22/// all appear as siblings at the same TOML level.
23#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
24pub struct CatalogEntry {
25    #[serde(default)]
26    pub default: bool,
27    #[serde(flatten)]
28    pub config: CatalogConfig,
29}
30
31// --- Catalog types ---
32
33#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
34#[serde(tag = "type", rename_all = "snake_case")]
35pub enum CatalogConfig {
36    IcebergRest {
37        uri: String,
38        warehouse: Option<String>,
39        #[serde(default, skip_serializing_if = "HashMap::is_empty")]
40        extra_properties: HashMap<String, String>,
41    },
42    Reflection,
43}
44
45// --- Dataset types ---
46
47/// File format for file-based datasets.
48#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
49#[serde(rename_all = "lowercase")]
50pub enum FileFormat {
51    Jsonl,
52    CSV,
53    Parquet,
54    Lines,
55}
56
57/// Compression codec for file-based datasets.
58#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
59#[serde(rename_all = "lowercase")]
60pub enum Compression {
61    Gzip,
62    Bzip2,
63    Xz,
64    Zstd,
65}
66
67#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
68pub struct StaticAwsCredentials {
69    pub access_id: String,
70    pub secret_key: String,
71}
72
73#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
74pub struct FilesConfig {
75    pub paths: Vec<String>,
76    #[serde(default)]
77    #[builder(into)]
78    pub format: Option<FileFormat>,
79    #[serde(default)]
80    #[builder(into)]
81    pub compression: Option<Compression>,
82    #[serde(default)]
83    #[builder(into)]
84    pub file_extension: Option<String>,
85    #[serde(default)]
86    #[builder(into)]
87    pub columns: Option<Vec<ColumnConfig>>,
88    #[serde(default)]
89    #[builder(into)]
90    pub header: Option<bool>,
91    #[serde(default)]
92    #[builder(into)]
93    pub delimiter: Option<String>,
94    #[serde(default)]
95    #[builder(into)]
96    pub region: Option<String>,
97    #[serde(default)]
98    #[builder(into)]
99    pub role_arn: Option<String>,
100    #[serde(default)]
101    pub static_credentials: Option<StaticAwsCredentials>,
102    #[serde(default)]
103    #[builder(into)]
104    pub endpoint_url: Option<String>,
105    #[serde(default)]
106    pub force_path_style: Option<bool>,
107    #[serde(default)]
108    pub allow_http: Option<bool>,
109}
110
111#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
112pub struct MemConfig {
113    pub columns: Vec<ColumnConfig>,
114}
115
116#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
117pub struct IcebergConfig {
118    pub metadata_location: String,
119    #[serde(default)]
120    pub region: Option<String>,
121}
122
123#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
124pub struct ViewConfig {
125    pub query: String,
126}
127
128#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, derive_more::From)]
129#[serde(tag = "type", rename_all = "snake_case")]
130pub enum DatasetConfig {
131    Files(FilesConfig),
132    Mem(MemConfig),
133    Iceberg(IcebergConfig),
134    View(ViewConfig),
135}
136
137#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
138pub struct ColumnConfig {
139    pub name: String,
140    #[serde(
141        deserialize_with = "deserialize_hamelin_type",
142        serialize_with = "serialize_hamelin_type"
143    )]
144    pub typ: HamelinType,
145}
146
147/// Deserialize a `HamelinType` from a Hamelin type expression string
148/// (e.g. `"string"`, `"int"`, `"{name: string, age: int}"`).
149fn deserialize_hamelin_type<'de, D>(deserializer: D) -> Result<HamelinType, D::Error>
150where
151    D: serde::Deserializer<'de>,
152{
153    let s = String::deserialize(deserializer)?;
154    let ctx = parse_type(s.clone()).map_err(|e| serde::de::Error::custom(e.contextualize(&s)))?;
155    let typ =
156        Type::from_parse_tree(&ctx).map_err(|e| serde::de::Error::custom(e.contextualize(&s)))?;
157    Ok(typ.into())
158}
159
160/// Serialize a `HamelinType` as a Hamelin type expression string,
161/// ensuring round-trip compatibility with `deserialize_hamelin_type`.
162fn serialize_hamelin_type<S>(typ: &HamelinType, serializer: S) -> Result<S::Ok, S::Error>
163where
164    S: serde::Serializer,
165{
166    let t: Type = typ.clone().try_into().map_err(serde::ser::Error::custom)?;
167    serializer.serialize_str(&t.to_string())
168}
169
170impl From<&Column> for ColumnConfig {
171    fn from(col: &Column) -> Self {
172        ColumnConfig {
173            name: col.name.as_str().to_string(),
174            typ: col.typ.clone(),
175        }
176    }
177}
178
179// --- Dataset resolution ---
180
181/// What a table reference resolved to.
182#[derive(Debug, Clone)]
183pub enum ResolvedDataset {
184    /// A locally-configured dataset (local override in the datasets map).
185    Local {
186        name: String,
187        catalog: String,
188        schema: String,
189        table: String,
190        config: DatasetConfig,
191    },
192    /// A table in a configured catalog — executor handles based on catalog type.
193    Catalog {
194        catalog_name: String,
195        catalog_config: CatalogConfig,
196        namespace: String,
197        table: String,
198    },
199}
200
201/// Holds configured datasets and catalogs, and resolves table references from queries.
202///
203/// Resolution logic:
204/// - **2-segment ref** `schema.table` → resolved against the catalog marked `default = true`
205/// - **3-segment ref** `catalog.schema.table`
206///
207/// Local dataset overrides always win. If there's no local override, fall back
208/// to the catalog's configured resolution mechanism.
209#[derive(Debug, Clone)]
210pub struct DatasetStore {
211    /// 3-level: catalog → schema → table → config
212    datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
213    catalogs: HashMap<String, CatalogConfig>,
214    /// The catalog name with `default = true`, used for 2-part identifier resolution.
215    default_catalog: Option<String>,
216}
217
218impl DatasetStore {
219    pub fn new(
220        datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
221        catalog_entries: HashMap<String, CatalogEntry>,
222    ) -> Self {
223        let default_catalog = catalog_entries
224            .iter()
225            .find(|(_, entry)| entry.default)
226            .map(|(name, _)| name.clone());
227
228        let catalogs = catalog_entries
229            .into_iter()
230            .map(|(name, entry)| (name, entry.config))
231            .collect();
232
233        Self {
234            datasets,
235            catalogs,
236            default_catalog,
237        }
238    }
239
240    /// The name of the catalog marked as default, if any.
241    pub fn default_catalog(&self) -> Option<&str> {
242        self.default_catalog.as_deref()
243    }
244
245    /// Resolve a list of table segments (e.g. `["catalog", "schema", "table"]`).
246    pub fn resolve(&self, segments: &[&str]) -> Result<ResolvedDataset, DatasetStoreError> {
247        match segments {
248            [schema, table] => {
249                let catalog = self.default_catalog.as_deref().ok_or_else(|| {
250                    DatasetStoreError::NoDefaultCatalog(format!("{}.{}", schema, table))
251                })?;
252                self.resolve_three(catalog, schema, table)
253            }
254            [catalog, schema, table] => self.resolve_three(catalog, schema, table),
255            _ => Err(DatasetStoreError::UnresolvableReference(
256                segments
257                    .iter()
258                    .map(|s| s.to_string())
259                    .collect::<Vec<_>>()
260                    .join("."),
261            )),
262        }
263    }
264
265    /// Whether `catalog` is the default catalog (used to decide 2-part vs 3-part display).
266    fn is_default_catalog(&self, catalog: &str) -> bool {
267        self.default_catalog
268            .as_deref()
269            .map_or(false, |dc| dc == catalog)
270    }
271
272    fn resolve_three(
273        &self,
274        catalog: &str,
275        schema: &str,
276        table: &str,
277    ) -> Result<ResolvedDataset, DatasetStoreError> {
278        // 1. Check local dataset override
279        if let Some(schemas) = self.datasets.get(catalog) {
280            if let Some(tables) = schemas.get(schema) {
281                if let Some(config) = tables.get(table) {
282                    let name = if self.is_default_catalog(catalog) {
283                        format!("{}.{}", schema, table)
284                    } else {
285                        format!("{}.{}.{}", catalog, schema, table)
286                    };
287                    return Ok(ResolvedDataset::Local {
288                        name,
289                        catalog: catalog.to_string(),
290                        schema: schema.to_string(),
291                        table: table.to_string(),
292                        config: config.clone(),
293                    });
294                }
295            }
296        }
297
298        // 2. Check configured catalogs
299        if let Some(catalog_config) = self.catalogs.get(catalog) {
300            return Ok(ResolvedDataset::Catalog {
301                catalog_name: catalog.to_string(),
302                catalog_config: catalog_config.clone(),
303                namespace: schema.to_string(),
304                table: table.to_string(),
305            });
306        }
307
308        // 3. Nothing matched
309        let ref_name = if self.is_default_catalog(catalog) {
310            format!("{}.{}", schema, table)
311        } else {
312            format!("{}.{}.{}", catalog, schema, table)
313        };
314        Err(DatasetStoreError::UnknownDataset(ref_name))
315    }
316
317    /// Flatten datasets for use by executors that pre-register everything.
318    pub fn flat_datasets(&self) -> HashMap<String, DatasetConfig> {
319        let mut result = HashMap::new();
320        for (catalog, schemas) in &self.datasets {
321            for (schema, tables) in schemas {
322                for (table, config) in tables {
323                    let key = if self.is_default_catalog(catalog) {
324                        format!("{}.{}", schema, table)
325                    } else {
326                        format!("{}.{}.{}", catalog, schema, table)
327                    };
328                    result.insert(key, config.clone());
329                }
330            }
331        }
332        result
333    }
334
335    /// The config for the default catalog, if one is marked `default = true`.
336    pub fn default_catalog_config(&self) -> Option<(&str, &CatalogConfig)> {
337        let name = self.default_catalog.as_deref()?;
338        let config = self.catalogs.get(name)?;
339        Some((name, config))
340    }
341}
342
343#[derive(Debug, thiserror::Error)]
344pub enum DatasetStoreError {
345    #[error("Cannot resolve table reference '{0}': not a 2 or 3-segment identifier")]
346    UnresolvableReference(String),
347    #[error("Unknown dataset '{0}': no local config or catalog matches")]
348    UnknownDataset(String),
349    #[error(
350        "No default catalog configured; use a 3-part identifier (catalog.schema.table) for '{0}'"
351    )]
352    NoDefaultCatalog(String),
353}
354
355// --- Top-level configuration ---
356
357/// Top-level Hamelin configuration loaded from `~/.hamelin.toml` and environment
358/// variables. Contains catalog discovery sources and dataset definitions.
359///
360/// This struct is intentionally backend-agnostic — backend-specific configuration
361/// (e.g. Trino connection settings) lives in each executor crate. Unknown TOML
362/// keys are silently ignored, so backend-specific sections can coexist in the
363/// same config file.
364#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
365#[serde(default)]
366pub struct HamelinConfig {
367    /// Catalog discovery sources, keyed by catalog name.
368    pub catalogs: HashMap<String, CatalogEntry>,
369    /// Datasets: `datasets.{catalog}.{schema}.{table}` → DatasetConfig.
370    pub datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
371    /// Path to the reflected catalog file for the default catalog.
372    /// Used by `--use-catalog-file` / `-e` for offline type-checking.
373    pub catalog_filename: String,
374}
375
376impl Default for HamelinConfig {
377    fn default() -> Self {
378        Self {
379            catalogs: HashMap::new(),
380            datasets: HashMap::new(),
381            catalog_filename: "~/.hamelin.env".to_string(),
382        }
383    }
384}
385
386impl HamelinConfig {
387    pub fn catalog_filename_path(&self) -> anyhow::Result<PathBuf> {
388        expanduser::expanduser(&self.catalog_filename)
389            .context("Failed to expand environment filename")
390    }
391}