use std::sync::Arc;
use arrow::array::{Array as _, BooleanArray};
use arrow::datatypes::Field;
use datafusion::prelude::SessionContext;
use datafusion::sql::TableReference;
use egui::containers::menu::MenuConfig;
use egui::{Frame, Id, Margin, OpenUrl, Panel, RichText, Ui};
use egui_table::{CellInfo, HeaderCellInfo};
use itertools::Itertools as _;
use re_arrow_util::ArrowArrayDowncastRef as _;
use re_format::{format_plural_s, format_uint};
use re_log::error;
use re_log_types::{EntryId, Timestamp};
use re_sorbet::{ColumnDescriptorRef, SorbetSchema};
use re_ui::egui_ext::response_ext::ResponseExt as _;
use re_ui::menu::menu_style;
use re_ui::{UiExt as _, icons};
use re_viewer_context::{
AsyncRuntimeHandle, StoreViewContext, SystemCommand, SystemCommandSender as _,
};
use crate::StreamingCacheTableProvider;
use crate::datafusion_adapter::{DataFusionAdapter, DataFusionQueryResult};
use crate::display_record_batch::DisplayColumn;
use crate::filters::{ColumnFilter, FilterState};
use crate::grid_view::FlagChangeEvent;
use crate::header_tooltip::column_header_tooltip_ui;
use crate::re_table::ReTable;
use crate::re_table_utils::{ColumnConfig, TableConfig};
use crate::table_blueprint::{
ColumnBlueprint, EntryLinksSpec, SegmentLinksSpec, SortBy, SortDirection, TableBlueprint,
};
use crate::table_selection::TableSelectionState;
use crate::{DisplayRecordBatch, default_display_name_for_column};
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub(crate) enum TableViewMode {
#[default]
Table,
Grid,
}
struct TableUiOutput {
blueprint: TableBlueprint,
flag_changes: Vec<FlagChangeEvent>,
}
pub struct Column<'a> {
pub id: egui::Id,
pub desc: ColumnDescriptorRef<'a>,
pub blueprint: ColumnBlueprint,
}
impl Column<'_> {
pub fn display_name(&self) -> String {
self.blueprint
.display_name
.clone()
.unwrap_or_else(|| default_display_name_for_column(&self.desc))
}
}
pub struct Columns<'a> {
pub columns: Vec<Column<'a>>,
}
impl<'a> Columns<'a> {
fn from(sorbet_schema: &'a SorbetSchema, column_blueprint_fn: &ColumnBlueprintFn<'_>) -> Self {
let columns = sorbet_schema
.columns
.iter()
.map(|desc| {
let id = egui::Id::new(desc);
let desc = desc.into();
let blueprint = column_blueprint_fn(&desc);
Column {
id,
desc,
blueprint,
}
})
.collect();
Self { columns }
}
}
impl Columns<'_> {
pub fn iter(&self) -> impl Iterator<Item = &Column<'_>> + use<'_> {
self.columns.iter()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TableStatus {
InitialLoading,
Loaded,
Updating,
Error(String),
}
type ColumnBlueprintFn<'a> = Box<dyn Fn(&ColumnDescriptorRef<'_>) -> ColumnBlueprint + 'a>;
pub struct DataFusionTableWidget<'a> {
session_ctx: Arc<SessionContext>,
table_ref: TableReference,
title: Option<String>,
url: Option<String>,
column_blueprint_fn: ColumnBlueprintFn<'a>,
initial_blueprint: TableBlueprint,
remote_table: Option<re_uri::EntryUri>,
}
impl<'a> DataFusionTableWidget<'a> {
pub fn refresh(
runtime: &AsyncRuntimeHandle,
egui_ctx: egui::Context,
session_ctx: Arc<SessionContext>,
table_ref: impl Into<TableReference>,
) {
let table_ref = table_ref.into();
runtime.spawn_future(async move {
Self::invalidate_streaming_cache(&session_ctx, &table_ref).await;
let id = id_from_session_context_and_table(&session_ctx, &table_ref);
DataFusionAdapter::clear_state(&egui_ctx, id);
});
}
async fn invalidate_streaming_cache(session_ctx: &SessionContext, table_ref: &TableReference) {
if let Ok(provider) = session_ctx.table_provider(table_ref.clone()).await
&& let Some(cache_provider) = provider
.as_any()
.downcast_ref::<StreamingCacheTableProvider>()
{
cache_provider.refresh();
}
}
pub fn new(session_ctx: Arc<SessionContext>, table_ref: impl Into<TableReference>) -> Self {
Self {
session_ctx,
table_ref: table_ref.into(),
title: None,
url: None,
column_blueprint_fn: Box::new(|_| ColumnBlueprint::default()),
initial_blueprint: Default::default(),
remote_table: None,
}
}
pub fn title(mut self, title: impl Into<String>) -> Self {
self.title = Some(title.into());
self
}
pub fn url(mut self, url: impl Into<String>) -> Self {
self.url = Some(url.into());
self
}
pub fn remote_table(mut self, entry_uri: re_uri::EntryUri) -> Self {
self.remote_table = Some(entry_uri);
self
}
pub fn column_blueprint(
mut self,
column_blueprint_fn: impl Fn(&ColumnDescriptorRef<'_>) -> ColumnBlueprint + 'a,
) -> Self {
self.column_blueprint_fn = Box::new(column_blueprint_fn);
self
}
pub fn initial_blueprint(mut self, initial_blueprint: TableBlueprint) -> Self {
self.initial_blueprint = initial_blueprint;
self
}
pub fn generate_segment_links(
mut self,
column_name: impl Into<String>,
segment_id_column_name: impl Into<String>,
origin: re_uri::Origin,
dataset_id: EntryId,
) -> Self {
self.initial_blueprint.segment_links = Some(SegmentLinksSpec {
column_name: column_name.into(),
segment_id_column_name: segment_id_column_name.into(),
origin,
dataset_id,
});
self
}
pub fn generate_entry_links(
mut self,
column_name: impl Into<String>,
entry_id_column_name: impl Into<String>,
origin: re_uri::Origin,
) -> Self {
self.initial_blueprint.entry_links = Some(EntryLinksSpec {
column_name: column_name.into(),
entry_id_column_name: entry_id_column_name.into(),
origin,
});
self
}
pub fn prefilter(mut self, expression: datafusion::prelude::Expr) -> Self {
self.initial_blueprint.prefilter = Some(expression);
self
}
fn is_flagging_available(
blueprint: &TableBlueprint,
columns: &Columns<'_>,
remote_table: Option<&re_uri::EntryUri>,
connection_registry: &re_redap_client::ConnectionRegistryHandle,
) -> bool {
let Some(flag_col_name) = &blueprint.flag_column else {
return false;
};
let flag_col_is_bool = columns.iter().any(|col| {
col.display_name() == *flag_col_name
&& matches!(&col.desc, re_sorbet::ColumnDescriptorRef::Component(c)
if c.store_datatype == arrow::datatypes::DataType::Boolean)
});
if !flag_col_is_bool {
re_log::warn_once!(
"Flag column {flag_col_name:?} is not a boolean column or does not exist in the table"
);
return false;
}
if let Some(remote) = remote_table {
let has_write = connection_registry
.credentials(&remote.origin)
.and_then(|creds| creds.has_write_permission())
.unwrap_or(true);
if !has_write {
return false;
}
}
true
}
pub fn show(
self,
viewer_ctx: &StoreViewContext<'_>,
runtime: &AsyncRuntimeHandle,
ui: &mut egui::Ui,
) -> TableStatus {
match self.session_ctx.table_exist(self.table_ref.clone()) {
Ok(true) => {}
Ok(false) => {
ui.loading_screen(
"Loading table:",
self.url.as_deref().or(self.title.as_deref()).unwrap_or(""),
);
return TableStatus::InitialLoading;
}
Err(err) => {
ui.loading_screen(
"Error while loading table:",
RichText::from(err.to_string()).color(ui.style().visuals.error_fg_color),
);
return TableStatus::Error(err.to_string());
}
}
let session_id = id_from_session_context_and_table(&self.session_ctx, &self.table_ref);
let mut table_state = DataFusionAdapter::get(
runtime,
ui,
&self.session_ctx,
self.table_ref.clone(),
session_id,
self.initial_blueprint.clone(),
);
let requested_query_result = table_state.results.as_ref();
let is_table_update_in_progress;
let query_result = match (requested_query_result, &table_state.last_query_results) {
(Some(Ok(query_result)), _) => {
is_table_update_in_progress = !query_result.finished;
query_result
}
(Some(Err(err)), _) => {
let error = format!("Could not load table: {err}");
ui.horizontal(|ui| {
ui.error_label(&error);
if ui
.small_icon_button(&re_ui::icons::RESET, "Refresh")
.clicked()
{
Self::refresh(
runtime,
ui.ctx().clone(),
Arc::clone(&self.session_ctx),
self.table_ref.clone(),
);
}
});
return TableStatus::Error(error);
}
(None, Some(Ok(last_query_result))) => {
is_table_update_in_progress = true;
last_query_result
}
(None, None | Some(Err(_))) => {
ui.loading_screen(
"Loading table:",
self.url.as_deref().or(self.title.as_deref()).unwrap_or(""),
);
return TableStatus::InitialLoading;
}
};
let output = self.table_ui(
viewer_ctx,
runtime,
ui,
table_state.blueprint(),
session_id,
table_state.queried_at,
is_table_update_in_progress,
query_result,
);
if !output.flag_changes.is_empty()
&& let Some(flag_col) = &output.blueprint.flag_column
{
table_state.apply_flag_changes(ui, flag_col, &output.flag_changes);
if let Some(remote) = &self.remote_table
&& let Some(Ok(results)) = &table_state.results
{
upsert_flag_changes(
viewer_ctx,
runtime,
remote.clone(),
results,
flag_col,
&output.flag_changes,
);
}
let session_ctx = Arc::clone(&self.session_ctx);
let table_ref = self.table_ref.clone();
runtime.spawn_future(async move {
Self::invalidate_streaming_cache(&session_ctx, &table_ref).await;
});
}
if table_state.blueprint() != &output.blueprint {
table_state.update_query(runtime, ui, output.blueprint);
}
if is_table_update_in_progress {
TableStatus::Updating
} else {
TableStatus::Loaded
}
}
#[expect(clippy::too_many_arguments)]
fn table_ui(
&self,
ctx: &StoreViewContext<'_>,
runtime: &AsyncRuntimeHandle,
ui: &mut egui::Ui,
table_blueprint: &TableBlueprint,
session_id: egui::Id,
queried_at: Timestamp,
should_show_loading_indicator: bool,
query_result: &DataFusionQueryResult,
) -> TableUiOutput {
let static_id = Id::new(&self.table_ref);
let mut new_blueprint = table_blueprint.clone();
let mut filter_state =
FilterState::load_or_init_from_blueprint(ui.ctx(), session_id, table_blueprint);
let num_rows = query_result
.sorbet_batches
.iter()
.map(|record_batch| record_batch.num_rows() as u64)
.sum();
let columns = Columns::from(&query_result.sorbet_schema, &self.column_blueprint_fn);
let display_record_batches = query_result
.sorbet_batches
.iter()
.map(|record_batch| {
DisplayRecordBatch::try_new(itertools::izip!(
query_result.sorbet_schema.columns.iter().map(|x| x.into()),
columns.iter().map(|column| &column.blueprint),
record_batch.columns().iter().map(Arc::clone)
))
})
.collect::<Result<Vec<_>, _>>();
let display_record_batches = match display_record_batches {
Ok(display_record_batches) => display_record_batches,
Err(err) => {
ui.error_label(err.to_string());
return TableUiOutput {
blueprint: new_blueprint,
flag_changes: Vec::new(),
};
}
};
let mut table_config = TableConfig::get_with_columns(
ui.ctx(),
static_id,
columns.iter().map(|column| {
ColumnConfig::new_with_visible(
column.id,
column.display_name(),
column.blueprint.default_visibility,
)
.with_sort_key(column.blueprint.sort_key)
}),
);
let enable_grid_view = ctx.app_options().experimental.table_grid_view;
let view_mode_id = session_id.with("view_mode");
let mut view_mode = ui
.ctx()
.data(|d| d.get_temp::<TableViewMode>(view_mode_id))
.unwrap_or_default();
if let Some(title) = &self.title {
title_ui(
ui,
ctx,
Some(&mut table_config),
title,
self.url.as_deref(),
should_show_loading_indicator,
if enable_grid_view {
Some(&mut view_mode)
} else {
None
},
);
}
filter_state.filter_bar_ui(ui, ctx.app_options().timestamp_format, &mut new_blueprint);
let table_style = re_ui::TableStyle::Spacious;
let mut row_height = ctx.tokens().table_row_height(table_style);
let first_column = table_config
.visible_column_indexes()
.next()
.and_then(|index| display_record_batches.first()?.columns().get(index));
if let Some(DisplayColumn::Component(component)) = first_column
&& component.is_image()
{
row_height *= 3.0;
}
let migrated_fields = query_result
.sorbet_schema
.columns
.arrow_fields(re_sorbet::BatchType::Dataframe);
populate_blueprint_from_field_metadata(&mut new_blueprint, &query_result.original_schema);
let flagging_enabled = Self::is_flagging_available(
&new_blueprint,
&columns,
self.remote_table.as_ref(),
ctx.app_ctx.connection_registry,
);
let visible_columns = table_config.visible_columns().count();
let total_columns = columns.columns.len();
let action = Self::bottom_bar_ui(
ui,
ctx,
session_id,
num_rows,
visible_columns,
total_columns,
queried_at,
);
match action {
Some(BottomBarAction::Refresh) => {
Self::refresh(
runtime,
ui.ctx().clone(),
Arc::clone(&self.session_ctx),
self.table_ref.clone(),
);
}
None => {}
}
let mut flag_changes = Vec::new();
match view_mode {
TableViewMode::Table => {
let mut table_delegate = DataFusionTableDelegate {
session_id,
ctx,
table_style,
query_result,
migrated_fields: &migrated_fields,
display_record_batches: &display_record_batches,
columns: &columns,
blueprint: table_blueprint,
new_blueprint: &mut new_blueprint,
filter_state: &mut filter_state,
row_height,
};
let mut re_table = ReTable::new(
ui.ctx(),
session_id,
&mut table_delegate,
&table_config,
num_rows,
);
re_table.show(ui);
}
TableViewMode::Grid => {
flag_changes = crate::grid_view::grid_ui(
ctx,
ui,
&columns,
&display_record_batches,
&table_config,
&new_blueprint,
num_rows,
flagging_enabled,
);
}
}
table_config.store(ui.ctx());
filter_state.store(ui.ctx(), session_id);
ui.ctx()
.data_mut(|d| d.insert_temp(view_mode_id, view_mode));
TableUiOutput {
blueprint: new_blueprint,
flag_changes,
}
}
fn bottom_bar_ui(
ui: &mut Ui,
ctx: &StoreViewContext<'_>,
session_id: Id,
total_rows: u64,
visible_columns: usize,
total_columns: usize,
queried_at: Timestamp,
) -> Option<BottomBarAction> {
let mut action = None;
let frame = Frame::new()
.fill(ui.tokens().table_header_bg_fill)
.inner_margin(Margin::symmetric(12, 0));
Panel::bottom(session_id.with("bottom_bar"))
.frame(frame)
.show_separator_line(false)
.show_inside(ui, |ui| {
let height = 24.0;
ui.set_height(height);
ui.horizontal_centered(|ui| {
ui.visuals_mut().widgets.noninteractive.fg_stroke.color =
ui.tokens().text_subdued;
ui.visuals_mut().widgets.active.fg_stroke.color = ui.tokens().text_default;
egui::Sides::new().show(
ui,
|ui| {
ui.set_height(height);
ui.label("rows:");
ui.strong(format_uint(total_rows));
ui.add_space(16.0);
ui.label("columns:");
ui.strong(format!(
"{} out of {}",
format_uint(visible_columns),
format_uint(total_columns),
));
},
|ui| {
ui.set_height(height);
if ui
.small_icon_button(&icons::RESET, "Refresh table")
.clicked()
{
action = Some(BottomBarAction::Refresh);
}
re_ui::time::short_duration_ui(
ui,
queried_at,
ctx.app_options().timestamp_format,
Ui::strong,
);
ui.label("Last updated:");
},
);
});
});
action
}
}
fn id_from_session_context_and_table(
session_ctx: &SessionContext,
table_ref: &TableReference,
) -> Id {
egui::Id::new((session_ctx.session_id(), table_ref))
}
fn title_ui(
ui: &mut egui::Ui,
ctx: &StoreViewContext<'_>,
table_config: Option<&mut TableConfig>,
title: &str,
url: Option<&str>,
should_show_loading_indicator: bool,
view_mode: Option<&mut TableViewMode>,
) {
Frame::new()
.inner_margin(Margin {
top: 16,
bottom: 12,
left: 16,
right: 16,
})
.show(ui, |ui| {
egui::Sides::new().show(
ui,
|ui| {
ui.heading(RichText::new(title).strong());
if let Some(url) = url
&& ui
.small_icon_button(&re_ui::icons::COPY, "Copy URL")
.on_hover_text(url)
.clicked()
{
ctx.command_sender()
.send_system(SystemCommand::CopyViewerUrl(url.to_owned()));
}
if should_show_loading_indicator {
ui.loading_indicator("Fetching table data");
}
},
|ui| {
ui.horizontal_centered(|ui| {
if let Some(view_mode) = view_mode {
ui.selectable_toggle(|ui| {
ui.icon_selectable_value(
&icons::TABLE_ROW_VIEW,
"Table view",
view_mode,
TableViewMode::Table,
);
ui.icon_selectable_value(
&icons::TABLE_GRID_VIEW,
"Grid view",
view_mode,
TableViewMode::Grid,
);
});
}
if let Some(table_config) = table_config {
table_config.button_ui(ui);
}
});
},
);
});
}
pub fn find_row_batch(
batches: &[DisplayRecordBatch],
mut row_index: usize,
) -> Option<(&DisplayRecordBatch, usize)> {
for batch in batches {
let row_count = batch.num_rows();
if row_index < row_count {
return Some((batch, row_index));
} else {
row_index -= row_count;
}
}
None
}
pub fn value_at(
columns: &Columns<'_>,
display_record_batches: &[DisplayRecordBatch],
row: u64,
column_name: &str,
) -> Option<arrow::array::ArrayRef> {
let (display_record_batch, local_row_index) =
find_row_batch(display_record_batches, row as usize)?;
let column_index = columns
.iter()
.position(|col| col.display_name() == column_name)?;
let column = display_record_batch.columns().get(column_index)?;
match column {
DisplayColumn::RowId { .. } | DisplayColumn::Timeline { .. } => None,
DisplayColumn::Component(col) => col.row_value_at(local_row_index),
}
}
pub fn string_value_at(
columns: &Columns<'_>,
display_record_batches: &[DisplayRecordBatch],
row: u64,
column_name: &str,
) -> Option<String> {
let data = value_at(columns, display_record_batches, row, column_name)?;
let string_array = data.downcast_array_ref::<arrow::array::StringArray>()?;
if string_array.is_empty() {
return None;
}
Some(string_array.value(0).to_owned())
}
pub fn bool_value_at(
columns: &Columns<'_>,
display_record_batches: &[DisplayRecordBatch],
row: u64,
column_name: &str,
) -> Option<bool> {
let data = value_at(columns, display_record_batches, row, column_name)?;
let bool_array = data.downcast_array_ref::<arrow::array::BooleanArray>()?;
if bool_array.is_empty() {
return None;
}
Some(bool_array.value(0))
}
fn populate_blueprint_from_field_metadata(
blueprint: &mut TableBlueprint,
schema: &arrow::datatypes::Schema,
) {
fn read_field_flag<'a>(schema: &'a arrow::datatypes::Schema, key: &str) -> Option<&'a str> {
let mut found: Option<&str> = None;
for field in schema.fields() {
if field.metadata().get(key).map(|v| v.as_str()) == Some("true") {
if found.is_some() {
re_log::warn_once!(
"Multiple fields have {key:?} metadata set; using the first one"
);
break;
}
found = Some(field.name());
}
}
found
}
if blueprint.flag_column.is_none() {
blueprint.flag_column =
read_field_flag(schema, crate::experimental_field_metadata::IS_FLAG_COLUMN)
.map(str::to_owned);
}
if blueprint.grid_view_card_title.is_none() {
blueprint.grid_view_card_title = read_field_flag(
schema,
crate::experimental_field_metadata::IS_GRID_VIEW_CARD_TITLE,
)
.map(str::to_owned);
}
}
fn upsert_flag_changes(
ctx: &StoreViewContext<'_>,
runtime: &AsyncRuntimeHandle,
remote: re_uri::EntryUri,
results: &crate::datafusion_adapter::DataFusionQueryResult,
flag_column_name: &str,
changes: &[crate::grid_view::FlagChangeEvent],
) {
let Some(index_col_idx) = results.original_schema.fields.iter().position(|f| {
f.metadata()
.get(re_sorbet::metadata::SORBET_IS_TABLE_INDEX)
.map(|v| v.as_str())
== Some("true")
}) else {
return;
};
let mut index_arrays = Vec::new();
let mut flag_values = Vec::new();
for change in changes {
if let Some((batch, row_offset)) = results.find_row_batch(change.row) {
index_arrays.push(batch.column(index_col_idx).slice(row_offset, 1));
flag_values.push(change.new_value);
}
}
if index_arrays.is_empty() {
return;
}
let build_upsert_batch = || -> Option<arrow::array::RecordBatch> {
let index_refs: Vec<_> = index_arrays.iter().map(|a| a.as_ref()).collect();
let index_array = re_arrow_util::concat_arrays(&index_refs).ok()?;
let flag_array = Arc::new(BooleanArray::from(flag_values));
let schema = arrow::datatypes::Schema::new_with_metadata(
vec![
Arc::new(results.original_schema.field(index_col_idx).clone()),
Arc::new(Field::new(
flag_column_name,
arrow::datatypes::DataType::Boolean,
true,
)),
],
Default::default(),
);
let num_rows = index_array.len();
arrow::array::RecordBatch::try_new_with_options(
Arc::new(schema),
vec![index_array, flag_array],
&arrow::array::RecordBatchOptions::new().with_row_count(Some(num_rows)),
)
.ok()
};
let Some(upsert_batch) = build_upsert_batch() else {
re_log::warn_once!("Failed to build upsert RecordBatch for flag changes");
return;
};
let connection_registry = ctx.app_ctx.connection_registry.clone();
runtime.spawn_future(async move {
let result = async {
let mut client = connection_registry.client(remote.origin).await?;
client
.write_table(
futures::stream::once(async { upsert_batch }),
remote.entry_id,
re_protos::cloud::v1alpha1::ext::TableInsertMode::Replace,
)
.await
}
.await;
if let Err(err) = result {
re_log::warn_once!("Failed to upsert flag changes: {err}");
} else {
re_log::debug!("Successfully upserted flag changes");
}
});
}
enum BottomBarAction {
Refresh,
}
struct DataFusionTableDelegate<'a> {
session_id: Id,
ctx: &'a StoreViewContext<'a>,
table_style: re_ui::TableStyle,
query_result: &'a DataFusionQueryResult,
migrated_fields: &'a Vec<Field>,
display_record_batches: &'a Vec<DisplayRecordBatch>,
columns: &'a Columns<'a>,
blueprint: &'a TableBlueprint,
new_blueprint: &'a mut TableBlueprint,
filter_state: &'a mut FilterState,
row_height: f32,
}
impl DataFusionTableDelegate<'_> {
fn segment_link_for_row(&self, row: u64, spec: &SegmentLinksSpec) -> Option<String> {
string_value_at(
self.columns,
self.display_record_batches,
row,
&spec.column_name,
)
}
pub fn row_context_menu(&self, ui: &Ui, _row_number: u64) {
let has_context_menu = self.blueprint.segment_links.is_some();
if !has_context_menu {
return;
}
ui.response().container_context_menu(|ui| {
let selection = TableSelectionState::load(ui.ctx(), self.session_id);
let selected_rows = selection.selected_rows;
if let Some(segment_links_spec) = &self.blueprint.segment_links {
let label = format!("Open {}", format_plural_s(selected_rows.len(), "segment"));
let response =
ui.add(icons::OPEN_RECORDING.as_button_with_label(ui.tokens(), label));
let open = |new_tab| {
for row in selected_rows.iter().copied().sorted() {
if let Some(segment_link) =
self.segment_link_for_row(row, segment_links_spec)
{
ui.open_url(OpenUrl {
url: segment_link,
new_tab,
});
} else {
error!("Could not get segment link for row {}", row);
}
}
};
if response.clicked_with_open_in_background() {
open(true);
} else if response.clicked() {
open(false);
}
}
});
}
}
impl egui_table::TableDelegate for DataFusionTableDelegate<'_> {
fn header_cell_ui(&mut self, ui: &mut egui::Ui, cell: &HeaderCellInfo) {
let tokens = ui.tokens();
let table_style = self.table_style;
let col_index = cell.group_index;
if let Some(column) = self.columns.columns.get(col_index) {
let column_field = &self.query_result.original_schema.fields[col_index];
let column_physical_name = column_field.name();
let column_display_name = column.display_name();
let current_sort_direction = self.blueprint.sort_by.as_ref().and_then(|sort_by| {
(sort_by.column_physical_name.as_str() == column_physical_name)
.then_some(&sort_by.direction)
});
egui::Sides::new()
.shrink_left()
.show(
ui,
|ui| {
ui.set_height(ui.tokens().table_content_height(table_style));
let response = ui.label(
egui::RichText::new(column_display_name)
.strong()
.monospace(),
);
if let Some(dir_icon) = current_sort_direction.map(SortDirection::icon) {
ui.add_space(-5.0);
ui.small_icon(dir_icon, Some(tokens.table_sort_icon_color));
}
response
},
|ui| {
ui.set_height(ui.tokens().table_content_height(table_style));
egui::containers::menu::MenuButton::from_button(
ui.small_icon_button_widget(&re_ui::icons::MORE, "More options"),
)
.config(MenuConfig::new().style(menu_style()))
.ui(ui, |ui| {
for sort_direction in SortDirection::iter() {
let already_sorted =
Some(&sort_direction) == current_sort_direction;
if ui
.add_enabled_ui(!already_sorted, |ui| {
sort_direction.menu_item_ui(ui)
})
.inner
.clicked()
{
self.new_blueprint.sort_by = Some(SortBy {
column_physical_name: column_physical_name.to_owned(),
direction: sort_direction,
});
ui.close();
}
}
#[expect(clippy::collapsible_if)]
if column.blueprint.variant_ui.is_none()
&& let Some(column_filter) =
ColumnFilter::default_for_column(Arc::clone(column_field))
{
if ui
.icon_and_text_menu_item(&re_ui::icons::FILTER, "Filter")
.clicked()
{
self.filter_state.push_new_filter(column_filter);
}
}
});
},
)
.0
.on_hover_ui(|ui| {
ui.with_optional_extras(|ui, show_extras| {
column_header_tooltip_ui(
ui,
&column.desc,
column_field,
&self.migrated_fields[col_index],
show_extras,
);
});
});
}
}
fn cell_ui(&mut self, ui: &mut egui::Ui, cell: &CellInfo) {
let col_index = cell.col_nr;
if let Some((display_record_batch, batch_index)) =
find_row_batch(self.display_record_batches, cell.row_nr as usize)
{
let column = &display_record_batch.columns()[col_index];
column.data_ui(self.ctx, ui, batch_index, None);
}
}
fn row_ui(&mut self, ui: &mut Ui, row_nr: u64) {
self.row_context_menu(ui, row_nr);
}
fn default_row_height(&self) -> f32 {
self.row_height
}
}