hamelin_executor 0.9.4

Common package for executing Hamelin across different backends
Documentation
use std::{error::Error, fmt::Debug};

use async_trait::async_trait;
use hamelin_lib::catalog::{Catalog, Column};
use hamelin_lib::err::ContextualTranslationErrors;
use hamelin_lib::tree::ast::identifier::Identifier;

use crate::results::ResultSet;

pub use hamelin_lib::TimeRange;

/// A compiled query ready for display or execution.
#[derive(Debug)]
pub struct CompiledQuery {
    /// Human-readable representation (SQL for Trino, LogicalPlan display for DataFusion).
    pub display: String,
    /// Output columns with types.
    pub columns: Vec<Column>,
}

#[async_trait]
pub trait Executor: Debug + Send + Sync {
    /// Compile a Hamelin query string without executing it.
    /// Returns a human-readable representation and the output schema.
    async fn translate(
        &self,
        hamelin: &str,
        time_range: Option<&str>,
    ) -> Result<CompiledQuery, ExecutorError>;

    /// Lower a Hamelin query to IR and return its display representation.
    ///
    /// Each executor applies its own normalization options (e.g. DataFusion
    /// enables `with_lower_filter()` and `with_lower_transform()` while Trino
    /// does not), so the IR output reflects what the executor will actually process.
    async fn ir(
        &self,
        hamelin: &str,
        time_range: Option<&str>,
    ) -> Result<CompiledQuery, ExecutorError>;

    /// Produce an execution plan for a Hamelin query without executing it.
    ///
    /// For DataFusion, this returns the physical plan. For Trino, this
    /// returns the output of `EXPLAIN (TYPE DISTRIBUTED) <sql>`.
    async fn explain(
        &self,
        hamelin: &str,
        time_range: Option<&str>,
    ) -> Result<String, ExecutorError>;

    /// Execute a Hamelin query string, return rows.
    /// `time_range` is an optional Hamelin range expression (e.g., "[-1h..]")
    async fn execute_query(
        &self,
        hamelin: &str,
        time_range: Option<&str>,
    ) -> Result<ResultSet, ExecutorError>;

    /// Execute a Hamelin DML string, return rows affected.
    async fn execute_dml(
        &self,
        hamelin: &str,
        time_range: Option<&str>,
    ) -> Result<usize, ExecutorError>;

    /// Create a dataset in the underlying database.
    async fn create_dataset(
        &self,
        dataset: Identifier,
        columns: Vec<Column>,
    ) -> Result<(), ExecutorError>;

    /// Resolve one or more datasets' columns.
    ///
    /// Returns columns in the same order as the input slice — callers can
    /// `zip` the result with their identifiers.
    async fn resolve_datasets(
        &self,
        datasets: &[Identifier],
    ) -> Result<Vec<Vec<Column>>, ExecutorError>;

    /// Reflect the schema of the default catalog.
    ///
    /// This is a proactive reflection operation that queries the underlying
    /// backend for its current schema. For Trino, this queries
    /// `information_schema.columns` scoped to the default catalog set on the
    /// connection. For DataFusion, this resolves all locally-configured datasets
    /// and enumerates the SessionContext's registered tables.
    async fn reflect_catalog(&self) -> Result<Catalog, ExecutorError>;
}

#[derive(Debug, thiserror::Error)]
pub enum ExecutorError {
    #[error("Server error: {0}")]
    /// Think of this like a generic database 500.
    ServerError(Box<dyn Error + Send + Sync + 'static>),

    #[error("Query error: {0}")]
    /// Your query was wrong. Try harder.
    QueryError(Box<dyn Error + Send + Sync + 'static>),

    #[error("Connection error: {0}")]
    /// I can't connect to the place that you want to connect.
    ConnectionError(Box<dyn Error + Send + Sync + 'static>),

    #[error("Configuration error: {0}")]
    /// I can't even understand your configuration enough to build anything.
    ConfigurationError(Box<dyn Error + Send + Sync + 'static>),

    #[error("Unexpected result set: {0}")]
    /// These were not the rows I expected.
    UnexpectedResultSet(Box<dyn Error + Send + Sync + 'static>),

    #[error("Compile error: {0}")]
    /// Hamelin compilation failed.
    CompileError(ContextualTranslationErrors),
}

impl From<ContextualTranslationErrors> for ExecutorError {
    fn from(value: ContextualTranslationErrors) -> Self {
        ExecutorError::CompileError(value)
    }
}