use std::sync::Arc;
use once_cell::sync::Lazy;
use crate::spec::{NestedField, NestedFieldRef, PrimitiveType, Type};
use crate::{Error, ErrorKind, Result};
pub const RESERVED_FIELD_ID_FILE: i32 = i32::MAX - 1;
pub const RESERVED_FIELD_ID_POS: i32 = i32::MAX - 2;
pub const RESERVED_FIELD_ID_DELETED: i32 = i32::MAX - 3;
pub const RESERVED_FIELD_ID_SPEC_ID: i32 = i32::MAX - 4;
pub const RESERVED_FIELD_ID_PARTITION: i32 = i32::MAX - 5;
pub const RESERVED_FIELD_ID_DELETE_FILE_PATH: i32 = i32::MAX - 101;
pub const RESERVED_FIELD_ID_DELETE_FILE_POS: i32 = i32::MAX - 102;
pub const RESERVED_FIELD_ID_CHANGE_TYPE: i32 = i32::MAX - 104;
pub const RESERVED_FIELD_ID_CHANGE_ORDINAL: i32 = i32::MAX - 105;
pub const RESERVED_FIELD_ID_COMMIT_SNAPSHOT_ID: i32 = i32::MAX - 106;
pub const RESERVED_FIELD_ID_ROW_ID: i32 = i32::MAX - 107;
pub const RESERVED_FIELD_ID_LAST_UPDATED_SEQUENCE_NUMBER: i32 = i32::MAX - 108;
pub const RESERVED_COL_NAME_FILE: &str = "_file";
pub const RESERVED_COL_NAME_POS: &str = "_pos";
pub const RESERVED_COL_NAME_DELETED: &str = "_deleted";
pub const RESERVED_COL_NAME_SPEC_ID: &str = "_spec_id";
pub const RESERVED_COL_NAME_PARTITION: &str = "_partition";
pub const RESERVED_COL_NAME_DELETE_FILE_PATH: &str = "file_path";
pub const RESERVED_COL_NAME_DELETE_FILE_POS: &str = "pos";
pub const RESERVED_COL_NAME_CHANGE_TYPE: &str = "_change_type";
pub const RESERVED_COL_NAME_CHANGE_ORDINAL: &str = "_change_ordinal";
pub const RESERVED_COL_NAME_COMMIT_SNAPSHOT_ID: &str = "_commit_snapshot_id";
pub const RESERVED_COL_NAME_ROW_ID: &str = "_row_id";
pub const RESERVED_COL_NAME_LAST_UPDATED_SEQUENCE_NUMBER: &str = "_last_updated_sequence_number";
static FILE_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
Arc::new(
NestedField::required(
RESERVED_FIELD_ID_FILE,
RESERVED_COL_NAME_FILE,
Type::Primitive(PrimitiveType::String),
)
.with_doc("Path of the file in which a row is stored"),
)
});
static POS_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
Arc::new(
NestedField::required(
RESERVED_FIELD_ID_POS,
RESERVED_COL_NAME_POS,
Type::Primitive(PrimitiveType::Long),
)
.with_doc("Ordinal position of a row in the source data file"),
)
});
static DELETED_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
Arc::new(
NestedField::required(
RESERVED_FIELD_ID_DELETED,
RESERVED_COL_NAME_DELETED,
Type::Primitive(PrimitiveType::Boolean),
)
.with_doc("Whether the row has been deleted"),
)
});
static SPEC_ID_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
Arc::new(
NestedField::required(
RESERVED_FIELD_ID_SPEC_ID,
RESERVED_COL_NAME_SPEC_ID,
Type::Primitive(PrimitiveType::Int),
)
.with_doc("Spec ID used to track the file containing a row"),
)
});
static DELETE_FILE_PATH_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
Arc::new(
NestedField::required(
RESERVED_FIELD_ID_DELETE_FILE_PATH,
RESERVED_COL_NAME_DELETE_FILE_PATH,
Type::Primitive(PrimitiveType::String),
)
.with_doc("Path of a file, used in position-based delete files"),
)
});
static DELETE_FILE_POS_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
Arc::new(
NestedField::required(
RESERVED_FIELD_ID_DELETE_FILE_POS,
RESERVED_COL_NAME_DELETE_FILE_POS,
Type::Primitive(PrimitiveType::Long),
)
.with_doc("Ordinal position of a row, used in position-based delete files"),
)
});
static CHANGE_TYPE_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
Arc::new(
NestedField::required(
RESERVED_FIELD_ID_CHANGE_TYPE,
RESERVED_COL_NAME_CHANGE_TYPE,
Type::Primitive(PrimitiveType::String),
)
.with_doc(
"The record type in the changelog (INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER)",
),
)
});
static CHANGE_ORDINAL_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
Arc::new(
NestedField::required(
RESERVED_FIELD_ID_CHANGE_ORDINAL,
RESERVED_COL_NAME_CHANGE_ORDINAL,
Type::Primitive(PrimitiveType::Int),
)
.with_doc("The order of the change"),
)
});
static COMMIT_SNAPSHOT_ID_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
Arc::new(
NestedField::required(
RESERVED_FIELD_ID_COMMIT_SNAPSHOT_ID,
RESERVED_COL_NAME_COMMIT_SNAPSHOT_ID,
Type::Primitive(PrimitiveType::Long),
)
.with_doc("The snapshot ID in which the change occurred"),
)
});
static ROW_ID_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
Arc::new(
NestedField::required(
RESERVED_FIELD_ID_ROW_ID,
RESERVED_COL_NAME_ROW_ID,
Type::Primitive(PrimitiveType::Long),
)
.with_doc("A unique long assigned for row lineage"),
)
});
static LAST_UPDATED_SEQUENCE_NUMBER_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
Arc::new(
NestedField::required(
RESERVED_FIELD_ID_LAST_UPDATED_SEQUENCE_NUMBER,
RESERVED_COL_NAME_LAST_UPDATED_SEQUENCE_NUMBER,
Type::Primitive(PrimitiveType::Long),
)
.with_doc("The sequence number which last updated this row"),
)
});
pub fn file_field() -> &'static NestedFieldRef {
&FILE_FIELD
}
pub fn pos_field() -> &'static NestedFieldRef {
&POS_FIELD
}
pub fn deleted_field() -> &'static NestedFieldRef {
&DELETED_FIELD
}
pub fn spec_id_field() -> &'static NestedFieldRef {
&SPEC_ID_FIELD
}
pub fn delete_file_path_field() -> &'static NestedFieldRef {
&DELETE_FILE_PATH_FIELD
}
pub fn delete_file_pos_field() -> &'static NestedFieldRef {
&DELETE_FILE_POS_FIELD
}
pub fn change_type_field() -> &'static NestedFieldRef {
&CHANGE_TYPE_FIELD
}
pub fn change_ordinal_field() -> &'static NestedFieldRef {
&CHANGE_ORDINAL_FIELD
}
pub fn commit_snapshot_id_field() -> &'static NestedFieldRef {
&COMMIT_SNAPSHOT_ID_FIELD
}
pub fn row_id_field() -> &'static NestedFieldRef {
&ROW_ID_FIELD
}
pub fn last_updated_sequence_number_field() -> &'static NestedFieldRef {
&LAST_UPDATED_SEQUENCE_NUMBER_FIELD
}
pub fn partition_field(partition_fields: Vec<NestedFieldRef>) -> NestedFieldRef {
use crate::spec::StructType;
Arc::new(
NestedField::required(
RESERVED_FIELD_ID_PARTITION,
RESERVED_COL_NAME_PARTITION,
Type::Struct(StructType::new(partition_fields)),
)
.with_doc("Partition to which a row belongs"),
)
}
pub fn get_metadata_field(field_id: i32) -> Result<&'static NestedFieldRef> {
match field_id {
RESERVED_FIELD_ID_FILE => Ok(file_field()),
RESERVED_FIELD_ID_POS => Ok(pos_field()),
RESERVED_FIELD_ID_DELETED => Ok(deleted_field()),
RESERVED_FIELD_ID_SPEC_ID => Ok(spec_id_field()),
RESERVED_FIELD_ID_PARTITION => Err(Error::new(
ErrorKind::Unexpected,
"The _partition field must be created using partition_field() with appropriate partition fields",
)),
RESERVED_FIELD_ID_DELETE_FILE_PATH => Ok(delete_file_path_field()),
RESERVED_FIELD_ID_DELETE_FILE_POS => Ok(delete_file_pos_field()),
RESERVED_FIELD_ID_CHANGE_TYPE => Ok(change_type_field()),
RESERVED_FIELD_ID_CHANGE_ORDINAL => Ok(change_ordinal_field()),
RESERVED_FIELD_ID_COMMIT_SNAPSHOT_ID => Ok(commit_snapshot_id_field()),
RESERVED_FIELD_ID_ROW_ID => Ok(row_id_field()),
RESERVED_FIELD_ID_LAST_UPDATED_SEQUENCE_NUMBER => Ok(last_updated_sequence_number_field()),
_ if is_metadata_field(field_id) => {
Err(Error::new(
ErrorKind::Unexpected,
format!(
"Metadata field ID {field_id} recognized but field definition not implemented"
),
))
}
_ => Err(Error::new(
ErrorKind::Unexpected,
format!("Field ID {field_id} is not a metadata field"),
)),
}
}
pub fn get_metadata_field_id(column_name: &str) -> Result<i32> {
match column_name {
RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE),
RESERVED_COL_NAME_POS => Ok(RESERVED_FIELD_ID_POS),
RESERVED_COL_NAME_DELETED => Ok(RESERVED_FIELD_ID_DELETED),
RESERVED_COL_NAME_SPEC_ID => Ok(RESERVED_FIELD_ID_SPEC_ID),
RESERVED_COL_NAME_PARTITION => Ok(RESERVED_FIELD_ID_PARTITION),
RESERVED_COL_NAME_DELETE_FILE_PATH => Ok(RESERVED_FIELD_ID_DELETE_FILE_PATH),
RESERVED_COL_NAME_DELETE_FILE_POS => Ok(RESERVED_FIELD_ID_DELETE_FILE_POS),
RESERVED_COL_NAME_CHANGE_TYPE => Ok(RESERVED_FIELD_ID_CHANGE_TYPE),
RESERVED_COL_NAME_CHANGE_ORDINAL => Ok(RESERVED_FIELD_ID_CHANGE_ORDINAL),
RESERVED_COL_NAME_COMMIT_SNAPSHOT_ID => Ok(RESERVED_FIELD_ID_COMMIT_SNAPSHOT_ID),
RESERVED_COL_NAME_ROW_ID => Ok(RESERVED_FIELD_ID_ROW_ID),
RESERVED_COL_NAME_LAST_UPDATED_SEQUENCE_NUMBER => {
Ok(RESERVED_FIELD_ID_LAST_UPDATED_SEQUENCE_NUMBER)
}
_ => Err(Error::new(
ErrorKind::Unexpected,
format!("Unknown/unsupported metadata column name: {column_name}"),
)),
}
}
pub fn is_metadata_field(field_id: i32) -> bool {
matches!(
field_id,
RESERVED_FIELD_ID_FILE
| RESERVED_FIELD_ID_POS
| RESERVED_FIELD_ID_DELETED
| RESERVED_FIELD_ID_SPEC_ID
| RESERVED_FIELD_ID_PARTITION
| RESERVED_FIELD_ID_DELETE_FILE_PATH
| RESERVED_FIELD_ID_DELETE_FILE_POS
| RESERVED_FIELD_ID_CHANGE_TYPE
| RESERVED_FIELD_ID_CHANGE_ORDINAL
| RESERVED_FIELD_ID_COMMIT_SNAPSHOT_ID
| RESERVED_FIELD_ID_ROW_ID
| RESERVED_FIELD_ID_LAST_UPDATED_SEQUENCE_NUMBER
)
}
pub fn is_metadata_column_name(column_name: &str) -> bool {
get_metadata_field_id(column_name).is_ok()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::spec::PrimitiveType;
#[test]
fn test_partition_field_creation() {
let partition_fields = vec![
Arc::new(NestedField::required(
1000,
"year",
Type::Primitive(PrimitiveType::Int),
)),
Arc::new(NestedField::required(
1001,
"month",
Type::Primitive(PrimitiveType::Int),
)),
];
let partition = partition_field(partition_fields);
assert_eq!(partition.id, RESERVED_FIELD_ID_PARTITION);
assert_eq!(partition.name, RESERVED_COL_NAME_PARTITION);
assert!(partition.required);
if let Type::Struct(struct_type) = partition.field_type.as_ref() {
assert_eq!(struct_type.fields().len(), 2);
assert_eq!(struct_type.fields()[0].name, "year");
assert_eq!(struct_type.fields()[1].name, "month");
} else {
panic!("Expected struct type for _partition field");
}
}
#[test]
fn test_partition_field_id_recognized() {
assert!(is_metadata_field(RESERVED_FIELD_ID_PARTITION));
}
#[test]
fn test_partition_field_name_recognized() {
assert_eq!(
get_metadata_field_id(RESERVED_COL_NAME_PARTITION).unwrap(),
RESERVED_FIELD_ID_PARTITION
);
}
#[test]
fn test_get_metadata_field_returns_error_for_partition() {
let result = get_metadata_field(RESERVED_FIELD_ID_PARTITION);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("partition_field()")
);
}
#[test]
fn test_all_metadata_field_ids() {
assert!(get_metadata_field(RESERVED_FIELD_ID_FILE).is_ok());
assert!(get_metadata_field(RESERVED_FIELD_ID_POS).is_ok());
assert!(get_metadata_field(RESERVED_FIELD_ID_DELETED).is_ok());
assert!(get_metadata_field(RESERVED_FIELD_ID_SPEC_ID).is_ok());
assert!(get_metadata_field(RESERVED_FIELD_ID_DELETE_FILE_PATH).is_ok());
assert!(get_metadata_field(RESERVED_FIELD_ID_DELETE_FILE_POS).is_ok());
assert!(get_metadata_field(RESERVED_FIELD_ID_CHANGE_TYPE).is_ok());
assert!(get_metadata_field(RESERVED_FIELD_ID_CHANGE_ORDINAL).is_ok());
assert!(get_metadata_field(RESERVED_FIELD_ID_COMMIT_SNAPSHOT_ID).is_ok());
assert!(get_metadata_field(RESERVED_FIELD_ID_ROW_ID).is_ok());
assert!(get_metadata_field(RESERVED_FIELD_ID_LAST_UPDATED_SEQUENCE_NUMBER).is_ok());
}
}