use std::sync::Arc;
use arrow::array::{
ArrayRef, BinaryBuilder, BooleanBuilder, Float32Builder, Float64Builder, Int16Builder,
Int32Builder, Int64Builder, Int8Builder, LargeBinaryBuilder, LargeStringBuilder, StringBuilder,
UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder,
};
use arrow::datatypes::DataType;
use arrow::record_batch::RecordBatch;
use crate::error::SyncError;
use rhei_core::types::{CdcEvent, CdcOperation};
use rhei_core::TableSchema;
pub fn cdc_event_to_dml(event: &CdcEvent, schema: &Arc<TableSchema>) -> Result<String, SyncError> {
rhei_core::validate_identifier(&event.table)
.map_err(|e| SyncError::Conversion(e.to_string()))?;
match event.operation {
CdcOperation::Insert => build_insert(event, schema),
CdcOperation::Update => build_update(event, schema),
CdcOperation::Delete => build_delete(event, schema),
}
}
fn build_insert(event: &CdcEvent, _schema: &Arc<TableSchema>) -> Result<String, SyncError> {
let data = event
.new_data
.as_ref()
.ok_or_else(|| SyncError::Conversion("INSERT event missing new_data".into()))?;
let obj = data
.as_object()
.ok_or_else(|| SyncError::Conversion("new_data is not a JSON object".into()))?;
for key in obj.keys() {
rhei_core::validate_identifier(key).map_err(|e| SyncError::Conversion(e.to_string()))?;
}
let columns: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
let values: Vec<String> = obj
.values()
.map(json_value_to_sql)
.collect::<Result<Vec<_>, _>>()?;
Ok(format!(
"INSERT INTO {} ({}) VALUES ({})",
event.table,
columns.join(", "),
values.join(", ")
))
}
fn build_update(event: &CdcEvent, schema: &Arc<TableSchema>) -> Result<String, SyncError> {
let new_data = event
.new_data
.as_ref()
.ok_or_else(|| SyncError::Conversion("UPDATE event missing new_data".into()))?;
let obj = new_data
.as_object()
.ok_or_else(|| SyncError::Conversion("new_data is not a JSON object".into()))?;
let set_parts: Vec<String> = obj
.iter()
.filter(|(k, _)| !schema.primary_key.contains(k))
.map(|(k, v)| json_value_to_sql(v).map(|sql| format!("{} = {}", k, sql)))
.collect::<Result<Vec<_>, _>>()?;
if set_parts.is_empty() {
return Ok(String::new());
}
let where_parts = build_pk_where(event, schema)?;
Ok(format!(
"UPDATE {} SET {} WHERE {}",
event.table,
set_parts.join(", "),
where_parts
))
}
fn build_delete(event: &CdcEvent, schema: &Arc<TableSchema>) -> Result<String, SyncError> {
let where_parts = build_pk_where(event, schema)?;
Ok(format!("DELETE FROM {} WHERE {}", event.table, where_parts))
}
pub(crate) fn build_pk_where(
event: &CdcEvent,
schema: &Arc<TableSchema>,
) -> Result<String, SyncError> {
let data = event
.old_data
.as_ref()
.or(event.new_data.as_ref())
.ok_or_else(|| SyncError::Conversion("event has neither old_data nor new_data".into()))?;
let obj = data
.as_object()
.ok_or_else(|| SyncError::Conversion("data is not a JSON object".into()))?;
let mut parts: Vec<String> = Vec::with_capacity(schema.primary_key.len());
for pk in &schema.primary_key {
let val = obj.get(pk).ok_or_else(|| {
SyncError::Conversion(format!(
"primary key column '{}' missing from CDC event for table '{}'",
pk, event.table
))
})?;
if val.is_null() {
return Err(SyncError::Conversion(format!(
"primary key column '{}' is NULL in CDC event for table '{}' (cannot build WHERE clause)",
pk, event.table
)));
}
parts.push(format!("{} = {}", pk, json_value_to_sql(val)?));
}
Ok(parts.join(" AND "))
}
pub fn build_batch_insert(
events: &[&CdcEvent],
_schema: &Arc<TableSchema>,
) -> Result<String, SyncError> {
if events.is_empty() {
return Err(SyncError::Conversion("empty batch".into()));
}
let table = &events[0].table;
rhei_core::validate_identifier(table).map_err(|e| SyncError::Conversion(e.to_string()))?;
let first_data = events[0]
.new_data
.as_ref()
.ok_or_else(|| SyncError::Conversion("INSERT event missing new_data".into()))?;
let first_obj = first_data
.as_object()
.ok_or_else(|| SyncError::Conversion("new_data is not a JSON object".into()))?;
let columns: Vec<&str> = {
let mut cols: Vec<&str> = first_obj.keys().map(|k| k.as_str()).collect();
cols.sort(); cols
};
for col in &columns {
rhei_core::validate_identifier(col).map_err(|e| SyncError::Conversion(e.to_string()))?;
}
let mut value_rows: Vec<String> = Vec::with_capacity(events.len());
for event in events {
let data = event
.new_data
.as_ref()
.ok_or_else(|| SyncError::Conversion("INSERT event missing new_data".into()))?;
let obj = data
.as_object()
.ok_or_else(|| SyncError::Conversion("new_data is not a JSON object".into()))?;
let mut event_cols: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
event_cols.sort();
if event_cols != columns {
return Err(SyncError::Conversion(format!(
"column mismatch in batch INSERT for table '{}': expected {:?}, got {:?}",
event.table, columns, event_cols
)));
}
let values: Vec<String> = columns
.iter()
.map(|col| match obj.get(*col) {
Some(v) => json_value_to_sql(v),
None => Ok("NULL".to_string()),
})
.collect::<Result<Vec<_>, _>>()?;
value_rows.push(format!("({})", values.join(", ")));
}
Ok(format!(
"INSERT INTO {} ({}) VALUES {}",
table,
columns.join(", "),
value_rows.join(", ")
))
}
pub fn cdc_events_to_batch(
events: &[&CdcEvent],
schema: &Arc<TableSchema>,
) -> Result<RecordBatch, SyncError> {
if events.is_empty() {
return Err(SyncError::Conversion("empty event batch".into()));
}
let n = events.len();
let arrow_schema = &schema.arrow_schema;
let mut columns: Vec<ArrayRef> = Vec::with_capacity(arrow_schema.fields().len());
for field in arrow_schema.fields() {
let col_name = field.name().as_str();
let array: ArrayRef = match field.data_type() {
DataType::Int8 => {
let mut b = Int8Builder::with_capacity(n);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::Number(num)) => {
let v = num.as_i64().ok_or_else(|| {
SyncError::Conversion(format!(
"cannot coerce {num} to Int8 for column '{col_name}'"
))
})?;
let narrow = i8::try_from(v).map_err(|_| {
SyncError::Conversion(format!(
"value {v} out of range for Int8 column '{col_name}' \
(valid range: {} to {})",
i8::MIN,
i8::MAX
))
})?;
b.append_value(narrow)
}
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected number, got {other:?}"
)))
}
}
}
Arc::new(b.finish())
}
DataType::Int16 => {
let mut b = Int16Builder::with_capacity(n);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::Number(num)) => {
let v = num.as_i64().ok_or_else(|| {
SyncError::Conversion(format!(
"cannot coerce {num} to Int16 for column '{col_name}'"
))
})?;
let narrow = i16::try_from(v).map_err(|_| {
SyncError::Conversion(format!(
"value {v} out of range for Int16 column '{col_name}' \
(valid range: {} to {})",
i16::MIN,
i16::MAX
))
})?;
b.append_value(narrow)
}
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected number, got {other:?}"
)))
}
}
}
Arc::new(b.finish())
}
DataType::Int32 => {
let mut b = Int32Builder::with_capacity(n);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::Number(num)) => {
let v = num.as_i64().ok_or_else(|| {
SyncError::Conversion(format!(
"cannot coerce {num} to Int32 for column '{col_name}'"
))
})?;
let narrow = i32::try_from(v).map_err(|_| {
SyncError::Conversion(format!(
"value {v} out of range for Int32 column '{col_name}' \
(valid range: {} to {})",
i32::MIN,
i32::MAX
))
})?;
b.append_value(narrow)
}
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected number, got {other:?}"
)))
}
}
}
Arc::new(b.finish())
}
DataType::Int64 => {
let mut b = Int64Builder::with_capacity(n);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::Number(num)) => {
b.append_value(num.as_i64().ok_or_else(|| {
SyncError::Conversion(format!(
"cannot coerce {} to Int64 for column '{col_name}'",
num
))
})?)
}
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected number, got {other:?}"
)))
}
}
}
Arc::new(b.finish())
}
DataType::UInt8 => {
let mut b = UInt8Builder::with_capacity(n);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::Number(num)) => {
let v = num.as_u64().ok_or_else(|| {
SyncError::Conversion(format!(
"cannot coerce {num} to UInt8 for column '{col_name}'"
))
})?;
let narrow = u8::try_from(v).map_err(|_| {
SyncError::Conversion(format!(
"value {v} out of range for UInt8 column '{col_name}' \
(valid range: 0 to {})",
u8::MAX
))
})?;
b.append_value(narrow)
}
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected number, got {other:?}"
)))
}
}
}
Arc::new(b.finish())
}
DataType::UInt16 => {
let mut b = UInt16Builder::with_capacity(n);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::Number(num)) => {
let v = num.as_u64().ok_or_else(|| {
SyncError::Conversion(format!(
"cannot coerce {num} to UInt16 for column '{col_name}'"
))
})?;
let narrow = u16::try_from(v).map_err(|_| {
SyncError::Conversion(format!(
"value {v} out of range for UInt16 column '{col_name}' \
(valid range: 0 to {})",
u16::MAX
))
})?;
b.append_value(narrow)
}
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected number, got {other:?}"
)))
}
}
}
Arc::new(b.finish())
}
DataType::UInt32 => {
let mut b = UInt32Builder::with_capacity(n);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::Number(num)) => {
let v = num.as_u64().ok_or_else(|| {
SyncError::Conversion(format!(
"cannot coerce {num} to UInt32 for column '{col_name}'"
))
})?;
let narrow = u32::try_from(v).map_err(|_| {
SyncError::Conversion(format!(
"value {v} out of range for UInt32 column '{col_name}' \
(valid range: 0 to {})",
u32::MAX
))
})?;
b.append_value(narrow)
}
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected number, got {other:?}"
)))
}
}
}
Arc::new(b.finish())
}
DataType::UInt64 => {
let mut b = UInt64Builder::with_capacity(n);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::Number(num)) => {
b.append_value(num.as_u64().ok_or_else(|| {
SyncError::Conversion(format!(
"cannot coerce {} to UInt64 for column '{col_name}'",
num
))
})?)
}
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected number, got {other:?}"
)))
}
}
}
Arc::new(b.finish())
}
DataType::Float32 => {
let mut b = Float32Builder::with_capacity(n);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::Number(num)) => {
b.append_value(num.as_f64().ok_or_else(|| {
SyncError::Conversion(format!(
"cannot coerce {} to Float32 for column '{col_name}'",
num
))
})? as f32)
}
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected number, got {other:?}"
)))
}
}
}
Arc::new(b.finish())
}
DataType::Float64 => {
let mut b = Float64Builder::with_capacity(n);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::Number(num)) => {
b.append_value(num.as_f64().ok_or_else(|| {
SyncError::Conversion(format!(
"cannot coerce {} to Float64 for column '{col_name}'",
num
))
})?)
}
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected number, got {other:?}"
)))
}
}
}
Arc::new(b.finish())
}
DataType::Boolean => {
let mut b = BooleanBuilder::with_capacity(n);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::Bool(v)) => b.append_value(*v),
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected boolean, got {other:?}"
)))
}
}
}
Arc::new(b.finish())
}
DataType::Utf8 => {
let mut b = StringBuilder::with_capacity(n, n * 16);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::String(s)) => b.append_value(s),
Some(serde_json::Value::Number(num)) => b.append_value(num.to_string()),
Some(serde_json::Value::Bool(v)) => {
b.append_value(if *v { "true" } else { "false" })
}
Some(serde_json::Value::Array(_)) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': JSON array cannot be stored as Utf8"
)))
}
Some(serde_json::Value::Object(_)) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': JSON object cannot be stored as Utf8"
)))
}
}
}
Arc::new(b.finish())
}
DataType::LargeUtf8 => {
let mut b = LargeStringBuilder::with_capacity(n, n * 16);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::String(s)) => b.append_value(s),
Some(serde_json::Value::Number(num)) => b.append_value(num.to_string()),
Some(serde_json::Value::Bool(v)) => {
b.append_value(if *v { "true" } else { "false" })
}
Some(serde_json::Value::Array(_)) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': JSON array cannot be stored as LargeUtf8"
)))
}
Some(serde_json::Value::Object(_)) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': JSON object cannot be stored as LargeUtf8"
)))
}
}
}
Arc::new(b.finish())
}
DataType::Binary => {
let mut b = BinaryBuilder::with_capacity(n, n * 16);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::String(s)) => b.append_value(s.as_bytes()),
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected string for Binary, got {other:?}"
)))
}
}
}
Arc::new(b.finish())
}
DataType::LargeBinary => {
let mut b = LargeBinaryBuilder::with_capacity(n, n * 16);
for ev in events {
match ev.new_data.as_ref().and_then(|d| d.get(col_name)) {
None | Some(serde_json::Value::Null) => b.append_null(),
Some(serde_json::Value::String(s)) => b.append_value(s.as_bytes()),
Some(other) => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': expected string for LargeBinary, got \
{other:?}"
)))
}
}
}
Arc::new(b.finish())
}
unsupported => {
return Err(SyncError::UnsupportedType(format!(
"column '{col_name}': Arrow type {unsupported:?} is not supported \
by cdc_events_to_batch — use the SQL path instead"
)))
}
};
columns.push(array);
}
RecordBatch::try_new(arrow_schema.clone(), columns)
.map_err(|e| SyncError::Conversion(format!("failed to build RecordBatch: {e}")))
}
pub(crate) fn json_value_to_sql(val: &serde_json::Value) -> Result<String, SyncError> {
match val {
serde_json::Value::Null => Ok("NULL".to_string()),
serde_json::Value::Bool(b) => Ok(if *b { "TRUE" } else { "FALSE" }.to_string()),
serde_json::Value::Number(n) => Ok(n.to_string()),
serde_json::Value::String(s) => Ok(format!("'{}'", s.replace('\'', "''"))),
serde_json::Value::Array(_) => Err(SyncError::UnsupportedType(
"JSON array (nested values not supported in DML generation)".into(),
)),
serde_json::Value::Object(_) => Err(SyncError::UnsupportedType(
"JSON object (nested values not supported in DML generation)".into(),
)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::sync::Arc;
fn test_schema() -> Arc<TableSchema> {
use arrow::datatypes::{DataType, Field, Schema};
Arc::new(TableSchema::new(
"users",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
Field::new("age", DataType::Int64, true),
])),
vec!["id".to_string()],
))
}
#[test]
fn test_insert_dml() {
let event = CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "users".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "name": "Alice", "age": 30})),
};
let sql = cdc_event_to_dml(&event, &test_schema()).unwrap();
assert!(sql.starts_with("INSERT INTO users"));
assert!(sql.contains("Alice"));
}
#[test]
fn test_update_dml() {
let event = CdcEvent {
seq: 2,
timestamp: 1001,
operation: CdcOperation::Update,
table: "users".into(),
row_id: Some(1),
old_data: Some(json!({"id": 1, "name": "Alice", "age": 30})),
new_data: Some(json!({"id": 1, "name": "Bob", "age": 31})),
};
let sql = cdc_event_to_dml(&event, &test_schema()).unwrap();
assert!(sql.starts_with("UPDATE users SET"));
assert!(sql.contains("WHERE id = 1"));
}
#[test]
fn test_batch_insert() {
let events = vec![
CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "users".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "name": "Alice", "age": 30})),
},
CdcEvent {
seq: 2,
timestamp: 1001,
operation: CdcOperation::Insert,
table: "users".into(),
row_id: Some(2),
old_data: None,
new_data: Some(json!({"id": 2, "name": "Bob", "age": 25})),
},
];
let refs: Vec<&CdcEvent> = events.iter().collect();
let sql = build_batch_insert(&refs, &test_schema()).unwrap();
assert!(sql.starts_with("INSERT INTO users"));
assert_eq!(sql.matches('(').count(), 3);
assert!(sql.contains("Alice"));
assert!(sql.contains("Bob"));
}
#[test]
fn test_unsupported_json_array_errors() {
let event = CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "users".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "name": "Alice", "age": 30, "tags": ["a", "b"]})),
};
let err = cdc_event_to_dml(&event, &test_schema()).unwrap_err();
match err {
SyncError::UnsupportedType(msg) => assert!(msg.contains("array")),
other => panic!("expected UnsupportedType, got {other:?}"),
}
}
#[test]
fn test_unsupported_json_object_errors() {
let val = serde_json::json!({"nested": "object"});
let err = json_value_to_sql(&val).unwrap_err();
match err {
SyncError::UnsupportedType(msg) => assert!(msg.contains("object")),
other => panic!("expected UnsupportedType, got {other:?}"),
}
}
#[test]
fn test_delete_dml() {
let event = CdcEvent {
seq: 3,
timestamp: 1002,
operation: CdcOperation::Delete,
table: "users".into(),
row_id: Some(1),
old_data: Some(json!({"id": 1, "name": "Alice", "age": 30})),
new_data: None,
};
let sql = cdc_event_to_dml(&event, &test_schema()).unwrap();
assert_eq!(sql, "DELETE FROM users WHERE id = 1");
}
#[test]
fn test_events_to_batch_basic() {
let events = vec![
CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "users".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "name": "Alice", "age": 30})),
},
CdcEvent {
seq: 2,
timestamp: 1001,
operation: CdcOperation::Insert,
table: "users".into(),
row_id: Some(2),
old_data: None,
new_data: Some(json!({"id": 2, "name": "Bob", "age": 25})),
},
];
let refs: Vec<&CdcEvent> = events.iter().collect();
let batch = cdc_events_to_batch(&refs, &test_schema()).unwrap();
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 3);
assert_eq!(batch.schema(), test_schema().arrow_schema);
}
#[test]
fn test_events_to_batch_nullable_columns() {
let events = vec![
CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "users".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "name": null, "age": 30})),
},
CdcEvent {
seq: 2,
timestamp: 1001,
operation: CdcOperation::Insert,
table: "users".into(),
row_id: Some(2),
old_data: None,
new_data: Some(json!({"id": 2, "name": "Bob", "age": null})),
},
];
let refs: Vec<&CdcEvent> = events.iter().collect();
let batch = cdc_events_to_batch(&refs, &test_schema()).unwrap();
assert_eq!(batch.num_rows(), 2);
let name_col = batch.column(1);
assert!(name_col.is_null(0));
assert!(!name_col.is_null(1));
let age_col = batch.column(2);
assert!(!age_col.is_null(0));
assert!(age_col.is_null(1));
}
#[test]
fn test_events_to_batch_missing_column_becomes_null() {
let events = vec![CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "users".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "name": "Alice"})),
}];
let refs: Vec<&CdcEvent> = events.iter().collect();
let batch = cdc_events_to_batch(&refs, &test_schema()).unwrap();
assert_eq!(batch.num_rows(), 1);
let age_col = batch.column(2);
assert!(age_col.is_null(0), "missing column should produce NULL");
}
#[test]
fn test_events_to_batch_unsupported_json_array() {
use arrow::datatypes::{DataType, Field, Schema};
let schema = Arc::new(TableSchema::new(
"t",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("tags", DataType::Utf8, true),
])),
vec!["id".to_string()],
));
let events = vec![CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "t".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "tags": ["a", "b"]})),
}];
let refs: Vec<&CdcEvent> = events.iter().collect();
let err = cdc_events_to_batch(&refs, &schema).unwrap_err();
match err {
SyncError::UnsupportedType(msg) => {
assert!(msg.contains("array"), "expected 'array' in: {msg}")
}
other => panic!("expected UnsupportedType, got {other:?}"),
}
}
#[test]
fn test_events_to_batch_unsupported_arrow_type() {
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
let schema = Arc::new(TableSchema::new(
"t",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
])),
vec!["id".to_string()],
));
let events = vec![CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "t".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "created_at": 1234567890})),
}];
let refs: Vec<&CdcEvent> = events.iter().collect();
let err = cdc_events_to_batch(&refs, &schema).unwrap_err();
assert!(
matches!(err, SyncError::UnsupportedType(_)),
"expected UnsupportedType, got {err:?}"
);
}
#[test]
fn test_events_to_batch_empty_returns_error() {
let refs: Vec<&CdcEvent> = vec![];
let err = cdc_events_to_batch(&refs, &test_schema()).unwrap_err();
assert!(
matches!(err, SyncError::Conversion(_)),
"expected Conversion error for empty batch"
);
}
#[test]
fn test_events_to_batch_float_and_bool() {
use arrow::datatypes::{DataType, Field, Schema};
let schema = Arc::new(TableSchema::new(
"metrics",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("score", DataType::Float64, true),
Field::new("active", DataType::Boolean, true),
])),
vec!["id".to_string()],
));
let events = vec![CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "metrics".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "score": 3.14, "active": true})),
}];
let refs: Vec<&CdcEvent> = events.iter().collect();
let batch = cdc_events_to_batch(&refs, &schema).unwrap();
assert_eq!(batch.num_rows(), 1);
assert!(!batch.column(1).is_null(0)); assert!(!batch.column(2).is_null(0)); }
#[test]
fn test_int8_overflow_returns_conversion_error() {
use arrow::datatypes::{DataType, Field, Schema};
let schema = Arc::new(TableSchema::new(
"t",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("byte_col", DataType::Int8, true),
])),
vec!["id".to_string()],
));
let events = vec![CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "t".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "byte_col": 128})),
}];
let refs: Vec<&CdcEvent> = events.iter().collect();
let err = cdc_events_to_batch(&refs, &schema).unwrap_err();
match err {
SyncError::Conversion(msg) => {
assert!(
msg.contains("out of range") || msg.contains("Int8"),
"expected out-of-range message, got: {msg}"
);
}
other => panic!("expected Conversion error, got {other:?}"),
}
}
#[test]
fn test_int8_in_range_succeeds() {
use arrow::datatypes::{DataType, Field, Schema};
let schema = Arc::new(TableSchema::new(
"t",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("byte_col", DataType::Int8, true),
])),
vec!["id".to_string()],
));
let events = vec![CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "t".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "byte_col": 127})),
}];
let refs: Vec<&CdcEvent> = events.iter().collect();
let batch = cdc_events_to_batch(&refs, &schema).unwrap();
assert_eq!(batch.num_rows(), 1);
assert!(!batch.column(1).is_null(0));
}
#[test]
fn test_uint8_overflow_returns_conversion_error() {
use arrow::datatypes::{DataType, Field, Schema};
let schema = Arc::new(TableSchema::new(
"t",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("ubyte_col", DataType::UInt8, true),
])),
vec!["id".to_string()],
));
let events = vec![CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "t".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "ubyte_col": 256})),
}];
let refs: Vec<&CdcEvent> = events.iter().collect();
let err = cdc_events_to_batch(&refs, &schema).unwrap_err();
match err {
SyncError::Conversion(msg) => {
assert!(
msg.contains("out of range") || msg.contains("UInt8"),
"expected out-of-range message, got: {msg}"
);
}
other => panic!("expected Conversion error, got {other:?}"),
}
}
}