use std::collections::HashSet;
use std::sync::{Arc, RwLock};
use hamelin::lib::catalog::{Catalog, CatalogProvider, DataSetBuilder};
use hamelin::lib::err::ContextualTranslationErrors;
use hamelin::lib::provider::EnvironmentProvider;
use hamelin::lib::sql::expression::identifier::Identifier;
use hamelin::lib::sql::query::TableReference;
use hamelin::lib::types::struct_type::Struct;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tsify_next::Tsify;
use wasm_bindgen::prelude::*;
struct UnionOverlayProvider {
base: Arc<CatalogProvider>,
failed: RwLock<HashSet<Identifier>>,
}
impl UnionOverlayProvider {
fn new(base: Arc<CatalogProvider>) -> Self {
Self {
base,
failed: RwLock::new(HashSet::new()),
}
}
fn add_failed_dataset(&self, name: Identifier) {
self.failed
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(name);
}
}
impl std::fmt::Debug for UnionOverlayProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let failed_count = self.failed.read().map(|f| f.len()).unwrap_or(0);
f.debug_struct("UnionOverlayProvider")
.field("failed_count", &failed_count)
.finish()
}
}
impl EnvironmentProvider for UnionOverlayProvider {
fn reflect_columns(&self, table_reference: TableReference) -> anyhow::Result<Struct> {
let failed = self
.failed
.read()
.map_err(|e| anyhow::anyhow!("failed to acquire read lock: {}", e))?;
if failed.contains(&table_reference.name) {
Ok(Struct::default())
} else {
self.base.reflect_columns(table_reference)
}
}
fn reflect_datasets(&self) -> anyhow::Result<Vec<Identifier>> {
let mut datasets = self.base.reflect_datasets()?;
let failed = self
.failed
.read()
.map_err(|e| anyhow::anyhow!("failed to acquire read lock: {}", e))?;
for f in failed.iter() {
if !datasets.contains(f) {
datasets.push(f.clone());
}
}
Ok(datasets)
}
}
#[derive(Serialize, Deserialize, Tsify)]
#[serde(rename_all = "camelCase")]
#[tsify(into_wasm_abi, from_wasm_abi)]
pub struct CatalogResource {
pub name: String,
pub query: String,
pub is_union: bool,
}
#[derive(Debug, Error, Serialize, Tsify)]
#[tsify(into_wasm_abi)]
#[serde(tag = "kind", rename_all = "camelCase")]
pub enum BuildCatalogError {
#[error("failed to initialize catalog: {message}")]
CatalogInit { message: String },
#[error("query compilation failed")]
Compilation {
name: String,
errors: ContextualTranslationErrors,
},
#[error("failed to parse dataset: {message}")]
DatasetParse { name: String, message: String },
}
#[derive(Serialize, Tsify)]
#[tsify(into_wasm_abi, hashmap_as_object)]
pub struct BuildCatalogOutput {
pub catalog: Catalog,
pub errors: Vec<BuildCatalogError>,
}
#[wasm_bindgen]
pub fn build_catalog(
starting_catalog: Catalog,
resources: Vec<CatalogResource>,
) -> BuildCatalogOutput {
let catalog_provider = match CatalogProvider::try_from(starting_catalog) {
Ok(provider) => provider,
Err(e) => {
return BuildCatalogOutput {
catalog: Catalog::default(),
errors: vec![BuildCatalogError::CatalogInit {
message: e.to_string(),
}],
};
}
};
let provider_arc: Arc<CatalogProvider> = Arc::new(catalog_provider);
let overlay_provider = Arc::new(UnionOverlayProvider::new(provider_arc.clone()));
let mut base_compiler = hamelin::Compiler::new();
base_compiler.set_environment_provider(provider_arc.clone());
let overlay_compiler = hamelin::Compiler {
expression_environment: base_compiler.expression_environment.clone(),
query_environment_provider: overlay_provider.clone(),
time_range_filter: None,
registry: base_compiler.registry.clone(),
translation_registry: base_compiler.translation_registry.clone(),
};
let mut errors = Vec::new();
for resource in resources {
let CatalogResource {
name: resource_name,
query,
is_union,
} = resource;
let compiler = if is_union {
&overlay_compiler
} else {
&base_compiler
};
match compiler.compile_query(query) {
Ok(query_translation) => {
let mut builder = DataSetBuilder::new(&resource_name);
builder
.columns
.extend(query_translation.translation.columns);
match builder.parse() {
Ok((name_identifier, column_map)) => {
provider_arc.set(name_identifier, column_map);
}
Err(e) => {
errors.push(BuildCatalogError::DatasetParse {
name: resource_name.clone(),
message: e.to_string(),
});
if let Ok(name_identifier) = resource_name.parse() {
overlay_provider.add_failed_dataset(name_identifier);
}
}
}
}
Err(e) => {
errors.push(BuildCatalogError::Compilation {
name: resource_name.clone(),
errors: e,
});
if let Ok(name_identifier) = resource_name.parse() {
overlay_provider.add_failed_dataset(name_identifier);
}
}
}
}
let final_catalog = provider_arc.get_catalog();
BuildCatalogOutput {
catalog: final_catalog,
errors,
}
}