use super::sqlinfo::{
SQL_INFO_DATE_TIME_FUNCTIONS, SQL_INFO_NUMERIC_FUNCTIONS, SQL_INFO_SQL_KEYWORDS,
SQL_INFO_STRING_FUNCTIONS, SQL_INFO_SYSTEM_FUNCTIONS,
};
use anyhow::Result;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::encode::{DictionaryHandling, FlightDataEncoderBuilder};
use arrow_flight::error::FlightError;
use arrow_flight::sql::DoPutPreparedStatementResult;
use arrow_flight::sql::metadata::{SqlInfoData, SqlInfoDataBuilder};
use arrow_flight::sql::server::PeekableFlightDataStream;
use arrow_flight::sql::{
ActionBeginSavepointRequest, ActionBeginSavepointResult, ActionBeginTransactionRequest,
ActionBeginTransactionResult, ActionCancelQueryRequest, ActionCancelQueryResult,
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
ActionCreatePreparedStatementResult, ActionCreatePreparedSubstraitPlanRequest,
ActionEndSavepointRequest, ActionEndTransactionRequest, Any, CommandGetCatalogs,
CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
CommandGetXdbcTypeInfo, CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
CommandStatementIngest, CommandStatementQuery, CommandStatementSubstraitPlan,
CommandStatementUpdate, ProstMessageExt, SqlInfo, TicketStatementQuery,
server::FlightSqlService,
};
use arrow_flight::{
Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse,
Ticket, flight_service_server::FlightService,
};
use core::str;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::ipc::writer::StreamWriter;
use futures::StreamExt;
use futures::{Stream, TryStreamExt};
use micromegas_analytics::lakehouse::lakehouse_context::LakehouseContext;
use micromegas_analytics::lakehouse::partition_cache::QueryPartitionProvider;
use micromegas_analytics::lakehouse::query::make_session_context;
use micromegas_analytics::lakehouse::session_configurator::SessionConfigurator;
use micromegas_analytics::lakehouse::view_factory::ViewFactory;
use micromegas_analytics::replication::bulk_ingest;
use micromegas_analytics::time::TimeRange;
use micromegas_auth::user_attribution::validate_and_resolve_user_attribution_grpc;
use micromegas_tracing::prelude::*;
use once_cell::sync::Lazy;
use prost::Message;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use tonic::metadata::MetadataMap;
use tonic::{Request, Response, Status, Streaming};
type FlightDataStream =
Pin<Box<dyn Stream<Item = Result<arrow_flight::FlightData, Status>> + Send>>;
macro_rules! status {
($desc:expr, $err:expr) => {
Status::internal(format!("{}: {} at {}:{}", $desc, $err, file!(), line!()))
};
}
macro_rules! api_entry_not_implemented {
() => {{
let function_name = micromegas_tracing::__function_name!();
error!("not implemented: {function_name}");
Err(Status::unimplemented(format!(
"{}:{} not implemented: {function_name}",
file!(),
line!()
)))
}};
}
struct CompletionTrackedStream<S> {
inner: S,
start_time: i64,
completed: bool,
}
impl<S> CompletionTrackedStream<S> {
fn new(inner: S, start_time: i64) -> Self {
Self {
inner,
start_time,
completed: false,
}
}
}
impl<S> Stream for CompletionTrackedStream<S>
where
S: Stream<Item = Result<arrow_flight::FlightData, Status>> + Unpin + Send,
{
type Item = Result<arrow_flight::FlightData, Status>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(result)) => {
if let Err(ref err) = result {
error!("stream error occurred: {err:?}");
if !self.completed {
let total_duration = now() - self.start_time;
imetric!("query_duration_with_error", "ticks", total_duration as u64);
imetric!("query_failed", "count", 1);
self.completed = true;
}
}
Poll::Ready(Some(result))
}
Poll::Ready(None) => {
if !self.completed {
let total_duration = now() - self.start_time;
imetric!("query_duration_total", "ticks", total_duration as u64);
imetric!("query_completed_successfully", "count", 1);
self.completed = true;
}
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
static INSTANCE_SQL_DATA: Lazy<SqlInfoData> = Lazy::new(|| {
let mut builder = SqlInfoDataBuilder::new();
builder.append(SqlInfo::FlightSqlServerName, "Micromegas Flight SQL Server");
builder.append(SqlInfo::FlightSqlServerVersion, "1");
builder.append(SqlInfo::FlightSqlServerArrowVersion, "1.3");
builder.append(SqlInfo::SqlKeywords, SQL_INFO_SQL_KEYWORDS);
builder.append(SqlInfo::SqlNumericFunctions, SQL_INFO_NUMERIC_FUNCTIONS);
builder.append(SqlInfo::SqlStringFunctions, SQL_INFO_STRING_FUNCTIONS);
builder.append(SqlInfo::SqlSystemFunctions, SQL_INFO_SYSTEM_FUNCTIONS);
builder.append(SqlInfo::SqlDatetimeFunctions, SQL_INFO_DATE_TIME_FUNCTIONS);
builder.build().unwrap()
});
#[derive(Clone)]
pub struct FlightSqlServiceImpl {
lakehouse: Arc<LakehouseContext>,
part_provider: Arc<dyn QueryPartitionProvider>,
view_factory: Arc<ViewFactory>,
session_configurator: Arc<dyn SessionConfigurator>,
}
impl FlightSqlServiceImpl {
pub fn new(
lakehouse: Arc<LakehouseContext>,
part_provider: Arc<dyn QueryPartitionProvider>,
view_factory: Arc<ViewFactory>,
session_configurator: Arc<dyn SessionConfigurator>,
) -> Self {
Self {
lakehouse,
part_provider,
view_factory,
session_configurator,
}
}
fn should_preserve_dictionary(metadata: &MetadataMap) -> bool {
metadata
.get("preserve_dictionary")
.and_then(|v| v.to_str().ok())
.map(|s| s.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
#[span_fn]
async fn execute_query(
&self,
ticket_stmt: TicketStatementQuery,
metadata: &MetadataMap,
) -> Result<Response<FlightDataStream>, Status> {
let begin_request = now();
let sql = std::str::from_utf8(&ticket_stmt.statement_handle)
.map_err(|e| status!("Unable to parse query", e))?;
let mut begin = metadata.get("query_range_begin");
if let Some(s) = &begin
&& s.is_empty()
{
begin = None;
}
let mut end = metadata.get("query_range_end");
if let Some(s) = &end
&& s.is_empty()
{
end = None;
}
let query_range = if begin.is_some() && end.is_some() {
let begin_datetime = chrono::DateTime::parse_from_rfc3339(
begin
.unwrap()
.to_str()
.map_err(|e| status!("Unable to convert query_range_begin to string", e))?,
)
.map_err(|e| status!("Unable to parse query_range_begin as a rfc3339 datetime", e))?;
let end_datetime = chrono::DateTime::parse_from_rfc3339(
end.unwrap()
.to_str()
.map_err(|e| status!("Unable to convert query_range_end to string", e))?,
)
.map_err(|e| status!("Unable to parse query_range_end as a rfc3339 datetime", e))?;
Some(TimeRange::new(begin_datetime.into(), end_datetime.into()))
} else {
None
};
let attr = validate_and_resolve_user_attribution_grpc(metadata).map_err(|e| *e)?;
let client_type = metadata
.get("x-client-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown");
let user_name_display = attr.user_name.as_deref().unwrap_or("");
if let Some(service_account_name) = &attr.service_account {
info!(
"execute_query range={query_range:?} sql={sql:?} limit={:?} user={} email={} name={user_name_display:?} service_account={service_account_name} client={client_type}",
metadata.get("limit"),
attr.user_id,
attr.user_email
);
} else {
info!(
"execute_query range={query_range:?} sql={sql:?} limit={:?} user={} email={} name={user_name_display:?} client={client_type}",
metadata.get("limit"),
attr.user_id,
attr.user_email
);
}
let session_begin = now();
let ctx = make_session_context(
self.lakehouse.clone(),
self.part_provider.clone(),
query_range,
self.view_factory.clone(),
self.session_configurator.clone(),
)
.await
.map_err(|e| status!("error in make_session_context", e))?;
let context_init_duration = now() - session_begin;
let planning_begin = now();
let mut df = ctx
.sql(sql)
.await
.map_err(|e| status!("error building dataframe", e))?;
let planning_duration = now() - planning_begin;
if let Some(limit_str) = metadata.get("limit") {
let limit: usize = usize::from_str(
limit_str
.to_str()
.map_err(|e| status!("error converting limit to str", e))?,
)
.map_err(|e| status!("error parsing limit", e))?;
df = df
.limit(0, Some(limit))
.map_err(|e| status!("error building dataframe with limit", e))?;
}
let execution_begin = now();
let schema = Arc::new(df.schema().as_arrow().clone());
let stream = df
.execute_stream()
.await
.map_err(|e| Status::internal(format!("Error executing plan: {e:?}")))?
.map_err(|e| FlightError::ExternalError(Box::new(e)));
let builder = if Self::should_preserve_dictionary(metadata) {
FlightDataEncoderBuilder::new()
.with_schema(schema.clone())
.with_dictionary_handling(DictionaryHandling::Resend)
} else {
FlightDataEncoderBuilder::new().with_schema(schema.clone())
};
let flight_data_stream = builder.build(stream);
let execution_duration = now() - execution_begin;
let total_setup_duration = now() - begin_request;
imetric!(
"context_init_duration",
"ticks",
context_init_duration as u64
);
imetric!("query_planning_duration", "ticks", planning_duration as u64);
imetric!(
"query_execution_duration",
"ticks",
execution_duration as u64
);
imetric!("query_setup_duration", "ticks", total_setup_duration as u64);
let instrumented_stream = flight_data_stream
.map_err(|e| status!("error building data stream", e))
.map({
let start_time = begin_request;
move |result| {
if result.is_err() {
let total_duration = now() - start_time;
imetric!("query_duration_with_error", "ticks", total_duration as u64);
}
result
}
});
let completion_tracked_stream =
CompletionTrackedStream::new(instrumented_stream.boxed(), begin_request);
Ok(Response::new(
Box::pin(completion_tracked_stream) as FlightDataStream
))
}
}
#[tonic::async_trait]
impl FlightSqlService for FlightSqlServiceImpl {
type FlightService = FlightSqlServiceImpl;
async fn do_handshake(
&self,
_request: Request<Streaming<HandshakeRequest>>,
) -> Result<
Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
Status,
> {
api_entry_not_implemented!()
}
#[span_fn]
async fn do_get_fallback(
&self,
request: Request<Ticket>,
_message: Any,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
let ticket_stmt = TicketStatementQuery::decode(request.get_ref().ticket.clone())
.map_err(|e| status!("Could not read ticket", e))?;
self.execute_query(ticket_stmt, request.metadata()).await
}
#[span_fn]
async fn get_flight_info_statement(
&self,
query: CommandStatementQuery,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
let begin_request = now();
info!("get_flight_info_statement {query:?} ");
let CommandStatementQuery { query, .. } = query;
let schema = Schema::empty();
let ticket = TicketStatementQuery {
statement_handle: query.into(),
};
let mut bytes: Vec<u8> = Vec::new();
if ticket.encode(&mut bytes).is_ok() {
let info = FlightInfo::new()
.try_with_schema(&schema)
.unwrap()
.with_endpoint(FlightEndpoint::new().with_ticket(Ticket::new(bytes)));
let duration = now() - begin_request;
imetric!("request_duration", "ticks", duration as u64);
Ok(Response::new(info))
} else {
error!("Error encoding ticket");
Err(Status::internal("Error encoding ticket"))
}
}
async fn get_flight_info_substrait_plan(
&self,
_query: CommandStatementSubstraitPlan,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
api_entry_not_implemented!()
}
async fn get_flight_info_prepared_statement(
&self,
_cmd: CommandPreparedStatementQuery,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
api_entry_not_implemented!()
}
async fn get_flight_info_catalogs(
&self,
_query: CommandGetCatalogs,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
api_entry_not_implemented!()
}
async fn get_flight_info_schemas(
&self,
_query: CommandGetDbSchemas,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
api_entry_not_implemented!()
}
#[span_fn]
async fn get_flight_info_tables(
&self,
query: CommandGetTables,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
let begin_request = now();
info!("get_flight_info_tables");
let flight_descriptor = request.into_inner();
let ticket = Ticket {
ticket: query.as_any().encode_to_vec().into(),
};
let endpoint = FlightEndpoint::new().with_ticket(ticket);
let flight_info = FlightInfo::new()
.try_with_schema(&query.into_builder().schema())
.map_err(|e| status!("Unable to encode schema", e))?
.with_endpoint(endpoint)
.with_descriptor(flight_descriptor);
let duration = now() - begin_request;
imetric!("request_duration", "ticks", duration as u64);
Ok(tonic::Response::new(flight_info))
}
async fn get_flight_info_table_types(
&self,
_query: CommandGetTableTypes,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
api_entry_not_implemented!()
}
#[span_fn]
async fn get_flight_info_sql_info(
&self,
query: CommandGetSqlInfo,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
let begin_request = now();
info!("get_flight_info_sql_info");
let flight_descriptor = request.into_inner();
let ticket = Ticket::new(query.as_any().encode_to_vec());
let endpoint = FlightEndpoint::new().with_ticket(ticket);
let flight_info = FlightInfo::new()
.try_with_schema(query.into_builder(&INSTANCE_SQL_DATA).schema().as_ref())
.map_err(|e| status!("Unable to encode schema", e))?
.with_endpoint(endpoint)
.with_descriptor(flight_descriptor);
let duration = now() - begin_request;
imetric!("request_duration", "ticks", duration as u64);
Ok(tonic::Response::new(flight_info))
}
async fn get_flight_info_primary_keys(
&self,
_query: CommandGetPrimaryKeys,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
api_entry_not_implemented!()
}
async fn get_flight_info_exported_keys(
&self,
_query: CommandGetExportedKeys,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
api_entry_not_implemented!()
}
async fn get_flight_info_imported_keys(
&self,
_query: CommandGetImportedKeys,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
api_entry_not_implemented!()
}
async fn get_flight_info_cross_reference(
&self,
_query: CommandGetCrossReference,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
api_entry_not_implemented!()
}
async fn get_flight_info_xdbc_type_info(
&self,
_query: CommandGetXdbcTypeInfo,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
api_entry_not_implemented!()
}
#[span_fn]
async fn do_get_statement(
&self,
ticket: TicketStatementQuery,
request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
self.execute_query(ticket, request.metadata()).await
}
async fn do_get_prepared_statement(
&self,
_query: CommandPreparedStatementQuery,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
api_entry_not_implemented!()
}
async fn do_get_catalogs(
&self,
_query: CommandGetCatalogs,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
api_entry_not_implemented!()
}
async fn do_get_schemas(
&self,
_query: CommandGetDbSchemas,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
api_entry_not_implemented!()
}
#[span_fn]
async fn do_get_tables(
&self,
query: CommandGetTables,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
let begin_request = now();
info!("do_get_tables {query:?}");
let mut builder = query.into_builder();
for view in self.view_factory.get_global_views() {
let catalog_name = "";
let schema_name = "";
builder
.append(
catalog_name,
schema_name,
&*view.get_view_set_name(),
"table",
&view.get_file_schema(),
)
.map_err(Status::from)?;
}
let schema = builder.schema();
let batch = builder.build();
let stream = FlightDataEncoderBuilder::new()
.with_schema(schema)
.build(futures::stream::once(async { batch }))
.map_err(Status::from);
let duration = now() - begin_request;
imetric!("request_duration", "ticks", duration as u64);
Ok(Response::new(Box::pin(stream)))
}
async fn do_get_table_types(
&self,
_query: CommandGetTableTypes,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
api_entry_not_implemented!()
}
#[span_fn]
async fn do_get_sql_info(
&self,
query: CommandGetSqlInfo,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
info!("do_get_sql_info");
let builder = query.into_builder(&INSTANCE_SQL_DATA);
let schema = builder.schema();
let batch = builder.build();
let stream = FlightDataEncoderBuilder::new()
.with_schema(schema)
.build(futures::stream::once(async { batch }))
.map_err(Status::from);
Ok(Response::new(Box::pin(stream)))
}
async fn do_get_primary_keys(
&self,
_query: CommandGetPrimaryKeys,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
api_entry_not_implemented!()
}
async fn do_get_exported_keys(
&self,
_query: CommandGetExportedKeys,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
api_entry_not_implemented!()
}
async fn do_get_imported_keys(
&self,
_query: CommandGetImportedKeys,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
api_entry_not_implemented!()
}
async fn do_get_cross_reference(
&self,
_query: CommandGetCrossReference,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
api_entry_not_implemented!()
}
async fn do_get_xdbc_type_info(
&self,
_query: CommandGetXdbcTypeInfo,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
api_entry_not_implemented!()
}
async fn do_put_statement_update(
&self,
_ticket: CommandStatementUpdate,
_request: Request<PeekableFlightDataStream>,
) -> Result<i64, Status> {
api_entry_not_implemented!()
}
#[span_fn]
async fn do_put_statement_ingest(
&self,
command: CommandStatementIngest,
request: Request<PeekableFlightDataStream>,
) -> Result<i64, Status> {
let table_name = command.table;
info!("do_put_statement_ingest table_name={table_name}");
let stream = FlightRecordBatchStream::new_from_flight_data(
request.into_inner().map_err(|e| e.into()),
);
bulk_ingest(self.lakehouse.lake().clone(), &table_name, stream)
.await
.map_err(|e| {
let msg = format!("error ingesting into {table_name}: {e:?}");
error!("{msg}");
status!(msg, e)
})
}
async fn do_put_substrait_plan(
&self,
_ticket: CommandStatementSubstraitPlan,
_request: Request<PeekableFlightDataStream>,
) -> Result<i64, Status> {
api_entry_not_implemented!()
}
async fn do_put_prepared_statement_query(
&self,
_query: CommandPreparedStatementQuery,
_request: Request<PeekableFlightDataStream>,
) -> Result<DoPutPreparedStatementResult, Status> {
api_entry_not_implemented!()
}
async fn do_put_prepared_statement_update(
&self,
_query: CommandPreparedStatementUpdate,
_request: Request<PeekableFlightDataStream>,
) -> Result<i64, Status> {
api_entry_not_implemented!()
}
#[span_fn]
async fn do_action_create_prepared_statement(
&self,
query: ActionCreatePreparedStatementRequest,
_request: Request<Action>,
) -> Result<ActionCreatePreparedStatementResult, Status> {
info!("do_action_create_prepared_statement query={}", &query.query);
let ctx = make_session_context(
self.lakehouse.clone(),
self.part_provider.clone(),
None,
self.view_factory.clone(),
self.session_configurator.clone(),
)
.await
.map_err(|e| status!("error in make_session_context", e))?;
let df = ctx
.sql(&query.query)
.await
.map_err(|e| status!("error building dataframe", e))?;
let schema = df.schema().as_arrow();
let mut schema_buffer = Vec::new();
let mut writer = StreamWriter::try_new(&mut schema_buffer, schema)
.map_err(|e| status!("error writing schema to in-memory buffer", e))?;
writer
.finish()
.map_err(|e| status!("error closing arrow ipc stream writer", e))?;
let result = ActionCreatePreparedStatementResult {
prepared_statement_handle: query.query.into(),
dataset_schema: schema_buffer.into(),
parameter_schema: "".into(),
};
Ok(result)
}
async fn do_action_close_prepared_statement(
&self,
_query: ActionClosePreparedStatementRequest,
_request: Request<Action>,
) -> Result<(), Status> {
info!("do_action_close_prepared_statement");
Ok(())
}
async fn do_action_create_prepared_substrait_plan(
&self,
_query: ActionCreatePreparedSubstraitPlanRequest,
_request: Request<Action>,
) -> Result<ActionCreatePreparedStatementResult, Status> {
api_entry_not_implemented!()
}
async fn do_action_begin_transaction(
&self,
_query: ActionBeginTransactionRequest,
_request: Request<Action>,
) -> Result<ActionBeginTransactionResult, Status> {
api_entry_not_implemented!()
}
async fn do_action_end_transaction(
&self,
_query: ActionEndTransactionRequest,
_request: Request<Action>,
) -> Result<(), Status> {
api_entry_not_implemented!()
}
async fn do_action_begin_savepoint(
&self,
_query: ActionBeginSavepointRequest,
_request: Request<Action>,
) -> Result<ActionBeginSavepointResult, Status> {
api_entry_not_implemented!()
}
async fn do_action_end_savepoint(
&self,
_query: ActionEndSavepointRequest,
_request: Request<Action>,
) -> Result<(), Status> {
api_entry_not_implemented!()
}
async fn do_action_cancel_query(
&self,
_query: ActionCancelQueryRequest,
_request: Request<Action>,
) -> Result<ActionCancelQueryResult, Status> {
api_entry_not_implemented!()
}
async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {
info!("register_sql_info");
}
}