use std::fmt::Debug;
use std::sync::Arc;
use async_trait::async_trait;
use parking_lot::RwLock;
use tokio::runtime::Handle;
use hamelin_executor::executor::{Executor, ExecutorError};
use hamelin_executor::results::ResultSet;
use hamelin_lib::catalog::{Catalog, Column};
use hamelin_lib::provider::EnvironmentProvider;
use hamelin_lib::sql::expression::identifier::Identifier;
use hamelin_lib::sql::query::TableReference;
use hamelin_trino::TrinoClient;
use crate::trino_env::TrinoEnvironmentProvider;
use crate::Compiler;
#[derive(Debug, Clone, Default)]
pub struct LegacyTrinoConfig {
pub host: String,
pub catalog: Option<String>,
pub port: u16,
pub user: String,
}
pub struct LegacyTrinoExecutor {
client: Arc<TrinoClient>,
compiler: RwLock<Compiler>,
}
impl LegacyTrinoExecutor {
pub fn from_conf(config: LegacyTrinoConfig, tokio: Handle) -> Result<Self, ExecutorError> {
let trino_conf = hamelin_trino::conf::Db {
host: config.host.clone(),
port: config.port,
user: config.user.clone(),
catalog: config.catalog.clone(),
};
let client = Arc::new(TrinoClient::from_conf(trino_conf)?);
let env_provider = Arc::new(TrinoEnvironmentProvider::new(client.clone(), tokio));
let mut compiler = Compiler::new();
compiler.set_environment_provider(env_provider);
Ok(Self {
client,
compiler: RwLock::new(compiler),
})
}
pub fn with_provider(
config: LegacyTrinoConfig,
provider: Arc<dyn EnvironmentProvider>,
) -> Result<Self, ExecutorError> {
let trino_conf = hamelin_trino::conf::Db {
host: config.host.clone(),
port: config.port,
user: config.user.clone(),
catalog: config.catalog.clone(),
};
let client = Arc::new(TrinoClient::from_conf(trino_conf)?);
let mut compiler = Compiler::new();
compiler.set_environment_provider(provider);
Ok(Self {
client,
compiler: RwLock::new(compiler),
})
}
fn compile_query(
&self,
hamelin: &str,
time_range: Option<&str>,
) -> Result<crate::QueryTranslation, ExecutorError> {
let mut compiler = self.compiler.write();
match time_range {
Some(tr) => compiler
.set_time_range_expression(tr.to_string())
.map_err(ExecutorError::CompileError)?,
None => compiler.clear_time_range(),
}
compiler
.compile_query(hamelin.to_string())
.map_err(ExecutorError::CompileError)
}
fn compile_dml(
&self,
hamelin: &str,
time_range: Option<&str>,
) -> Result<crate::DMLTranslation, ExecutorError> {
let mut compiler = self.compiler.write();
match time_range {
Some(tr) => compiler
.set_time_range_expression(tr.to_string())
.map_err(ExecutorError::CompileError)?,
None => compiler.clear_time_range(),
}
compiler
.compile_dml(hamelin.to_string())
.map_err(ExecutorError::CompileError)
}
pub fn client(&self) -> &Arc<TrinoClient> {
&self.client
}
pub fn compiler(&self) -> &RwLock<Compiler> {
&self.compiler
}
}
impl Debug for LegacyTrinoExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LegacyTrinoExecutor").finish()
}
}
#[async_trait]
impl Executor for LegacyTrinoExecutor {
async fn execute_query(
&self,
hamelin: &str,
time_range: Option<&str>,
) -> Result<ResultSet, ExecutorError> {
let translation = self.compile_query(hamelin, time_range)?;
self.client
.execute_query_sql(
&translation.translation.sql,
translation.translation.columns,
)
.await
}
async fn execute_dml(
&self,
hamelin: &str,
time_range: Option<&str>,
) -> Result<usize, ExecutorError> {
let translation = self.compile_dml(hamelin, time_range)?;
self.client
.execute_dml_sql(&translation.translation.sql)
.await
}
async fn create_dataset(
&self,
dataset: Identifier,
columns: Vec<Column>,
) -> Result<(), ExecutorError> {
self.client.create_dataset(dataset, columns).await
}
async fn get_dataset(&self, dataset: Identifier) -> Result<Vec<Column>, ExecutorError> {
let table_ref = TableReference {
name: dataset,
alias: None,
};
let struct_type = self.client.reflect_columns(table_ref).await?;
let columns = struct_type
.fields
.iter()
.map(|(name, typ)| Column {
name: name.to_hamelin(),
typ: typ.clone().into(),
})
.collect();
Ok(columns)
}
async fn get_catalog(&self) -> Result<Catalog, ExecutorError> {
self.client.reflect_catalog().await
}
}