use std::sync::Arc;
use crate::actions::{get_log_commit_info_schema, CommitInfo, COMMIT_INFO_NAME};
use crate::expressions::{MapData, Scalar, Transform};
use crate::schema::{MapType, StructField, StructType, ToSchema};
use crate::{DataType, Engine, EngineData, Error, Expression, ExpressionRef, IntoEngineData};
use super::Transaction;
fn commit_info_literal_exprs(
commit_info: CommitInfo,
) -> Result<Vec<(&'static str, ExpressionRef)>, Error> {
let op_params_map_type = MapType::new(DataType::STRING, DataType::STRING, true);
let literal_exprs = vec![
(
"timestamp",
Arc::new(Expression::literal(commit_info.timestamp)),
),
(
"inCommitTimestamp",
Arc::new(Expression::literal(commit_info.in_commit_timestamp)),
),
(
"operation",
Arc::new(Expression::literal(commit_info.operation)),
),
(
"operationParameters",
Arc::new(Expression::literal(
match commit_info.operation_parameters {
Some(map) => Scalar::Map(MapData::try_new(
op_params_map_type,
map.into_iter()
.map(|(k, v)| (Scalar::String(k), Scalar::String(v))),
)?),
None => Scalar::Null(DataType::Map(Box::new(op_params_map_type))),
},
)),
),
(
"kernelVersion",
Arc::new(Expression::literal(commit_info.kernel_version)),
),
(
"isBlindAppend",
Arc::new(Expression::literal(commit_info.is_blind_append)),
),
(
"engineInfo",
Arc::new(Expression::literal(commit_info.engine_info)),
),
("txnId", Arc::new(Expression::literal(commit_info.txn_id))),
];
let expected_expr_len = CommitInfo::to_schema().fields().len();
if literal_exprs.len() != expected_expr_len {
return Err(Error::Generic(format!("expect the commit_info_literal_exprs return {expected_expr_len} expressions, but only get {} expressions. \
If CommitInfo field was added/removed, please update Expression::Literal in this function and update the with_commit_info doc comment", literal_exprs.len())));
}
Ok(literal_exprs)
}
impl<S> Transaction<S> {
pub(super) fn generate_commit_info(
&self,
engine: &dyn Engine,
kernel_commit_info: CommitInfo,
) -> Result<Box<dyn EngineData>, Error> {
match &self.engine_commit_info {
Some((engine_commit_info, engine_commit_info_schema)) => {
let kernel_schema = CommitInfo::to_schema();
let output_fields: Vec<_> = engine_commit_info_schema
.fields()
.map(|field| kernel_schema.field(field.name()).unwrap_or(field))
.cloned()
.chain(
kernel_schema
.fields()
.filter(|field| !engine_commit_info_schema.contains(field.name()))
.cloned(),
)
.collect();
let output_schema = StructType::new_unchecked(output_fields);
let literal_exprs = commit_info_literal_exprs(kernel_commit_info)?;
let last_engine_field = engine_commit_info_schema.field_names().last().cloned();
let mut transform = Transform::new_top_level();
for (field_name, expr_ref) in &literal_exprs {
if engine_commit_info_schema.contains(*field_name) {
transform = transform.with_replaced_field(*field_name, expr_ref.clone());
}
}
for (field_name, expr_ref) in &literal_exprs {
if !engine_commit_info_schema.contains(*field_name) {
transform = transform
.with_inserted_field(last_engine_field.as_deref(), expr_ref.clone());
}
}
let wrapped_expr =
Expression::struct_from([Arc::new(Expression::transform(transform))]);
let wrapped_schema = Arc::new(StructType::new_unchecked([StructField::nullable(
COMMIT_INFO_NAME,
output_schema,
)]));
let evaluator = engine.evaluation_handler().new_expression_evaluator(
engine_commit_info_schema.clone(),
Arc::new(wrapped_expr),
wrapped_schema.into(),
)?;
evaluator.evaluate(engine_commit_info.as_ref())
}
None => {
kernel_commit_info.into_engine_data(get_log_commit_info_schema().clone(), engine)
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::actions::CommitInfo;
use crate::arrow::array::{
Array, ArrayRef, BooleanArray, Int64Array, MapArray, MapBuilder, StringArray,
StringBuilder, StructArray,
};
use crate::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
};
use crate::arrow::record_batch::RecordBatch;
use crate::committer::FileSystemCommitter;
use crate::engine::arrow_conversion::TryIntoKernel;
use crate::engine::arrow_data::ArrowEngineData;
use crate::schema::{Schema, SchemaRef, StructField, StructType, ToSchema};
use crate::transaction::Transaction;
use crate::utils::test_utils::load_test_table;
use crate::{DeltaResult, Engine, EngineData};
fn make_kernel_commit_info() -> CommitInfo {
CommitInfo::new(
1_700_000_000_000i64,
Some(134_000_000i64),
Some("WRITE".to_string()),
Some("test_engine/1.0".to_string()),
false,
)
}
fn make_engine_commit_info(
arrow_fields: Vec<ArrowField>,
columns: Vec<ArrayRef>,
) -> (Box<dyn EngineData>, SchemaRef) {
let arrow_schema = ArrowSchema::new(arrow_fields);
let kernel_schema: Schema = arrow_schema.as_ref().try_into_kernel().unwrap();
let batch =
RecordBatch::try_new(Arc::new(arrow_schema), columns).expect("valid RecordBatch");
(
Box::new(ArrowEngineData::new(batch)),
Arc::new(kernel_schema),
)
}
fn commit_info_struct(result: &ArrowEngineData) -> &StructArray {
let batch = result.record_batch();
assert_eq!(
batch.num_columns(),
1,
"expected single 'commitInfo' column"
);
assert_eq!(batch.schema().field(0).name(), "commitInfo");
batch
.column(0)
.as_any()
.downcast_ref::<StructArray>()
.expect("commitInfo column should be a StructArray")
}
fn get_str<'a>(s: &'a StructArray, col: &str) -> &'a str {
s.column_by_name(col)
.unwrap_or_else(|| panic!("field '{col}' not found"))
.as_any()
.downcast_ref::<StringArray>()
.unwrap_or_else(|| panic!("field '{col}' is not a StringArray"))
.value(0)
}
fn get_i64(s: &StructArray, col: &str) -> i64 {
s.column_by_name(col)
.unwrap_or_else(|| panic!("field '{col}' not found"))
.as_any()
.downcast_ref::<Int64Array>()
.unwrap_or_else(|| panic!("field '{col}' is not an Int64Array"))
.value(0)
}
fn get_map(s: &StructArray, col: &str) -> StructArray {
s.column_by_name(col)
.unwrap_or_else(|| panic!("field '{col}' not found"))
.as_any()
.downcast_ref::<MapArray>()
.unwrap_or_else(|| panic!("field '{col}' is not a MapArray"))
.value(0)
}
fn get_bool(s: &StructArray, col: &str) -> bool {
s.column_by_name(col)
.unwrap_or_else(|| panic!("field '{col}' not found"))
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap_or_else(|| panic!("field '{col}' is not an Int64Array"))
.value(0)
}
fn make_txn(
engine_commit_info: Option<(Box<dyn EngineData>, SchemaRef)>,
) -> DeltaResult<(Arc<dyn Engine>, Transaction)> {
let (engine, snapshot, _tempdir) = load_test_table("table-without-dv-small")?;
let mut txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_operation("WRITE".to_string());
if let Some((engine_commit_info_data, engine_commit_info_schema)) = engine_commit_info {
txn = txn.with_commit_info(engine_commit_info_data, engine_commit_info_schema);
}
Ok((engine, txn))
}
#[test]
fn test_build_commit_info_none_branch() -> DeltaResult<()> {
let (engine, txn) = make_txn(None)?;
let result = ArrowEngineData::try_from_engine_data(
txn.generate_commit_info(engine.as_ref(), make_kernel_commit_info())?,
)?;
let ci = commit_info_struct(&result);
let kernel_schema = CommitInfo::to_schema();
assert_eq!(ci.num_columns(), kernel_schema.fields().count());
assert_eq!(get_str(ci, "operation"), "WRITE");
assert!(!get_str(ci, "kernelVersion").is_empty());
assert!(!get_str(ci, "txnId").is_empty());
Ok(())
}
#[test]
fn test_build_commit_info_disjoint_schemas() -> DeltaResult<()> {
let (data, schema) = make_engine_commit_info(
vec![
ArrowField::new("customApp", ArrowDataType::Utf8, false),
ArrowField::new("customVersion", ArrowDataType::Int64, false),
],
vec![
Arc::new(StringArray::from(vec!["myApp"])) as ArrayRef,
Arc::new(Int64Array::from(vec![42i64])) as ArrayRef,
],
);
let (engine, txn) = make_txn(Some((data, schema)))?;
let result = ArrowEngineData::try_from_engine_data(
txn.generate_commit_info(engine.as_ref(), make_kernel_commit_info())?,
)?;
let commit_info = commit_info_struct(&result);
assert_eq!(
commit_info.num_columns(),
2 + CommitInfo::to_schema().fields().count()
);
assert_eq!(commit_info.fields()[0].name(), "customApp");
assert_eq!(commit_info.fields()[1].name(), "customVersion");
assert_eq!(get_str(commit_info, "customApp"), "myApp");
assert_eq!(get_i64(commit_info, "customVersion"), 42);
assert_eq!(get_str(commit_info, "operation"), "WRITE");
assert!(!get_str(commit_info, "kernelVersion").is_empty());
assert!(get_map(commit_info, "operationParameters").len() == 0);
assert!(uuid::Uuid::parse_str(get_str(commit_info, "txnId")).is_ok());
assert!(get_i64(commit_info, "timestamp") > 0);
assert_eq!(get_i64(commit_info, "inCommitTimestamp"), 134_000_000);
assert_eq!(get_str(commit_info, "engineInfo"), "test_engine/1.0");
assert!(!get_bool(commit_info, "isBlindAppend"));
Ok(())
}
#[test]
fn test_build_commit_info_full_overlap() -> DeltaResult<()> {
let mut map_builder = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());
map_builder.keys().append_value("stale_key");
map_builder.values().append_value("stale_value");
map_builder.append(true).unwrap();
let stale_op_params = Arc::new(map_builder.finish()) as ArrayRef;
let (data, schema) = make_engine_commit_info(
vec![
ArrowField::new("timestamp", ArrowDataType::Int64, true),
ArrowField::new("inCommitTimestamp", ArrowDataType::Int64, true),
ArrowField::new("operation", ArrowDataType::Utf8, true),
ArrowField::new(
"operationParameters",
stale_op_params.data_type().clone(),
true,
),
ArrowField::new("kernelVersion", ArrowDataType::Utf8, true),
ArrowField::new("isBlindAppend", ArrowDataType::Boolean, true),
ArrowField::new("engineInfo", ArrowDataType::Utf8, true),
ArrowField::new("txnId", ArrowDataType::Utf8, true),
],
vec![
Arc::new(Int64Array::from(vec![Some(0i64)])) as ArrayRef,
Arc::new(Int64Array::from(vec![None::<i64>])) as ArrayRef,
Arc::new(StringArray::from(vec!["STALE_OP"])) as ArrayRef,
stale_op_params,
Arc::new(StringArray::from(vec!["v0.0.0"])) as ArrayRef,
Arc::new(BooleanArray::from(vec![None::<bool>])) as ArrayRef,
Arc::new(StringArray::from(vec!["stale_engine"])) as ArrayRef,
Arc::new(StringArray::from(vec!["stale_txn"])) as ArrayRef,
],
);
let (engine, txn) = make_txn(Some((data, schema)))?;
let result = ArrowEngineData::try_from_engine_data(
txn.generate_commit_info(engine.as_ref(), make_kernel_commit_info())?,
)?;
let commit_info = commit_info_struct(&result);
assert_eq!(commit_info.num_columns(), 8);
assert_eq!(get_str(commit_info, "operation"), "WRITE");
assert!(!get_str(commit_info, "kernelVersion").is_empty());
assert_eq!(get_map(commit_info, "operationParameters").len(), 0);
assert!(uuid::Uuid::parse_str(get_str(commit_info, "txnId")).is_ok());
assert!(get_i64(commit_info, "timestamp") > 0);
assert_eq!(get_i64(commit_info, "inCommitTimestamp"), 134_000_000);
assert_eq!(get_str(commit_info, "engineInfo"), "test_engine/1.0");
assert!(!get_bool(commit_info, "isBlindAppend"));
Ok(())
}
#[test]
fn test_build_commit_info_partial_overlap() -> DeltaResult<()> {
let (data, schema) = make_engine_commit_info(
vec![
ArrowField::new("timestamp", ArrowDataType::Int64, true),
ArrowField::new("operation", ArrowDataType::Utf8, true),
ArrowField::new("myCustomField", ArrowDataType::Utf8, false),
],
vec![
Arc::new(Int64Array::from(vec![Some(0i64)])) as ArrayRef,
Arc::new(StringArray::from(vec!["STALE_OP"])) as ArrayRef,
Arc::new(StringArray::from(vec!["keep_me"])) as ArrayRef,
],
);
let (engine, txn) = make_txn(Some((data, schema)))?;
let result = ArrowEngineData::try_from_engine_data(
txn.generate_commit_info(engine.as_ref(), make_kernel_commit_info())?,
)?;
let ci = commit_info_struct(&result);
assert_eq!(get_str(ci, "myCustomField"), "keep_me");
assert_ne!(get_str(ci, "operation"), "STALE_OP");
assert_eq!(get_str(ci, "operation"), "WRITE");
assert_eq!(ci.fields()[0].name(), "timestamp");
assert_eq!(ci.fields()[1].name(), "operation");
assert_eq!(ci.fields()[2].name(), "myCustomField");
assert_eq!(
ci.num_columns(),
3 + CommitInfo::to_schema().fields().count() - 2
);
Ok(())
}
#[test]
fn test_build_commit_info_type_conflict_replaced_by_kernel() -> DeltaResult<()> {
let (data, schema) = make_engine_commit_info(
vec![
ArrowField::new("timestamp", ArrowDataType::Utf8, true),
ArrowField::new("inCommitTimestamp", ArrowDataType::Utf8, true),
ArrowField::new("operation", ArrowDataType::Int64, true),
ArrowField::new("isBlindAppend", ArrowDataType::Utf8, true),
ArrowField::new("myCustomField", ArrowDataType::Utf8, false),
],
vec![
Arc::new(StringArray::from(vec!["not-a-timestamp"])) as ArrayRef,
Arc::new(StringArray::from(vec!["not-a-timestamp"])) as ArrayRef,
Arc::new(Int64Array::from(vec![0i64])) as ArrayRef,
Arc::new(StringArray::from(vec!["not-a-bool"])) as ArrayRef,
Arc::new(StringArray::from(vec!["keep_me"])) as ArrayRef,
],
);
let (engine, txn) = make_txn(Some((data, schema)))?;
let result = ArrowEngineData::try_from_engine_data(
txn.generate_commit_info(engine.as_ref(), make_kernel_commit_info())?,
)?;
let ci = commit_info_struct(&result);
let field_type = |name: &str| {
ci.fields()
.iter()
.find(|f| f.name() == name)
.unwrap_or_else(|| panic!("field '{name}' must be present"))
.data_type()
.clone()
};
assert_eq!(field_type("timestamp"), ArrowDataType::Int64);
assert_eq!(field_type("inCommitTimestamp"), ArrowDataType::Int64);
assert_eq!(field_type("operation"), ArrowDataType::Utf8);
assert_eq!(field_type("isBlindAppend"), ArrowDataType::Boolean);
assert_eq!(field_type("myCustomField"), ArrowDataType::Utf8);
assert_eq!(get_str(ci, "myCustomField"), "keep_me");
Ok(())
}
#[test]
fn test_build_commit_info_empty_engine_schema() -> DeltaResult<()> {
let empty_batch = RecordBatch::new_empty(Arc::new(ArrowSchema::empty()));
let empty_schema = Arc::new(StructType::new_unchecked(Vec::<StructField>::new()));
let (engine, txn) = make_txn(Some((
Box::new(ArrowEngineData::new(empty_batch)),
empty_schema,
)))?;
let result = ArrowEngineData::try_from_engine_data(
txn.generate_commit_info(engine.as_ref(), make_kernel_commit_info())?,
)?;
let ci = commit_info_struct(&result);
let kernel_schema = CommitInfo::to_schema();
assert_eq!(ci.num_columns(), kernel_schema.fields().count());
for (i, field) in kernel_schema.fields().enumerate() {
assert_eq!(ci.fields()[i].name(), field.name());
}
Ok(())
}
}