hamelin_legacy 0.4.0

Legacy AST translation code for Hamelin (to be deprecated)
Documentation
use std::fmt::Debug;
use std::sync::Arc;

use async_trait::async_trait;
use parking_lot::RwLock;
use tokio::runtime::Handle;

use hamelin_executor::executor::{Executor, ExecutorError};
use hamelin_executor::results::ResultSet;
use hamelin_lib::catalog::{Catalog, Column};
use hamelin_lib::provider::EnvironmentProvider;
use hamelin_lib::sql::expression::identifier::Identifier;
use hamelin_lib::sql::query::TableReference;
use hamelin_trino::TrinoClient;

use crate::trino_env::TrinoEnvironmentProvider;
use crate::Compiler;

/// Configuration for the LegacyTrinoExecutor.
#[derive(Debug, Clone, Default)]
pub struct LegacyTrinoConfig {
    pub host: String,
    pub catalog: Option<String>,
    pub port: u16,
    pub user: String,
}

/// A Trino executor that uses the legacy Hamelin compiler.
/// This executor compiles Hamelin source strings and executes them against Trino.
///
/// This is a "one-stop-shop" executor - users create it and get reflection, compilation,
/// and execution all wired together automatically. Supports two modes:
/// 1. **Live reflection** (default) - uses `TrinoEnvironmentProvider` to reflect from Trino
/// 2. **Custom provider** - uses any `EnvironmentProvider` (e.g., `CatalogProvider` for `--use-catalog-file`)
pub struct LegacyTrinoExecutor {
    client: Arc<TrinoClient>,
    compiler: RwLock<Compiler>,
}

impl LegacyTrinoExecutor {
    /// Create executor with live Trino reflection.
    pub fn from_conf(config: LegacyTrinoConfig, tokio: Handle) -> Result<Self, ExecutorError> {
        let trino_conf = hamelin_trino::conf::Db {
            host: config.host.clone(),
            port: config.port,
            user: config.user.clone(),
            catalog: config.catalog.clone(),
        };
        let client = Arc::new(TrinoClient::from_conf(trino_conf)?);

        // Create TrinoEnvironmentProvider wrapping client + tokio handle
        let env_provider = Arc::new(TrinoEnvironmentProvider::new(client.clone(), tokio));

        let mut compiler = Compiler::new();
        compiler.set_environment_provider(env_provider);

        Ok(Self {
            client,
            compiler: RwLock::new(compiler),
        })
    }

    /// Create executor with a custom environment provider (e.g., CatalogProvider from file).
    pub fn with_provider(
        config: LegacyTrinoConfig,
        provider: Arc<dyn EnvironmentProvider>,
    ) -> Result<Self, ExecutorError> {
        let trino_conf = hamelin_trino::conf::Db {
            host: config.host.clone(),
            port: config.port,
            user: config.user.clone(),
            catalog: config.catalog.clone(),
        };
        let client = Arc::new(TrinoClient::from_conf(trino_conf)?);

        let mut compiler = Compiler::new();
        compiler.set_environment_provider(provider);

        Ok(Self {
            client,
            compiler: RwLock::new(compiler),
        })
    }

    fn compile_query(
        &self,
        hamelin: &str,
        time_range: Option<&str>,
    ) -> Result<crate::QueryTranslation, ExecutorError> {
        let mut compiler = self.compiler.write();
        match time_range {
            Some(tr) => compiler
                .set_time_range_expression(tr.to_string())
                .map_err(ExecutorError::CompileError)?,
            None => compiler.clear_time_range(),
        }
        compiler
            .compile_query(hamelin.to_string())
            .map_err(ExecutorError::CompileError)
    }

    fn compile_dml(
        &self,
        hamelin: &str,
        time_range: Option<&str>,
    ) -> Result<crate::DMLTranslation, ExecutorError> {
        let mut compiler = self.compiler.write();
        match time_range {
            Some(tr) => compiler
                .set_time_range_expression(tr.to_string())
                .map_err(ExecutorError::CompileError)?,
            None => compiler.clear_time_range(),
        }
        compiler
            .compile_dml(hamelin.to_string())
            .map_err(ExecutorError::CompileError)
    }

    /// Get access to the underlying `TrinoClient`.
    pub fn client(&self) -> &Arc<TrinoClient> {
        &self.client
    }

    /// Get access to the compiler (for advanced use cases like translate-only).
    pub fn compiler(&self) -> &RwLock<Compiler> {
        &self.compiler
    }
}

impl Debug for LegacyTrinoExecutor {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("LegacyTrinoExecutor").finish()
    }
}

#[async_trait]
impl Executor for LegacyTrinoExecutor {
    async fn execute_query(
        &self,
        hamelin: &str,
        time_range: Option<&str>,
    ) -> Result<ResultSet, ExecutorError> {
        let translation = self.compile_query(hamelin, time_range)?;
        self.client
            .execute_query_sql(
                &translation.translation.sql,
                translation.translation.columns,
            )
            .await
    }

    async fn execute_dml(
        &self,
        hamelin: &str,
        time_range: Option<&str>,
    ) -> Result<usize, ExecutorError> {
        let translation = self.compile_dml(hamelin, time_range)?;
        self.client
            .execute_dml_sql(&translation.translation.sql)
            .await
    }

    async fn create_dataset(
        &self,
        dataset: Identifier,
        columns: Vec<Column>,
    ) -> Result<(), ExecutorError> {
        self.client.create_dataset(dataset, columns).await
    }

    async fn get_dataset(&self, dataset: Identifier) -> Result<Vec<Column>, ExecutorError> {
        let table_ref = TableReference {
            name: dataset,
            alias: None,
        };
        let struct_type = self.client.reflect_columns(table_ref).await?;

        // Convert Struct to Vec<Column>
        let columns = struct_type
            .fields
            .iter()
            .map(|(name, typ)| Column {
                name: name.to_hamelin(),
                typ: typ.clone().into(),
            })
            .collect();

        Ok(columns)
    }

    async fn get_catalog(&self) -> Result<Catalog, ExecutorError> {
        self.client.reflect_catalog().await
    }
}