rustsim-io 0.0.1

Arrow batch builders, CSV bridge, and ClickHouse writer for rustsim
Documentation
use arrow_schema::{DataType, Field};
use rustsim_io::arrow::{schema_from_fields, ArrowValue};
use rustsim_io::bridge::{BridgeError, CollectArrowBridge};

#[test]
fn bridge_auto_flushes_at_batch_size() {
    let schema = schema_from_fields(vec![
        Field::new("id", DataType::Int64, false),
        Field::new("value", DataType::Float64, false),
    ]);

    let mut bridge = CollectArrowBridge::new(schema, 3).unwrap();

    bridge
        .push_row(&[ArrowValue::Int64(1), ArrowValue::Float64(1.0)])
        .unwrap();
    bridge
        .push_row(&[ArrowValue::Int64(2), ArrowValue::Float64(2.0)])
        .unwrap();
    assert_eq!(bridge.total_rows(), 2);

    bridge
        .push_row(&[ArrowValue::Int64(3), ArrowValue::Float64(3.0)])
        .unwrap();
    // Should have auto-flushed at batch_size=3
    assert_eq!(bridge.batches().len(), 1);
    assert_eq!(bridge.batches()[0].num_rows(), 3);

    bridge
        .push_row(&[ArrowValue::Int64(4), ArrowValue::Float64(4.0)])
        .unwrap();
    let batches = bridge.take_batches().unwrap();
    assert_eq!(batches.len(), 2);
    assert_eq!(batches[1].num_rows(), 1);
}

#[test]
fn bridge_take_batches_flushes_remainder() {
    let schema = schema_from_fields(vec![Field::new("x", DataType::Int64, false)]);

    let mut bridge = CollectArrowBridge::new(schema, 100).unwrap();

    bridge.push_row(&[ArrowValue::Int64(42)]).unwrap();
    bridge.push_row(&[ArrowValue::Int64(43)]).unwrap();

    let batches = bridge.take_batches().unwrap();
    assert_eq!(batches.len(), 1);
    assert_eq!(batches[0].num_rows(), 2);
}

#[test]
fn bridge_metrics_report_pending_and_completed_rows() {
    let schema = schema_from_fields(vec![Field::new("x", DataType::Int64, false)]);

    let mut bridge = CollectArrowBridge::new(schema, 2).unwrap();
    let empty = bridge.metrics();
    assert_eq!(empty.batch_size, 2);
    assert_eq!(empty.pending_rows, 0);
    assert_eq!(empty.completed_batches, 0);
    assert_eq!(empty.completed_rows, 0);
    assert_eq!(empty.total_rows, 0);

    bridge.push_row(&[ArrowValue::Int64(1)]).unwrap();
    let one = bridge.metrics();
    assert_eq!(one.pending_rows, 1);
    assert_eq!(one.completed_batches, 0);
    assert_eq!(one.total_rows, 1);

    bridge.push_row(&[ArrowValue::Int64(2)]).unwrap();
    let flushed = bridge.metrics();
    assert_eq!(flushed.pending_rows, 0);
    assert_eq!(flushed.completed_batches, 1);
    assert_eq!(flushed.completed_rows, 2);
    assert_eq!(flushed.total_rows, 2);
}

#[test]
fn bridge_rejects_zero_batch_size() {
    let schema = schema_from_fields(vec![Field::new("x", DataType::Int64, false)]);
    let err = CollectArrowBridge::new(schema, 0).err().unwrap();
    assert!(matches!(err, BridgeError::InvalidBatchSize));
}

#[test]
fn bridge_can_restore_completed_batches_after_downstream_failure() {
    let schema = schema_from_fields(vec![Field::new("x", DataType::Int64, false)]);
    let mut bridge = CollectArrowBridge::new(schema, 2).unwrap();

    bridge.push_row(&[ArrowValue::Int64(1)]).unwrap();
    bridge.push_row(&[ArrowValue::Int64(2)]).unwrap();
    let drained = bridge.drain_completed();
    assert_eq!(drained.len(), 1);
    assert!(bridge.batches().is_empty());

    bridge.restore_completed(drained);
    assert_eq!(bridge.completed_batches(), 1);
    assert_eq!(bridge.completed_rows(), 2);
}