buoyant_kernel 0.22.2

Buoyant Data distribution of delta-kernel
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
//! Integration tests for column-mapping-aware write paths (stats, partition values, removes).

use std::collections::HashMap;
use std::sync::Arc;

use buoyant_kernel as delta_kernel;
use delta_kernel::actions::{MAX_VALUES, MIN_VALUES};
use delta_kernel::arrow::array::{
    Array, ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
};
use delta_kernel::arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema};
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioMultiThreadExecutor;
use delta_kernel::engine::default::DefaultEngineBuilder;
use delta_kernel::expressions::{ColumnName, Scalar};
use delta_kernel::object_store::local::LocalFileSystem;
use delta_kernel::object_store::path::Path;
use delta_kernel::object_store::{DynObjectStore, ObjectStoreExt as _};
use delta_kernel::schema::{DataType, StructField, StructType};
use delta_kernel::table_features::{get_any_level_column_physical_name, ColumnMappingMode};
use delta_kernel::transaction::create_table::create_table;
use delta_kernel::{Engine, FileMeta, Snapshot};
use test_utils::{
    add_commit, assert_partition_values, assert_schema_has_field,
    column_mapping_fixtures as fixtures, copy_directory, create_table_and_load_snapshot,
    engine_store_setup, nested_batches, nested_schema, read_actions_from_commit, read_add_infos,
    read_scan, remove_all_and_get_remove_actions, resolve_field, test_table_setup,
    write_batch_to_table,
};
use url::Url;

use crate::common::write_utils::{
    assert_min_max_stats, get_parquet_field_id, resolve_struct_field, set_table_properties,
};

/// 1. Creates a table with the given column mapping mode
/// 2. Writes two batches of data
/// 3. Checkpoints and verifies add.stats uses physical column names in the checkpoint
/// 4. Reads a parquet footer to verify physical names/IDs
/// 5. Reads data back to verify correctness
/// 6. Removes files and verifies remove.stats matches the original add.stats
#[rstest::rstest]
#[case::cm_none(ColumnMappingMode::None)]
#[case::cm_id(ColumnMappingMode::Id)]
#[case::cm_name(ColumnMappingMode::Name)]
#[tokio::test(flavor = "multi_thread")]
async fn test_column_mapping_write(
    #[case] cm_mode: ColumnMappingMode,
) -> Result<(), Box<dyn std::error::Error>> {
    let _ = tracing_subscriber::fmt::try_init();

    let schema = nested_schema()?;

    let (_tmp_dir, table_path, _) = test_table_setup()?;
    let table_url = Url::from_directory_path(&table_path).unwrap();
    let store: Arc<DynObjectStore> = Arc::new(LocalFileSystem::new());
    let engine = Arc::new(
        DefaultEngineBuilder::new(store.clone())
            .with_task_executor(Arc::new(TokioMultiThreadExecutor::new(
                tokio::runtime::Handle::current(),
            )))
            .build(),
    );

    // Step 1: Create table
    let mode_str = match cm_mode {
        ColumnMappingMode::None => "none",
        ColumnMappingMode::Id => "id",
        ColumnMappingMode::Name => "name",
    };
    let mut latest_snapshot = create_table_and_load_snapshot(
        &table_path,
        schema.clone(),
        engine.as_ref(),
        &[("delta.columnMapping.mode", mode_str)],
    )?;

    // Get physical field paths for stats verification (top-level and nested)
    let cm = latest_snapshot
        .table_properties()
        .column_mapping_mode
        .unwrap_or(ColumnMappingMode::None);
    let row_number_physical = get_any_level_column_physical_name(
        latest_snapshot.schema().as_ref(),
        &ColumnName::new(["row_number"]),
        cm,
    )?
    .into_inner();
    let street_physical = get_any_level_column_physical_name(
        latest_snapshot.schema().as_ref(),
        &ColumnName::new(["address", "street"]),
        cm,
    )?
    .into_inner();

    // Step 2: Write two batches
    for data in nested_batches()? {
        latest_snapshot =
            write_batch_to_table(&latest_snapshot, engine.as_ref(), data, HashMap::new()).await?;
    }

    // Enable writeStatsAsStruct so the checkpoint contains native stats_parsed.
    // CREATE TABLE doesn't allow this property yet, so we write a metadata-update commit directly.
    latest_snapshot = set_table_properties(
        &table_path,
        &table_url,
        engine.as_ref(),
        latest_snapshot.version(),
        &[("delta.checkpoint.writeStatsAsStruct", "true")],
    )?;

    // Step 3: Checkpoint and verify add.stats uses correct column names
    let snapshot_for_checkpoint = latest_snapshot.clone();
    snapshot_for_checkpoint.checkpoint(engine.as_ref(), None)?;
    let ckpt_snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
    let add_actions = read_add_infos(&ckpt_snapshot, engine.as_ref())?;
    let mut all_stats: Vec<_> = add_actions
        .iter()
        .filter_map(|a| a.stats.as_ref())
        .filter(|s| s.get(MIN_VALUES).is_some())
        .collect();
    assert_eq!(all_stats.len(), 2, "should have stats for 2 files");
    all_stats.sort_by_key(|s| s[MIN_VALUES][&row_number_physical[0]].as_i64().unwrap());

    // Batch 1: row_number 1..3, address.street "st1".."st3"
    assert_min_max_stats(all_stats[0], &row_number_physical, 1, 3);
    assert_min_max_stats(all_stats[0], &street_physical, "st1", "st3");

    // Batch 2: row_number 4..6, address.street "st4".."st6"
    assert_min_max_stats(all_stats[1], &row_number_physical, 4, 6);
    assert_min_max_stats(all_stats[1], &street_physical, "st4", "st6");

    // Step 3b: Verify stats_parsed in scan metadata uses correct physical column names
    {
        let scan = ckpt_snapshot
            .scan_builder()
            .include_all_stats_columns()
            .build()?;
        let scan_metadata_results: Vec<_> = scan
            .scan_metadata(engine.as_ref())?
            .collect::<Result<Vec<_>, _>>()?;

        let mut stats_rows: Vec<(i64, i64, String, String)> = Vec::new();
        for sm in scan_metadata_results {
            let (data, sel) = sm.scan_files.into_parts();
            let batch: RecordBatch = ArrowEngineData::try_from_engine_data(data)?.into();

            let batch_struct = StructArray::from(batch.clone());
            let stats_parsed: &StructArray =
                resolve_struct_field(&batch_struct, &["stats_parsed".into()]);

            let min_path = |field: &[String]| -> Vec<String> {
                [&["stats_parsed".into(), MIN_VALUES.into()], field].concat()
            };
            let max_path = |field: &[String]| -> Vec<String> {
                [&["stats_parsed".into(), MAX_VALUES.into()], field].concat()
            };
            let min_row_num: &Int64Array =
                resolve_struct_field(&batch_struct, &min_path(&row_number_physical));
            let max_row_num: &Int64Array =
                resolve_struct_field(&batch_struct, &max_path(&row_number_physical));
            let min_st: &StringArray =
                resolve_struct_field(&batch_struct, &min_path(&street_physical));
            let max_st: &StringArray =
                resolve_struct_field(&batch_struct, &max_path(&street_physical));

            for (i, &selected) in sel.iter().enumerate().take(batch.num_rows()) {
                if selected && !stats_parsed.is_null(i) {
                    stats_rows.push((
                        min_row_num.value(i),
                        max_row_num.value(i),
                        min_st.value(i).to_string(),
                        max_st.value(i).to_string(),
                    ));
                }
            }
        }

        stats_rows.sort_by_key(|r| r.0);
        assert_eq!(stats_rows.len(), 2, "should have stats_parsed for 2 files");
        assert_eq!(stats_rows[0], (1, 3, "st1".to_string(), "st3".to_string()));
        assert_eq!(stats_rows[1], (4, 6, "st4".to_string(), "st6".to_string()));
    }

    // Step 4: Read parquet footer to verify physical names and native field_id
    {
        let parquet_path = &add_actions
            .first()
            .expect("should have at least one add file")
            .path;
        let parquet_url = table_url.join(parquet_path)?;
        let local_path = parquet_url.to_file_path().unwrap();

        let obj_meta = store
            .head(&Path::from_url_path(parquet_url.path())?)
            .await?;
        let file_meta = FileMeta::new(
            parquet_url,
            0, /* last_modified */
            obj_meta.size as u64,
        );
        let footer = engine.parquet_handler().read_parquet_footer(&file_meta)?;
        let footer_schema = footer.schema;

        let logical_schema = latest_snapshot.schema();
        for logical_path in [&["row_number"][..], &["address", "street"]] {
            let col = ColumnName::new(logical_path.iter().copied());
            let physical =
                get_any_level_column_physical_name(logical_schema.as_ref(), &col, cm)?.into_inner();
            assert_schema_has_field(&footer_schema, &physical);

            let field_id = get_parquet_field_id(&local_path, &physical);
            let logical_field = resolve_field(logical_schema.as_ref(), logical_path).unwrap();
            match cm_mode {
                ColumnMappingMode::Id | ColumnMappingMode::Name => {
                    let expected_id = logical_field
                        .column_mapping_id()
                        .expect("expected ColumnMappingId number")
                        as i32;
                    assert_eq!(
                        field_id,
                        Some(expected_id),
                        "parquet field_id mismatch for {logical_path:?}"
                    );
                }
                ColumnMappingMode::None => {
                    assert_eq!(
                        field_id, None,
                        "parquet field_id should not be set in None column mapping mode"
                    );
                }
            }
        }
    }

    // Step 5: Read data back to verify correctness
    {
        let post_ckpt_snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
        let scan = post_ckpt_snapshot.scan_builder().build()?;
        let batches: Vec<RecordBatch> = scan
            .execute(engine.clone())?
            .map(|r| {
                let data = r.unwrap();
                let arrow = ArrowEngineData::try_from_engine_data(data).unwrap();
                arrow.record_batch().clone()
            })
            .collect();

        let result_schema = batches[0].schema();
        let combined = delta_kernel::arrow::compute::concat_batches(&result_schema, &batches)?;
        assert_eq!(
            combined.num_rows(),
            6,
            "Should have 6 rows from two written batches"
        );

        // Verify logical column names and data values
        let combined_struct = StructArray::from(combined);

        // Top-level: row_number should contain [1..=6]
        let row_numbers: &Int64Array =
            resolve_struct_field(&combined_struct, &["row_number".into()]);
        let mut vals: Vec<i64> = (0..row_numbers.len())
            .map(|i| row_numbers.value(i))
            .collect();
        vals.sort();
        assert_eq!(vals, vec![1, 2, 3, 4, 5, 6]);

        // Nested: address.street should contain ["st1"..="st6"]
        let streets: &StringArray =
            resolve_struct_field(&combined_struct, &["address".into(), "street".into()]);
        let mut street_vals: Vec<&str> = (0..streets.len()).map(|i| streets.value(i)).collect();
        street_vals.sort();
        assert_eq!(street_vals, vec!["st1", "st2", "st3", "st4", "st5", "st6"]);
    }

    // Step 6: Remove files and verify remove.stats matches original add.stats
    {
        let original_add_stats: Vec<serde_json::Value> =
            add_actions.iter().filter_map(|a| a.stats.clone()).collect();
        assert!(
            !original_add_stats.is_empty(),
            "should have at least one add with stats"
        );

        let remove_actions =
            remove_all_and_get_remove_actions(&latest_snapshot, &table_url, engine.as_ref())?;
        assert!(
            !remove_actions.is_empty(),
            "Expected at least one remove action"
        );

        let remove_stats: Vec<serde_json::Value> = remove_actions
            .iter()
            .filter_map(|r| {
                r["stats"]
                    .as_str()
                    .map(|s| serde_json::from_str(s).unwrap())
            })
            .collect();
        assert_eq!(
            remove_stats, original_add_stats,
            "remove.stats should match original add.stats"
        );
    }

    Ok(())
}

/// Verifies that partitioned writes use physical column names in add.partitionValues.
#[rstest::rstest]
#[case::cm_none("./tests/data/partition_cm/none")]
#[case::cm_id("./tests/data/partition_cm/id")]
#[case::cm_name("./tests/data/partition_cm/name")]
#[tokio::test(flavor = "multi_thread")]
async fn test_column_mapping_partitioned_write(
    #[case] table_dir: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    let _ = tracing_subscriber::fmt::try_init();

    // Copy test data to a temp dir so we can write to it
    let tmp_dir = tempfile::tempdir()?;
    copy_directory(std::path::Path::new(table_dir), tmp_dir.path())?;
    let table_url = Url::from_directory_path(tmp_dir.path()).unwrap();
    let store: Arc<DynObjectStore> = Arc::new(LocalFileSystem::new());
    let engine = Arc::new(
        DefaultEngineBuilder::new(store.clone())
            .with_task_executor(Arc::new(TokioMultiThreadExecutor::new(
                tokio::runtime::Handle::current(),
            )))
            .build(),
    );

    let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
    let cm = snapshot
        .table_properties()
        .column_mapping_mode
        .unwrap_or(ColumnMappingMode::None);
    let physical_name = get_any_level_column_physical_name(
        snapshot.schema().as_ref(),
        &ColumnName::new(["category"]),
        cm,
    )?
    .into_inner()
    .remove(0);

    // Verify physical name for column mapping mode
    if table_dir.ends_with("none") {
        assert_eq!(physical_name, "category");
    } else {
        assert_ne!(
            physical_name, "category",
            "physical name should differ from logical name under column mapping"
        );
    }

    // Write data with partition value
    let data_schema = Arc::new(StructType::try_new(vec![StructField::nullable(
        "value",
        DataType::INTEGER,
    )])?);
    let batch = RecordBatch::try_new(
        Arc::new(data_schema.as_ref().try_into_arrow()?),
        vec![Arc::new(Int32Array::from(vec![1, 2]))],
    )?;
    let partition_values = HashMap::from([("category".to_string(), Scalar::String("A".into()))]);
    write_batch_to_table(&snapshot, engine.as_ref(), batch, partition_values).await?;

    // Read commit log and verify add.partitionValues key uses physical name
    let add_actions = read_actions_from_commit(&table_url, 1, "add")?;
    assert!(!add_actions.is_empty(), "no add action found in commit log");
    for add in &add_actions {
        assert_partition_values(add, &physical_name, "A");
    }

    // Remove the written file and verify remove action preserves physical names
    let post_write_snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
    let remove_actions =
        remove_all_and_get_remove_actions(&post_write_snapshot, &table_url, engine.as_ref())?;
    assert!(
        !remove_actions.is_empty(),
        "no remove action found in commit log"
    );
    for remove in &remove_actions {
        assert_partition_values(remove, &physical_name, "A");
    }

    Ok(())
}

// Two fields with same physical name at different physical paths is valid. Write
// should succeed.
#[rstest::rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_same_phy_name_different_path(
    #[values("name", "id")] cm_mode: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    let (_tmp_dir, table_path, _) = test_table_setup()?;
    let table_url = Url::from_directory_path(&table_path).unwrap();
    let store: Arc<DynObjectStore> = Arc::new(LocalFileSystem::new());
    let engine = Arc::new(
        DefaultEngineBuilder::new(store.clone())
            .with_task_executor(Arc::new(TokioMultiThreadExecutor::new(
                tokio::runtime::Handle::current(),
            )))
            .build(),
    );

    let logical_schema = Arc::new(fixtures::same_leaf_phy_name_under_different_parents());
    let snapshot = create_table(table_url.as_str(), logical_schema.clone(), "Test/1.0")
        .with_table_properties([("delta.columnMapping.mode", cm_mode)])
        .build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
        .commit(engine.as_ref())?
        .unwrap_post_commit_snapshot();

    // Build a RecordBatch matching the logical schema.
    let arrow_schema: ArrowSchema = logical_schema.as_ref().try_into_arrow()?;
    let inner_fields_of =
        |outer: &str| match arrow_schema.field_with_name(outer).unwrap().data_type() {
            ArrowDataType::Struct(fields) => fields.clone(),
            _ => panic!("expected struct for {outer}"),
        };
    let outer1_array = StructArray::new(
        inner_fields_of("outer1"),
        vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
        None,
    );
    let outer2_array = StructArray::new(
        inner_fields_of("outer2"),
        vec![Arc::new(Int32Array::from(vec![100, 200])) as ArrayRef],
        None,
    );
    let batch = RecordBatch::try_new(
        Arc::new(arrow_schema),
        vec![Arc::new(outer1_array), Arc::new(outer2_array)],
    )?;

    let snapshot = write_batch_to_table(&snapshot, engine.as_ref(), batch, HashMap::new()).await?;

    // Scan back and verify the round-tripped rows.
    let scan = snapshot.scan_builder().build()?;
    let batches = read_scan(&scan, engine.clone())?;
    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
    assert_eq!(total_rows, 2, "expected 2 rows back");
    let combined = StructArray::from(batches[0].clone());
    let outer1_a: &Int32Array = resolve_struct_field(&combined, &["outer1".into(), "a".into()]);
    let outer2_a: &Int32Array = resolve_struct_field(&combined, &["outer2".into(), "a".into()]);
    assert_eq!(outer1_a.values(), &[10, 20]);
    assert_eq!(outer2_a.values(), &[100, 200]);
    Ok(())
}

/// A schema with two fields sharing same physical path should be rejected.
#[rstest::rstest]
#[tokio::test]
async fn test_duplicated_phy_path_rejected(
    #[values("name", "id")] cm_mode: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    let (store, engine, table_url) = engine_store_setup("dup_phys_path", None);
    let schema = fixtures::nested_field_with_same_phy_path();
    let schema_json = serde_json::to_string(&schema)?;
    let escaped = serde_json::to_string(&schema_json)?;
    // Create a v0 commit in a hack way to bypass create_table validation.
    let v0 = format!(
        r#"{{"protocol":{{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["columnMapping"],"writerFeatures":["columnMapping"]}}}}
{{"metaData":{{"id":"test-id","format":{{"provider":"parquet","options":{{}}}},"schemaString":{escaped},"partitionColumns":[],"configuration":{{"delta.columnMapping.mode":"{cm_mode}","delta.columnMapping.maxColumnId":"4"}},"createdTime":1700000000000}}}}
"#
    );
    add_commit(table_url.as_str(), store.as_ref(), 0, v0).await?;

    let msg = Snapshot::builder_for(table_url)
        .build(&engine)
        .expect_err("dup physicalName must be rejected at snapshot load")
        .to_string();
    assert!(
        msg.contains("Duplicate `delta.columnMapping.physicalName`")
            && msg.contains(".a'")
            && msg.contains(".b'"),
        "expected path-aware dedup error naming colliding leaves, got: {msg}"
    );
    Ok(())
}