use crossbeam::{channel::TryRecvError, select};
use itertools::Itertools as _;
use re_components::{ColorRGBA, Label, Point2D};
use re_log_types::{
DataRow, DataTableBatcher, DataTableBatcherConfig, RowId, SizeBytes, TimePoint, Timeline,
};
use re_log_types::{DataTable, TableId, Time};
#[test]
fn manual_trigger() {
let batcher = DataTableBatcher::new(DataTableBatcherConfig::NEVER).unwrap();
let tables = batcher.tables();
let mut expected = create_table();
expected.compute_all_size_bytes();
for _ in 0..3 {
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
for row in expected.to_rows() {
batcher.push_row(row);
}
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
batcher.flush_blocking();
{
let mut table = tables.recv().unwrap();
table.table_id = expected.table_id;
similar_asserts::assert_eq!(expected, table);
}
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
}
drop(batcher);
assert_eq!(Err(TryRecvError::Disconnected), tables.try_recv());
}
#[test]
fn shutdown_trigger() {
let batcher = DataTableBatcher::new(DataTableBatcherConfig::NEVER).unwrap();
let tables = batcher.tables();
let table = create_table();
let rows = table.to_rows().collect_vec();
for _ in 0..3 {
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
for row in rows.clone() {
batcher.push_row(row);
}
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
}
drop(batcher);
let expected = DataTable::from_rows(
TableId::ZERO,
std::iter::repeat_with(|| rows.clone()).take(3).flatten(),
);
select! {
recv(tables) -> batch => {
let mut table = batch.unwrap();
table.table_id = expected.table_id;
similar_asserts::assert_eq!(expected, table);
}
default(std::time::Duration::from_millis(50)) => {
panic!("output channel never yielded any table");
}
}
assert_eq!(Err(TryRecvError::Disconnected), tables.try_recv());
}
#[test]
fn num_bytes_trigger() {
let table = create_table();
let rows = table.to_rows().collect_vec();
let flush_duration = std::time::Duration::from_millis(50);
let flush_num_bytes = rows
.iter()
.take(rows.len() - 1)
.map(|row| row.total_size_bytes())
.sum::<u64>();
let batcher = DataTableBatcher::new(DataTableBatcherConfig {
flush_num_bytes,
flush_tick: flush_duration,
..DataTableBatcherConfig::NEVER
})
.unwrap();
let tables = batcher.tables();
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
for row in table.to_rows() {
batcher.push_row(row);
}
select! {
recv(tables) -> batch => {
let table = batch.unwrap();
let expected = DataTable::from_rows(
table.table_id,
rows.clone().into_iter().take(rows.len() - 1),
);
similar_asserts::assert_eq!(expected, table);
}
default(flush_duration) => {
panic!("output channel never yielded any table");
}
}
select! {
recv(tables) -> batch => {
let table = batch.unwrap();
let expected = DataTable::from_rows(
table.table_id,
rows.last().cloned(),
);
similar_asserts::assert_eq!(expected, table);
}
default(flush_duration * 2) => {
panic!("output channel never yielded any table");
}
}
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
drop(batcher);
assert_eq!(Err(TryRecvError::Disconnected), tables.try_recv());
}
#[test]
fn num_rows_trigger() {
let table = create_table();
let rows = table.to_rows().collect_vec();
let flush_duration = std::time::Duration::from_millis(50);
let flush_num_rows = rows.len() as u64 - 1;
let batcher = DataTableBatcher::new(DataTableBatcherConfig {
flush_num_rows,
flush_tick: flush_duration,
..DataTableBatcherConfig::NEVER
})
.unwrap();
let tables = batcher.tables();
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
for row in table.to_rows() {
batcher.push_row(row);
}
select! {
recv(tables) -> batch => {
let table = batch.unwrap();
let expected = DataTable::from_rows(
table.table_id,
rows.clone().into_iter().take(rows.len() - 1),
);
similar_asserts::assert_eq!(expected, table);
}
default(flush_duration) => {
panic!("output channel never yielded any table");
}
}
select! {
recv(tables) -> batch => {
let table = batch.unwrap();
let expected = DataTable::from_rows(
table.table_id,
rows.last().cloned(),
);
similar_asserts::assert_eq!(expected, table);
}
default(flush_duration * 2) => {
panic!("output channel never yielded any table");
}
}
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
drop(batcher);
assert_eq!(Err(TryRecvError::Disconnected), tables.try_recv());
}
#[test]
fn duration_trigger() {
let table = create_table();
let rows = table.to_rows().collect_vec();
let flush_duration = std::time::Duration::from_millis(50);
let batcher = DataTableBatcher::new(DataTableBatcherConfig {
flush_tick: flush_duration,
..DataTableBatcherConfig::NEVER
})
.unwrap();
let tables = batcher.tables();
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
_ = std::thread::Builder::new().spawn({
let mut rows = rows.clone();
let batcher = batcher.clone();
move || {
for row in rows.drain(..rows.len() - 1) {
batcher.push_row(row);
}
std::thread::sleep(flush_duration * 2);
let row = rows.last().cloned().unwrap();
batcher.push_row(row);
}
});
select! {
recv(tables) -> batch => {
let table = batch.unwrap();
let expected = DataTable::from_rows(
table.table_id,
rows.clone().into_iter().take(rows.len() - 1),
);
similar_asserts::assert_eq!(expected, table);
}
default(flush_duration * 2) => {
panic!("output channel never yielded any table");
}
}
select! {
recv(tables) -> batch => {
let table = batch.unwrap();
let expected = DataTable::from_rows(
table.table_id,
rows.last().cloned(),
);
similar_asserts::assert_eq!(expected, table);
}
default(flush_duration * 4) => {
panic!("output channel never yielded any table");
}
}
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
drop(batcher);
assert_eq!(Err(TryRecvError::Disconnected), tables.try_recv());
}
fn create_table() -> DataTable {
let timepoint = |frame_nr: i64| {
TimePoint::from([
(Timeline::log_time(), Time::now().into()),
(Timeline::new_sequence("frame_nr"), frame_nr.into()),
])
};
let row0 = {
let num_instances = 2;
let points: &[Point2D] = &[[10.0, 10.0].into(), [20.0, 20.0].into()];
let colors: &[_] = &[ColorRGBA::from_rgb(128, 128, 128)];
let labels: &[Label] = &[];
DataRow::from_cells3(
RowId::random(),
"a",
timepoint(1),
num_instances,
(points, colors, labels),
)
};
let row1 = {
let num_instances = 0;
let colors: &[ColorRGBA] = &[];
DataRow::from_cells1(RowId::random(), "b", timepoint(1), num_instances, colors)
};
let row2 = {
let num_instances = 1;
let colors: &[_] = &[ColorRGBA::from_rgb(255, 255, 255)];
let labels: &[_] = &[Label("hey".into())];
DataRow::from_cells2(
RowId::random(),
"c",
timepoint(2),
num_instances,
(colors, labels),
)
};
let mut table = DataTable::from_rows(TableId::ZERO, [row0, row1, row2]);
table.compute_all_size_bytes();
table
}