buoyant_kernel 0.21.103

Buoyant Data distribution of delta-kernel
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
use std::sync::Arc;

use super::Transaction;
use crate::actions::{get_log_commit_info_schema, CommitInfo, COMMIT_INFO_NAME};
use crate::expressions::{MapData, Scalar, Transform};
use crate::schema::{MapType, StructField, StructType, ToSchema};
use crate::{DataType, Engine, EngineData, Error, Expression, ExpressionRef, IntoEngineData};

/// Builds a list of `(field_name, literal_expression)` pairs covering every [`CommitInfo`]
/// field. Field names match the camelCase schema names produced by the `ToSchema` derive macro.
/// The returned vec preserves CommitInfo schema field order, which callers rely on when
/// inserting kernel-only fields after the last engine field.
fn commit_info_literal_exprs(
    commit_info: CommitInfo,
) -> Result<Vec<(&'static str, ExpressionRef)>, Error> {
    let op_params_map_type = MapType::new(DataType::STRING, DataType::STRING, true);
    let literal_exprs = vec![
        (
            "timestamp",
            Arc::new(Expression::literal(commit_info.timestamp)),
        ),
        (
            "inCommitTimestamp",
            Arc::new(Expression::literal(commit_info.in_commit_timestamp)),
        ),
        (
            "operation",
            Arc::new(Expression::literal(commit_info.operation)),
        ),
        (
            "operationParameters",
            Arc::new(Expression::literal(
                match commit_info.operation_parameters {
                    Some(map) => Scalar::Map(MapData::try_new(
                        op_params_map_type,
                        map.into_iter()
                            .map(|(k, v)| (Scalar::String(k), Scalar::String(v))),
                    )?),
                    None => Scalar::Null(DataType::Map(Box::new(op_params_map_type))),
                },
            )),
        ),
        (
            "kernelVersion",
            Arc::new(Expression::literal(commit_info.kernel_version)),
        ),
        (
            "isBlindAppend",
            Arc::new(Expression::literal(commit_info.is_blind_append)),
        ),
        (
            "engineInfo",
            Arc::new(Expression::literal(commit_info.engine_info)),
        ),
        ("txnId", Arc::new(Expression::literal(commit_info.txn_id))),
    ];
    let expected_expr_len = CommitInfo::to_schema().fields().len();
    if literal_exprs.len() != expected_expr_len {
        return Err(Error::Generic(format!("expect the commit_info_literal_exprs return {expected_expr_len} expressions, but only get {} expressions. \
            If CommitInfo field was added/removed, please update Expression::Literal in this function and update the with_commit_info doc comment", literal_exprs.len())));
    }
    Ok(literal_exprs)
}

impl<S> Transaction<S> {
    pub(super) fn generate_commit_info(
        &self,
        engine: &dyn Engine,
        kernel_commit_info: CommitInfo,
    ) -> Result<Box<dyn EngineData>, Error> {
        match &self.engine_commit_info {
            Some((engine_commit_info, engine_commit_info_schema)) => {
                let kernel_schema = CommitInfo::to_schema();

                // Step 1: Build output schema - all engine fields first, then any kernel-only
                // fields that are not already present in the engine schema appended at the end.
                let output_fields: Vec<_> = engine_commit_info_schema
                    .fields()
                    .map(|field| kernel_schema.field(field.name()).unwrap_or(field))
                    .cloned()
                    .chain(
                        kernel_schema
                            .fields()
                            .filter(|field| !engine_commit_info_schema.contains(field.name()))
                            .cloned(),
                    )
                    .collect();

                let output_schema = StructType::new_unchecked(output_fields);

                // Step 2: Build literal expressions for each CommitInfo field.
                let literal_exprs = commit_info_literal_exprs(kernel_commit_info)?;

                // Step 3: Build Transform. Replacements must be registered before insertions so
                // that for the last engine field (which may itself be replaced), exprs is ordered
                // as [replace_expr, insert_exprs...]. The evaluator emits exprs in declaration
                // order, so the replace value must come first.
                let last_engine_field = engine_commit_info_schema.field_names().last().cloned();
                let mut transform = Transform::new_top_level();

                // First pass: replace fields that already exist in the engine schema.
                for (field_name, expr_ref) in &literal_exprs {
                    if engine_commit_info_schema.contains(*field_name) {
                        transform = transform.with_replaced_field(*field_name, expr_ref.clone());
                    }
                }
                // Second pass: append kernel-only fields after the last engine field.
                for (field_name, expr_ref) in &literal_exprs {
                    if !engine_commit_info_schema.contains(*field_name) {
                        transform = transform
                            .with_inserted_field(last_engine_field.as_deref(), expr_ref.clone());
                    }
                }

                // Step 4: Wrap the transform in a struct expression so the output matches the
                // Delta log action format `{ "commitInfo": { merged fields... } }`, consistent
                // with the None branch which uses `get_log_commit_info_schema()`.
                let wrapped_expr =
                    Expression::struct_from([Arc::new(Expression::transform(transform))]);
                let wrapped_schema = Arc::new(StructType::new_unchecked([StructField::nullable(
                    COMMIT_INFO_NAME,
                    output_schema,
                )]));
                let evaluator = engine.evaluation_handler().new_expression_evaluator(
                    engine_commit_info_schema.clone(),
                    Arc::new(wrapped_expr),
                    wrapped_schema.into(),
                )?;
                evaluator.evaluate(engine_commit_info.as_ref())
            }
            None => {
                kernel_commit_info.into_engine_data(get_log_commit_info_schema().clone(), engine)
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use crate::actions::CommitInfo;
    use crate::arrow::array::{
        Array, ArrayRef, BooleanArray, Int64Array, MapArray, MapBuilder, StringArray,
        StringBuilder, StructArray,
    };
    use crate::arrow::datatypes::{
        DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
    };
    use crate::arrow::record_batch::RecordBatch;
    use crate::committer::FileSystemCommitter;
    use crate::engine::arrow_conversion::TryIntoKernel;
    use crate::engine::arrow_data::ArrowEngineData;
    use crate::schema::{Schema, SchemaRef, StructField, StructType, ToSchema};
    use crate::transaction::Transaction;
    use crate::utils::test_utils::load_test_table;
    use crate::{DeltaResult, Engine, EngineData};

    // ── build_commit_info tests ────────────────────────────────────────────────

    /// Helper: create a kernel `CommitInfo` that mirrors what `Transaction::commit` produces.
    fn make_kernel_commit_info() -> CommitInfo {
        CommitInfo::new(
            1_700_000_000_000i64,
            Some(134_000_000i64),
            Some("WRITE".to_string()),
            Some("test_engine/1.0".to_string()),
            false,
        )
    }

    /// Helper: build an Arrow RecordBatch + kernel SchemaRef for use as engine_commit_info.
    fn make_engine_commit_info(
        arrow_fields: Vec<ArrowField>,
        columns: Vec<ArrayRef>,
    ) -> (Box<dyn EngineData>, SchemaRef) {
        let arrow_schema = ArrowSchema::new(arrow_fields);
        let kernel_schema: Schema = arrow_schema.as_ref().try_into_kernel().unwrap();
        let batch =
            RecordBatch::try_new(Arc::new(arrow_schema), columns).expect("valid RecordBatch");
        (
            Box::new(ArrowEngineData::new(batch)),
            Arc::new(kernel_schema),
        )
    }

    /// Helper: extract the inner "commitInfo" StructArray from a top-level RecordBatch.
    /// Both branches of `build_commit_info` produce `{ "commitInfo": { ... } }`.
    fn commit_info_struct(result: &ArrowEngineData) -> &StructArray {
        let batch = result.record_batch();
        assert_eq!(
            batch.num_columns(),
            1,
            "expected single 'commitInfo' column"
        );
        assert_eq!(batch.schema().field(0).name(), "commitInfo");
        batch
            .column(0)
            .as_any()
            .downcast_ref::<StructArray>()
            .expect("commitInfo column should be a StructArray")
    }

    /// Helper: pull a non-null string value from a named column in a StructArray.
    fn get_str<'a>(s: &'a StructArray, col: &str) -> &'a str {
        s.column_by_name(col)
            .unwrap_or_else(|| panic!("field '{col}' not found"))
            .as_any()
            .downcast_ref::<StringArray>()
            .unwrap_or_else(|| panic!("field '{col}' is not a StringArray"))
            .value(0)
    }

    /// Helper: pull a non-null i64 value from a named column in a StructArray.
    fn get_i64(s: &StructArray, col: &str) -> i64 {
        s.column_by_name(col)
            .unwrap_or_else(|| panic!("field '{col}' not found"))
            .as_any()
            .downcast_ref::<Int64Array>()
            .unwrap_or_else(|| panic!("field '{col}' is not an Int64Array"))
            .value(0)
    }

    /// Helper: pull the map value at row 0 from a named MapArray column in a StructArray.
    /// Returns the key-value pairs as a StructArray.
    fn get_map(s: &StructArray, col: &str) -> StructArray {
        s.column_by_name(col)
            .unwrap_or_else(|| panic!("field '{col}' not found"))
            .as_any()
            .downcast_ref::<MapArray>()
            .unwrap_or_else(|| panic!("field '{col}' is not a MapArray"))
            .value(0)
    }

    /// Helper: pull a non-null boolean value from a named column in a StructArray.
    fn get_bool(s: &StructArray, col: &str) -> bool {
        s.column_by_name(col)
            .unwrap_or_else(|| panic!("field '{col}' not found"))
            .as_any()
            .downcast_ref::<BooleanArray>()
            .unwrap_or_else(|| panic!("field '{col}' is not an Int64Array"))
            .value(0)
    }

    /// Create a transaction with the given engine_commit_info, using the shared test table.
    fn make_txn(
        engine_commit_info: Option<(Box<dyn EngineData>, SchemaRef)>,
    ) -> DeltaResult<(Arc<dyn Engine>, Transaction)> {
        let (engine, snapshot, _tempdir) = load_test_table("table-without-dv-small")?;
        let mut txn = snapshot
            .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
            .with_operation("WRITE".to_string());
        if let Some((engine_commit_info_data, engine_commit_info_schema)) = engine_commit_info {
            txn = txn.with_commit_info(engine_commit_info_data, engine_commit_info_schema);
        }
        Ok((engine, txn))
    }

    /// no engine_commit_info -- output is the kernel CommitInfo wrapped in a "commitInfo"
    /// outer struct, matching the Delta log action format produced by `get_log_commit_info_schema`.
    #[test]
    fn test_build_commit_info_none_branch() -> DeltaResult<()> {
        let (engine, txn) = make_txn(None)?;
        let result = ArrowEngineData::try_from_engine_data(
            txn.generate_commit_info(engine.as_ref(), make_kernel_commit_info())?,
        )?;
        let ci = commit_info_struct(&result);

        let kernel_schema = CommitInfo::to_schema();
        assert_eq!(ci.num_columns(), kernel_schema.fields().count());
        assert_eq!(get_str(ci, "operation"), "WRITE");
        assert!(!get_str(ci, "kernelVersion").is_empty());
        assert!(!get_str(ci, "txnId").is_empty());
        Ok(())
    }

    /// engine schema has fields that are fully disjoint from CommitInfo -- all CommitInfo
    /// fields are appended after the engine-only fields, in CommitInfo schema order.
    #[test]
    fn test_build_commit_info_disjoint_schemas() -> DeltaResult<()> {
        let (data, schema) = make_engine_commit_info(
            vec![
                ArrowField::new("customApp", ArrowDataType::Utf8, false),
                ArrowField::new("customVersion", ArrowDataType::Int64, false),
            ],
            vec![
                Arc::new(StringArray::from(vec!["myApp"])) as ArrayRef,
                Arc::new(Int64Array::from(vec![42i64])) as ArrayRef,
            ],
        );
        let (engine, txn) = make_txn(Some((data, schema)))?;

        let result = ArrowEngineData::try_from_engine_data(
            txn.generate_commit_info(engine.as_ref(), make_kernel_commit_info())?,
        )?;
        let commit_info = commit_info_struct(&result);

        // All CommitInfo fields are appended -- total = 2 engine + 8 CommitInfo.
        assert_eq!(
            commit_info.num_columns(),
            2 + CommitInfo::to_schema().fields().count()
        );

        // Engine fields are first and their values pass through unchanged.
        assert_eq!(commit_info.fields()[0].name(), "customApp");
        assert_eq!(commit_info.fields()[1].name(), "customVersion");
        assert_eq!(get_str(commit_info, "customApp"), "myApp");
        assert_eq!(get_i64(commit_info, "customVersion"), 42);

        assert_eq!(get_str(commit_info, "operation"), "WRITE");
        assert!(!get_str(commit_info, "kernelVersion").is_empty());
        assert!(get_map(commit_info, "operationParameters").len() == 0);
        assert!(uuid::Uuid::parse_str(get_str(commit_info, "txnId")).is_ok());
        assert!(get_i64(commit_info, "timestamp") > 0);
        assert_eq!(get_i64(commit_info, "inCommitTimestamp"), 134_000_000);
        assert_eq!(get_str(commit_info, "engineInfo"), "test_engine/1.0");
        assert!(!get_bool(commit_info, "isBlindAppend"));

        Ok(())
    }

    /// engine schema contains every kernel's CommitInfo field.
    /// All overlapping fields must be replaced by kernel values, no new fields added.
    #[test]
    fn test_build_commit_info_full_overlap() -> DeltaResult<()> {
        let mut map_builder = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());
        map_builder.keys().append_value("stale_key");
        map_builder.values().append_value("stale_value");
        map_builder.append(true).unwrap();
        let stale_op_params = Arc::new(map_builder.finish()) as ArrayRef;

        let (data, schema) = make_engine_commit_info(
            vec![
                ArrowField::new("timestamp", ArrowDataType::Int64, true),
                ArrowField::new("inCommitTimestamp", ArrowDataType::Int64, true),
                ArrowField::new("operation", ArrowDataType::Utf8, true),
                ArrowField::new(
                    "operationParameters",
                    stale_op_params.data_type().clone(),
                    true,
                ),
                ArrowField::new("kernelVersion", ArrowDataType::Utf8, true),
                ArrowField::new("isBlindAppend", ArrowDataType::Boolean, true),
                ArrowField::new("engineInfo", ArrowDataType::Utf8, true),
                ArrowField::new("txnId", ArrowDataType::Utf8, true),
            ],
            vec![
                Arc::new(Int64Array::from(vec![Some(0i64)])) as ArrayRef,
                Arc::new(Int64Array::from(vec![None::<i64>])) as ArrayRef,
                Arc::new(StringArray::from(vec!["STALE_OP"])) as ArrayRef,
                stale_op_params,
                Arc::new(StringArray::from(vec!["v0.0.0"])) as ArrayRef,
                Arc::new(BooleanArray::from(vec![None::<bool>])) as ArrayRef,
                Arc::new(StringArray::from(vec!["stale_engine"])) as ArrayRef,
                Arc::new(StringArray::from(vec!["stale_txn"])) as ArrayRef,
            ],
        );
        let (engine, txn) = make_txn(Some((data, schema)))?;

        let result = ArrowEngineData::try_from_engine_data(
            txn.generate_commit_info(engine.as_ref(), make_kernel_commit_info())?,
        )?;
        let commit_info = commit_info_struct(&result);

        // All 8 CommitInfo fields are present in the engine schema -- no fields appended.
        assert_eq!(commit_info.num_columns(), 8);

        assert_eq!(get_str(commit_info, "operation"), "WRITE");
        assert!(!get_str(commit_info, "kernelVersion").is_empty());
        assert_eq!(get_map(commit_info, "operationParameters").len(), 0);
        assert!(uuid::Uuid::parse_str(get_str(commit_info, "txnId")).is_ok());
        assert!(get_i64(commit_info, "timestamp") > 0);
        assert_eq!(get_i64(commit_info, "inCommitTimestamp"), 134_000_000);
        assert_eq!(get_str(commit_info, "engineInfo"), "test_engine/1.0");
        assert!(!get_bool(commit_info, "isBlindAppend"));

        Ok(())
    }

    /// engine schema has partial overlap -- overlapping fields are replaced, engine-only
    /// fields pass through, and remaining CommitInfo fields are appended after the last engine
    /// field.
    #[test]
    fn test_build_commit_info_partial_overlap() -> DeltaResult<()> {
        let (data, schema) = make_engine_commit_info(
            vec![
                ArrowField::new("timestamp", ArrowDataType::Int64, true),
                ArrowField::new("operation", ArrowDataType::Utf8, true),
                ArrowField::new("myCustomField", ArrowDataType::Utf8, false),
            ],
            vec![
                Arc::new(Int64Array::from(vec![Some(0i64)])) as ArrayRef,
                Arc::new(StringArray::from(vec!["STALE_OP"])) as ArrayRef,
                Arc::new(StringArray::from(vec!["keep_me"])) as ArrayRef,
            ],
        );
        let (engine, txn) = make_txn(Some((data, schema)))?;

        let result = ArrowEngineData::try_from_engine_data(
            txn.generate_commit_info(engine.as_ref(), make_kernel_commit_info())?,
        )?;
        let ci = commit_info_struct(&result);

        // Engine-only field passes through unchanged.
        assert_eq!(get_str(ci, "myCustomField"), "keep_me");

        // Overlapping fields are replaced with kernel values.
        assert_ne!(get_str(ci, "operation"), "STALE_OP");
        assert_eq!(get_str(ci, "operation"), "WRITE");

        // Engine fields keep their original schema positions (first 3 columns).
        assert_eq!(ci.fields()[0].name(), "timestamp");
        assert_eq!(ci.fields()[1].name(), "operation");
        assert_eq!(ci.fields()[2].name(), "myCustomField");

        // Remaining CommitInfo fields (6 not in engine schema) are appended after myCustomField.
        // Total = 3 engine fields + 6 kernel-only fields.
        assert_eq!(
            ci.num_columns(),
            3 + CommitInfo::to_schema().fields().count() - 2
        );
        Ok(())
    }

    /// engine schema has overlapping fields with different DataTypes than kernel expects.
    /// Kernel replacement must win, so each output field has the kernel's type.
    #[test]
    fn test_build_commit_info_type_conflict_replaced_by_kernel() -> DeltaResult<()> {
        let (data, schema) = make_engine_commit_info(
            vec![
                ArrowField::new("timestamp", ArrowDataType::Utf8, true),
                ArrowField::new("inCommitTimestamp", ArrowDataType::Utf8, true),
                ArrowField::new("operation", ArrowDataType::Int64, true),
                ArrowField::new("isBlindAppend", ArrowDataType::Utf8, true),
                ArrowField::new("myCustomField", ArrowDataType::Utf8, false),
            ],
            vec![
                Arc::new(StringArray::from(vec!["not-a-timestamp"])) as ArrayRef,
                Arc::new(StringArray::from(vec!["not-a-timestamp"])) as ArrayRef,
                Arc::new(Int64Array::from(vec![0i64])) as ArrayRef,
                Arc::new(StringArray::from(vec!["not-a-bool"])) as ArrayRef,
                Arc::new(StringArray::from(vec!["keep_me"])) as ArrayRef,
            ],
        );
        let (engine, txn) = make_txn(Some((data, schema)))?;

        let result = ArrowEngineData::try_from_engine_data(
            txn.generate_commit_info(engine.as_ref(), make_kernel_commit_info())?,
        )?;
        let ci = commit_info_struct(&result);

        // Each kernel-owned field has the kernel's type, not the engine's.
        let field_type = |name: &str| {
            ci.fields()
                .iter()
                .find(|f| f.name() == name)
                .unwrap_or_else(|| panic!("field '{name}' must be present"))
                .data_type()
                .clone()
        };
        assert_eq!(field_type("timestamp"), ArrowDataType::Int64);
        assert_eq!(field_type("inCommitTimestamp"), ArrowDataType::Int64);
        assert_eq!(field_type("operation"), ArrowDataType::Utf8);
        assert_eq!(field_type("isBlindAppend"), ArrowDataType::Boolean);

        // Engine-only field passes through with its original type and value unchanged.
        assert_eq!(field_type("myCustomField"), ArrowDataType::Utf8);
        assert_eq!(get_str(ci, "myCustomField"), "keep_me");
        Ok(())
    }

    /// engine schema is empty -- all CommitInfo fields are prepended (which, with no engine
    /// fields preceding them, is equivalent to producing the full CommitInfo schema).
    #[test]
    fn test_build_commit_info_empty_engine_schema() -> DeltaResult<()> {
        // A 0-row, 0-column RecordBatch with an empty kernel schema.
        let empty_batch = RecordBatch::new_empty(Arc::new(ArrowSchema::empty()));
        let empty_schema = Arc::new(StructType::new_unchecked(Vec::<StructField>::new()));
        let (engine, txn) = make_txn(Some((
            Box::new(ArrowEngineData::new(empty_batch)),
            empty_schema,
        )))?;

        let result = ArrowEngineData::try_from_engine_data(
            txn.generate_commit_info(engine.as_ref(), make_kernel_commit_info())?,
        )?;
        let ci = commit_info_struct(&result);

        // With no engine fields, the inner schema matches CommitInfo::to_schema().
        let kernel_schema = CommitInfo::to_schema();
        assert_eq!(ci.num_columns(), kernel_schema.fields().count());

        // Column order matches CommitInfo schema field order.
        for (i, field) in kernel_schema.fields().enumerate() {
            assert_eq!(ci.fields()[i].name(), field.name());
        }
        Ok(())
    }
}