use anyhow::Result;
use micromegas_analytics::lakehouse::lakehouse_context::LakehouseContext;
use micromegas_analytics::lakehouse::partition_cache::LivePartitionProvider;
use micromegas_analytics::lakehouse::session_configurator::SessionConfigurator;
use micromegas_analytics::lakehouse::static_tables_configurator::StaticTablesConfigurator;
use micromegas_analytics::lakehouse::view_factory::{ViewFactory, default_view_factory};
use micromegas_auth::tower::AuthService;
use micromegas_auth::types::AuthProvider;
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use micromegas_tracing::prelude::*;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use arrow_flight::flight_service_server::FlightServiceServer;
use datafusion::execution::runtime_env::RuntimeEnv;
use tonic::transport::Server;
use tower::ServiceBuilder;
use tower::layer::layer_fn;
use super::connect_info_layer::ConnectedIncoming;
use super::flight_sql_service_impl::FlightSqlServiceImpl;
use super::grpc_health_service::GrpcHealthService;
use super::log_uri_service::LogUriService;
type ViewFactoryFn = Box<
dyn FnOnce(
Arc<RuntimeEnv>,
Arc<DataLakeConnection>,
) -> Pin<Box<dyn Future<Output = Result<ViewFactory>> + Send>>
+ Send,
>;
pub struct FlightSqlServer;
impl FlightSqlServer {
pub fn builder() -> FlightSqlServerBuilder {
FlightSqlServerBuilder::default()
}
}
pub struct FlightSqlServerBuilder {
view_factory_fn: Option<ViewFactoryFn>,
session_configurator: Option<Arc<dyn SessionConfigurator>>,
auth_provider: Option<Arc<dyn AuthProvider>>,
use_default_auth: bool,
max_decoding_message_size: usize,
listen_addr: SocketAddr,
}
impl Default for FlightSqlServerBuilder {
fn default() -> Self {
Self {
view_factory_fn: None,
session_configurator: None,
auth_provider: None,
use_default_auth: false,
max_decoding_message_size: 100 * 1024 * 1024,
listen_addr: "0.0.0.0:50051"
.parse()
.expect("valid default listen address"),
}
}
}
impl FlightSqlServerBuilder {
pub fn with_view_factory_fn<F, Fut>(mut self, f: F) -> Self
where
F: FnOnce(Arc<RuntimeEnv>, Arc<DataLakeConnection>) -> Fut + Send + 'static,
Fut: Future<Output = Result<ViewFactory>> + Send + 'static,
{
self.view_factory_fn = Some(Box::new(move |runtime, lake| Box::pin(f(runtime, lake))));
self
}
pub fn with_session_configurator(mut self, cfg: Arc<dyn SessionConfigurator>) -> Self {
self.session_configurator = Some(cfg);
self
}
pub fn with_auth_provider(mut self, provider: Arc<dyn AuthProvider>) -> Self {
self.auth_provider = Some(provider);
self.use_default_auth = false;
self
}
pub fn with_default_auth(mut self) -> Self {
self.use_default_auth = true;
self.auth_provider = None;
self
}
pub fn with_max_decoding_message_size(mut self, bytes: usize) -> Self {
self.max_decoding_message_size = bytes;
self
}
pub fn with_listen_addr(mut self, addr: SocketAddr) -> Self {
self.listen_addr = addr;
self
}
pub async fn build_and_serve(self) -> Result<()> {
let lakehouse = LakehouseContext::from_env().await?;
let data_lake = lakehouse.lake().clone();
info!(
"created lakehouse context with metadata cache: {:?}",
lakehouse.metadata_cache()
);
let view_factory = if let Some(factory_fn) = self.view_factory_fn {
Arc::new(factory_fn(lakehouse.runtime().clone(), data_lake).await?)
} else {
Arc::new(default_view_factory(lakehouse.runtime().clone(), data_lake).await?)
};
let partition_provider =
Arc::new(LivePartitionProvider::new(lakehouse.lake().db_pool.clone()));
let session_configurator: Arc<dyn SessionConfigurator> =
if let Some(cfg) = self.session_configurator {
cfg
} else {
StaticTablesConfigurator::from_env(
"MICROMEGAS_STATIC_TABLES_URL",
lakehouse.runtime().clone(),
)
.await?
};
let svc = FlightServiceServer::new(FlightSqlServiceImpl::new(
lakehouse,
partition_provider,
view_factory,
session_configurator,
))
.max_decoding_message_size(self.max_decoding_message_size);
let auth_provider: Option<Arc<dyn AuthProvider>> = if let Some(provider) =
self.auth_provider
{
Some(provider)
} else if self.use_default_auth {
match micromegas_auth::default_provider::provider().await? {
Some(provider) => Some(provider),
None => {
anyhow::bail!(
"Authentication required but no auth providers configured. Set MICROMEGAS_API_KEYS or MICROMEGAS_OIDC_CONFIG"
);
}
}
} else {
info!("Authentication disabled");
None
};
let layer = ServiceBuilder::new()
.layer(layer_fn(GrpcHealthService::new))
.layer(layer_fn(|service| LogUriService { service }))
.layer(layer_fn(move |inner| AuthService {
inner,
auth_provider: auth_provider.clone(),
}))
.into_inner();
info!("Listening on {:?}", self.listen_addr);
let listener = std::net::TcpListener::bind(self.listen_addr)?;
let incoming = ConnectedIncoming::from_std_listener(listener)?;
Server::builder()
.layer(layer)
.add_service(svc)
.serve_with_incoming(incoming)
.await?;
info!("bye");
Ok(())
}
}