use std::{
sync::{Arc, Mutex, OnceLock},
thread,
};
use arrow::{
array::{
Array, ArrayData, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Builder, FixedSizeBinaryArray, Float16Array, Float32Array,
Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, StringArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray, UInt8Array, timezone::Tz,
},
buffer::Buffer,
datatypes::{
ArrowPrimitiveType, DataType, Decimal256Type, Field, Float16Type, Schema, SchemaRef,
TimeUnit,
},
error::ArrowError,
record_batch::{RecordBatch, RecordBatchReader},
};
use chrono::{NaiveDate, NaiveTime, TimeZone};
use float_eq::assert_float_eq;
type F16 = <Float16Type as ArrowPrimitiveType>::Native;
use arrow_odbc::{
ColumnFailure, Error, OdbcReaderBuilder, OdbcWriter, TextEncoding, WriterError,
arrow::array::Float64Array,
arrow_schema_from, insert_into_table,
odbc_api::{
Connection, ConnectionOptions, Cursor, CursorImpl, Environment, IntoParameter,
buffers::TextRowSet,
handles::StatementConnection,
sys::{AttrConnectionPooling, AttrCpMatch},
},
};
use stdext::function_name;
const MSSQL: &str = "Driver={ODBC Driver 18 for SQL Server};\
Server=localhost;\
UID=SA;\
PWD=My@Test@Password1;\
TrustServerCertificate=yes;";
const POSTGRES: &str = "Driver={PostgreSQL UNICODE};\
Server=localhost;\
Port=5432;\
Database=test;\
Uid=test;\
Pwd=test;";
const DB2: &str = "Driver={IBM DB2 ODBC DRIVER};\
Database=testdb;\
Hostname=localhost;\
Port=50000;\
Protocol=TCPIP;\
Uid=db2inst1;\
Pwd=password;";
fn env() -> &'static Environment {
static ENV: OnceLock<Environment> = OnceLock::new();
ENV.get_or_init(|| {
unsafe {
Environment::set_connection_pooling(AttrConnectionPooling::DriverAware).unwrap();
let mut env = Environment::new().unwrap();
env.set_connection_pooling_matching(AttrCpMatch::Strict)
.unwrap();
env
}
})
}
#[test]
fn fetch_nullable_32bit_integer() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any = fetch_arrow_data(table_name, "INTEGER", "(1),(NULL),(3)").unwrap();
let array_vals = array_any.as_any().downcast_ref::<Int32Array>().unwrap();
assert!(array_vals.is_valid(0));
assert!(array_vals.is_null(1));
assert!(array_vals.is_valid(2));
assert_eq!([1, 0, 3], *array_vals.values());
}
#[test]
fn fetch_32bit_integer() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any = fetch_arrow_data(table_name, "INTEGER NOT NULL", "(1),(2),(3)").unwrap();
let array_vals = array_any.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!([1, 2, 3], *array_vals.values());
}
#[test]
fn fetch_16bit_integer() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any = fetch_arrow_data(table_name, "SMALLINT NOT NULL", "(1),(2),(3)").unwrap();
let array_vals = array_any.as_any().downcast_ref::<Int16Array>().unwrap();
assert_eq!([1, 2, 3], *array_vals.values());
}
#[test]
fn fetch_unsigend_8bit_integer() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any = fetch_arrow_data(table_name, "TINYINT NOT NULL", "(1),(0),(255)").unwrap();
let array_vals = array_any.as_any().downcast_ref::<UInt8Array>().unwrap();
assert_eq!([1, 0, 255], *array_vals.values());
}
#[test]
fn fetch_8bit_unsigned_integer_explicit_schema() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["TINYINT NOT NULL"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES (1),(2),(3)");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::UInt8, false)]));
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.with_schema(schema)
.build(cursor)
.unwrap();
let arrow_batch = reader.next().unwrap().unwrap();
let array_vals = arrow_batch
.column(0)
.as_any()
.downcast_ref::<UInt8Array>()
.unwrap();
assert_eq!([1, 2, 3], *array_vals.values());
}
#[test]
fn fetch_decimal128_negative_scale_unsupported() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["NUMERIC(5,0) NOT NULL"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES (12300)");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Decimal128(3, -2),
false,
)]));
let result = OdbcReaderBuilder::new().with_schema(schema).build(cursor);
assert!(matches!(
result,
Err(Error::ColumnFailure {
source: ColumnFailure::UnsupportedArrowType(DataType::Decimal128(3, -2)),
index: 0,
name: _
})
))
}
#[test]
fn unsupported_16bit_unsigned_integer() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["SMALLINT NOT NULL"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES (1),(2),(3)");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::UInt16, false)]));
let result = OdbcReaderBuilder::new().with_schema(schema).build(cursor);
assert!(matches!(
result,
Err(Error::ColumnFailure {
source: ColumnFailure::UnsupportedArrowType(DataType::UInt16),
index: 0,
name: _
})
))
}
#[test]
fn fetch_boolean() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any = fetch_arrow_data(table_name, "BIT NOT NULL", "(1),(0),(1)").unwrap();
let array_vals = array_any.as_any().downcast_ref::<BooleanArray>().unwrap();
assert!(array_vals.value(0));
assert!(!array_vals.value(1));
assert!(array_vals.value(2));
}
#[test]
fn fetch_nullable_boolean() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any = fetch_arrow_data(table_name, "BIT", "(1),(NULL),(0)").unwrap();
let array_vals = array_any.as_any().downcast_ref::<BooleanArray>().unwrap();
assert!(array_vals.is_valid(0));
assert!(array_vals.value(0));
assert!(array_vals.is_null(1));
assert!(array_vals.is_valid(2));
assert!(!array_vals.value(2));
}
#[test]
fn fetch_32bit_floating_point() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any = fetch_arrow_data(table_name, "REAL NOT NULL", "(1),(2),(3)").unwrap();
let array_vals = array_any.as_any().downcast_ref::<Float32Array>().unwrap();
assert_float_eq!([1., 2., 3.][..], array_vals.values(), abs_all <= 000.1);
}
#[test]
fn fetch_64bit_floating_point() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any =
fetch_arrow_data(table_name, "DOUBLE PRECISION NOT NULL", "(1),(2),(3)").unwrap();
let array_vals = array_any.as_any().downcast_ref::<Float64Array>().unwrap();
assert_float_eq!([1., 2., 3.][..], array_vals.values(), abs_all <= 000.1);
}
#[test]
fn fetch_64bit_integer() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any = fetch_arrow_data(table_name, "BIGINT NOT NULL", "(1),(2),(3)").unwrap();
let array_vals = array_any.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!([1, 2, 3], *array_vals.values());
}
#[test]
fn fetch_varchar() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any =
fetch_arrow_data(table_name, "VARCHAR(50)", "('Hello'),('Bonjour'),(NULL)").unwrap();
let array_vals = array_any.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!("Hello", array_vals.value(0));
assert_eq!("Bonjour", array_vals.value(1));
assert!(array_vals.is_null(2));
}
#[test]
fn trim_fixed_sized_character_data() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_literals(table_name, "CHAR(4)", "('1234'),(' 123'),('123 ')");
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(4)
.trim_fixed_sized_characters(true)
.build(cursor)
.unwrap()
.into_concurrent()
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_any = record_batch.column(0).clone();
let array_vals = array_any.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!("1234", array_vals.value(0));
assert_eq!("123", array_vals.value(1));
assert_eq!("123", array_vals.value(2));
}
#[test]
fn fetch_nvarchar() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any =
fetch_arrow_data(table_name, "NVARCHAR(50)", "('Hello'),('Bonjour'),(NULL)").unwrap();
let array_vals = array_any.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!("Hello", array_vals.value(0));
assert_eq!("Bonjour", array_vals.value(1));
assert!(array_vals.is_null(2));
}
#[test]
fn fetch_dates() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any =
fetch_arrow_data(table_name, "DATE", "('2021-04-09'),(NULL),('2002-09-30')").unwrap();
let array_vals = array_any.as_any().downcast_ref::<Date32Array>().unwrap();
assert_eq!(
Some(NaiveDate::from_ymd_opt(2021, 4, 9).unwrap()),
array_vals.value_as_date(0)
);
assert!(array_vals.is_null(1));
assert_eq!(
Some(NaiveDate::from_ymd_opt(2002, 9, 30).unwrap()),
array_vals.value_as_date(2)
);
}
#[test]
fn fetch_non_null_dates() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any =
fetch_arrow_data(table_name, "DATE NOT NULL", "('2021-04-09'),('2002-09-30')").unwrap();
let array_vals = array_any.as_any().downcast_ref::<Date32Array>().unwrap();
assert_eq!(
Some(NaiveDate::from_ymd_opt(2021, 4, 9).unwrap()),
array_vals.value_as_date(0)
);
assert_eq!(
Some(NaiveDate::from_ymd_opt(2002, 9, 30).unwrap()),
array_vals.value_as_date(1)
);
}
#[test]
fn fetch_non_null_date_time() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any = fetch_arrow_data(
table_name,
"DATETIME NOT NULL",
"('2021-04-09 18:57:50.12'),('2002-09-30 12:43:17.45')",
)
.unwrap();
let array_vals = array_any
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
assert_eq!(
Some(
NaiveDate::from_ymd_opt(2021, 4, 9)
.unwrap()
.and_hms_milli_opt(18, 57, 50, 120)
.unwrap()
),
array_vals.value_as_datetime(0)
);
assert_eq!(
Some(
NaiveDate::from_ymd_opt(2002, 9, 30)
.unwrap()
.and_hms_milli_opt(12, 43, 17, 450)
.unwrap()
),
array_vals.value_as_datetime(1)
);
}
#[test]
fn fetch_date_time_us() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any = fetch_arrow_data(
table_name,
"DATETIME2(6)",
"('2021-04-09 18:57:50'),(NULL),('2002-09-30 12:43:17')",
)
.unwrap();
let array_vals = array_any
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
assert_eq!(
Some(
NaiveDate::from_ymd_opt(2021, 4, 9)
.unwrap()
.and_hms_milli_opt(18, 57, 50, 0)
.unwrap()
),
array_vals.value_as_datetime(0)
);
assert!(array_vals.is_null(1));
assert_eq!(
Some(
NaiveDate::from_ymd_opt(2002, 9, 30)
.unwrap()
.and_hms_milli_opt(12, 43, 17, 00)
.unwrap()
),
array_vals.value_as_datetime(2)
);
}
#[test]
fn fetch_date_time_ms() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any = fetch_arrow_data(
table_name,
"DATETIME",
"('2021-04-09 18:57:50'),(NULL),('2002-09-30 12:43:17')",
)
.unwrap();
let array_vals = array_any
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
assert_eq!(
Some(
NaiveDate::from_ymd_opt(2021, 4, 9)
.unwrap()
.and_hms_milli_opt(18, 57, 50, 0)
.unwrap()
),
array_vals.value_as_datetime(0)
);
assert!(array_vals.is_null(1));
assert_eq!(
Some(
NaiveDate::from_ymd_opt(2002, 9, 30)
.unwrap()
.and_hms_milli_opt(12, 43, 17, 00)
.unwrap()
),
array_vals.value_as_datetime(2)
);
}
#[test]
fn fetch_date_time_ms_before_epoch() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any =
fetch_arrow_data(table_name, "DATETIME", "('1900-01-01 12:43:17.123')").unwrap();
let array_vals = array_any
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
assert_eq!(
Some(
NaiveDate::from_ymd_opt(1900, 1, 1)
.unwrap()
.and_hms_milli_opt(12, 43, 17, 123)
.unwrap()
),
array_vals.value_as_datetime(0)
);
}
#[test]
fn fetch_timestamp_ms_which_could_not_be_represented_as_i64_ns() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any =
fetch_arrow_data(table_name, "DATETIME2(3)", "('1600-06-18T23:12:44.123Z')").unwrap();
let array_vals = array_any
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
assert_eq!(
Some(
NaiveDate::from_ymd_opt(1600, 6, 18)
.unwrap()
.and_hms_milli_opt(23, 12, 44, 123)
.unwrap()
),
array_vals.value_as_datetime(0)
);
}
#[test]
fn fetch_non_null_date_time_ns() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let array_any = fetch_arrow_data(
table_name,
"DATETIME2 NOT NULL",
"('2021-04-09 18:57:50.1234567'),('2002-09-30 12:43:17.456')",
)
.unwrap();
let array_vals = array_any
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
assert_eq!(
Some(
NaiveDate::from_ymd_opt(2021, 4, 9)
.unwrap()
.and_hms_nano_opt(18, 57, 50, 123_456_700)
.unwrap()
),
array_vals.value_as_datetime(0)
);
assert_eq!(
Some(
NaiveDate::from_ymd_opt(2002, 9, 30)
.unwrap()
.and_hms_nano_opt(12, 43, 17, 456_000_000)
.unwrap()
),
array_vals.value_as_datetime(1)
);
}
#[test]
fn fetch_out_of_range_date_time_ns() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let error = fetch_arrow_data(
table_name,
"DATETIME2 NOT NULL",
"('2300-01-01 00:00:00.1234567')",
)
.unwrap_err();
assert_eq!(
"External error: Timestamp is not representable in arrow: 2300-01-01 00:00:00.123456700\n\
Timestamps with nanoseconds precision are represented using a signed 64 Bit integer. This \
limits their range to values between 1677-09-21 00:12:44 and \
2262-04-11 23:47:16.854775807. The value returned from the database is outside of this \
range. Suggestions to fix this error either reduce the precision or fetch the values as \
text.",
error.to_string()
)
}
#[test]
fn map_out_of_range_date_time_to_null() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_literals(
table_name,
"DATETIME2 NOT NULL",
"('2300-01-01 00:00:00.1234567'),('2002-09-30 12:43:17.456')",
);
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.value_errors_as_null(true)
.build(cursor)
.unwrap()
.into_concurrent()
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array = record_batch.column(0).clone();
let array_vals = array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
assert!(array_vals.is_null(0));
assert_eq!(
Some(
NaiveDate::from_ymd_opt(2002, 9, 30)
.unwrap()
.and_hms_nano_opt(12, 43, 17, 456_000_000)
.unwrap()
),
array_vals.value_as_datetime(1)
);
}
#[test]
fn fetch_decimals() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_literals(table_name, "DECIMAL(5,2) NOT NULL", "(123.45),(678.90)");
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(5)
.build(cursor)
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let column = record_batch.column(0).clone();
let array_vals = column.as_any().downcast_ref::<Decimal128Array>().unwrap();
assert_eq!("123.45", array_vals.value_as_string(0));
assert_eq!("678.90", array_vals.value_as_string(1));
}
#[test]
fn fetch_negative_decimal() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_value(table_name, "DECIMAL(5,2) NOT NULL", -123.45);
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(5)
.build(cursor)
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let column = record_batch.column(0).clone();
let array_vals = column.as_any().downcast_ref::<Decimal128Array>().unwrap();
assert_eq!("-123.45", array_vals.value_as_string(0));
}
#[test]
fn fetch_varbinary_data() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARBINARY(30) NOT NULL"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES (?)");
let mut insert = conn.prepare(&sql).unwrap();
insert.execute(&b"Hello".into_parameter()).unwrap();
insert.execute(&b"World".into_parameter()).unwrap();
let sql = format!("SELECT a FROM {table_name} ORDER BY id");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap();
let arrow_batch = reader.next().unwrap().unwrap();
let array_vals = arrow_batch
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
assert_eq!(b"Hello", array_vals.value(0));
assert_eq!(b"World", array_vals.value(1));
}
#[test]
fn fetch_fixed_sized_binary_data() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["BINARY(5) NOT NULL"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES (?)");
let mut insert = conn.prepare(&sql).unwrap();
insert.execute(&b"Hello".into_parameter()).unwrap();
insert.execute(&b"World".into_parameter()).unwrap();
let sql = format!("SELECT a FROM {table_name} ORDER BY id");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap();
let arrow_batch = reader.next().unwrap().unwrap();
let array_vals = arrow_batch
.column(0)
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.unwrap();
assert_eq!(b"Hello", array_vals.value(0));
assert_eq!(b"World", array_vals.value(1));
}
#[test]
fn fetch_time() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["TIME NOT NULL"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES ('12:34:56')");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name} ORDER BY id");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap();
let arrow_batch = reader.next().unwrap().unwrap();
let array_vals = arrow_batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!("12:34:56.0000000", array_vals.value(0));
}
#[test]
fn fetch_time_0_psql() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(POSTGRES, Default::default())
.unwrap();
setup_empty_table::<PostgreSql>(&conn, table_name, &["TIME(0) NOT NULL"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES ('12:34:56')");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name} ORDER BY id");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap();
let arrow_batch = reader.next().unwrap().unwrap();
let array_vals = arrow_batch
.column(0)
.as_any()
.downcast_ref::<Time32SecondArray>()
.unwrap();
assert_eq!(45_296, array_vals.value(0));
}
#[test]
fn fetch_time_1_psql() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(POSTGRES, Default::default())
.unwrap();
setup_empty_table::<PostgreSql>(&conn, table_name, &["TIME(1) NOT NULL"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES ('12:34:56.7')");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name} ORDER BY id");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap();
let arrow_batch = reader.next().unwrap().unwrap();
let array_vals = arrow_batch
.column(0)
.as_any()
.downcast_ref::<Time32MillisecondArray>()
.unwrap();
let ms_since_midnight = array_vals.value(0) as u32;
let sec = ms_since_midnight / 1000;
let nano = (ms_since_midnight % 1000) * 1_000_000;
let naive_time = NaiveTime::from_num_seconds_from_midnight_opt(sec, nano).unwrap();
assert_eq!("12:34:56.700", naive_time.to_string());
}
#[test]
fn fetch_time_2_psql() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(POSTGRES, Default::default())
.unwrap();
setup_empty_table::<PostgreSql>(&conn, table_name, &["TIME(2) NOT NULL"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES ('12:34:56.78')");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name} ORDER BY id");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap();
let arrow_batch = reader.next().unwrap().unwrap();
let array_vals = arrow_batch
.column(0)
.as_any()
.downcast_ref::<Time32MillisecondArray>()
.unwrap();
let ms_since_midnight = array_vals.value(0) as u32;
let sec = ms_since_midnight / 1000;
let nano = (ms_since_midnight % 1000) * 1_000_000;
let naive_time = NaiveTime::from_num_seconds_from_midnight_opt(sec, nano).unwrap();
assert_eq!("12:34:56.780", naive_time.to_string());
}
#[test]
fn fetch_time_3_psql() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(POSTGRES, Default::default())
.unwrap();
setup_empty_table::<PostgreSql>(&conn, table_name, &["TIME(3) NOT NULL"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES ('12:34:56.789')");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name} ORDER BY id");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap();
let arrow_batch = reader.next().unwrap().unwrap();
let array_vals = arrow_batch
.column(0)
.as_any()
.downcast_ref::<Time32MillisecondArray>()
.unwrap();
let ms_since_midnight = array_vals.value(0) as u32;
let sec = ms_since_midnight / 1000;
let nano = (ms_since_midnight % 1000) * 1_000_000;
let naive_time = NaiveTime::from_num_seconds_from_midnight_opt(sec, nano).unwrap();
assert_eq!("12:34:56.789", naive_time.to_string());
}
#[test]
fn fetch_time_4_psql() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(POSTGRES, Default::default())
.unwrap();
setup_empty_table::<PostgreSql>(&conn, table_name, &["TIME(4) NOT NULL"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES ('12:34:56.7891')");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name} ORDER BY id");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap();
let arrow_batch = reader.next().unwrap().unwrap();
let array_vals = arrow_batch
.column(0)
.as_any()
.downcast_ref::<Time64MicrosecondArray>()
.unwrap();
let us_since_midnight = array_vals.value(0);
let sec = (us_since_midnight / 1_000_000) as u32;
let nano = ((us_since_midnight % 1_000_000) * 1_000) as u32;
let naive_time = NaiveTime::from_num_seconds_from_midnight_opt(sec, nano).unwrap();
assert_eq!("12:34:56.789100", naive_time.to_string());
}
#[test]
fn fetch_time_7_mssql() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["TIME"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES ('12:34:56.7891234')");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name} ORDER BY id");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let dbms_name = conn.database_management_system_name().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.with_dbms_name(dbms_name)
.build(cursor)
.unwrap();
let arrow_batch = reader.next().unwrap().unwrap();
let array_vals = arrow_batch
.column(0)
.as_any()
.downcast_ref::<Time64NanosecondArray>()
.unwrap();
let us_since_midnight = array_vals.value(0);
let sec = (us_since_midnight / 1_000_000_000) as u32;
let nano = (us_since_midnight % 1_000_000_000) as u32;
let naive_time = NaiveTime::from_num_seconds_from_midnight_opt(sec, nano).unwrap();
assert_eq!("12:34:56.789123400", naive_time.to_string());
}
#[test]
fn prepared_query() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["REAL NOT NULL"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES (1),(2),(3)");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let mut prepared = conn.prepare(&sql).unwrap();
let cursor = prepared.execute(()).unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap();
let arrow_batch = reader.next().unwrap().unwrap();
let array_vals = arrow_batch
.column(0)
.as_any()
.downcast_ref::<Float32Array>()
.unwrap();
assert_float_eq!([1., 2., 3.][..], array_vals.values(), abs_all <= 000.1);
}
#[test]
fn infer_schema() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["REAL NOT NULL"]).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.build(cursor)
.unwrap();
let actual = reader.schema();
let expected = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
assert_eq!(expected, actual)
}
#[test]
fn fetch_schema_for_table() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["REAL NOT NULL"]).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let mut prepared = conn.prepare(&sql).unwrap();
let schema = arrow_schema_from(&mut prepared, None, false).unwrap();
assert_eq!(r#"Field { "a": Float32 }"#, schema.to_string())
}
#[test]
fn should_allocate_enough_memory_for_wchar_column_bound_to_u8() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = query_single_value(table_name, "NCHAR(1) NOT NULL", "â„¢");
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.build(cursor)
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_any = record_batch.column(0).clone();
let array_vals = array_any.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!("â„¢", array_vals.value(0));
}
#[test]
fn should_allocate_enough_memory_for_varchar_column_bound_to_u16() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = query_single_value(table_name, "CHAR(1) NOT NULL", "Ü");
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.build(cursor)
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_any = record_batch.column(0).clone();
let array_vals = array_any.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!("Ü", array_vals.value(0));
}
#[test]
fn should_allow_to_fetch_from_varchar_max() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARCHAR(MAX)"]).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let result = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.with_max_text_size(1024)
.build(cursor);
assert!(result.is_ok())
}
#[test]
fn should_error_for_truncation() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARCHAR(MAX)"]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES ('123456789')");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.with_max_text_size(5)
.build(cursor)
.unwrap();
let result = reader.next().unwrap();
assert!(result.is_err())
}
#[test]
fn should_allow_to_fetch_from_varbinary_max() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARBINARY(MAX)"]).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let result = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.with_max_binary_size(1024)
.build(cursor);
assert!(result.is_ok())
}
#[test]
#[ignore = "This tests allocates too much memory under WSL"]
fn fallibale_allocations() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARBINARY(4096)"]).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let cursor = conn.execute(&sql, (), None).unwrap().unwrap();
let result = OdbcReaderBuilder::new()
.with_max_bytes_per_batch(usize::MAX)
.with_max_num_rows_per_batch(100_000_000)
.with_fallibale_allocations(true)
.build(cursor);
assert!(result.is_err());
assert!(matches!(
result.err().unwrap(),
Error::ColumnFailure {
name: _,
index: 0,
source: ColumnFailure::TooLarge {
num_elements: 100_000_000,
element_size: 4096
}
}
));
}
#[test]
fn read_multiple_result_sets() {
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
let cursor = conn
.execute("SELECT 1 AS A; SELECT 2 AS B;", (), None)
.unwrap()
.unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.build(cursor)
.unwrap();
let first = reader.next().unwrap().unwrap();
let cursor = reader.into_cursor().unwrap();
let cursor = cursor.more_results().unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.build(cursor)
.unwrap();
let second = reader.next().unwrap().unwrap();
let first_vals = first
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(1, first_vals.value(0));
let second_vals = second
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(2, second_vals.value(0));
}
#[test]
fn read_multiple_result_sets_with_second_no_schema() {
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
let cursor = conn
.execute(
"SELECT 1 AS A; SELECT 1 AS A INTO #local_temp_table; SELECT A FROM #local_temp_table;",
(),
None,
)
.unwrap()
.unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.build(cursor)
.unwrap();
let first = reader.next().unwrap().unwrap();
let cursor = reader.into_cursor().unwrap();
let cursor = cursor.more_results().unwrap().unwrap();
let reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.build(cursor)
.unwrap();
let second_schema = reader.schema();
let cursor = reader.into_cursor().unwrap();
let cursor = cursor.more_results().unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.build(cursor)
.unwrap();
let third = reader.next().unwrap().unwrap();
let first_vals = first
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(1, first_vals.value(0));
let second_column_count = second_schema.fields().len();
assert_eq!(0, second_column_count);
let third_vals = third
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(1, third_vals.value(0));
}
#[test]
fn applies_row_limit_for_default_constructed_readers() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_value(table_name, "INTEGER", "42");
let reader = OdbcReaderBuilder::new().build(cursor).unwrap();
assert_eq!(reader.max_rows_per_batch(), 65535)
}
#[test]
fn applies_memory_size_limit() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_value(table_name, "VARCHAR(512)", "Hello");
let reader = OdbcReaderBuilder::new()
.with_max_bytes_per_batch(10 * 1024 * 1024)
.build(cursor)
.unwrap();
assert!(reader.max_rows_per_batch() < 65535)
}
#[test]
fn memory_size_limit_can_not_hold_a_single_row() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_value(table_name, "VARCHAR(512)", "Hello");
let result = OdbcReaderBuilder::new()
.with_max_bytes_per_batch(1)
.build(cursor);
assert!(matches!(
result,
Err(Error::OdbcBufferTooSmall {
max_bytes_per_batch: 1,
bytes_per_row: _
})
))
}
#[test]
fn fetch_wide_data() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_value(table_name, "VARCHAR(30)", "Hällö, Wörld!");
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.with_payload_text_encoding(TextEncoding::Utf16)
.build(cursor)
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_any = record_batch.column(0).clone();
let array_vals = array_any.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!("Hällö, Wörld!", array_vals.value(0));
}
#[test]
fn fetch_narrow_data() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_value(table_name, "VARCHAR(15)", "Hello, World!");
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.with_payload_text_encoding(TextEncoding::Utf8)
.build(cursor)
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_any = record_batch.column(0).clone();
let array_vals = array_any.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!("Hello, World!", array_vals.value(0));
}
#[test]
fn insert_does_not_support_list_type() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARCHAR(4096)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::List(Arc::new(Field::new("b", DataType::Utf8, true))),
true,
)]));
let insert = format!("INSERT INTO {table_name} (a) VALUES (?)");
let prepared = conn.prepare(&insert).unwrap();
let result = OdbcWriter::new(10, schema.as_ref(), prepared);
assert!(matches!(
result,
Err(WriterError::UnsupportedArrowDataType(_))
))
}
#[test]
fn insert_text() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARCHAR(4096)"]).unwrap();
let array = StringArray::from(vec![Some("Hello"), None, Some("World")]);
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "Hello\nNULL\nWorld";
assert_eq!(expected, actual);
}
#[test]
fn insert_multiple_small_batches() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARCHAR(10)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let first_string = StringArray::from(vec![Some("a")]);
let first_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(first_string)]).unwrap();
let second_string = StringArray::from(vec![Some("bc")]);
let second_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(second_string)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![first_batch, second_batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "a\nbc";
assert_eq!(expected, actual);
}
#[test]
fn insert_non_ascii_text() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARCHAR(50)"]).unwrap();
let array = StringArray::from(vec![Some("Frühstück µ")]);
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "Frühstück µ";
assert_eq!(expected, actual);
}
#[test]
fn insert_nullable_booleans() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["BIT"]).unwrap();
let array = BooleanArray::from(vec![Some(true), None, Some(false)]);
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, true)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch.clone(), batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1\nNULL\n0\n1\nNULL\n0";
assert_eq!(expected, actual);
}
#[test]
fn insert_non_nullable_booleans() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["BIT"]).unwrap();
let array = BooleanArray::from(vec![Some(true), Some(false), Some(false)]);
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch.clone(), batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1\n0\n0\n1\n0\n0";
assert_eq!(expected, actual);
}
#[test]
fn insert_nullable_int8() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["TINYINT"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int8, true)]));
let array1 = Int8Array::from(vec![Some(1), None, Some(3)]);
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1)]).unwrap();
let array2 = Int8Array::from(vec![Some(4), None, Some(6)]);
let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array2)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch1, batch2]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1\nNULL\n3\n4\nNULL\n6";
assert_eq!(expected, actual);
}
#[test]
fn insert_non_nullable_int8() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["TINYINT"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int8, false)]));
let array1 = Int8Array::from(vec![Some(1), Some(2), Some(3)]);
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1)]).unwrap();
let array2 = Int8Array::from(vec![Some(4), Some(5), Some(6)]);
let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array2)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch1, batch2]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1\n2\n3\n4\n5\n6";
assert_eq!(expected, actual);
}
#[test]
fn insert_nullable_int16() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["SMALLINT"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int16, true)]));
let array1 = Int16Array::from(vec![Some(1), None, Some(3)]);
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch1]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1\nNULL\n3";
assert_eq!(expected, actual);
}
#[test]
fn insert_nullable_int32() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["INTEGER"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
let array1 = Int32Array::from(vec![Some(1), None, Some(3)]);
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch1]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1\nNULL\n3";
assert_eq!(expected, actual);
}
#[test]
fn insert_nullable_int64() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["BIGINT"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
let array1 = Int64Array::from(vec![Some(1), None, Some(3)]);
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch1]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1\nNULL\n3";
assert_eq!(expected, actual);
}
#[test]
fn insert_non_nullable_unsigned_int8() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["SMALLINT"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::UInt8, false)]));
let array1 = UInt8Array::from(vec![Some(1), Some(2), Some(3)]);
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch1]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1\n2\n3";
assert_eq!(expected, actual);
}
#[test]
fn insert_nullable_f32() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["REAL"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
let array1 = Float32Array::from(vec![Some(1.), None, Some(3.)]);
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch1]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1.0\nNULL\n3.0";
assert_eq!(expected, actual);
}
#[test]
fn insert_nullable_f64() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["FLOAT(25)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)]));
let array1 = Float64Array::from(vec![Some(1.), None, Some(3.)]);
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch1]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1.0\nNULL\n3.0";
assert_eq!(expected, actual);
}
#[test]
fn insert_nullable_f16() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["REAL"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float16, true)]));
let array1: Float16Array = [Some(F16::from_f32(1.0)), None, Some(F16::from_f32(3.0))]
.into_iter()
.collect();
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch1]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1.0\nNULL\n3.0";
assert_eq!(expected, actual);
}
#[test]
fn insert_non_nullable_f16() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["REAL"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float16, false)]));
let array1: Float16Array = [
Some(F16::from_f32(1.0)),
Some(F16::from_f32(2.0)),
Some(F16::from_f32(3.0)),
]
.into_iter()
.collect();
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch1]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1.0\n2.0\n3.0";
assert_eq!(expected, actual);
}
#[test]
fn insert_timestamp_with_seconds_precisions() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["DATETIME2(0)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Second, None),
false,
)]));
let array = TimestampSecondArray::from(vec![11111111]);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1970-05-09 14:25:11";
assert_eq!(expected, actual);
}
#[test]
fn insert_berlin_time_to_daytime_offset_sec_precision() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["DATETIMEOFFSET(0)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Second, Some(Arc::from("Europe/Berlin"))),
false,
)]));
let tz: Tz = "Europe/Berlin".parse().unwrap();
let dt = tz.with_ymd_and_hms(2025, 6, 22, 12, 0, 0).single().unwrap();
let dt2 = tz.with_ymd_and_hms(2025, 2, 1, 12, 0, 0).single().unwrap();
let timestamp = [dt.timestamp(), dt2.timestamp()];
let buffer = Buffer::from_slice_ref(×tamp);
let data = ArrayData::builder(DataType::Timestamp(
TimeUnit::Second,
Some(Arc::from("Europe/Berlin")),
))
.len(2)
.add_buffer(buffer)
.build()
.unwrap();
let array = TimestampSecondArray::from(data);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "2025-06-22 12:00:00 +02:00\n2025-02-01 12:00:00 +01:00";
assert_eq!(expected, actual);
}
#[test]
fn insert_berlin_time_to_daytime_offset_ms_precision() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["DATETIMEOFFSET(3)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Millisecond, Some(Arc::from("Europe/Berlin"))),
false,
)]));
let tz: Tz = "Europe/Berlin".parse().unwrap();
let dt = tz.with_ymd_and_hms(2025, 6, 22, 12, 0, 0).single().unwrap();
let timestamp = [dt.timestamp() * 1000i64 + 123i64];
let buffer = Buffer::from_slice_ref(×tamp);
let data = ArrayData::builder(DataType::Timestamp(
TimeUnit::Millisecond,
Some(Arc::from("Europe/Berlin")),
))
.len(1)
.add_buffer(buffer)
.build()
.unwrap();
let array = TimestampMillisecondArray::from(data);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "2025-06-22 12:00:00.123 +02:00";
assert_eq!(expected, actual);
}
#[test]
fn insert_berlin_time_to_daytime_offset_us_precision() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["DATETIMEOFFSET(6)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("Europe/Berlin"))),
false,
)]));
let tz: Tz = "Europe/Berlin".parse().unwrap();
let dt = tz.with_ymd_and_hms(2025, 6, 22, 12, 0, 0).single().unwrap();
let timestamp = [dt.timestamp() * 1_000_000i64 + 123456i64];
let buffer = Buffer::from_slice_ref(×tamp);
let data = ArrayData::builder(DataType::Timestamp(
TimeUnit::Microsecond,
Some(Arc::from("Europe/Berlin")),
))
.len(1)
.add_buffer(buffer)
.build()
.unwrap();
let array = TimestampMicrosecondArray::from(data);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "2025-06-22 12:00:00.123456 +02:00";
assert_eq!(expected, actual);
}
#[test]
fn insert_berlin_time_to_daytime_offset_ns_precision() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["DATETIMEOFFSET(7)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Nanosecond, Some(Arc::from("Europe/Berlin"))),
false,
)]));
let tz: Tz = "Europe/Berlin".parse().unwrap();
let dt = tz.with_ymd_and_hms(2025, 6, 22, 12, 0, 0).single().unwrap();
let timestamp = [dt.timestamp() * 1_000_000_000i64 + 123456789i64];
let buffer = Buffer::from_slice_ref(×tamp);
let data = ArrayData::builder(DataType::Timestamp(
TimeUnit::Nanosecond,
Some(Arc::from("Europe/Berlin")),
))
.len(1)
.add_buffer(buffer)
.build()
.unwrap();
let array = TimestampNanosecondArray::from(data);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "2025-06-22 12:00:00.1234568 +02:00";
assert_eq!(expected, actual);
}
#[test]
fn insert_timestamp_with_foobar_timezone() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["DATETIMEOFFSET(0)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Second, Some(Arc::from("Foobar"))),
false,
)]));
let timestamp = [11111111i64];
let buffer = Buffer::from_slice_ref(×tamp);
let data = ArrayData::builder(DataType::Timestamp(
TimeUnit::Second,
Some(Arc::from("Foobar")),
))
.len(1)
.add_buffer(buffer)
.build()
.unwrap();
let array = TimestampSecondArray::from(data);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
let result = insert_into_table(&conn, &mut reader, table_name, 5);
assert!(result.is_err());
let error = result.unwrap_err();
assert!(matches!(
error,
WriterError::InvalidTimeZone { time_zone: _ }
));
assert_eq!(
"Unable to parse 'Foobar' into a valid IANA time zone.",
error.to_string()
);
}
#[test]
fn insert_timestamp_with_milliseconds_precisions() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["DATETIME2(3)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
)]));
let array = TimestampMillisecondArray::from(vec![11111111111]);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1970-05-09 14:25:11.111";
assert_eq!(expected, actual);
}
#[test]
fn insert_timestamp_with_milliseconds_precisions_which_is_not_representable_as_i64_ns() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["DATETIME2(3)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
)]));
let ndt = NaiveDate::from_ymd_opt(1600, 6, 18)
.unwrap()
.and_hms_milli_opt(23, 12, 44, 123)
.unwrap();
let epoch_ms = ndt.and_utc().timestamp_millis();
let array = TimestampMillisecondArray::from(vec![epoch_ms]);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1600-06-18 23:12:44.123";
assert_eq!(expected, actual);
}
#[test]
fn insert_timestamp_with_microseconds_precisions() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["DATETIME2(6)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
)]));
let array = TimestampMicrosecondArray::from(vec![11111111111111]);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1970-05-09 14:25:11.111111";
assert_eq!(expected, actual);
}
#[test]
fn insert_timestamp_with_nanoseconds_precisions() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["DATETIME2(7)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
)]));
let array = TimestampNanosecondArray::from(vec![11111111111111111]);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1970-05-09 14:25:11.1111111";
assert_eq!(expected, actual);
}
#[test]
fn insert_date32_array() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["DATE"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Date32, false)]));
let array: Date32Array = [Some(0)].into_iter().collect();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1970-01-01";
assert_eq!(expected, actual);
}
#[test]
fn insert_date64_array() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["DATE"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Date64, false)]));
let array: Date64Array = [Some(0)].into_iter().collect();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "1970-01-01";
assert_eq!(expected, actual);
}
#[test]
fn insert_time32_second_array() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["TIME(0)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Time32(TimeUnit::Second),
false,
)]));
let array: Time32SecondArray = [Some(11_111)].into_iter().collect();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "03:05:11";
assert_eq!(expected, actual);
}
#[test]
fn insert_time32_ms_array() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["TIME(3)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Time32(TimeUnit::Millisecond),
false,
)]));
let array: Time32MillisecondArray = [Some(11_111_111)].into_iter().collect();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "03:05:11.111";
assert_eq!(expected, actual);
}
#[test]
fn insert_time64_us_array() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["TIME(6)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Time64(TimeUnit::Microsecond),
false,
)]));
let array: Time64MicrosecondArray = [Some(11_111_111_111)].into_iter().collect();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "03:05:11.111111";
assert_eq!(expected, actual);
}
#[test]
fn insert_time64_ns_array() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["TIME(7)"]).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Time64(TimeUnit::Nanosecond),
false,
)]));
let array: Time64NanosecondArray = [Some(11_111_111_111_111)].into_iter().collect();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "03:05:11.1111111";
assert_eq!(expected, actual);
}
#[test]
fn insert_binary() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARBINARY(4096)"]).unwrap();
let array = BinaryArray::from(vec![
Some([1, 2].as_slice()),
None,
Some([3, 4, 5, 6, 7].as_slice()),
]);
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Binary, true)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "0102\nNULL\n0304050607";
assert_eq!(expected, actual);
}
#[test]
fn insert_fixed_binary() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARBINARY(4096)"]).unwrap();
let array = BinaryArray::from(vec![
Some([1, 2].as_slice()),
None,
Some([3, 4, 5, 6, 7].as_slice()),
]);
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Binary, true)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "0102\nNULL\n0304050607";
assert_eq!(expected, actual);
}
#[test]
fn insert_decimal_128() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["NUMERIC(5,3)"]).unwrap();
let array: Decimal128Array = [Some(12345), None, Some(67891), Some(1), Some(1000)]
.into_iter()
.collect();
let array = array.with_precision_and_scale(5, 3).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Decimal128(5, 3),
true,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 2).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "12.345\nNULL\n67.891\n.001\n1.000";
assert_eq!(expected, actual);
}
#[test]
fn insert_decimal_256() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["NUMERIC(5,3)"]).unwrap();
let mut builder = Decimal256Builder::new();
let mut bytes = [0u8; 32];
type I256 = <Decimal256Type as ArrowPrimitiveType>::Native;
bytes[0..4].copy_from_slice(12345i32.to_le_bytes().as_slice());
builder.append_value(I256::from_le_bytes(bytes));
builder.append_null();
let array = builder.finish().with_precision_and_scale(5, 3).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Decimal256(5, 3),
true,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "12.345\nNULL";
assert_eq!(expected, actual);
}
#[test]
fn insert_decimal_128_with_negative_scale_psql() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(POSTGRES, Default::default())
.unwrap();
setup_empty_table::<PostgreSql>(&conn, table_name, &["NUMERIC(5,-2)"]).unwrap();
let array: Decimal128Array = [Some(123), None, Some(456), Some(1), Some(10)]
.into_iter()
.collect();
let array = array.with_precision_and_scale(3, -2).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Decimal128(3, -2),
true,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 2).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "12300\nNULL\n45600\n100\n1000";
assert_eq!(expected, actual);
}
#[test]
fn insert_decimal_256_with_negative_scale_psql() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(POSTGRES, Default::default())
.unwrap();
setup_empty_table::<PostgreSql>(&conn, table_name, &["NUMERIC(5,-2)"]).unwrap();
let mut builder = Decimal256Builder::new();
let mut bytes = [0u8; 32];
type I256 = <Decimal256Type as ArrowPrimitiveType>::Native;
bytes[0..4].copy_from_slice(123i32.to_le_bytes().as_slice());
builder.append_value(I256::from_le_bytes(bytes));
builder.append_null();
let array = builder.finish().with_precision_and_scale(3, -2).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Decimal256(3, -2),
true,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "12300\nNULL";
assert_eq!(expected, actual);
}
#[test]
fn insert_taking_ownership_of_connection() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARCHAR(4096)"]).unwrap();
let array = StringArray::from(vec![Some("Hello"), None, Some("World")]);
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let reader = StubBatchReader::new(schema.clone(), vec![batch]);
let mut inserter = {
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
let row_capacity = 50;
OdbcWriter::from_connection(conn, &schema, table_name, row_capacity).unwrap()
};
inserter.write_all(reader).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "Hello\nNULL\nWorld";
assert_eq!(expected, actual);
}
#[test]
fn insert_large_text() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARCHAR(4096)"]).unwrap();
let array = LargeStringArray::from(vec![Some("Hello"), None, Some("World")]);
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::LargeUtf8,
true,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
insert_into_table(&conn, &mut reader, table_name, 5).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "Hello\nNULL\nWorld";
assert_eq!(expected, actual);
}
#[test]
fn insert_into_column_named_like_a_resevered_keyword() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
let drop_table = &format!("DROP TABLE IF EXISTS {table_name}");
conn.execute(drop_table, (), None).unwrap();
let create_table = format!("CREATE TABLE {table_name} (id int IDENTITY(1,1),\"values\" int);");
conn.execute(&create_table, (), None).unwrap();
let array = Int32Array::from(vec![Some(42)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"\"values\"",
DataType::Int32,
true,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
let result = insert_into_table(&conn, &mut reader, table_name, 1);
result.unwrap();
let actual = table_to_string(&conn, table_name, &["\"values\""]);
let expected = "42";
assert_eq!(expected, actual);
}
#[test]
fn sanatize_column_names() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
let drop_table = &format!("DROP TABLE IF EXISTS {table_name}");
let create_table = format!(
"CREATE TABLE {table_name} (id int IDENTITY(1,1),\"column name with spaces\" INTEGER);"
);
conn.execute(drop_table, (), None).unwrap();
conn.execute(&create_table, (), None).unwrap();
let array = Int32Array::from(vec![Some(42)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"column name with spaces",
DataType::Int32,
true,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let mut reader = StubBatchReader::new(schema, vec![batch]);
let result = insert_into_table(&conn, &mut reader, table_name, 1);
assert!(result.is_ok());
let actual = table_to_string(&conn, table_name, &["\"column name with spaces\""]);
let expected = "42";
assert_eq!(expected, actual);
}
#[test]
fn fetch_integer_concurrently() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_literals(table_name, "INTEGER", "(1),(NULL),(3)");
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_any = record_batch.column(0).clone();
let array_vals = array_any.as_any().downcast_ref::<Int32Array>().unwrap();
assert!(array_vals.is_valid(0));
assert!(array_vals.is_null(1));
assert!(array_vals.is_valid(2));
assert_eq!([1, 0, 3], *array_vals.values());
}
#[test]
fn fetch_empty_cursor_concurrently() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = empty_cursor(table_name, "INTEGER");
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap();
let record_batch = reader.next();
assert!(record_batch.is_none())
}
#[test]
fn fetch_with_error_concurrently() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_value(table_name, "VARCHAR(50)", "Hello, World!");
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.with_max_text_size(1)
.build(cursor)
.unwrap()
.into_concurrent()
.unwrap();
let record_batch = reader.next().unwrap();
assert!(record_batch.is_err())
}
#[test]
fn fetch_row_groups_repeatedly_concurrently() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_literals(table_name, "INTEGER", "(1),(NULL),(3)");
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.build(cursor)
.unwrap()
.into_concurrent()
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_any = record_batch.column(0).clone();
let array_vals_1 = array_any.as_any().downcast_ref::<Int32Array>().unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_any = record_batch.column(0).clone();
let array_vals_2 = array_any.as_any().downcast_ref::<Int32Array>().unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_any = record_batch.column(0).clone();
let array_vals_3 = array_any.as_any().downcast_ref::<Int32Array>().unwrap();
assert!(array_vals_1.is_valid(0));
assert_eq!([1], *array_vals_1.values());
assert!(array_vals_2.is_null(0));
assert!(array_vals_3.is_valid(0));
assert_eq!([3], *array_vals_3.values());
}
#[test]
fn fetch_empty_cursor_concurrently_twice() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = empty_cursor(table_name, "INTEGER");
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap()
.into_concurrent()
.unwrap();
let _ = reader.next();
let record_batch = reader.next();
assert!(record_batch.is_none())
}
#[test]
fn read_multiple_result_sets_using_concurrent_cursor() {
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
let cursor = conn
.into_cursor("SELECT 1 AS A; SELECT 2 AS B;", (), None)
.unwrap()
.unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.build(cursor)
.unwrap()
.into_concurrent()
.unwrap();
let first = reader.next().unwrap().unwrap();
let cursor = reader.into_cursor().unwrap();
let cursor = cursor.more_results().unwrap().unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.build(cursor)
.unwrap()
.into_concurrent()
.unwrap();
let second = reader.next().unwrap().unwrap();
let first_vals = first
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(1, first_vals.value(0));
let second_vals = second
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(2, second_vals.value(0));
}
#[test]
fn promote_sequential_to_concurrent_cursor() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_value(table_name, "INTEGER", 42);
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap()
.into_concurrent()
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_any = record_batch.column(0).clone();
let array_vals = array_any.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!([42], *array_vals.values());
}
#[test]
fn concurrent_reader_is_send() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over_value(table_name, "INTEGER", 42);
let mut concurrent_reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap()
.into_concurrent()
.unwrap();
let record_batch = thread::spawn(move || concurrent_reader.next().unwrap().unwrap())
.join()
.unwrap();
let array_any = record_batch.column(0).clone();
let array_vals = array_any.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!([42], *array_vals.values());
}
#[test]
fn support_shared_ownership_of_connections_for_writer() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &["VARCHAR(50)"]).unwrap();
let array = StringArray::from(vec![Some("Hello"), None, Some("World")]);
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let reader = StubBatchReader::new(schema.clone(), vec![batch]);
let mut inserter = {
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
let shared_connection = Arc::new(Mutex::new(conn));
let row_capacity = 50;
OdbcWriter::from_connection(shared_connection, &schema, table_name, row_capacity).unwrap()
};
inserter.write_all(reader).unwrap();
let actual = table_to_string(&conn, table_name, &["a"]);
let expected = "Hello\nNULL\nWorld";
assert_eq!(expected, actual);
}
#[test]
fn varchar_1000_psql() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(POSTGRES, ConnectionOptions::default())
.unwrap();
setup_empty_table::<PostgreSql>(&conn, table_name, &["VARCHAR(1000)"]).unwrap();
let long_text_with_special_characters = "ê°€".repeat(1000);
conn.execute(
&format!("INSERT INTO {table_name} (a) VALUES(?);"),
&(long_text_with_special_characters.as_str()).into_parameter(),
None,
)
.unwrap();
let cursor = conn
.into_cursor(&format!("SELECT a FROM {table_name}"), (), None)
.unwrap()
.unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.build(cursor)
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_any = record_batch.column(0).clone();
let array_vals = array_any.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(1, array_vals.len());
assert_eq!(long_text_with_special_characters, array_vals.value(0));
}
#[test]
#[ignore = "Currently trouble with automating setting up DB2 on CI and local development"]
fn blob_on_db2() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let conn = env()
.connect_with_connection_string(DB2, ConnectionOptions::default())
.unwrap();
setup_empty_table::<Db2>(&conn, table_name, &["BLOB"]).unwrap();
let blob_data: Vec<u8> = (0..=255).collect();
conn.execute(
&format!("INSERT INTO {table_name} (a) VALUES (?)"),
&(blob_data.as_slice()).into_parameter(),
None,
)
.unwrap();
let dbms_name = conn.database_management_system_name().unwrap();
eprintln!("DB2 DBMS Name: {dbms_name}");
let cursor = conn
.into_cursor(&format!("SELECT a FROM {table_name}"), (), None)
.unwrap()
.unwrap();
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(1)
.with_dbms_name(dbms_name)
.build(cursor)
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_any = record_batch.column(0).clone();
let array_vals = array_any.as_any().downcast_ref::<BinaryArray>().unwrap();
assert_eq!(1, array_vals.len());
assert_eq!(&blob_data, array_vals.value(0));
}
fn setup_empty_table_mssql(
conn: &Connection,
table_name: &str,
column_types: &[&str],
) -> Result<(), odbc_api::Error> {
setup_empty_table::<MsSql>(conn, table_name, column_types)
}
fn setup_empty_table<D: Dbms>(
conn: &Connection,
table_name: &str,
column_types: &[&str],
) -> Result<(), odbc_api::Error> {
let drop_table = &format!("DROP TABLE IF EXISTS {table_name}");
let column_names = &["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"];
let cols = column_types
.iter()
.zip(column_names)
.map(|(ty, name)| format!("{name} {ty}"))
.collect::<Vec<_>>()
.join(", ");
let identity = D::identity_column();
let create_table = format!("CREATE TABLE {table_name} ({identity},{cols});");
conn.execute(drop_table, (), None)?;
conn.execute(&create_table, (), None)?;
Ok(())
}
trait Dbms {
fn identity_column() -> &'static str;
}
struct MsSql;
impl Dbms for MsSql {
fn identity_column() -> &'static str {
"id int IDENTITY(1,1)"
}
}
struct PostgreSql;
impl Dbms for PostgreSql {
fn identity_column() -> &'static str {
"id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY"
}
}
struct Db2;
impl Dbms for Db2 {
fn identity_column() -> &'static str {
"id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY"
}
}
pub fn table_to_string(conn: &Connection<'_>, table_name: &str, column_names: &[&str]) -> String {
let cols = column_names.join(", ");
let query = format!("SELECT {cols} FROM {table_name}");
let cursor = conn.execute(&query, (), None).unwrap().unwrap();
cursor_to_string(cursor)
}
pub fn cursor_to_string(mut cursor: impl Cursor) -> String {
let batch_size = 20;
let mut buffer = TextRowSet::for_cursor(batch_size, &mut cursor, Some(8192)).unwrap();
let mut row_set_cursor = cursor.bind_buffer(&mut buffer).unwrap();
let mut text = String::new();
while let Some(row_set) = row_set_cursor.fetch().unwrap() {
for row_index in 0..row_set.num_rows() {
if row_index != 0 {
text.push('\n');
}
for col_index in 0..row_set.num_cols() {
if col_index != 0 {
text.push(',');
}
text.push_str(
row_set
.at_as_str(col_index, row_index)
.unwrap()
.unwrap_or("NULL"),
);
}
}
}
text
}
fn fetch_arrow_data(
table_name: &str,
column_type: &str,
literal: &str,
) -> Result<ArrayRef, anyhow::Error> {
let cursor = cursor_over_literals(table_name, column_type, literal);
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(100)
.build(cursor)
.unwrap()
.into_concurrent()
.unwrap();
let record_batch = reader.next().unwrap()?;
Ok(record_batch.column(0).clone())
}
fn cursor_over_literals(
table_name: &str,
column_type: &str,
literal: &str,
) -> CursorImpl<StatementConnection<Connection<'static>>> {
let conn = env()
.connect_with_connection_string(MSSQL, ConnectionOptions::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &[column_type]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES {literal}");
conn.execute(&sql, (), None).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let cursor = conn.into_cursor(&sql, (), None).unwrap().unwrap();
cursor
}
fn cursor_over_value(
table_name: &str,
column_type: &str,
value: impl IntoParameter,
) -> CursorImpl<StatementConnection<Connection<'static>>> {
let conn = env()
.connect_with_connection_string(MSSQL, ConnectionOptions::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &[column_type]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES (?)");
conn.execute(&sql, &value.into_parameter(), None).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let cursor = conn.into_cursor(&sql, (), None).unwrap().unwrap();
cursor
}
fn empty_cursor(
table_name: &str,
column_type: &str,
) -> CursorImpl<StatementConnection<Connection<'static>>> {
let conn = env()
.connect_with_connection_string(MSSQL, ConnectionOptions::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &[column_type]).unwrap();
let sql = format!("SELECT a FROM {table_name}");
let cursor = conn.into_cursor(&sql, (), None).unwrap().unwrap();
cursor
}
fn query_single_value(
table_name: &str,
column_type: &str,
value: impl IntoParameter,
) -> impl Cursor {
let conn = env()
.connect_with_connection_string(MSSQL, Default::default())
.unwrap();
setup_empty_table_mssql(&conn, table_name, &[column_type]).unwrap();
let sql = format!("INSERT INTO {table_name} (a) VALUES (?)");
conn.execute(&sql, &value.into_parameter(), None).unwrap();
let sql = format!("SELECT a FROM {table_name}");
conn.into_cursor(&sql, (), None).unwrap().unwrap()
}
struct StubBatchReader {
schema: SchemaRef,
batches: Vec<RecordBatch>,
}
impl StubBatchReader {
pub fn new(schema: SchemaRef, mut batches: Vec<RecordBatch>) -> Self {
batches.reverse();
Self { schema, batches }
}
}
impl Iterator for StubBatchReader {
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
self.batches.pop().map(Ok)
}
}
impl RecordBatchReader for StubBatchReader {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}