use std::fmt::Debug;
use std::sync::Arc;
use std::thread;
use tokio::runtime::Handle;
use hamelin_lib::provider::EnvironmentProvider;
use hamelin_lib::sql::expression::identifier::Identifier;
use hamelin_lib::sql::query::TableReference;
use hamelin_lib::types::struct_type::Struct;
use hamelin_trino::TrinoClient;
pub struct TrinoEnvironmentProvider {
client: Arc<TrinoClient>,
handle: Handle,
}
impl TrinoEnvironmentProvider {
pub fn new(client: Arc<TrinoClient>, handle: Handle) -> Self {
Self { client, handle }
}
fn run_blocking<F, T>(&self, future: F) -> T
where
F: std::future::Future<Output = T> + Send,
T: Send,
{
if Handle::try_current().is_ok() {
let handle = self.handle.clone();
thread::scope(|s| {
s.spawn(move || handle.block_on(future))
.join()
.expect("Thread panicked during async operation")
})
} else {
self.handle.block_on(future)
}
}
}
impl Debug for TrinoEnvironmentProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TrinoEnvironmentProvider")
.field("client", &self.client)
.finish()
}
}
impl EnvironmentProvider for TrinoEnvironmentProvider {
fn reflect_columns(&self, table_reference: TableReference) -> anyhow::Result<Struct> {
self.run_blocking(self.client.reflect_columns(table_reference))
.map_err(|e| anyhow::anyhow!("{}", e))
}
fn reflect_datasets(&self) -> anyhow::Result<Vec<Identifier>> {
self.run_blocking(self.client.reflect_datasets())
.map_err(|e| anyhow::anyhow!("{}", e))
}
}