use std::collections::{hash_map, HashMap};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::expressions::{ColumnName, Expression, ExpressionRef};
use crate::schema::{DataType, SchemaRef, StructField, StructType};
use crate::utils::CollectInto;
use crate::{DeltaResult, Error};
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
pub struct ExpressionFieldPatch {
pub keep_input: bool,
pub insertions: Vec<ExpressionRef>,
pub optional: bool,
}
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
pub struct ExpressionStructPatch {
pub input_path: Option<ColumnName>,
pub field_patches: HashMap<String, ExpressionFieldPatch>,
pub prepended_fields: Vec<ExpressionRef>,
pub appended_fields: Vec<ExpressionRef>,
}
impl ExpressionStructPatch {
pub fn is_empty(&self) -> bool {
self.prepended_fields.is_empty()
&& self.appended_fields.is_empty()
&& self.field_patches.is_empty()
}
pub fn input_path(&self) -> Option<&ColumnName> {
self.input_path.as_ref()
}
}
#[derive(Debug)]
pub struct StructPatchBuilder<Item> {
input_path: Option<ColumnName>,
root: StructPatchNode<Item>,
error: DeltaResult<()>,
}
#[derive(Debug)]
struct StructPatchNode<Item> {
prepended_fields: Vec<Item>,
appended_fields: Vec<Item>,
fields: HashMap<String, FieldPatchNode<Item>>,
}
impl<Item> Default for StructPatchNode<Item> {
fn default() -> Self {
Self {
prepended_fields: Vec::new(),
appended_fields: Vec::new(),
fields: HashMap::new(),
}
}
}
#[derive(Debug)]
struct FieldPatchNode<Item> {
action: FieldPatchOp<Item>,
insert_after: Vec<Item>,
}
impl<Item> Default for FieldPatchNode<Item> {
fn default() -> Self {
Self {
action: FieldPatchOp::default(),
insert_after: Vec::new(),
}
}
}
#[derive(Debug)]
enum FieldPatchOp<Item> {
Keep,
Drop { optional: bool },
Replace(Item),
Nested(Box<StructPatchNode<Item>>),
}
#[allow(clippy::derivable_impls)]
impl<Item> Default for FieldPatchOp<Item> {
fn default() -> Self {
FieldPatchOp::Keep
}
}
impl<Item> FieldPatchOp<Item> {
fn is_keep(&self) -> bool {
matches!(self, FieldPatchOp::Keep)
}
fn is_optional_drop(&self) -> bool {
matches!(self, FieldPatchOp::Drop { optional: true })
}
}
const TOP_LEVEL: &[String] = &[];
impl<Item> StructPatchBuilder<Item> {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {
input_path: None,
root: StructPatchNode::default(),
error: Ok(()),
}
}
pub fn new_nested(path: impl CollectInto<ColumnName>) -> Self {
Self {
input_path: Some(path.collect_into()),
root: StructPatchNode::default(),
error: Ok(()),
}
}
pub fn drop(self, field_name: impl Into<String>) -> Self {
self.drop_at(TOP_LEVEL, field_name)
}
pub fn drop_at(
self,
struct_path: impl CollectInto<ColumnName>,
field_name: impl Into<String>,
) -> Self {
self.apply_at(struct_path, |node| node.drop(field_name, false))
}
pub fn drop_if_exists(self, field_name: impl Into<String>) -> Self {
self.drop_if_exists_at(TOP_LEVEL, field_name)
}
pub fn drop_if_exists_at(
self,
struct_path: impl CollectInto<ColumnName>,
field_name: impl Into<String>,
) -> Self {
self.apply_at(struct_path, |node| node.drop(field_name, true))
}
pub fn replace(self, field_name: impl Into<String>, item: impl Into<Item>) -> Self {
self.replace_at(TOP_LEVEL, field_name, item)
}
pub fn replace_at(
self,
struct_path: impl CollectInto<ColumnName>,
field_name: impl Into<String>,
item: impl Into<Item>,
) -> Self {
self.apply_at(struct_path, |node| {
node.set_action(field_name, FieldPatchOp::Replace(item.into()))
})
}
pub fn prepend(self, item: impl Into<Item>) -> Self {
self.prepend_at(TOP_LEVEL, item)
}
pub fn prepend_at(
self,
struct_path: impl CollectInto<ColumnName>,
item: impl Into<Item>,
) -> Self {
self.apply_at(struct_path, |node| {
node.prepended_fields.push(item.into());
Ok(())
})
}
pub fn insert_after(self, field_name: impl Into<String>, item: impl Into<Item>) -> Self {
self.insert_after_at(TOP_LEVEL, field_name, item)
}
pub fn insert_after_at(
self,
struct_path: impl CollectInto<ColumnName>,
field_name: impl Into<String>,
item: impl Into<Item>,
) -> Self {
self.apply_at(struct_path, |node| {
node.insert_after(field_name, item.into())
})
}
pub fn append(self, item: impl Into<Item>) -> Self {
self.append_at(TOP_LEVEL, item)
}
pub fn append_at(
self,
struct_path: impl CollectInto<ColumnName>,
item: impl Into<Item>,
) -> Self {
self.apply_at(struct_path, |node| {
node.appended_fields.push(item.into());
Ok(())
})
}
fn apply_at(
mut self,
struct_path: impl CollectInto<ColumnName>,
op: impl FnOnce(&mut StructPatchNode<Item>) -> DeltaResult<()>,
) -> Self {
if self.error.is_ok() {
let path = struct_path.collect_into();
self.error = self.root.child_at_mut(&path).and_then(op);
}
self
}
fn begin_build(
self,
input_schema: &StructType,
) -> DeltaResult<(StructPatchNode<Item>, Option<ColumnName>, &StructType)> {
self.error?;
let source_schema = resolve_input_schema(input_schema, self.input_path.as_ref())?;
Ok((self.root, self.input_path, source_schema))
}
}
impl<Item> StructPatchNode<Item> {
fn insert_after(
&mut self,
field_name: impl Into<String>,
item: impl Into<Item>,
) -> DeltaResult<()> {
let entry = self.field_patch_mut(field_name.into(), |field_name, entry| {
if entry.action.is_optional_drop() {
return Err(Error::generic(format!(
"Field '{field_name}' cannot combine optional drop with insert-after"
)));
}
Ok(())
})?;
entry.insert_after.push(item.into());
Ok(())
}
fn drop(&mut self, field_name: impl Into<String>, optional: bool) -> DeltaResult<()> {
self.set_action(field_name, FieldPatchOp::Drop { optional })
}
fn set_action(
&mut self,
field_name: impl Into<String>,
action: FieldPatchOp<Item>,
) -> DeltaResult<()> {
let entry = self.field_patch_mut(field_name.into(), |field_name, entry| {
if !entry.action.is_keep() {
return Err(Error::generic(format!(
"Field '{field_name}' has multiple input field actions"
)));
}
if action.is_optional_drop() && !entry.insert_after.is_empty() {
return Err(Error::generic(format!(
"Field '{field_name}' cannot combine optional drop with insert-after"
)));
}
Ok(())
})?;
entry.action = action;
Ok(())
}
fn child_at_mut(&mut self, path: &[String]) -> DeltaResult<&mut Self> {
let Some((field_name, remaining)) = path.split_first() else {
return Ok(self);
};
let state = self.fields.entry(field_name.to_string()).or_default();
if state.action.is_keep() {
state.action = FieldPatchOp::Nested(Box::default());
}
let FieldPatchOp::Nested(node) = &mut state.action else {
return Err(Error::generic(format!(
"Cannot patch nested fields under dropped/replaced field '{field_name}'"
)));
};
node.child_at_mut(remaining)
}
fn field_patch_mut(
&mut self,
field_name: String,
validate_existing: impl FnOnce(&str, &FieldPatchNode<Item>) -> DeltaResult<()>,
) -> DeltaResult<&mut FieldPatchNode<Item>> {
match self.fields.entry(field_name) {
hash_map::Entry::Vacant(entry) => Ok(entry.insert(FieldPatchNode::default())),
hash_map::Entry::Occupied(entry) => {
validate_existing(entry.key(), entry.get())?;
Ok(entry.into_mut())
}
}
}
}
fn resolve_input_schema<'a>(
input_schema: &'a StructType,
input_path: Option<&ColumnName>,
) -> DeltaResult<&'a StructType> {
let input_path = match input_path {
Some(input_path) if !input_path.path().is_empty() => input_path,
_ => return Ok(input_schema),
};
let field = input_schema.field_at(input_path)?;
let DataType::Struct(nested_schema) = field.data_type() else {
return Err(Error::generic(format!(
"Patching failed: input path '{input_path}' references a non-struct field"
)));
};
Ok(nested_schema)
}
impl StructPatchBuilder<ExpressionRef> {
pub fn build(self) -> DeltaResult<ExpressionStructPatch> {
self.error?;
Ok(self.root.to_expr_patch(self.input_path))
}
}
impl TryFrom<StructPatchBuilder<ExpressionRef>> for ExpressionStructPatch {
type Error = Error;
fn try_from(builder: StructPatchBuilder<ExpressionRef>) -> DeltaResult<Self> {
builder.build()
}
}
trait ExpressionItem: Sized {
fn expr(&self) -> &ExpressionRef;
fn exprs(items: &[Self]) -> impl Iterator<Item = ExpressionRef> {
items.iter().map(Self::expr).cloned()
}
}
impl ExpressionItem for ExpressionRef {
fn expr(&self) -> &ExpressionRef {
self
}
}
impl ExpressionItem for ProjectionItem {
fn expr(&self) -> &ExpressionRef {
&self.1
}
}
impl<Item: ExpressionItem> StructPatchNode<Item> {
fn to_expr_patch(&self, input_path: Option<ColumnName>) -> ExpressionStructPatch {
let mut field_patches = HashMap::with_capacity(self.fields.len());
for (field_name, state) in &self.fields {
let patch = state.to_expr_field_patch(input_path.as_ref(), field_name);
field_patches.insert(field_name.clone(), patch);
}
ExpressionStructPatch {
field_patches,
prepended_fields: Item::exprs(&self.prepended_fields).collect(),
appended_fields: Item::exprs(&self.appended_fields).collect(),
input_path,
}
}
}
impl<Item: ExpressionItem> FieldPatchNode<Item> {
fn to_expr_field_patch(
&self,
parent_input_path: Option<&ColumnName>,
field_name: &str,
) -> ExpressionFieldPatch {
let mut field_patch = ExpressionFieldPatch::default();
match &self.action {
FieldPatchOp::Keep => {
field_patch.keep_input = true;
}
FieldPatchOp::Drop { optional } => {
field_patch.optional = *optional;
}
FieldPatchOp::Replace(expr) => {
field_patch.insertions.push(expr.expr().clone());
}
FieldPatchOp::Nested(node) => {
let child_input_path = join_prefix(parent_input_path, field_name);
let child_patch = node.to_expr_patch(Some(child_input_path));
let child_patch = Arc::new(Expression::StructPatch(child_patch));
field_patch.insertions.push(child_patch);
}
}
let insert_after = Item::exprs(&self.insert_after);
field_patch.insertions.extend(insert_after);
field_patch
}
}
impl StructPatchBuilder<StructField> {
pub fn build(self, input_schema: &StructType) -> DeltaResult<StructType> {
let (root, _input_path, source_schema) = self.begin_build(input_schema)?;
StructType::try_new(schema_walk(root, source_schema)?)
}
}
type ProjectionItem = (StructField, ExpressionRef);
trait SchemaPatchItem {
fn into_field(self) -> StructField;
fn into_fields(items: Vec<Self>) -> impl Iterator<Item = StructField>
where
Self: Sized,
{
items.into_iter().map(Self::into_field)
}
}
impl SchemaPatchItem for StructField {
fn into_field(self) -> StructField {
self
}
}
impl SchemaPatchItem for ProjectionItem {
fn into_field(self) -> StructField {
self.0
}
}
fn schema_walk<Item: SchemaPatchItem>(
node: StructPatchNode<Item>,
input_schema: &StructType,
) -> DeltaResult<Vec<StructField>> {
let mut fields = node.fields;
let mut output: Vec<_> = Item::into_fields(node.prepended_fields).collect();
output.reserve(input_schema.num_fields() + fields.len());
for input_field in input_schema.fields() {
let field_name = input_field.name();
let field_patch = fields.remove(field_name).unwrap_or_default();
match field_patch.action {
FieldPatchOp::Drop { .. } => {}
FieldPatchOp::Keep => output.push(input_field.clone()),
FieldPatchOp::Replace(item) => output.push(item.into_field()),
FieldPatchOp::Nested(node) => {
let DataType::Struct(nested_schema) = input_field.data_type() else {
return Err(Error::generic(format!(
"Cannot patch nested fields under non-struct field '{}'",
input_field.name()
)));
};
let children = schema_walk(*node, nested_schema)?;
let field = StructField::new(
input_field.name(),
StructType::try_new(children)?,
input_field.nullable,
);
output.push(field.with_metadata(input_field.metadata.clone()));
}
}
output.extend(Item::into_fields(field_patch.insert_after));
}
if let Some((field_name, _)) = fields
.iter()
.find(|(_, state)| !state.action.is_optional_drop())
{
return Err(Error::generic(format!(
"Field to patch does not exist: {field_name}"
)));
}
output.extend(Item::into_fields(node.appended_fields));
Ok(output)
}
#[derive(Debug)]
pub struct ProjectionStructPatchBuilder<'a> {
input_schema: &'a StructType,
inner: StructPatchBuilder<ProjectionItem>,
}
macro_rules! delegate {
($self:ident, $method:ident, $field:expr, $expr:expr $(, $arg:expr)*) => {{
$self.inner = $self.inner.$method($($arg,)* ($field, $expr.into()));
$self
}};
}
impl<'a> ProjectionStructPatchBuilder<'a> {
fn existing_input_field(
&self,
struct_path: &ColumnName,
field_name: &str,
) -> DeltaResult<StructField> {
let field_path: ColumnName = [
self.inner.input_path.clone().unwrap_or_default(),
struct_path.clone(),
ColumnName::new([field_name]),
]
.into_iter()
.collect();
let field = self.input_schema.field_at(&field_path)?;
Ok(field.clone())
}
pub fn new(input_schema: &'a StructType) -> Self {
Self {
input_schema,
inner: StructPatchBuilder::new(),
}
}
pub fn new_nested(input_schema: &'a StructType, path: impl CollectInto<ColumnName>) -> Self {
Self {
input_schema,
inner: StructPatchBuilder::new_nested(path),
}
}
pub fn drop(mut self, field_name: impl Into<String>) -> Self {
self.inner = self.inner.drop(field_name);
self
}
pub fn drop_at(
mut self,
struct_path: impl CollectInto<ColumnName>,
field_name: impl Into<String>,
) -> Self {
self.inner = self.inner.drop_at(struct_path, field_name);
self
}
pub fn drop_if_exists(mut self, field_name: impl Into<String>) -> Self {
self.inner = self.inner.drop_if_exists(field_name);
self
}
pub fn drop_if_exists_at(
mut self,
struct_path: impl CollectInto<ColumnName>,
field_name: impl Into<String>,
) -> Self {
self.inner = self.inner.drop_if_exists_at(struct_path, field_name);
self
}
pub fn replace(
mut self,
field_name: impl Into<String>,
field: StructField,
expr: impl Into<ExpressionRef>,
) -> Self {
delegate!(self, replace, field, expr, field_name)
}
pub fn replace_at(
mut self,
struct_path: impl CollectInto<ColumnName>,
field_name: impl Into<String>,
field: StructField,
expr: impl Into<ExpressionRef>,
) -> Self {
delegate!(self, replace_at, field, expr, struct_path, field_name)
}
pub fn replace_expr(
self,
field_name: impl Into<String>,
expr: impl Into<ExpressionRef>,
) -> Self {
self.replace_expr_at(TOP_LEVEL, field_name, expr)
}
pub fn replace_expr_at(
mut self,
struct_path: impl CollectInto<ColumnName>,
field_name: impl Into<String>,
expr: impl Into<ExpressionRef>,
) -> Self {
if self.inner.error.is_err() {
return self;
}
let struct_path = struct_path.collect_into();
let field_name = field_name.into();
match self.existing_input_field(&struct_path, &field_name) {
Ok(field) => self.replace_at(struct_path, field_name, field, expr),
Err(error) => {
self.inner.error = Err(error);
self
}
}
}
pub fn prepend(mut self, field: StructField, expr: impl Into<ExpressionRef>) -> Self {
delegate!(self, prepend, field, expr)
}
pub fn prepend_at(
mut self,
struct_path: impl CollectInto<ColumnName>,
field: StructField,
expr: impl Into<ExpressionRef>,
) -> Self {
delegate!(self, prepend_at, field, expr, struct_path)
}
pub fn insert_after(
mut self,
field_name: impl Into<String>,
field: StructField,
expr: impl Into<ExpressionRef>,
) -> Self {
delegate!(self, insert_after, field, expr, field_name)
}
pub fn insert_after_at(
mut self,
struct_path: impl CollectInto<ColumnName>,
field_name: impl Into<String>,
field: StructField,
expr: impl Into<ExpressionRef>,
) -> Self {
delegate!(self, insert_after_at, field, expr, struct_path, field_name)
}
pub fn append(mut self, field: StructField, expr: impl Into<ExpressionRef>) -> Self {
delegate!(self, append, field, expr)
}
pub fn append_at(
mut self,
struct_path: impl CollectInto<ColumnName>,
field: StructField,
expr: impl Into<ExpressionRef>,
) -> Self {
delegate!(self, append_at, field, expr, struct_path)
}
pub fn build(self) -> DeltaResult<(SchemaRef, ExpressionRef)> {
let (root, input_path, source_schema) = self.inner.begin_build(self.input_schema)?;
let patch = root.to_expr_patch(input_path);
let schema = StructType::try_new(schema_walk(root, source_schema)?)?;
Ok((Arc::new(schema), Arc::new(Expression::StructPatch(patch))))
}
}
fn join_prefix(prefix: Option<&ColumnName>, name: &str) -> ColumnName {
let leaf = ColumnName::new([name]);
match prefix {
Some(prefix) => prefix.join(&leaf),
None => leaf,
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use rstest::rstest;
use crate::expressions::{
lit, Expression as Expr, ExpressionRef, ExpressionStructPatch, ExpressionStructPatchBuilder,
};
use crate::schema::{DataType, SchemaStructPatchBuilder, StructField, StructType};
use crate::struct_patch::ProjectionStructPatchBuilder;
use crate::utils::test_utils::assert_result_error_with_message;
fn projection_patch(expr: &ExpressionRef) -> &ExpressionStructPatch {
let Expr::StructPatch(patch) = expr.as_ref() else {
panic!("Expected struct patch expression");
};
patch
}
#[test]
fn struct_patch_builder_lowers_nested_paths_to_raw_patches() {
let patch = ExpressionStructPatchBuilder::new()
.drop_at(["add"], "gone")
.replace_at(["add"], "stub", lit("replaced"))
.insert_after_at(["add"], "x", lit(true))
.insert_after("add", lit("after_add"))
.build()
.unwrap();
let add_patch = patch.field_patches.get("add").unwrap();
assert!(!add_patch.keep_input);
assert_eq!(add_patch.insertions.len(), 2);
let Expr::StructPatch(inner) = add_patch.insertions[0].as_ref() else {
panic!("Expected nested struct patch");
};
assert_eq!(
inner.input_path.as_ref().map(|p| p.to_string()).as_deref(),
Some("add")
);
let gone = inner.field_patches.get("gone").unwrap();
assert!(!gone.keep_input);
assert!(gone.insertions.is_empty());
let stub = inner.field_patches.get("stub").unwrap();
assert!(!stub.keep_input);
assert_eq!(stub.insertions, vec![Arc::new(lit("replaced"))]);
let x = inner.field_patches.get("x").unwrap();
assert!(x.keep_input);
assert_eq!(x.insertions, vec![Arc::new(lit(true))]);
assert_eq!(add_patch.insertions[1], Arc::new(lit("after_add")));
}
#[test]
fn struct_patch_builder_allows_empty_root_patches() {
let patch = ExpressionStructPatchBuilder::new().build().unwrap();
assert!(patch.is_empty());
assert!(patch.input_path().is_none());
let nested_patch = ExpressionStructPatchBuilder::new_nested(["nested"])
.build()
.unwrap();
assert!(nested_patch.is_empty());
assert_eq!(
nested_patch
.input_path()
.map(ToString::to_string)
.as_deref(),
Some("nested")
);
}
#[rstest]
#[case::drop_then_replace(
ExpressionStructPatchBuilder::new().drop("a").replace("a", lit(1)),
"multiple input field actions")]
#[case::replace_then_drop(
ExpressionStructPatchBuilder::new().replace("a", lit(1)).drop("a"),
"multiple input field actions")]
#[case::drop_with_nested_insert(
ExpressionStructPatchBuilder::new().drop("add").insert_after_at(["add"], "x", lit(true)),
"nested fields")]
#[case::nested_replace_then_drop(
ExpressionStructPatchBuilder::new().replace_at(["add"], "x", lit("one")).drop_at(["add"], "x"),
"multiple input field actions")]
fn expression_build_rejects_conflicting_field_actions(
#[case] builder: ExpressionStructPatchBuilder,
#[case] expected_msg: &str,
) {
assert_result_error_with_message(builder.build(), expected_msg);
}
fn field(name: impl Into<String>) -> StructField {
StructField::nullable(name, DataType::INTEGER)
}
fn schema(names: &[&str]) -> StructType {
StructType::new_unchecked(names.iter().map(|name| field(*name)).collect::<Vec<_>>())
}
fn field_names(schema: &StructType) -> Vec<String> {
schema
.fields()
.map(|field| field.name().to_string())
.collect()
}
fn nested_input_schema() -> StructType {
StructType::new_unchecked(vec![
StructField::nullable("nested", schema(&["nested_a", "nested_b"])),
field("top"),
])
}
#[rstest]
#[case::empty_patch(SchemaStructPatchBuilder::new())]
#[case::optional_missing_drop(SchemaStructPatchBuilder::new().drop_if_exists("missing"))]
fn schema_build_preserves_input_schema(#[case] builder: SchemaStructPatchBuilder) {
let input_schema = schema(&["a", "b"]);
assert_eq!(builder.build(&input_schema).unwrap(), input_schema);
}
#[rstest]
#[case::empty_nested_path_targets_top_level(
schema(&["a", "b"]),
SchemaStructPatchBuilder::new_nested(Vec::<String>::new()).insert_after("a", field("after_a")),
&["a", "after_a", "b"])]
#[case::inserts_before_and_after(
schema(&["a", "b"]),
SchemaStructPatchBuilder::new().prepend(field("prepended")).insert_after("a", field("after_a")),
&["prepended", "a", "after_a", "b"])]
#[case::appends_after_all_input_fields(
schema(&["a", "b"]),
SchemaStructPatchBuilder::new().append(field("appended_1")).append(field("appended_2")),
&["a", "b", "appended_1", "appended_2"])]
#[case::appends_to_empty_input(
StructType::new_unchecked(Vec::<StructField>::new()),
SchemaStructPatchBuilder::new().append(field("only")),
&["only"])]
#[case::replaces_field_at_input_position(
schema(&["a", "b", "c"]),
SchemaStructPatchBuilder::new().replace("b", field("bb")),
&["a", "bb", "c"])]
#[case::drops_field(
schema(&["a", "b", "c"]),
SchemaStructPatchBuilder::new().drop("b"),
&["a", "c"])]
#[case::preserves_patch_ordering(
schema(&["a", "b", "c"]),
SchemaStructPatchBuilder::new()
.prepend(field("prepended"))
.insert_after("a", field("after_a"))
.replace("b", field("bb"))
.drop("c")
.append(field("appended")),
&["prepended", "a", "after_a", "bb", "appended"])]
fn schema_build_produces_expected_field_order(
#[case] input_schema: StructType,
#[case] builder: SchemaStructPatchBuilder,
#[case] expected_names: &[&str],
) {
let output_schema = builder.build(&input_schema).unwrap();
assert_eq!(field_names(&output_schema), expected_names);
}
#[test]
fn schema_build_nested_path_targets_nested_struct_schema() {
let input_schema = nested_input_schema();
let output_schema = SchemaStructPatchBuilder::new_nested(["nested"])
.insert_after("nested_a", field("nested_inserted"))
.build(&input_schema)
.unwrap();
assert_eq!(
field_names(&output_schema),
["nested_a", "nested_inserted", "nested_b"]
);
}
#[test]
fn schema_build_nested_field_patch_patches_in_place() {
let input_schema = nested_input_schema();
let output_schema = SchemaStructPatchBuilder::new()
.insert_after_at(["nested"], "nested_a", field("nested_inserted"))
.build(&input_schema)
.unwrap();
assert_eq!(field_names(&output_schema), ["nested", "top"]);
let DataType::Struct(nested) = output_schema.field("nested").unwrap().data_type() else {
panic!("Expected nested struct field");
};
assert_eq!(
field_names(nested),
["nested_a", "nested_inserted", "nested_b"]
);
}
#[rstest]
#[case::required_missing(SchemaStructPatchBuilder::new().drop("missing"))]
#[case::required_missing_with_optional_match(
SchemaStructPatchBuilder::new().drop_if_exists("a").drop("missing"))]
fn schema_build_required_missing_field_errors(#[case] builder: SchemaStructPatchBuilder) {
let result = builder.build(&schema(&["a", "b"]));
assert_result_error_with_message(result, "Field to patch does not exist: missing");
}
#[test]
fn projection_sparse_build_produces_schema_and_sparse_patch() {
let input_schema = schema(&["a", "b", "c", "untouched"]);
let (output_schema, patch) = ProjectionStructPatchBuilder::new(&input_schema)
.prepend(field("prepended"), lit(0))
.insert_after("a", field("after_a"), lit(1))
.replace("b", field("bb"), lit(2))
.drop("c")
.append(field("appended"), lit(3))
.build()
.unwrap();
let patch = projection_patch(&patch);
assert_eq!(
field_names(&output_schema),
["prepended", "a", "after_a", "bb", "untouched", "appended"]
);
assert_eq!(patch.prepended_fields, vec![Arc::new(lit(0))]);
assert_eq!(patch.appended_fields, vec![Arc::new(lit(3))]);
assert!(patch.input_path().is_none());
assert!(!patch.field_patches.contains_key("untouched"));
let a = patch.field_patches.get("a").unwrap();
assert!(a.keep_input);
assert_eq!(a.insertions, vec![Arc::new(lit(1))]);
let b = patch.field_patches.get("b").unwrap();
assert!(!b.keep_input);
assert_eq!(b.insertions, vec![Arc::new(lit(2))]);
let c = patch.field_patches.get("c").unwrap();
assert!(!c.keep_input);
assert!(c.insertions.is_empty());
}
#[test]
fn projection_sparse_build_lowers_nested_patch_to_sparse_struct_patch() {
let input_schema = nested_input_schema();
let (output_schema, patch) = ProjectionStructPatchBuilder::new(&input_schema)
.insert_after_at(["nested"], "nested_a", field("nested_inserted"), lit(9))
.build()
.unwrap();
let patch = projection_patch(&patch);
assert_eq!(field_names(&output_schema), ["nested", "top"]);
let DataType::Struct(nested_schema) = output_schema.field("nested").unwrap().data_type()
else {
panic!("Expected nested struct field");
};
assert_eq!(
field_names(nested_schema),
["nested_a", "nested_inserted", "nested_b"]
);
assert!(!patch.field_patches.contains_key("top"));
let nested_patch = patch.field_patches.get("nested").unwrap();
assert!(!nested_patch.keep_input);
assert_eq!(nested_patch.insertions.len(), 1);
let Expr::StructPatch(inner) = nested_patch.insertions[0].as_ref() else {
panic!("Expected nested struct patch");
};
assert_eq!(
inner.input_path().map(ToString::to_string).as_deref(),
Some("nested")
);
assert!(!inner.field_patches.contains_key("nested_b"));
let nested_a = inner.field_patches.get("nested_a").unwrap();
assert!(nested_a.keep_input);
assert_eq!(nested_a.insertions, vec![Arc::new(lit(9))]);
}
#[test]
fn projection_sparse_build_replace_expr_at_preserves_nested_field() {
let input_schema = nested_input_schema();
let (output_schema, patch) = ProjectionStructPatchBuilder::new(&input_schema)
.replace_expr_at(["nested"], "nested_b", lit(9))
.build()
.unwrap();
let patch = projection_patch(&patch);
assert_eq!(field_names(&output_schema), ["nested", "top"]);
let DataType::Struct(output_nested_schema) =
output_schema.field("nested").unwrap().data_type()
else {
panic!("Expected nested struct field");
};
let DataType::Struct(input_nested_schema) =
input_schema.field("nested").unwrap().data_type()
else {
panic!("Expected nested struct field");
};
assert_eq!(
output_nested_schema.field("nested_b"),
input_nested_schema.field("nested_b")
);
let nested_patch = patch.field_patches.get("nested").unwrap();
let Expr::StructPatch(inner) = nested_patch.insertions[0].as_ref() else {
panic!("Expected nested struct patch");
};
let nested_b = inner.field_patches.get("nested_b").unwrap();
assert!(!nested_b.keep_input);
assert_eq!(nested_b.insertions, vec![Arc::new(lit(9))]);
}
#[test]
fn projection_sparse_build_replace_expr_uses_nested_builder_input_path() {
let input_schema = nested_input_schema();
let (output_schema, patch) =
ProjectionStructPatchBuilder::new_nested(&input_schema, ["nested"])
.replace_expr("nested_b", lit(9))
.build()
.unwrap();
let patch = projection_patch(&patch);
assert_eq!(field_names(&output_schema), ["nested_a", "nested_b"]);
assert_eq!(
patch.input_path().map(ToString::to_string).as_deref(),
Some("nested")
);
let nested_b = patch.field_patches.get("nested_b").unwrap();
assert!(!nested_b.keep_input);
assert_eq!(nested_b.insertions, vec![Arc::new(lit(9))]);
}
#[test]
fn projection_sparse_build_required_missing_field_errors() {
let input_schema = schema(&["a", "b"]);
let result = ProjectionStructPatchBuilder::new(&input_schema)
.drop("missing")
.build();
assert_result_error_with_message(result, "Field to patch does not exist: missing");
}
}