Skip to main content

lance_encoding/encodings/logical/
blob.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, sync::Arc};
5
6use arrow_array::{
7    builder::{PrimitiveBuilder, StringBuilder},
8    cast::AsArray,
9    types::{UInt32Type, UInt64Type, UInt8Type},
10    Array, ArrayRef, StructArray, UInt64Array,
11};
12use arrow_buffer::Buffer;
13use arrow_schema::{DataType, Field as ArrowField, Fields};
14use futures::{future::BoxFuture, FutureExt};
15use lance_core::{
16    datatypes::Field, datatypes::BLOB_V2_DESC_FIELDS, error::LanceOptionExt, Error, Result,
17};
18use snafu::location;
19
20use crate::{
21    buffer::LanceBuffer,
22    constants::PACKED_STRUCT_META_KEY,
23    decoder::PageEncoding,
24    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
25    encodings::logical::primitive::PrimitiveStructuralEncoder,
26    format::ProtobufUtils21,
27    repdef::{DefinitionInterpretation, RepDefBuilder},
28};
29use lance_core::datatypes::BlobKind;
30
31/// Blob structural encoder - stores large binary data in external buffers
32///
33/// This encoder takes large binary arrays and stores them outside the normal
34/// page structure. It creates a descriptor (position, size) for each blob
35/// that is stored inline in the page.
36pub struct BlobStructuralEncoder {
37    // Encoder for the descriptors (position/size struct)
38    descriptor_encoder: Box<dyn FieldEncoder>,
39    // Set when we first see data
40    def_meaning: Option<Arc<[DefinitionInterpretation]>>,
41}
42
43impl BlobStructuralEncoder {
44    pub fn new(
45        field: &Field,
46        column_index: u32,
47        options: &crate::encoder::EncodingOptions,
48        compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
49    ) -> Result<Self> {
50        // Create descriptor field: struct<position: u64, size: u64>
51        // Preserve the original field's metadata for packed struct
52        let mut descriptor_metadata = HashMap::with_capacity(1);
53        descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
54
55        let descriptor_data_type = DataType::Struct(Fields::from(vec![
56            ArrowField::new("position", DataType::UInt64, false),
57            ArrowField::new("size", DataType::UInt64, false),
58        ]));
59
60        // Use the original field's name for the descriptor
61        let descriptor_field = Field::try_from(
62            ArrowField::new(&field.name, descriptor_data_type, field.nullable)
63                .with_metadata(descriptor_metadata),
64        )?;
65
66        // Use PrimitiveStructuralEncoder to handle the descriptor
67        let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
68            options,
69            compression_strategy,
70            column_index,
71            descriptor_field,
72            Arc::new(HashMap::new()),
73        )?);
74
75        Ok(Self {
76            descriptor_encoder,
77            def_meaning: None,
78        })
79    }
80
81    fn wrap_tasks(
82        tasks: Vec<EncodeTask>,
83        def_meaning: Arc<[DefinitionInterpretation]>,
84    ) -> Vec<EncodeTask> {
85        tasks
86            .into_iter()
87            .map(|task| {
88                let def_meaning = def_meaning.clone();
89                task.then(|encoded_page| async move {
90                    let encoded_page = encoded_page?;
91
92                    let PageEncoding::Structural(inner_layout) = encoded_page.description else {
93                        return Err(Error::Internal {
94                            message: "Expected inner encoding to return structural layout"
95                                .to_string(),
96                            location: location!(),
97                        });
98                    };
99
100                    let wrapped = ProtobufUtils21::blob_layout(inner_layout, &def_meaning);
101                    Ok(EncodedPage {
102                        column_idx: encoded_page.column_idx,
103                        data: encoded_page.data,
104                        description: PageEncoding::Structural(wrapped),
105                        num_rows: encoded_page.num_rows,
106                        row_number: encoded_page.row_number,
107                    })
108                })
109                .boxed()
110            })
111            .collect::<Vec<_>>()
112    }
113}
114
115impl FieldEncoder for BlobStructuralEncoder {
116    fn maybe_encode(
117        &mut self,
118        array: ArrayRef,
119        external_buffers: &mut OutOfLineBuffers,
120        mut repdef: RepDefBuilder,
121        row_number: u64,
122        num_rows: u64,
123    ) -> Result<Vec<EncodeTask>> {
124        if let Some(validity) = array.nulls() {
125            repdef.add_validity_bitmap(validity.clone());
126        } else {
127            repdef.add_no_null(array.len());
128        }
129
130        // Convert input array to LargeBinary
131        let binary_array = array
132            .as_binary_opt::<i64>()
133            .ok_or_else(|| Error::InvalidInput {
134                source: format!("Expected LargeBinary array, got {}", array.data_type()).into(),
135                location: location!(),
136            })?;
137
138        let repdef = RepDefBuilder::serialize(vec![repdef]);
139
140        let rep = repdef.repetition_levels.as_ref();
141        let def = repdef.definition_levels.as_ref();
142        let def_meaning: Arc<[DefinitionInterpretation]> = repdef.def_meaning.into();
143
144        if self.def_meaning.is_none() {
145            self.def_meaning = Some(def_meaning.clone());
146        } else {
147            debug_assert_eq!(self.def_meaning.as_ref().unwrap(), &def_meaning);
148        }
149
150        // Collect positions and sizes
151        let mut positions = Vec::with_capacity(binary_array.len());
152        let mut sizes = Vec::with_capacity(binary_array.len());
153
154        for i in 0..binary_array.len() {
155            if binary_array.is_null(i) {
156                // Null values are smuggled into the positions array
157
158                // If we have null values we must have definition levels
159                let mut repdef = (def.expect_ok()?[i] as u64) << 16;
160                if let Some(rep) = rep {
161                    repdef += rep[i] as u64;
162                }
163
164                debug_assert_ne!(repdef, 0);
165                positions.push(repdef);
166                sizes.push(0);
167            } else {
168                let value = binary_array.value(i);
169                if value.is_empty() {
170                    // Empty values
171                    positions.push(0);
172                    sizes.push(0);
173                } else {
174                    // Add data to external buffers
175                    let position =
176                        external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
177                    positions.push(position);
178                    sizes.push(value.len() as u64);
179                }
180            }
181        }
182
183        // Create descriptor array
184        let position_array = Arc::new(UInt64Array::from(positions));
185        let size_array = Arc::new(UInt64Array::from(sizes));
186        let descriptor_array = Arc::new(StructArray::new(
187            Fields::from(vec![
188                ArrowField::new("position", DataType::UInt64, false),
189                ArrowField::new("size", DataType::UInt64, false),
190            ]),
191            vec![position_array as ArrayRef, size_array as ArrayRef],
192            None, // Descriptors are never null
193        ));
194
195        // Delegate to descriptor encoder
196        let encode_tasks = self.descriptor_encoder.maybe_encode(
197            descriptor_array,
198            external_buffers,
199            RepDefBuilder::default(),
200            row_number,
201            num_rows,
202        )?;
203
204        Ok(Self::wrap_tasks(encode_tasks, def_meaning))
205    }
206
207    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
208        let encode_tasks = self.descriptor_encoder.flush(external_buffers)?;
209
210        // Use the cached def meaning.  If we haven't seen any data yet then we can just use a dummy
211        // value (not clear there would be any encode tasks in that case)
212        let def_meaning = self
213            .def_meaning
214            .clone()
215            .unwrap_or_else(|| Arc::new([DefinitionInterpretation::AllValidItem]));
216
217        Ok(Self::wrap_tasks(encode_tasks, def_meaning))
218    }
219
220    fn finish(
221        &mut self,
222        external_buffers: &mut OutOfLineBuffers,
223    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
224        self.descriptor_encoder.finish(external_buffers)
225    }
226
227    fn num_columns(&self) -> u32 {
228        self.descriptor_encoder.num_columns()
229    }
230}
231
232/// Blob v2 structural encoder
233pub struct BlobV2StructuralEncoder {
234    descriptor_encoder: Box<dyn FieldEncoder>,
235}
236
237impl BlobV2StructuralEncoder {
238    pub fn new(
239        field: &Field,
240        column_index: u32,
241        options: &crate::encoder::EncodingOptions,
242        compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
243    ) -> Result<Self> {
244        let mut descriptor_metadata = HashMap::with_capacity(1);
245        descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
246
247        let descriptor_data_type = DataType::Struct(BLOB_V2_DESC_FIELDS.clone());
248
249        let descriptor_field = Field::try_from(
250            ArrowField::new(&field.name, descriptor_data_type, field.nullable)
251                .with_metadata(descriptor_metadata),
252        )?;
253
254        let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
255            options,
256            compression_strategy,
257            column_index,
258            descriptor_field,
259            Arc::new(HashMap::new()),
260        )?);
261
262        Ok(Self { descriptor_encoder })
263    }
264}
265
266impl FieldEncoder for BlobV2StructuralEncoder {
267    fn maybe_encode(
268        &mut self,
269        array: ArrayRef,
270        external_buffers: &mut OutOfLineBuffers,
271        mut repdef: RepDefBuilder,
272        row_number: u64,
273        num_rows: u64,
274    ) -> Result<Vec<EncodeTask>> {
275        let struct_arr = array.as_struct();
276        if let Some(validity) = struct_arr.nulls() {
277            repdef.add_validity_bitmap(validity.clone());
278        } else {
279            repdef.add_no_null(struct_arr.len());
280        }
281
282        let kind_col = struct_arr
283            .column_by_name("kind")
284            .expect("kind column must exist")
285            .as_primitive::<UInt8Type>();
286        let data_col = struct_arr
287            .column_by_name("data")
288            .expect("data column must exist")
289            .as_binary::<i64>();
290        let uri_col = struct_arr
291            .column_by_name("uri")
292            .expect("uri column must exist")
293            .as_string::<i32>();
294        let blob_id_col = struct_arr
295            .column_by_name("blob_id")
296            .expect("blob_id column must exist")
297            .as_primitive::<UInt32Type>();
298        let blob_size_col = struct_arr
299            .column_by_name("blob_size")
300            .expect("blob_size column must exist")
301            .as_primitive::<UInt64Type>();
302        let packed_position_col = struct_arr
303            .column_by_name("position")
304            .expect("position column must exist")
305            .as_primitive::<UInt64Type>();
306
307        let row_count = struct_arr.len();
308
309        let mut kind_builder = PrimitiveBuilder::<UInt8Type>::with_capacity(row_count);
310        let mut position_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(row_count);
311        let mut size_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(row_count);
312        let mut blob_id_builder = PrimitiveBuilder::<UInt32Type>::with_capacity(row_count);
313        let mut uri_builder = StringBuilder::with_capacity(row_count, row_count * 16);
314
315        for i in 0..row_count {
316            let (kind_value, position_value, size_value, blob_id_value, uri_value) =
317                if struct_arr.is_null(i) || kind_col.is_null(i) {
318                    (BlobKind::Inline as u8, 0, 0, 0, "".to_string())
319                } else {
320                    let kind_val = BlobKind::try_from(kind_col.value(i))?;
321                    match kind_val {
322                        BlobKind::Dedicated => (
323                            BlobKind::Dedicated as u8,
324                            0,
325                            blob_size_col.value(i),
326                            blob_id_col.value(i),
327                            "".to_string(),
328                        ),
329                        BlobKind::External => (
330                            BlobKind::External as u8,
331                            0,
332                            0,
333                            0,
334                            uri_col.value(i).to_string(),
335                        ),
336                        BlobKind::Packed => (
337                            BlobKind::Packed as u8,
338                            packed_position_col.value(i),
339                            blob_size_col.value(i),
340                            blob_id_col.value(i),
341                            "".to_string(),
342                        ),
343                        BlobKind::Inline => {
344                            let data_val = data_col.value(i);
345                            let blob_len = data_val.len() as u64;
346                            let position = external_buffers
347                                .add_buffer(LanceBuffer::from(Buffer::from(data_val)));
348
349                            (
350                                BlobKind::Inline as u8,
351                                position,
352                                blob_len,
353                                0,
354                                "".to_string(),
355                            )
356                        }
357                    }
358                };
359
360            kind_builder.append_value(kind_value);
361            position_builder.append_value(position_value);
362            size_builder.append_value(size_value);
363            blob_id_builder.append_value(blob_id_value);
364            uri_builder.append_value(uri_value);
365        }
366        let children: Vec<ArrayRef> = vec![
367            Arc::new(kind_builder.finish()),
368            Arc::new(position_builder.finish()),
369            Arc::new(size_builder.finish()),
370            Arc::new(blob_id_builder.finish()),
371            Arc::new(uri_builder.finish()),
372        ];
373
374        let descriptor_array = Arc::new(StructArray::try_new(
375            BLOB_V2_DESC_FIELDS.clone(),
376            children,
377            None,
378        )?) as ArrayRef;
379
380        self.descriptor_encoder.maybe_encode(
381            descriptor_array,
382            external_buffers,
383            repdef,
384            row_number,
385            num_rows,
386        )
387    }
388
389    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
390        self.descriptor_encoder.flush(external_buffers)
391    }
392
393    fn finish(
394        &mut self,
395        external_buffers: &mut OutOfLineBuffers,
396    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
397        self.descriptor_encoder.finish(external_buffers)
398    }
399
400    fn num_columns(&self) -> u32 {
401        self.descriptor_encoder.num_columns()
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use crate::{
409        compression::DefaultCompressionStrategy,
410        encoder::{ColumnIndexSequence, EncodingOptions},
411        testing::{
412            check_round_trip_encoding_of_data, check_round_trip_encoding_of_data_with_expected,
413            TestCases,
414        },
415        version::LanceFileVersion,
416    };
417    use arrow_array::{
418        ArrayRef, LargeBinaryArray, StringArray, StructArray, UInt32Array, UInt64Array, UInt8Array,
419    };
420    use arrow_schema::{DataType, Field as ArrowField};
421
422    #[test]
423    fn test_blob_encoder_creation() {
424        let field =
425            Field::try_from(ArrowField::new("blob_field", DataType::LargeBinary, true)).unwrap();
426        let mut column_index = ColumnIndexSequence::default();
427        let column_idx = column_index.next_column_index(0);
428        let options = EncodingOptions::default();
429        let compression = Arc::new(DefaultCompressionStrategy::new());
430
431        let encoder = BlobStructuralEncoder::new(&field, column_idx, &options, compression);
432
433        assert!(encoder.is_ok());
434    }
435
436    #[tokio::test]
437    async fn test_blob_encoding_simple() {
438        let field = Field::try_from(
439            ArrowField::new("blob_field", DataType::LargeBinary, true).with_metadata(
440                HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]),
441            ),
442        )
443        .unwrap();
444        let mut column_index = ColumnIndexSequence::default();
445        let column_idx = column_index.next_column_index(0);
446        let options = EncodingOptions::default();
447        let compression = Arc::new(DefaultCompressionStrategy::new());
448
449        let mut encoder =
450            BlobStructuralEncoder::new(&field, column_idx, &options, compression).unwrap();
451
452        // Create test data with larger blobs
453        let large_data = vec![0u8; 1024 * 100]; // 100KB blob
454        let data: Vec<Option<&[u8]>> =
455            vec![Some(b"hello world"), None, Some(&large_data), Some(b"")];
456        let array = Arc::new(LargeBinaryArray::from(data));
457
458        // Test encoding
459        let mut external_buffers = OutOfLineBuffers::new(0, 8);
460        let repdef = RepDefBuilder::default();
461
462        let tasks = encoder
463            .maybe_encode(array, &mut external_buffers, repdef, 0, 4)
464            .unwrap();
465
466        // If no tasks yet, flush to force encoding
467        if tasks.is_empty() {
468            let _flush_tasks = encoder.flush(&mut external_buffers).unwrap();
469        }
470
471        // Should produce encode tasks for the descriptor (or we need more data)
472        // For now, just verify no errors occurred
473        assert!(encoder.num_columns() > 0);
474
475        // Verify external buffers were used for large data
476        let buffers = external_buffers.take_buffers();
477        assert!(
478            !buffers.is_empty(),
479            "Large blobs should be stored in external buffers"
480        );
481    }
482
483    #[tokio::test]
484    async fn test_blob_round_trip() {
485        // Test round-trip encoding with blob metadata
486        let blob_metadata =
487            HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);
488
489        // Create test data
490        let val1: &[u8] = &vec![1u8; 1024]; // 1KB
491        let val2: &[u8] = &vec![2u8; 10240]; // 10KB
492        let val3: &[u8] = &vec![3u8; 102400]; // 100KB
493        let array = Arc::new(LargeBinaryArray::from(vec![
494            Some(val1),
495            None,
496            Some(val2),
497            Some(val3),
498        ]));
499
500        // Use the standard test harness
501        check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await;
502    }
503
504    #[tokio::test]
505    async fn test_blob_v2_external_round_trip() {
506        let blob_metadata =
507            HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);
508
509        let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
510        let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
511        let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
512        let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
513        let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
514        let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));
515
516        let kind_array = UInt8Array::from(vec![
517            BlobKind::Inline as u8,
518            BlobKind::External as u8,
519            BlobKind::External as u8,
520        ]);
521        let data_array = LargeBinaryArray::from(vec![Some(b"inline".as_ref()), None, None]);
522        let uri_array = StringArray::from(vec![
523            None,
524            Some("file:///tmp/external.bin"),
525            Some("s3://bucket/blob"),
526        ]);
527        let blob_id_array = UInt32Array::from(vec![0, 0, 0]);
528        let blob_size_array = UInt64Array::from(vec![0, 0, 0]);
529        let position_array = UInt64Array::from(vec![0, 0, 0]);
530
531        let struct_array = StructArray::from(vec![
532            (kind_field, Arc::new(kind_array) as ArrayRef),
533            (data_field, Arc::new(data_array) as ArrayRef),
534            (uri_field, Arc::new(uri_array) as ArrayRef),
535            (blob_id_field, Arc::new(blob_id_array) as ArrayRef),
536            (blob_size_field, Arc::new(blob_size_array) as ArrayRef),
537            (position_field, Arc::new(position_array) as ArrayRef),
538        ]);
539
540        let expected_descriptor = StructArray::from(vec![
541            (
542                Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
543                Arc::new(UInt8Array::from(vec![
544                    BlobKind::Inline as u8,
545                    BlobKind::External as u8,
546                    BlobKind::External as u8,
547                ])) as ArrayRef,
548            ),
549            (
550                Arc::new(ArrowField::new("position", DataType::UInt64, false)),
551                Arc::new(UInt64Array::from(vec![0, 0, 0])) as ArrayRef,
552            ),
553            (
554                Arc::new(ArrowField::new("size", DataType::UInt64, false)),
555                Arc::new(UInt64Array::from(vec![6, 0, 0])) as ArrayRef,
556            ),
557            (
558                Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
559                Arc::new(UInt32Array::from(vec![0, 0, 0])) as ArrayRef,
560            ),
561            (
562                Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
563                Arc::new(StringArray::from(vec![
564                    "",
565                    "file:///tmp/external.bin",
566                    "s3://bucket/blob",
567                ])) as ArrayRef,
568            ),
569        ]);
570
571        check_round_trip_encoding_of_data_with_expected(
572            vec![Arc::new(struct_array)],
573            Some(Arc::new(expected_descriptor)),
574            &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
575            blob_metadata,
576        )
577        .await;
578    }
579
580    #[tokio::test]
581    async fn test_blob_v2_dedicated_round_trip() {
582        let blob_metadata =
583            HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);
584
585        let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
586        let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
587        let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
588        let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
589        let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
590        let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));
591
592        let kind_array = UInt8Array::from(vec![BlobKind::Dedicated as u8, BlobKind::Inline as u8]);
593        let data_array = LargeBinaryArray::from(vec![None, Some(b"abc".as_ref())]);
594        let uri_array = StringArray::from(vec![Option::<&str>::None, None]);
595        let blob_id_array = UInt32Array::from(vec![42, 0]);
596        let blob_size_array = UInt64Array::from(vec![12, 0]);
597        let position_array = UInt64Array::from(vec![0, 0]);
598
599        let struct_array = StructArray::from(vec![
600            (kind_field, Arc::new(kind_array) as ArrayRef),
601            (data_field, Arc::new(data_array) as ArrayRef),
602            (uri_field, Arc::new(uri_array) as ArrayRef),
603            (blob_id_field, Arc::new(blob_id_array) as ArrayRef),
604            (blob_size_field, Arc::new(blob_size_array) as ArrayRef),
605            (position_field, Arc::new(position_array) as ArrayRef),
606        ]);
607
608        let expected_descriptor = StructArray::from(vec![
609            (
610                Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
611                Arc::new(UInt8Array::from(vec![
612                    BlobKind::Dedicated as u8,
613                    BlobKind::Inline as u8,
614                ])) as ArrayRef,
615            ),
616            (
617                Arc::new(ArrowField::new("position", DataType::UInt64, false)),
618                Arc::new(UInt64Array::from(vec![0, 0])) as ArrayRef,
619            ),
620            (
621                Arc::new(ArrowField::new("size", DataType::UInt64, false)),
622                Arc::new(UInt64Array::from(vec![12, 3])) as ArrayRef,
623            ),
624            (
625                Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
626                Arc::new(UInt32Array::from(vec![42, 0])) as ArrayRef,
627            ),
628            (
629                Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
630                Arc::new(StringArray::from(vec!["", ""])) as ArrayRef,
631            ),
632        ]);
633
634        check_round_trip_encoding_of_data_with_expected(
635            vec![Arc::new(struct_array)],
636            Some(Arc::new(expected_descriptor)),
637            &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
638            blob_metadata,
639        )
640        .await;
641    }
642
643    #[tokio::test]
644    async fn test_blob_v2_packed_round_trip() {
645        let blob_metadata =
646            HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);
647
648        let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
649        let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
650        let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
651        let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
652        let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
653        let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));
654
655        let kind_array = UInt8Array::from(vec![BlobKind::Packed as u8]);
656        let data_array = LargeBinaryArray::from(vec![None::<&[u8]>]);
657        let uri_array = StringArray::from(vec![None::<&str>]);
658        let blob_id_array = UInt32Array::from(vec![7]);
659        let blob_size_array = UInt64Array::from(vec![5]);
660        let position_array = UInt64Array::from(vec![10]);
661
662        let struct_array = StructArray::from(vec![
663            (kind_field, Arc::new(kind_array) as ArrayRef),
664            (data_field, Arc::new(data_array) as ArrayRef),
665            (uri_field, Arc::new(uri_array) as ArrayRef),
666            (blob_id_field, Arc::new(blob_id_array) as ArrayRef),
667            (blob_size_field, Arc::new(blob_size_array) as ArrayRef),
668            (position_field, Arc::new(position_array) as ArrayRef),
669        ]);
670
671        let expected_descriptor = StructArray::from(vec![
672            (
673                Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
674                Arc::new(UInt8Array::from(vec![BlobKind::Packed as u8])) as ArrayRef,
675            ),
676            (
677                Arc::new(ArrowField::new("position", DataType::UInt64, false)),
678                Arc::new(UInt64Array::from(vec![10])) as ArrayRef,
679            ),
680            (
681                Arc::new(ArrowField::new("size", DataType::UInt64, false)),
682                Arc::new(UInt64Array::from(vec![5])) as ArrayRef,
683            ),
684            (
685                Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
686                Arc::new(UInt32Array::from(vec![7])) as ArrayRef,
687            ),
688            (
689                Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
690                Arc::new(StringArray::from(vec![""])) as ArrayRef,
691            ),
692        ]);
693
694        check_round_trip_encoding_of_data_with_expected(
695            vec![Arc::new(struct_array)],
696            Some(Arc::new(expected_descriptor)),
697            &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
698            blob_metadata,
699        )
700        .await;
701    }
702}