use std::collections::BTreeMap;
use arrow2::Either;
use re_log_types::{
    DataCellColumn, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeRange, Timeline,
};
use crate::{
    store::{IndexedBucketInner, PersistentIndexedTable},
    DataStore, IndexedBucket,
};
impl DataStore {
                        pub fn to_data_table(&self) -> re_log_types::DataReadResult<DataTable> {
        use re_log_types::{DataRow, RowId};
        let mut rows = ahash::HashMap::<RowId, DataRow>::default();
        for table in self.to_data_tables(None) {
            for row in table.to_rows().collect::<Vec<_>>() {
                let row = row?;
                match rows.entry(row.row_id()) {
                    std::collections::hash_map::Entry::Occupied(mut entry) => {
                        for (timeline, time) in row.timepoint() {
                            entry.get_mut().timepoint.insert(*timeline, *time);
                        }
                    }
                    std::collections::hash_map::Entry::Vacant(entry) => {
                        entry.insert(row);
                    }
                }
            }
        }
        let mut rows = rows.into_values().collect::<Vec<_>>();
        rows.sort_by_key(|row| (row.timepoint.clone(), row.row_id));
        Ok(re_log_types::DataTable::from_rows(
            re_log_types::TableId::random(),
            rows,
        ))
    }
                pub fn to_data_tables(
        &self,
        time_filter: Option<(Timeline, TimeRange)>,
    ) -> impl Iterator<Item = DataTable> + '_ {
        let timeless = self.dump_timeless_tables();
        let temporal = if let Some(time_filter) = time_filter {
            Either::Left(self.dump_temporal_tables_filtered(time_filter))
        } else {
            Either::Right(self.dump_temporal_tables())
        };
        timeless.chain(temporal)
    }
    fn dump_timeless_tables(&self) -> impl Iterator<Item = DataTable> + '_ {
        self.timeless_tables.values().map(|table| {
            re_tracing::profile_scope!("timeless_table");
            let PersistentIndexedTable {
                ent_path,
                cluster_key: _,
                col_insert_id: _,
                col_row_id,
                col_num_instances,
                columns,
            } = table;
            DataTable {
                table_id: TableId::random(),
                col_row_id: col_row_id.clone(),
                col_timelines: Default::default(),
                col_entity_path: std::iter::repeat_with(|| ent_path.clone())
                    .take(table.num_rows() as _)
                    .collect(),
                col_num_instances: col_num_instances.clone(),
                columns: columns.clone().into_iter().collect(),             }
        })
    }
    fn dump_temporal_tables(&self) -> impl Iterator<Item = DataTable> + '_ {
        self.tables.values().flat_map(|table| {
            re_tracing::profile_scope!("temporal_table");
            table.buckets.values().map(move |bucket| {
                re_tracing::profile_scope!("temporal_bucket");
                bucket.sort_indices_if_needed();
                let IndexedBucket {
                    timeline,
                    cluster_key: _,
                    inner,
                } = bucket;
                let IndexedBucketInner {
                    is_sorted,
                    time_range: _,
                    col_time,
                    col_insert_id: _,
                    col_row_id,
                    col_num_instances,
                    columns,
                    size_bytes: _,
                } = &*inner.read();
                debug_assert!(is_sorted);
                DataTable {
                    table_id: TableId::random(),
                    col_row_id: col_row_id.clone(),
                    col_timelines: [(*timeline, col_time.iter().copied().map(Some).collect())]
                        .into(),
                    col_entity_path: std::iter::repeat_with(|| table.ent_path.clone())
                        .take(col_row_id.len())
                        .collect(),
                    col_num_instances: col_num_instances.clone(),
                    columns: columns.clone().into_iter().collect(),                 }
            })
        })
    }
    fn dump_temporal_tables_filtered(
        &self,
        (timeline_filter, time_filter): (Timeline, TimeRange),
    ) -> impl Iterator<Item = DataTable> + '_ {
        self.tables
            .values()
            .filter_map(move |table| {
                re_tracing::profile_scope!("temporal_table_filtered");
                if table.timeline != timeline_filter {
                    return None;
                }
                Some(table.buckets.values().filter_map(move |bucket| {
                    re_tracing::profile_scope!("temporal_bucket_filtered");
                    bucket.sort_indices_if_needed();
                    let IndexedBucket {
                        timeline,
                        cluster_key: _,
                        inner,
                    } = bucket;
                    let IndexedBucketInner {
                        is_sorted,
                        time_range,
                        col_time,
                        col_insert_id: _,
                        col_row_id,
                        col_num_instances,
                        columns,
                        size_bytes: _,
                    } = &*inner.read();
                    debug_assert!(is_sorted);
                    if !time_range.intersects(time_filter) {
                        return None;
                    }
                    let col_row_id: RowIdVec =
                        filter_column(col_time, col_row_id.iter(), time_filter).collect();
                                                            debug_assert!(!col_row_id.is_empty());
                    if col_row_id.is_empty() {
                        return None;
                    }
                    let col_timelines = [(
                        *timeline,
                        filter_column(col_time, col_time.iter(), time_filter)
                            .map(Some)
                            .collect(),
                    )]
                    .into();
                    let col_entity_path = std::iter::repeat_with(|| table.ent_path.clone())
                        .take(col_row_id.len())
                        .collect();
                    let col_num_instances =
                        filter_column(col_time, col_num_instances.iter(), time_filter).collect();
                    let mut columns2 = BTreeMap::default();
                    for (component, column) in columns {
                        let column = filter_column(col_time, column.iter(), time_filter).collect();
                        columns2.insert(*component, DataCellColumn(column));
                    }
                    Some(DataTable {
                        table_id: TableId::random(),
                        col_row_id,
                        col_timelines,
                        col_entity_path,
                        col_num_instances,
                        columns: columns2,
                    })
                }))
            })
            .flatten()
    }
}
fn filter_column<'a, T: 'a + Clone>(
    col_time: &'a ErasedTimeVec,
    column: impl Iterator<Item = &'a T> + 'a,
    time_filter: TimeRange,
) -> impl Iterator<Item = T> + 'a {
    col_time
        .iter()
        .zip(column)
        .filter(move |(time, _)| time_filter.contains((**time).into()))
        .map(|(_, v)| v.clone())
}