use std::{cmp, mem};
use crate::numeric_id::NumericId;
use rayon::prelude::*;
use crate::{
ColumnId, ExecutionState, Offset, RowId, Subset, Table, TableId, TaggedRowBuffer, Value,
WrappedTable,
hash_index::{ColumnIndex, Index},
parallel_heuristics::parallelize_rebuild,
table_spec::{Rebuilder, WrappedTableRef},
};
use super::SortedWritesTable;
macro_rules! insert_row {
($this: expr, $mutation_buf: expr, $row:expr, $next_ts:expr) => {{
let row = $row;
let this = &*$this;
let next_ts = $next_ts;
if let Some(sort_by) = this.sort_by {
row[sort_by.index()] = next_ts;
}
$mutation_buf.stage_insert(row);
}};
}
impl SortedWritesTable {
pub(super) fn do_rebuild(
&mut self,
table_id: TableId,
table: &WrappedTable,
next_ts: Value,
exec_state: &mut ExecutionState,
) -> bool {
if self.to_rebuild.is_empty() {
return false;
}
let Some(rebuilder) = table.rebuilder(&self.to_rebuild) else {
return false;
};
if let Some(hint_col) = rebuilder.hint_col() {
let to_scan = self.subset_tracker.recent_updates(table_id, table);
if incremental_rebuild(
to_scan.size(),
self.data.next_row().index(),
parallelize_rebuild(to_scan.size()),
) {
self.rebuild_incremental(table, &*rebuilder, hint_col, to_scan, next_ts, exec_state)
} else {
self.rebuild_nonincremental(&*rebuilder, next_ts, exec_state)
}
} else {
self.rebuild_nonincremental(&*rebuilder, next_ts, exec_state)
}
}
fn rebuild_incremental(
&mut self,
table: &WrappedTable,
rebuilder: &dyn Rebuilder,
search_col: ColumnId,
to_scan: Subset,
next_ts: Value,
exec_state: &mut ExecutionState,
) -> bool {
let mut index = mem::replace(
&mut self.rebuild_index,
Index::new(vec![], ColumnIndex::new()),
);
WrappedTableRef::with_wrapper(self, |wrapped| {
index.refresh(wrapped);
});
self.rebuild_index = index;
let mut buf = TaggedRowBuffer::new(1);
table.scan_project(
to_scan.as_ref(),
&[search_col],
Offset::new(0),
usize::MAX,
&[],
&mut buf,
);
if parallelize_rebuild(to_scan.size()) {
WrappedTableRef::with_wrapper(self, |wrapped| {
buf.par_iter()
.fold(
|| (self.new_buffer(), exec_state.clone(), false),
|(mut mutation_buf, mut exec_state, mut changed), (_, row)| {
let Some(subset) = self.rebuild_index.get_subset(&row[0]) else {
return (mutation_buf, exec_state, changed);
};
let mut scanned = TaggedRowBuffer::new(self.n_columns);
rebuilder.rebuild_subset(
wrapped,
subset,
&mut scanned,
&mut exec_state,
);
for (row_id, row) in scanned.non_stale_mut() {
let to_remove =
self.data.get_row(row_id).map(|x| &x[0..self.n_keys]);
if let Some(key) = to_remove {
mutation_buf.stage_remove(key);
}
changed = true;
insert_row!(self, mutation_buf, row, next_ts);
}
(mutation_buf, exec_state, changed)
},
)
.map(|(_, _, changed)| changed)
.max()
.unwrap_or(false)
})
} else {
let mut scratch = TaggedRowBuffer::new(self.n_columns);
let mut changed = false;
for (_, id) in buf.iter() {
let Some(subset) = self.rebuild_index.get_subset(&id[0]) else {
continue;
};
WrappedTableRef::with_wrapper(self, |wrapped| {
rebuilder.rebuild_subset(wrapped, subset, &mut scratch, exec_state);
});
changed |= subset.size() > 0;
}
if !scratch.is_empty() {
let mut write_buf = self.new_buffer();
for (row_id, row) in scratch.non_stale_mut() {
if let Some(to_remove) = self.data.get_row(row_id).map(|x| &x[0..self.n_keys]) {
write_buf.stage_remove(to_remove);
}
insert_row!(self, write_buf, row, next_ts);
}
}
changed
}
}
fn rebuild_nonincremental(
&mut self,
rebuilder: &dyn Rebuilder,
next_ts: Value,
exec_state: &mut ExecutionState,
) -> bool {
const STEP_SIZE: usize = 2048;
if parallelize_rebuild(self.data.next_row().index()) {
(0..self.data.next_row().index())
.into_par_iter()
.step_by(STEP_SIZE)
.fold(
|| {
(
self.new_buffer(),
TaggedRowBuffer::new(self.n_columns),
exec_state.clone(),
false,
)
},
|(mut mutation_buf, mut buf, mut exec_state, mut changed), start| {
rebuilder.rebuild_buf(
&self.data.data,
RowId::from_usize(start),
RowId::from_usize(cmp::min(
start + STEP_SIZE,
self.data.next_row().index(),
)),
&mut buf,
&mut exec_state,
);
for (row_id, row) in buf.non_stale_mut() {
let to_remove = self.data.get_row(row_id).map(|x| &x[0..self.n_keys]);
changed = true;
if let Some(key) = to_remove {
mutation_buf.stage_remove(key);
}
insert_row!(self, mutation_buf, row, next_ts);
}
buf.clear();
(mutation_buf, buf, exec_state, changed)
},
)
.map(|(_, _, _, changed)| changed)
.max()
.unwrap_or(false)
} else {
let mut buf = TaggedRowBuffer::new(self.n_columns);
let mut changed = false;
let max_row = self.data.next_row().index();
for start in (0..max_row).step_by(STEP_SIZE) {
rebuilder.rebuild_buf(
&self.data.data,
RowId::from_usize(start),
RowId::from_usize(cmp::min(start + STEP_SIZE, max_row)),
&mut buf,
exec_state,
);
}
if !buf.is_empty() {
let mut write_buf = self.new_buffer();
for (row_id, row) in buf.non_stale_mut() {
if let Some(to_remove) = self.data.get_row(row_id).map(|x| &x[0..self.n_keys]) {
write_buf.stage_remove(to_remove);
}
insert_row!(self, write_buf, row, next_ts);
changed = true;
}
}
changed
}
}
}
fn incremental_rebuild(_uf_size: usize, _table_size: usize, _parallel: bool) -> bool {
#[cfg(debug_assertions)]
{
use rand::{Rng, rng};
rng().random::<bool>()
}
#[cfg(not(debug_assertions))]
{
if _parallel {
_table_size > 10_000 && _uf_size * 8192 <= _table_size
} else {
_table_size > 10000 && _uf_size * 8 <= _table_size
}
}
}