hamelin_wasm 0.7.4

Hamelin implementation compiled to WASM
Documentation
use std::collections::HashSet;
use std::sync::{Arc, RwLock};

use hamelin_lib::catalog::{Catalog, CatalogProvider, DataSetBuilder};
use hamelin_lib::err::ContextualTranslationErrors;
use hamelin_lib::provider::EnvironmentProvider;
use hamelin_lib::tree::ast::identifier::Identifier;
use hamelin_lib::types::struct_type::Struct;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tsify::Tsify;
use wasm_bindgen::prelude::*;

use crate::{compile_query_with_provider, CompileHamelinQueryResult};

/// Overlays "failed datasets exist with empty schema" on top of the real provider.
/// Allows UNION compilation to proceed even when dependencies failed, without
/// polluting the final catalog with those failures.
struct UnionOverlayProvider {
    base: Arc<CatalogProvider>,
    failed: RwLock<HashSet<Identifier>>,
}

impl UnionOverlayProvider {
    fn new(base: Arc<CatalogProvider>) -> Self {
        Self {
            base,
            failed: RwLock::new(HashSet::new()),
        }
    }

    fn add_failed_dataset(&self, name: Identifier) {
        // Use unwrap_or_else to recover from a poisoned lock (extremely rare in WASM)
        self.failed
            .write()
            .unwrap_or_else(|e| e.into_inner())
            .insert(name);
    }
}

impl std::fmt::Debug for UnionOverlayProvider {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let failed_count = self.failed.read().map(|f| f.len()).unwrap_or(0);
        f.debug_struct("UnionOverlayProvider")
            .field("failed_count", &failed_count)
            .finish()
    }
}

impl EnvironmentProvider for UnionOverlayProvider {
    fn reflect_columns(&self, name: &Identifier) -> anyhow::Result<Struct> {
        let failed = self
            .failed
            .read()
            .map_err(|e| anyhow::anyhow!("failed to acquire read lock: {}", e))?;
        if failed.contains(name) {
            // Treat failed datasets as existing with an empty schema.
            Ok(Struct::default())
        } else {
            self.base.reflect_columns(name)
        }
    }

    fn reflect_datasets(&self) -> anyhow::Result<Vec<Identifier>> {
        let mut datasets = self.base.reflect_datasets()?;
        let failed = self
            .failed
            .read()
            .map_err(|e| anyhow::anyhow!("failed to acquire read lock: {}", e))?;
        for f in failed.iter() {
            if !datasets.contains(f) {
                datasets.push(f.clone());
            }
        }
        Ok(datasets)
    }
}

#[derive(Serialize, Deserialize, Tsify)]
#[serde(rename_all = "camelCase")]
#[tsify(into_wasm_abi, from_wasm_abi)]
pub struct CatalogResource {
    pub name: Identifier,
    pub query: String,
    pub is_union: bool,
}

#[derive(Debug, Error, Serialize, Tsify)]
#[tsify(into_wasm_abi)]
#[serde(tag = "kind", rename_all = "camelCase")]
pub enum BuildCatalogError {
    #[error("failed to initialize catalog: {message}")]
    CatalogInit { message: String },

    #[error("query compilation failed")]
    Compilation {
        name: String,
        errors: ContextualTranslationErrors,
    },

    #[error("failed to parse dataset: {message}")]
    DatasetParse { name: String, message: String },
}

#[derive(Serialize, Tsify)]
#[tsify(into_wasm_abi, hashmap_as_object)]
pub struct BuildCatalogOutput {
    pub catalog: Catalog,
    pub errors: Vec<BuildCatalogError>,
}

#[wasm_bindgen]
/// Builds a `Catalog` by compiling each dataset query and registering its output schema.
///
/// Important: `resources` (datasets) must be topologically sorted before being passed in:
/// if dataset B references dataset A, then A must appear before B in `resources`.
pub fn build_catalog(
    starting_catalog: Catalog,
    resources: Vec<CatalogResource>,
) -> BuildCatalogOutput {
    // Initialize compiler and catalog provider from starting catalog
    let catalog_provider = match CatalogProvider::try_from(starting_catalog) {
        Ok(provider) => provider,
        Err(e) => {
            return BuildCatalogOutput {
                catalog: Catalog::default(),
                errors: vec![BuildCatalogError::CatalogInit {
                    message: e.to_string(),
                }],
            };
        }
    };

    let provider_arc: Arc<CatalogProvider> = Arc::new(catalog_provider);
    let overlay_provider: Arc<UnionOverlayProvider> =
        Arc::new(UnionOverlayProvider::new(provider_arc.clone()));

    let mut errors = Vec::new();

    for resource in resources {
        let CatalogResource {
            name: resource_name,
            query,
            is_union,
        } = resource;

        // UNIONs use an overlay provider so failed dependencies don't block compilation.
        match compile_query_with_provider(
            query,
            if is_union {
                overlay_provider.clone()
            } else {
                provider_arc.clone()
            },
        ) {
            CompileHamelinQueryResult::Ok(result) => {
                let mut builder = DataSetBuilder::new(resource_name.clone());
                builder.columns.extend(result.columns);

                match builder.parse() {
                    Ok((name_identifier, column_map)) => {
                        provider_arc.set(name_identifier, column_map);
                    }
                    Err(e) => {
                        errors.push(BuildCatalogError::DatasetParse {
                            name: resource_name.to_string(),
                            message: e.to_string(),
                        });
                        overlay_provider.add_failed_dataset(resource_name);
                    }
                }
            }
            CompileHamelinQueryResult::Err(e) => {
                errors.push(BuildCatalogError::Compilation {
                    name: resource_name.to_string(),
                    errors: e,
                });
                overlay_provider.add_failed_dataset(resource_name);
            }
        }
    }

    // Get the final catalog from the provider
    let final_catalog = provider_arc.get_catalog();

    BuildCatalogOutput {
        catalog: final_catalog,
        errors,
    }
}