use arrow_schema::{DataType, Field};
use rustsim_io::arrow::{schema_from_fields, ArrowValue};
use rustsim_io::bridge::{BridgeError, CollectArrowBridge};
#[test]
fn bridge_auto_flushes_at_batch_size() {
let schema = schema_from_fields(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Float64, false),
]);
let mut bridge = CollectArrowBridge::new(schema, 3).unwrap();
bridge
.push_row(&[ArrowValue::Int64(1), ArrowValue::Float64(1.0)])
.unwrap();
bridge
.push_row(&[ArrowValue::Int64(2), ArrowValue::Float64(2.0)])
.unwrap();
assert_eq!(bridge.total_rows(), 2);
bridge
.push_row(&[ArrowValue::Int64(3), ArrowValue::Float64(3.0)])
.unwrap();
assert_eq!(bridge.batches().len(), 1);
assert_eq!(bridge.batches()[0].num_rows(), 3);
bridge
.push_row(&[ArrowValue::Int64(4), ArrowValue::Float64(4.0)])
.unwrap();
let batches = bridge.take_batches().unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches[1].num_rows(), 1);
}
#[test]
fn bridge_take_batches_flushes_remainder() {
let schema = schema_from_fields(vec![Field::new("x", DataType::Int64, false)]);
let mut bridge = CollectArrowBridge::new(schema, 100).unwrap();
bridge.push_row(&[ArrowValue::Int64(42)]).unwrap();
bridge.push_row(&[ArrowValue::Int64(43)]).unwrap();
let batches = bridge.take_batches().unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 2);
}
#[test]
fn bridge_metrics_report_pending_and_completed_rows() {
let schema = schema_from_fields(vec![Field::new("x", DataType::Int64, false)]);
let mut bridge = CollectArrowBridge::new(schema, 2).unwrap();
let empty = bridge.metrics();
assert_eq!(empty.batch_size, 2);
assert_eq!(empty.pending_rows, 0);
assert_eq!(empty.completed_batches, 0);
assert_eq!(empty.completed_rows, 0);
assert_eq!(empty.total_rows, 0);
bridge.push_row(&[ArrowValue::Int64(1)]).unwrap();
let one = bridge.metrics();
assert_eq!(one.pending_rows, 1);
assert_eq!(one.completed_batches, 0);
assert_eq!(one.total_rows, 1);
bridge.push_row(&[ArrowValue::Int64(2)]).unwrap();
let flushed = bridge.metrics();
assert_eq!(flushed.pending_rows, 0);
assert_eq!(flushed.completed_batches, 1);
assert_eq!(flushed.completed_rows, 2);
assert_eq!(flushed.total_rows, 2);
}
#[test]
fn bridge_rejects_zero_batch_size() {
let schema = schema_from_fields(vec![Field::new("x", DataType::Int64, false)]);
let err = CollectArrowBridge::new(schema, 0).err().unwrap();
assert!(matches!(err, BridgeError::InvalidBatchSize));
}
#[test]
fn bridge_can_restore_completed_batches_after_downstream_failure() {
let schema = schema_from_fields(vec![Field::new("x", DataType::Int64, false)]);
let mut bridge = CollectArrowBridge::new(schema, 2).unwrap();
bridge.push_row(&[ArrowValue::Int64(1)]).unwrap();
bridge.push_row(&[ArrowValue::Int64(2)]).unwrap();
let drained = bridge.drain_completed();
assert_eq!(drained.len(), 1);
assert!(bridge.batches().is_empty());
bridge.restore_completed(drained);
assert_eq!(bridge.completed_batches(), 1);
assert_eq!(bridge.completed_rows(), 2);
}