re_components 0.8.2

The standard rerun data types, component types, and archetypes
Documentation
use crossbeam::{channel::TryRecvError, select};
use itertools::Itertools as _;

use re_components::{ColorRGBA, Label, Point2D};
use re_log_types::{
    DataRow, DataTableBatcher, DataTableBatcherConfig, RowId, SizeBytes, TimePoint, Timeline,
};
use re_log_types::{DataTable, TableId, Time};

#[test]
fn manual_trigger() {
    let batcher = DataTableBatcher::new(DataTableBatcherConfig::NEVER).unwrap();
    let tables = batcher.tables();

    let mut expected = create_table();
    expected.compute_all_size_bytes();

    for _ in 0..3 {
        assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

        for row in expected.to_rows() {
            batcher.push_row(row);
        }

        assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

        batcher.flush_blocking();

        {
            let mut table = tables.recv().unwrap();
            // NOTE: Override the resulting table's ID so they can be compared.
            table.table_id = expected.table_id;

            similar_asserts::assert_eq!(expected, table);
        }

        assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
    }

    drop(batcher);

    assert_eq!(Err(TryRecvError::Disconnected), tables.try_recv());
}

#[test]
fn shutdown_trigger() {
    let batcher = DataTableBatcher::new(DataTableBatcherConfig::NEVER).unwrap();
    let tables = batcher.tables();

    let table = create_table();
    let rows = table.to_rows().collect_vec();

    for _ in 0..3 {
        assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

        for row in rows.clone() {
            batcher.push_row(row);
        }

        assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
    }

    drop(batcher);

    let expected = DataTable::from_rows(
        TableId::ZERO,
        std::iter::repeat_with(|| rows.clone()).take(3).flatten(),
    );

    select! {
            recv(tables) -> batch => {
            let mut table = batch.unwrap();
            // NOTE: Override the resulting table's ID so they can be compared.
            table.table_id = expected.table_id;

            similar_asserts::assert_eq!(expected, table);
        }
        default(std::time::Duration::from_millis(50)) => {
            panic!("output channel never yielded any table");
        }
    }

    assert_eq!(Err(TryRecvError::Disconnected), tables.try_recv());
}

#[test]
fn num_bytes_trigger() {
    let table = create_table();
    let rows = table.to_rows().collect_vec();
    let flush_duration = std::time::Duration::from_millis(50);
    let flush_num_bytes = rows
        .iter()
        .take(rows.len() - 1)
        .map(|row| row.total_size_bytes())
        .sum::<u64>();

    let batcher = DataTableBatcher::new(DataTableBatcherConfig {
        flush_num_bytes,
        flush_tick: flush_duration,
        ..DataTableBatcherConfig::NEVER
    })
    .unwrap();
    let tables = batcher.tables();

    assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

    for row in table.to_rows() {
        batcher.push_row(row);
    }

    // Expect all rows except for the last one (num_bytes trigger).
    select! {
            recv(tables) -> batch => {
            let table = batch.unwrap();
            let expected = DataTable::from_rows(
                table.table_id,
                rows.clone().into_iter().take(rows.len() - 1),
            );
            similar_asserts::assert_eq!(expected, table);
        }
        default(flush_duration) => {
            panic!("output channel never yielded any table");
        }
    }

    // Expect just the last row (duration trigger).
    select! {
            recv(tables) -> batch => {
            let table = batch.unwrap();
            let expected = DataTable::from_rows(
                table.table_id,
                rows.last().cloned(),
            );
            similar_asserts::assert_eq!(expected, table);
        }
        default(flush_duration * 2) => {
            panic!("output channel never yielded any table");
        }
    }

    assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

    drop(batcher);

    assert_eq!(Err(TryRecvError::Disconnected), tables.try_recv());
}

#[test]
fn num_rows_trigger() {
    let table = create_table();
    let rows = table.to_rows().collect_vec();
    let flush_duration = std::time::Duration::from_millis(50);
    let flush_num_rows = rows.len() as u64 - 1;

    let batcher = DataTableBatcher::new(DataTableBatcherConfig {
        flush_num_rows,
        flush_tick: flush_duration,
        ..DataTableBatcherConfig::NEVER
    })
    .unwrap();
    let tables = batcher.tables();

    assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

    for row in table.to_rows() {
        batcher.push_row(row);
    }

    // Expect all rows except for the last one.
    select! {
            recv(tables) -> batch => {
            let table = batch.unwrap();
            let expected = DataTable::from_rows(
                table.table_id,
                rows.clone().into_iter().take(rows.len() - 1),
            );
            similar_asserts::assert_eq!(expected, table);
        }
        default(flush_duration) => {
            panic!("output channel never yielded any table");
        }
    }

    // Expect just the last row.
    select! {
            recv(tables) -> batch => {
            let table = batch.unwrap();
            let expected = DataTable::from_rows(
                table.table_id,
                rows.last().cloned(),
            );
            similar_asserts::assert_eq!(expected, table);
        }
        default(flush_duration * 2) => {
            panic!("output channel never yielded any table");
        }
    }

    assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

    drop(batcher);

    assert_eq!(Err(TryRecvError::Disconnected), tables.try_recv());
}

#[test]
fn duration_trigger() {
    let table = create_table();
    let rows = table.to_rows().collect_vec();

    let flush_duration = std::time::Duration::from_millis(50);

    let batcher = DataTableBatcher::new(DataTableBatcherConfig {
        flush_tick: flush_duration,
        ..DataTableBatcherConfig::NEVER
    })
    .unwrap();
    let tables = batcher.tables();

    assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

    _ = std::thread::Builder::new().spawn({
        let mut rows = rows.clone();
        let batcher = batcher.clone();
        move || {
            for row in rows.drain(..rows.len() - 1) {
                batcher.push_row(row);
            }

            std::thread::sleep(flush_duration * 2);

            let row = rows.last().cloned().unwrap();
            batcher.push_row(row);
        }
    });

    // Expect all rows except for the last one.
    select! {
            recv(tables) -> batch => {
            let table = batch.unwrap();
            let expected = DataTable::from_rows(
                table.table_id,
                rows.clone().into_iter().take(rows.len() - 1),
            );
            similar_asserts::assert_eq!(expected, table);
        }
        default(flush_duration * 2) => {
            panic!("output channel never yielded any table");
        }
    }

    // Expect just the last row.
    select! {
            recv(tables) -> batch => {
            let table = batch.unwrap();
            let expected = DataTable::from_rows(
                table.table_id,
                rows.last().cloned(),
            );
            similar_asserts::assert_eq!(expected, table);
        }
        default(flush_duration * 4) => {
            panic!("output channel never yielded any table");
        }
    }

    assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

    drop(batcher);

    assert_eq!(Err(TryRecvError::Disconnected), tables.try_recv());
}

fn create_table() -> DataTable {
    let timepoint = |frame_nr: i64| {
        TimePoint::from([
            (Timeline::log_time(), Time::now().into()),
            (Timeline::new_sequence("frame_nr"), frame_nr.into()),
        ])
    };

    let row0 = {
        let num_instances = 2;
        let points: &[Point2D] = &[[10.0, 10.0].into(), [20.0, 20.0].into()];
        let colors: &[_] = &[ColorRGBA::from_rgb(128, 128, 128)];
        let labels: &[Label] = &[];

        DataRow::from_cells3(
            RowId::random(),
            "a",
            timepoint(1),
            num_instances,
            (points, colors, labels),
        )
    };

    let row1 = {
        let num_instances = 0;
        let colors: &[ColorRGBA] = &[];

        DataRow::from_cells1(RowId::random(), "b", timepoint(1), num_instances, colors)
    };

    let row2 = {
        let num_instances = 1;
        let colors: &[_] = &[ColorRGBA::from_rgb(255, 255, 255)];
        let labels: &[_] = &[Label("hey".into())];

        DataRow::from_cells2(
            RowId::random(),
            "c",
            timepoint(2),
            num_instances,
            (colors, labels),
        )
    };

    let mut table = DataTable::from_rows(TableId::ZERO, [row0, row1, row2]);
    table.compute_all_size_bytes();
    table
}