use super::{
lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
view_factory::ViewFactory,
};
use crate::{
dfext::expressions::{exp_to_string, exp_to_timestamp},
time::TimeRange,
};
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
catalog::{TableFunctionImpl, TableProvider},
common::plan_err,
logical_expr::Expr,
};
use micromegas_tracing::prelude::*;
use std::sync::Arc;
#[derive(Debug)]
pub struct PerfettoTraceTableFunction {
lakehouse: Arc<LakehouseContext>,
view_factory: Arc<ViewFactory>,
part_provider: Arc<dyn QueryPartitionProvider>,
}
impl PerfettoTraceTableFunction {
pub fn new(
lakehouse: Arc<LakehouseContext>,
view_factory: Arc<ViewFactory>,
part_provider: Arc<dyn QueryPartitionProvider>,
) -> Self {
Self {
lakehouse,
view_factory,
part_provider,
}
}
pub fn output_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("chunk_id", DataType::Int32, false),
Field::new("chunk_data", DataType::Binary, false),
]))
}
}
impl TableFunctionImpl for PerfettoTraceTableFunction {
#[span_fn]
fn call(&self, exprs: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
let arg1 = exprs.first().map(exp_to_string);
let Some(Ok(process_id)) = arg1 else {
return plan_err!(
"First argument to perfetto_trace_chunks must be a string (the process ID), given {:?}",
arg1
);
};
let arg2 = exprs.get(1).map(exp_to_string);
let Some(Ok(span_types_str)) = arg2 else {
return plan_err!(
"Second argument to perfetto_trace_chunks must be a string ('thread', 'async', or 'both'), given {:?}",
arg2
);
};
let span_types = match span_types_str.as_str() {
"thread" => SpanTypes::Thread,
"async" => SpanTypes::Async,
"both" => SpanTypes::Both,
_ => {
return plan_err!(
"span_types must be 'thread', 'async', or 'both', given: {}",
span_types_str
);
}
};
let arg3 = exprs.get(2).map(exp_to_timestamp);
let Some(Ok(start_time)) = arg3 else {
return plan_err!(
"Third argument to perfetto_trace_chunks must be a timestamp (start time), given {:?}",
arg3
);
};
let arg4 = exprs.get(3).map(exp_to_timestamp);
let Some(Ok(end_time)) = arg4 else {
return plan_err!(
"Fourth argument to perfetto_trace_chunks must be a timestamp (end time), given {:?}",
arg4
);
};
let time_range = TimeRange {
begin: start_time,
end: end_time,
};
let execution_plan = Arc::new(PerfettoTraceExecutionPlan::new(
Self::output_schema(),
process_id,
span_types,
time_range,
self.lakehouse.clone(),
self.view_factory.clone(),
self.part_provider.clone(),
));
Ok(Arc::new(PerfettoTraceTableProvider::new(execution_plan)))
}
}
use super::perfetto_trace_execution_plan::{
PerfettoTraceExecutionPlan, PerfettoTraceTableProvider, SpanTypes,
};