use std::sync::Arc;
use arrow::array::{new_null_array, ArrayRef, Int64Builder, StringArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use crate::converter::{build_pk_where, cdc_events_to_batch, json_value_to_sql};
use crate::error::SyncError;
use rhei_core::types::{CdcEvent, CdcOperation};
use rhei_core::TableSchema;
pub const VALID_FROM_COL: &str = "_rhei_valid_from";
pub const VALID_TO_COL: &str = "_rhei_valid_to";
pub const OPERATION_COL: &str = "_rhei_operation";
const OP_INSERT: &str = "'I'";
const OP_UPDATE: &str = "'U'";
const OP_DELETE: &str = "'D'";
pub fn temporalize_schema(schema: &SchemaRef) -> SchemaRef {
let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
fields.push(Arc::new(Field::new(VALID_FROM_COL, DataType::Int64, false)));
fields.push(Arc::new(Field::new(VALID_TO_COL, DataType::Int64, true)));
fields.push(Arc::new(Field::new(OPERATION_COL, DataType::Utf8, false)));
Arc::new(Schema::new(fields))
}
pub fn cdc_event_to_temporal_dml(
event: &CdcEvent,
schema: &Arc<TableSchema>,
) -> Result<Vec<String>, SyncError> {
rhei_core::validate_identifier(&event.table)
.map_err(|e| SyncError::Conversion(e.to_string()))?;
match event.operation {
CdcOperation::Insert => {
let sql = build_temporal_insert(event, schema, OP_INSERT)?;
Ok(vec![sql])
}
CdcOperation::Update => {
let close = build_close_version(event, schema)?;
let insert = build_temporal_insert(event, schema, OP_UPDATE)?;
Ok(vec![close, insert])
}
CdcOperation::Delete => {
let close = build_close_version(event, schema)?;
let tombstone = build_temporal_tombstone(event, schema)?;
Ok(vec![close, tombstone])
}
}
}
fn build_temporal_insert(
event: &CdcEvent,
_schema: &Arc<TableSchema>,
op_code: &str,
) -> Result<String, SyncError> {
let data = event
.new_data
.as_ref()
.ok_or_else(|| SyncError::Conversion("event missing new_data".into()))?;
let obj = data
.as_object()
.ok_or_else(|| SyncError::Conversion("new_data is not a JSON object".into()))?;
for key in obj.keys() {
rhei_core::validate_identifier(key).map_err(|e| SyncError::Conversion(e.to_string()))?;
}
let mut columns: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
let mut values: Vec<String> = obj
.values()
.map(json_value_to_sql)
.collect::<Result<Vec<_>, _>>()?;
columns.push(VALID_FROM_COL);
values.push(event.timestamp.to_string());
columns.push(VALID_TO_COL);
values.push("NULL".to_string());
columns.push(OPERATION_COL);
values.push(op_code.to_string());
Ok(format!(
"INSERT INTO {} ({}) VALUES ({})",
event.table,
columns.join(", "),
values.join(", ")
))
}
fn build_close_version(event: &CdcEvent, schema: &Arc<TableSchema>) -> Result<String, SyncError> {
let pk_where = build_pk_where(event, schema)?;
Ok(format!(
"UPDATE {} SET {} = {} WHERE {} AND {} IS NULL",
event.table, VALID_TO_COL, event.timestamp, pk_where, VALID_TO_COL
))
}
fn build_temporal_tombstone(
event: &CdcEvent,
schema: &Arc<TableSchema>,
) -> Result<String, SyncError> {
let data = event
.old_data
.as_ref()
.or(event.new_data.as_ref())
.ok_or_else(|| {
SyncError::Conversion("DELETE event has neither old_data nor new_data".into())
})?;
let obj = data
.as_object()
.ok_or_else(|| SyncError::Conversion("data is not a JSON object".into()))?;
for pk in &schema.primary_key {
let val = obj.get(pk).ok_or_else(|| {
SyncError::Conversion(format!(
"primary key column '{}' missing from DELETE event for table '{}'",
pk, event.table
))
})?;
if val.is_null() {
return Err(SyncError::Conversion(format!(
"primary key column '{}' is NULL in DELETE event for table '{}' (cannot build tombstone)",
pk, event.table
)));
}
}
let field_count = schema.arrow_schema.fields().len();
let mut columns: Vec<&str> = Vec::with_capacity(field_count + 3);
let mut values: Vec<String> = Vec::with_capacity(field_count + 3);
for field in schema.arrow_schema.fields() {
let name = field.name().as_str();
columns.push(name);
if let Some(val) = obj.get(name) {
values.push(json_value_to_sql(val)?);
} else {
values.push("NULL".to_string());
}
}
columns.push(VALID_FROM_COL);
values.push(event.timestamp.to_string());
columns.push(VALID_TO_COL);
values.push("NULL".to_string());
columns.push(OPERATION_COL);
values.push(OP_DELETE.to_string());
Ok(format!(
"INSERT INTO {} ({}) VALUES ({})",
event.table,
columns.join(", "),
values.join(", ")
))
}
pub fn cdc_events_to_temporal_batch(
events: &[&CdcEvent],
schema: &Arc<TableSchema>,
) -> Result<RecordBatch, SyncError> {
if events.is_empty() {
return Err(SyncError::Conversion("empty event batch".into()));
}
let base = cdc_events_to_batch(events, schema)?;
let n = base.num_rows();
let mut valid_from_builder = Int64Builder::with_capacity(n);
for ev in events {
valid_from_builder.append_value(ev.timestamp);
}
let valid_from: ArrayRef = Arc::new(valid_from_builder.finish());
let valid_to: ArrayRef = new_null_array(&DataType::Int64, n);
let operation: ArrayRef = Arc::new(StringArray::from_iter_values(std::iter::repeat_n("I", n)));
let temporal_schema = temporalize_schema(&base.schema());
let mut columns: Vec<ArrayRef> = base.columns().to_vec();
columns.push(valid_from);
columns.push(valid_to);
columns.push(operation);
RecordBatch::try_new(temporal_schema, columns)
.map_err(|e| SyncError::Conversion(format!("failed to build temporal RecordBatch: {e}")))
}
pub fn build_temporal_batch_insert(
events: &[&CdcEvent],
_schema: &Arc<TableSchema>,
) -> Result<String, SyncError> {
if events.is_empty() {
return Err(SyncError::Conversion("empty batch".into()));
}
let table = &events[0].table;
rhei_core::validate_identifier(table).map_err(|e| SyncError::Conversion(e.to_string()))?;
let first_data = events[0]
.new_data
.as_ref()
.ok_or_else(|| SyncError::Conversion("INSERT event missing new_data".into()))?;
let first_obj = first_data
.as_object()
.ok_or_else(|| SyncError::Conversion("new_data is not a JSON object".into()))?;
let mut data_columns: Vec<&str> = first_obj.keys().map(|k| k.as_str()).collect();
data_columns.sort();
for col in &data_columns {
rhei_core::validate_identifier(col).map_err(|e| SyncError::Conversion(e.to_string()))?;
}
let mut columns: Vec<&str> = data_columns.clone();
columns.push(VALID_FROM_COL);
columns.push(VALID_TO_COL);
columns.push(OPERATION_COL);
let mut value_rows: Vec<String> = Vec::with_capacity(events.len());
for event in events {
let data = event
.new_data
.as_ref()
.ok_or_else(|| SyncError::Conversion("INSERT event missing new_data".into()))?;
let obj = data
.as_object()
.ok_or_else(|| SyncError::Conversion("new_data is not a JSON object".into()))?;
let mut event_cols: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
event_cols.sort();
if event_cols != data_columns {
return Err(SyncError::Conversion(format!(
"column mismatch in temporal batch INSERT for table '{}': expected {:?}, got {:?}",
event.table, data_columns, event_cols
)));
}
let mut values: Vec<String> = data_columns
.iter()
.map(|col| match obj.get(*col) {
Some(v) => json_value_to_sql(v),
None => Ok("NULL".to_string()),
})
.collect::<Result<Vec<_>, _>>()?;
values.push(event.timestamp.to_string());
values.push("NULL".to_string());
values.push(OP_INSERT.to_string());
value_rows.push(format!("({})", values.join(", ")));
}
Ok(format!(
"INSERT INTO {} ({}) VALUES {}",
table,
columns.join(", "),
value_rows.join(", ")
))
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn test_schema() -> Arc<TableSchema> {
use arrow::datatypes::{DataType, Field, Schema};
Arc::new(TableSchema::new(
"users",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
Field::new("age", DataType::Int64, true),
])),
vec!["id".to_string()],
))
}
fn make_event(
seq: i64,
ts: i64,
op: CdcOperation,
old_data: Option<serde_json::Value>,
new_data: Option<serde_json::Value>,
) -> CdcEvent {
CdcEvent {
seq,
timestamp: ts,
operation: op,
table: "users".into(),
row_id: Some(1),
old_data,
new_data,
}
}
#[test]
fn test_temporal_insert_dml() {
let event = make_event(
1,
1000,
CdcOperation::Insert,
None,
Some(json!({"id": 1, "name": "Alice", "age": 30})),
);
let stmts = cdc_event_to_temporal_dml(&event, &test_schema()).unwrap();
assert_eq!(stmts.len(), 1);
assert!(stmts[0].starts_with("INSERT INTO users"));
assert!(stmts[0].contains("_rhei_valid_from"));
assert!(stmts[0].contains("1000")); assert!(stmts[0].contains("NULL")); assert!(stmts[0].contains("'I'")); assert!(stmts[0].contains("Alice"));
}
#[test]
fn test_temporal_update_dml() {
let event = make_event(
2,
2000,
CdcOperation::Update,
Some(json!({"id": 1, "name": "Alice", "age": 30})),
Some(json!({"id": 1, "name": "Bob", "age": 31})),
);
let stmts = cdc_event_to_temporal_dml(&event, &test_schema()).unwrap();
assert_eq!(stmts.len(), 2);
assert!(stmts[0].starts_with("UPDATE users SET _rhei_valid_to = 2000"));
assert!(stmts[0].contains("WHERE id = 1"));
assert!(stmts[0].contains("_rhei_valid_to IS NULL"));
assert!(stmts[1].starts_with("INSERT INTO users"));
assert!(stmts[1].contains("Bob"));
assert!(stmts[1].contains("'U'"));
assert!(stmts[1].contains("2000")); }
#[test]
fn test_temporal_delete_dml() {
let event = make_event(
3,
3000,
CdcOperation::Delete,
Some(json!({"id": 1, "name": "Alice", "age": 30})),
None,
);
let stmts = cdc_event_to_temporal_dml(&event, &test_schema()).unwrap();
assert_eq!(stmts.len(), 2);
assert!(stmts[0].starts_with("UPDATE users SET _rhei_valid_to = 3000"));
assert!(stmts[0].contains("WHERE id = 1"));
assert!(stmts[1].starts_with("INSERT INTO users"));
assert!(stmts[1].contains("'D'"));
assert!(stmts[1].contains("3000")); assert!(stmts[1].contains("id"));
assert!(stmts[1].contains("name"));
assert!(stmts[1].contains("age"));
}
#[test]
fn test_temporal_batch_insert() {
let events = vec![
make_event(
1,
1000,
CdcOperation::Insert,
None,
Some(json!({"id": 1, "name": "Alice", "age": 30})),
),
make_event(
2,
1001,
CdcOperation::Insert,
None,
Some(json!({"id": 2, "name": "Bob", "age": 25})),
),
];
let refs: Vec<&CdcEvent> = events.iter().collect();
let sql = build_temporal_batch_insert(&refs, &test_schema()).unwrap();
assert!(sql.starts_with("INSERT INTO users"));
assert!(sql.contains("_rhei_valid_from"));
assert!(sql.contains("_rhei_valid_to"));
assert!(sql.contains("_rhei_operation"));
assert!(sql.contains("Alice"));
assert!(sql.contains("Bob"));
assert!(sql.contains("'I'"));
assert_eq!(sql.matches('(').count(), 3);
}
#[test]
fn test_temporal_update_no_old_data() {
let event = make_event(
2,
2000,
CdcOperation::Update,
None,
Some(json!({"id": 1, "name": "Bob", "age": 31})),
);
let stmts = cdc_event_to_temporal_dml(&event, &test_schema()).unwrap();
assert_eq!(stmts.len(), 2);
assert!(stmts[0].contains("WHERE id = 1"));
assert!(stmts[1].contains("Bob"));
}
#[test]
fn test_temporalize_schema() {
use arrow::datatypes::{DataType, Field, Schema};
let base = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
]));
let temporal = temporalize_schema(&base);
assert_eq!(temporal.fields().len(), 5); assert_eq!(temporal.field(2).name(), "_rhei_valid_from");
assert_eq!(temporal.field(2).data_type(), &DataType::Int64);
assert!(!temporal.field(2).is_nullable());
assert_eq!(temporal.field(3).name(), "_rhei_valid_to");
assert!(temporal.field(3).is_nullable());
assert_eq!(temporal.field(4).name(), "_rhei_operation");
assert_eq!(temporal.field(4).data_type(), &DataType::Utf8);
}
#[test]
fn test_temporal_events_to_batch_columns_appended() {
let events = vec![
make_event(
1,
1000,
CdcOperation::Insert,
None,
Some(json!({"id": 1, "name": "Alice", "age": 30})),
),
make_event(
2,
2000,
CdcOperation::Insert,
None,
Some(json!({"id": 2, "name": "Bob", "age": 25})),
),
];
let refs: Vec<&CdcEvent> = events.iter().collect();
let schema = test_schema();
let batch = cdc_events_to_temporal_batch(&refs, &schema).unwrap();
assert_eq!(batch.num_columns(), 6);
assert_eq!(batch.num_rows(), 2);
let s = batch.schema();
assert_eq!(s.field(3).name(), VALID_FROM_COL);
assert_eq!(s.field(4).name(), VALID_TO_COL);
assert_eq!(s.field(5).name(), OPERATION_COL);
assert!(batch.column(4).is_null(0));
assert!(batch.column(4).is_null(1));
use arrow::array::Int64Array;
let vf = batch
.column(3)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(vf.value(0), 1000);
assert_eq!(vf.value(1), 2000);
use arrow::array::StringArray;
let op = batch
.column(5)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(op.value(0), "I");
assert_eq!(op.value(1), "I");
}
#[test]
fn test_temporal_events_to_batch_nullable_base_cols() {
let events = vec![make_event(
1,
5000,
CdcOperation::Insert,
None,
Some(json!({"id": 1, "name": null, "age": null})),
)];
let refs: Vec<&CdcEvent> = events.iter().collect();
let batch = cdc_events_to_temporal_batch(&refs, &test_schema()).unwrap();
assert_eq!(batch.num_rows(), 1);
assert!(batch.column(1).is_null(0));
assert!(batch.column(2).is_null(0));
assert!(!batch.column(3).is_null(0)); assert!(batch.column(4).is_null(0)); assert!(!batch.column(5).is_null(0)); }
#[test]
fn test_temporal_events_to_batch_empty_returns_error() {
let refs: Vec<&CdcEvent> = vec![];
let err = cdc_events_to_temporal_batch(&refs, &test_schema()).unwrap_err();
assert!(
matches!(err, SyncError::Conversion(_)),
"expected Conversion error for empty batch"
);
}
#[test]
fn test_temporal_events_to_batch_schema_matches_temporalize() {
let events = vec![make_event(
1,
1000,
CdcOperation::Insert,
None,
Some(json!({"id": 1, "name": "X", "age": 10})),
)];
let refs: Vec<&CdcEvent> = events.iter().collect();
let schema = test_schema();
let batch = cdc_events_to_temporal_batch(&refs, &schema).unwrap();
let expected_schema = temporalize_schema(&schema.arrow_schema);
assert_eq!(
batch.schema(),
expected_schema,
"batch schema must match temporalize_schema output"
);
}
}