1use std::collections::HashMap;
8use std::path::PathBuf;
9
10use anyhow::Context;
11use hamelin_lib::antlr::parse_type;
12use hamelin_lib::catalog::{Column, HamelinType};
13use hamelin_lib::types::Type;
14use serde::{Deserialize, Serialize};
15
16#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
24pub struct CatalogEntry {
25 #[serde(default)]
26 pub default: bool,
27 #[serde(flatten)]
28 pub config: CatalogConfig,
29}
30
31#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
34#[serde(tag = "type", rename_all = "snake_case")]
35pub enum CatalogConfig {
36 IcebergRest {
37 uri: String,
38 warehouse: Option<String>,
39 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
40 extra_properties: HashMap<String, String>,
41 },
42 Reflection,
43}
44
45#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
49#[serde(rename_all = "lowercase")]
50pub enum FileFormat {
51 Jsonl,
52 CSV,
53 Parquet,
54 Lines,
55}
56
57#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
59#[serde(rename_all = "lowercase")]
60pub enum Compression {
61 Gzip,
62 Bzip2,
63 Xz,
64 Zstd,
65}
66
67#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
68pub struct StaticAwsCredentials {
69 pub access_id: String,
70 pub secret_key: String,
71}
72
73#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
74pub struct FilesConfig {
75 pub paths: Vec<String>,
76 #[serde(default)]
77 #[builder(into)]
78 pub format: Option<FileFormat>,
79 #[serde(default)]
80 #[builder(into)]
81 pub compression: Option<Compression>,
82 #[serde(default)]
83 #[builder(into)]
84 pub file_extension: Option<String>,
85 #[serde(default)]
86 #[builder(into)]
87 pub columns: Option<Vec<ColumnConfig>>,
88 #[serde(default)]
89 #[builder(into)]
90 pub header: Option<bool>,
91 #[serde(default)]
92 #[builder(into)]
93 pub delimiter: Option<String>,
94 #[serde(default)]
95 #[builder(into)]
96 pub region: Option<String>,
97 #[serde(default)]
98 #[builder(into)]
99 pub role_arn: Option<String>,
100 #[serde(default)]
101 pub static_credentials: Option<StaticAwsCredentials>,
102 #[serde(default)]
103 #[builder(into)]
104 pub endpoint_url: Option<String>,
105 #[serde(default)]
106 pub force_path_style: Option<bool>,
107 #[serde(default)]
108 pub allow_http: Option<bool>,
109}
110
111#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
112pub struct MemConfig {
113 pub columns: Vec<ColumnConfig>,
114}
115
116#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
117pub struct IcebergConfig {
118 pub metadata_location: String,
119 #[serde(default)]
120 pub region: Option<String>,
121}
122
123#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
124pub struct ViewConfig {
125 pub query: String,
126}
127
128#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, derive_more::From)]
129#[serde(tag = "type", rename_all = "snake_case")]
130pub enum DatasetConfig {
131 Files(FilesConfig),
132 Mem(MemConfig),
133 Iceberg(IcebergConfig),
134 View(ViewConfig),
135}
136
137#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, bon::Builder)]
138pub struct ColumnConfig {
139 pub name: String,
140 #[serde(
141 deserialize_with = "deserialize_hamelin_type",
142 serialize_with = "serialize_hamelin_type"
143 )]
144 pub typ: HamelinType,
145}
146
147fn deserialize_hamelin_type<'de, D>(deserializer: D) -> Result<HamelinType, D::Error>
150where
151 D: serde::Deserializer<'de>,
152{
153 let s = String::deserialize(deserializer)?;
154 let ctx = parse_type(s.clone()).map_err(|e| serde::de::Error::custom(e.contextualize(&s)))?;
155 let typ =
156 Type::from_parse_tree(&ctx).map_err(|e| serde::de::Error::custom(e.contextualize(&s)))?;
157 Ok(typ.into())
158}
159
160fn serialize_hamelin_type<S>(typ: &HamelinType, serializer: S) -> Result<S::Ok, S::Error>
163where
164 S: serde::Serializer,
165{
166 let t: Type = typ.clone().try_into().map_err(serde::ser::Error::custom)?;
167 serializer.serialize_str(&t.to_string())
168}
169
170impl From<&Column> for ColumnConfig {
171 fn from(col: &Column) -> Self {
172 ColumnConfig {
173 name: col.name.as_str().to_string(),
174 typ: col.typ.clone(),
175 }
176 }
177}
178
179#[derive(Debug, Clone)]
183pub enum ResolvedDataset {
184 Local {
186 name: String,
187 catalog: String,
188 schema: String,
189 table: String,
190 config: DatasetConfig,
191 },
192 Catalog {
194 catalog_name: String,
195 catalog_config: CatalogConfig,
196 namespace: String,
197 table: String,
198 },
199}
200
201#[derive(Debug, Clone)]
210pub struct DatasetStore {
211 datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
213 catalogs: HashMap<String, CatalogConfig>,
214 default_catalog: Option<String>,
216}
217
218impl DatasetStore {
219 pub fn new(
220 datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
221 catalog_entries: HashMap<String, CatalogEntry>,
222 ) -> Self {
223 let default_catalog = catalog_entries
224 .iter()
225 .find(|(_, entry)| entry.default)
226 .map(|(name, _)| name.clone());
227
228 let catalogs = catalog_entries
229 .into_iter()
230 .map(|(name, entry)| (name, entry.config))
231 .collect();
232
233 Self {
234 datasets,
235 catalogs,
236 default_catalog,
237 }
238 }
239
240 pub fn default_catalog(&self) -> Option<&str> {
242 self.default_catalog.as_deref()
243 }
244
245 pub fn resolve(&self, segments: &[&str]) -> Result<ResolvedDataset, DatasetStoreError> {
247 match segments {
248 [schema, table] => {
249 let catalog = self.default_catalog.as_deref().ok_or_else(|| {
250 DatasetStoreError::NoDefaultCatalog(format!("{}.{}", schema, table))
251 })?;
252 self.resolve_three(catalog, schema, table)
253 }
254 [catalog, schema, table] => self.resolve_three(catalog, schema, table),
255 _ => Err(DatasetStoreError::UnresolvableReference(
256 segments
257 .iter()
258 .map(|s| s.to_string())
259 .collect::<Vec<_>>()
260 .join("."),
261 )),
262 }
263 }
264
265 fn is_default_catalog(&self, catalog: &str) -> bool {
267 self.default_catalog
268 .as_deref()
269 .map_or(false, |dc| dc == catalog)
270 }
271
272 fn resolve_three(
273 &self,
274 catalog: &str,
275 schema: &str,
276 table: &str,
277 ) -> Result<ResolvedDataset, DatasetStoreError> {
278 if let Some(schemas) = self.datasets.get(catalog) {
280 if let Some(tables) = schemas.get(schema) {
281 if let Some(config) = tables.get(table) {
282 let name = if self.is_default_catalog(catalog) {
283 format!("{}.{}", schema, table)
284 } else {
285 format!("{}.{}.{}", catalog, schema, table)
286 };
287 return Ok(ResolvedDataset::Local {
288 name,
289 catalog: catalog.to_string(),
290 schema: schema.to_string(),
291 table: table.to_string(),
292 config: config.clone(),
293 });
294 }
295 }
296 }
297
298 if let Some(catalog_config) = self.catalogs.get(catalog) {
300 return Ok(ResolvedDataset::Catalog {
301 catalog_name: catalog.to_string(),
302 catalog_config: catalog_config.clone(),
303 namespace: schema.to_string(),
304 table: table.to_string(),
305 });
306 }
307
308 let ref_name = if self.is_default_catalog(catalog) {
310 format!("{}.{}", schema, table)
311 } else {
312 format!("{}.{}.{}", catalog, schema, table)
313 };
314 Err(DatasetStoreError::UnknownDataset(ref_name))
315 }
316
317 pub fn flat_datasets(&self) -> HashMap<String, DatasetConfig> {
319 let mut result = HashMap::new();
320 for (catalog, schemas) in &self.datasets {
321 for (schema, tables) in schemas {
322 for (table, config) in tables {
323 let key = if self.is_default_catalog(catalog) {
324 format!("{}.{}", schema, table)
325 } else {
326 format!("{}.{}.{}", catalog, schema, table)
327 };
328 result.insert(key, config.clone());
329 }
330 }
331 }
332 result
333 }
334
335 pub fn default_catalog_config(&self) -> Option<(&str, &CatalogConfig)> {
337 let name = self.default_catalog.as_deref()?;
338 let config = self.catalogs.get(name)?;
339 Some((name, config))
340 }
341}
342
343#[derive(Debug, thiserror::Error)]
344pub enum DatasetStoreError {
345 #[error("Cannot resolve table reference '{0}': not a 2 or 3-segment identifier")]
346 UnresolvableReference(String),
347 #[error("Unknown dataset '{0}': no local config or catalog matches")]
348 UnknownDataset(String),
349 #[error(
350 "No default catalog configured; use a 3-part identifier (catalog.schema.table) for '{0}'"
351 )]
352 NoDefaultCatalog(String),
353}
354
355#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
365#[serde(default)]
366pub struct HamelinConfig {
367 pub catalogs: HashMap<String, CatalogEntry>,
369 pub datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
371 pub catalog_filename: String,
374}
375
376impl Default for HamelinConfig {
377 fn default() -> Self {
378 Self {
379 catalogs: HashMap::new(),
380 datasets: HashMap::new(),
381 catalog_filename: "~/.hamelin.env".to_string(),
382 }
383 }
384}
385
386impl HamelinConfig {
387 pub fn catalog_filename_path(&self) -> anyhow::Result<PathBuf> {
388 expanduser::expanduser(&self.catalog_filename)
389 .context("Failed to expand environment filename")
390 }
391}