Skip to main content

lance_encoding/encodings/logical/
blob.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, sync::Arc};
5
6use arrow_array::{
7    Array, ArrayRef, StructArray, UInt64Array,
8    builder::{PrimitiveBuilder, StringBuilder},
9    cast::AsArray,
10    types::{UInt8Type, UInt32Type, UInt64Type},
11};
12use arrow_buffer::Buffer;
13use arrow_schema::{DataType, Field as ArrowField, Fields};
14use futures::{FutureExt, future::BoxFuture};
15use lance_core::{
16    Error, Result, datatypes::BLOB_V2_DESC_FIELDS, datatypes::Field, error::LanceOptionExt,
17};
18
19use crate::{
20    buffer::LanceBuffer,
21    constants::PACKED_STRUCT_META_KEY,
22    decoder::PageEncoding,
23    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
24    encodings::logical::primitive::PrimitiveStructuralEncoder,
25    format::ProtobufUtils21,
26    repdef::{DefinitionInterpretation, RepDefBuilder},
27};
28use lance_core::datatypes::BlobKind;
29
30/// Blob structural encoder - stores large binary data in external buffers
31///
32/// This encoder takes large binary arrays and stores them outside the normal
33/// page structure. It creates a descriptor (position, size) for each blob
34/// that is stored inline in the page.
35pub struct BlobStructuralEncoder {
36    // Encoder for the descriptors (position/size struct)
37    descriptor_encoder: Box<dyn FieldEncoder>,
38    // Set when we first see data
39    def_meaning: Option<Arc<[DefinitionInterpretation]>>,
40}
41
42impl BlobStructuralEncoder {
43    pub fn new(
44        field: &Field,
45        column_index: u32,
46        options: &crate::encoder::EncodingOptions,
47        compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
48    ) -> Result<Self> {
49        // Create descriptor field: struct<position: u64, size: u64>
50        // Preserve the original field's metadata for packed struct
51        let mut descriptor_metadata = HashMap::with_capacity(1);
52        descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
53
54        let descriptor_data_type = DataType::Struct(Fields::from(vec![
55            ArrowField::new("position", DataType::UInt64, false),
56            ArrowField::new("size", DataType::UInt64, false),
57        ]));
58
59        // Use the original field's name for the descriptor
60        let descriptor_field = Field::try_from(
61            ArrowField::new(&field.name, descriptor_data_type, field.nullable)
62                .with_metadata(descriptor_metadata),
63        )?;
64
65        // Use PrimitiveStructuralEncoder to handle the descriptor
66        let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
67            options,
68            compression_strategy,
69            column_index,
70            descriptor_field,
71            Arc::new(HashMap::new()),
72        )?);
73
74        Ok(Self {
75            descriptor_encoder,
76            def_meaning: None,
77        })
78    }
79
80    fn wrap_tasks(
81        tasks: Vec<EncodeTask>,
82        def_meaning: Arc<[DefinitionInterpretation]>,
83    ) -> Vec<EncodeTask> {
84        tasks
85            .into_iter()
86            .map(|task| {
87                let def_meaning = def_meaning.clone();
88                task.then(|encoded_page| async move {
89                    let encoded_page = encoded_page?;
90
91                    let PageEncoding::Structural(inner_layout) = encoded_page.description else {
92                        return Err(Error::internal(
93                            "Expected inner encoding to return structural layout".to_string(),
94                        ));
95                    };
96
97                    let wrapped = ProtobufUtils21::blob_layout(inner_layout, &def_meaning);
98                    Ok(EncodedPage {
99                        column_idx: encoded_page.column_idx,
100                        data: encoded_page.data,
101                        description: PageEncoding::Structural(wrapped),
102                        num_rows: encoded_page.num_rows,
103                        row_number: encoded_page.row_number,
104                    })
105                })
106                .boxed()
107            })
108            .collect::<Vec<_>>()
109    }
110}
111
112impl FieldEncoder for BlobStructuralEncoder {
113    fn maybe_encode(
114        &mut self,
115        array: ArrayRef,
116        external_buffers: &mut OutOfLineBuffers,
117        mut repdef: RepDefBuilder,
118        row_number: u64,
119        num_rows: u64,
120    ) -> Result<Vec<EncodeTask>> {
121        if let Some(validity) = array.nulls() {
122            repdef.add_validity_bitmap(validity.clone());
123        } else {
124            repdef.add_no_null(array.len());
125        }
126
127        // Convert input array to LargeBinary
128        let binary_array = array.as_binary_opt::<i64>().ok_or_else(|| {
129            Error::invalid_input_source(
130                format!("Expected LargeBinary array, got {}", array.data_type()).into(),
131            )
132        })?;
133
134        let repdef = RepDefBuilder::serialize(vec![repdef]);
135
136        let rep = repdef.repetition_levels.as_ref();
137        let def = repdef.definition_levels.as_ref();
138        let def_meaning: Arc<[DefinitionInterpretation]> = repdef.def_meaning.into();
139
140        match self.def_meaning.as_ref() {
141            None => {
142                self.def_meaning = Some(def_meaning.clone());
143            }
144            Some(existing) => {
145                debug_assert_eq!(existing, &def_meaning);
146            }
147        }
148
149        // Collect positions and sizes
150        let mut positions = Vec::with_capacity(binary_array.len());
151        let mut sizes = Vec::with_capacity(binary_array.len());
152
153        for i in 0..binary_array.len() {
154            if binary_array.is_null(i) {
155                // Null values are smuggled into the positions array
156
157                // If we have null values we must have definition levels
158                let mut repdef = (def.expect_ok()?[i] as u64) << 16;
159                if let Some(rep) = rep {
160                    repdef += rep[i] as u64;
161                }
162
163                debug_assert_ne!(repdef, 0);
164                positions.push(repdef);
165                sizes.push(0);
166            } else {
167                let value = binary_array.value(i);
168                if value.is_empty() {
169                    // Empty values
170                    positions.push(0);
171                    sizes.push(0);
172                } else {
173                    // Add data to external buffers
174                    let position =
175                        external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
176                    positions.push(position);
177                    sizes.push(value.len() as u64);
178                }
179            }
180        }
181
182        // Create descriptor array
183        let position_array = Arc::new(UInt64Array::from(positions));
184        let size_array = Arc::new(UInt64Array::from(sizes));
185        let descriptor_array = Arc::new(StructArray::new(
186            Fields::from(vec![
187                ArrowField::new("position", DataType::UInt64, false),
188                ArrowField::new("size", DataType::UInt64, false),
189            ]),
190            vec![position_array as ArrayRef, size_array as ArrayRef],
191            None, // Descriptors are never null
192        ));
193
194        // Delegate to descriptor encoder
195        let encode_tasks = self.descriptor_encoder.maybe_encode(
196            descriptor_array,
197            external_buffers,
198            RepDefBuilder::default(),
199            row_number,
200            num_rows,
201        )?;
202
203        Ok(Self::wrap_tasks(encode_tasks, def_meaning))
204    }
205
206    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
207        let encode_tasks = self.descriptor_encoder.flush(external_buffers)?;
208
209        // Use the cached def meaning.  If we haven't seen any data yet then we can just use a dummy
210        // value (not clear there would be any encode tasks in that case)
211        let def_meaning = self
212            .def_meaning
213            .clone()
214            .unwrap_or_else(|| Arc::new([DefinitionInterpretation::AllValidItem]));
215
216        Ok(Self::wrap_tasks(encode_tasks, def_meaning))
217    }
218
219    fn finish(
220        &mut self,
221        external_buffers: &mut OutOfLineBuffers,
222    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
223        self.descriptor_encoder.finish(external_buffers)
224    }
225
226    fn num_columns(&self) -> u32 {
227        self.descriptor_encoder.num_columns()
228    }
229}
230
231/// Blob v2 structural encoder
232pub struct BlobV2StructuralEncoder {
233    descriptor_encoder: Box<dyn FieldEncoder>,
234}
235
236impl BlobV2StructuralEncoder {
237    pub fn new(
238        field: &Field,
239        column_index: u32,
240        options: &crate::encoder::EncodingOptions,
241        compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
242    ) -> Result<Self> {
243        let mut descriptor_metadata = HashMap::with_capacity(1);
244        descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
245
246        let descriptor_data_type = DataType::Struct(BLOB_V2_DESC_FIELDS.clone());
247
248        let descriptor_field = Field::try_from(
249            ArrowField::new(&field.name, descriptor_data_type, field.nullable)
250                .with_metadata(descriptor_metadata),
251        )?;
252
253        let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
254            options,
255            compression_strategy,
256            column_index,
257            descriptor_field,
258            Arc::new(HashMap::new()),
259        )?);
260
261        Ok(Self { descriptor_encoder })
262    }
263}
264
265impl FieldEncoder for BlobV2StructuralEncoder {
266    fn maybe_encode(
267        &mut self,
268        array: ArrayRef,
269        external_buffers: &mut OutOfLineBuffers,
270        mut repdef: RepDefBuilder,
271        row_number: u64,
272        num_rows: u64,
273    ) -> Result<Vec<EncodeTask>> {
274        let struct_arr = array.as_struct();
275        if let Some(validity) = struct_arr.nulls() {
276            repdef.add_validity_bitmap(validity.clone());
277        } else {
278            repdef.add_no_null(struct_arr.len());
279        }
280
281        let kind_col = struct_arr
282            .column_by_name("kind")
283            .ok_or_else(|| {
284                Error::invalid_input_source("Blob v2 struct missing `kind` field".into())
285            })?
286            .as_primitive::<UInt8Type>();
287        let data_col = struct_arr
288            .column_by_name("data")
289            .ok_or_else(|| {
290                Error::invalid_input_source("Blob v2 struct missing `data` field".into())
291            })?
292            .as_binary::<i64>();
293        let uri_col = struct_arr
294            .column_by_name("uri")
295            .ok_or_else(|| {
296                Error::invalid_input_source("Blob v2 struct missing `uri` field".into())
297            })?
298            .as_string::<i32>();
299        let blob_id_col = struct_arr
300            .column_by_name("blob_id")
301            .ok_or_else(|| {
302                Error::invalid_input_source("Blob v2 struct missing `blob_id` field".into())
303            })?
304            .as_primitive::<UInt32Type>();
305        let blob_size_col = struct_arr
306            .column_by_name("blob_size")
307            .ok_or_else(|| {
308                Error::invalid_input_source("Blob v2 struct missing `blob_size` field".into())
309            })?
310            .as_primitive::<UInt64Type>();
311        let packed_position_col = struct_arr
312            .column_by_name("position")
313            .ok_or_else(|| {
314                Error::invalid_input_source("Blob v2 struct missing `position` field".into())
315            })?
316            .as_primitive::<UInt64Type>();
317
318        let row_count = struct_arr.len();
319
320        let mut kind_builder = PrimitiveBuilder::<UInt8Type>::with_capacity(row_count);
321        let mut position_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(row_count);
322        let mut size_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(row_count);
323        let mut blob_id_builder = PrimitiveBuilder::<UInt32Type>::with_capacity(row_count);
324        let mut uri_builder = StringBuilder::with_capacity(row_count, row_count * 16);
325
326        for i in 0..row_count {
327            let (kind_value, position_value, size_value, blob_id_value, uri_value) =
328                if struct_arr.is_null(i) || kind_col.is_null(i) {
329                    (BlobKind::Inline as u8, 0, 0, 0, "".to_string())
330                } else {
331                    let kind_val = BlobKind::try_from(kind_col.value(i))?;
332                    match kind_val {
333                        BlobKind::Dedicated => (
334                            BlobKind::Dedicated as u8,
335                            0,
336                            blob_size_col.value(i),
337                            blob_id_col.value(i),
338                            "".to_string(),
339                        ),
340                        BlobKind::External => {
341                            let uri = uri_col.value(i).to_string();
342                            let position = if packed_position_col.is_null(i) {
343                                0
344                            } else {
345                                packed_position_col.value(i)
346                            };
347                            let size = if blob_size_col.is_null(i) {
348                                0
349                            } else {
350                                blob_size_col.value(i)
351                            };
352                            let external_base_id = if blob_id_col.is_null(i) {
353                                0
354                            } else {
355                                blob_id_col.value(i)
356                            };
357                            (
358                                BlobKind::External as u8,
359                                position,
360                                size,
361                                external_base_id,
362                                uri,
363                            )
364                        }
365                        BlobKind::Packed => (
366                            BlobKind::Packed as u8,
367                            packed_position_col.value(i),
368                            blob_size_col.value(i),
369                            blob_id_col.value(i),
370                            "".to_string(),
371                        ),
372                        BlobKind::Inline => {
373                            let data_val = data_col.value(i);
374                            let blob_len = data_val.len() as u64;
375                            let position = external_buffers
376                                .add_buffer(LanceBuffer::from(Buffer::from(data_val)));
377
378                            (
379                                BlobKind::Inline as u8,
380                                position,
381                                blob_len,
382                                0,
383                                "".to_string(),
384                            )
385                        }
386                    }
387                };
388
389            kind_builder.append_value(kind_value);
390            position_builder.append_value(position_value);
391            size_builder.append_value(size_value);
392            blob_id_builder.append_value(blob_id_value);
393            uri_builder.append_value(uri_value);
394        }
395        let children: Vec<ArrayRef> = vec![
396            Arc::new(kind_builder.finish()),
397            Arc::new(position_builder.finish()),
398            Arc::new(size_builder.finish()),
399            Arc::new(blob_id_builder.finish()),
400            Arc::new(uri_builder.finish()),
401        ];
402
403        let descriptor_array = Arc::new(StructArray::try_new(
404            BLOB_V2_DESC_FIELDS.clone(),
405            children,
406            None,
407        )?) as ArrayRef;
408
409        self.descriptor_encoder.maybe_encode(
410            descriptor_array,
411            external_buffers,
412            repdef,
413            row_number,
414            num_rows,
415        )
416    }
417
418    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
419        self.descriptor_encoder.flush(external_buffers)
420    }
421
422    fn finish(
423        &mut self,
424        external_buffers: &mut OutOfLineBuffers,
425    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
426        self.descriptor_encoder.finish(external_buffers)
427    }
428
429    fn num_columns(&self) -> u32 {
430        self.descriptor_encoder.num_columns()
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437    use crate::{
438        compression::DefaultCompressionStrategy,
439        encoder::{ColumnIndexSequence, EncodingOptions},
440        testing::{
441            TestCases, check_round_trip_encoding_of_data,
442            check_round_trip_encoding_of_data_with_expected,
443        },
444        version::LanceFileVersion,
445    };
446    use arrow_array::{
447        ArrayRef, LargeBinaryArray, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array,
448    };
449    use arrow_schema::{DataType, Field as ArrowField};
450
451    #[test]
452    fn test_blob_encoder_creation() {
453        let field =
454            Field::try_from(ArrowField::new("blob_field", DataType::LargeBinary, true)).unwrap();
455        let mut column_index = ColumnIndexSequence::default();
456        let column_idx = column_index.next_column_index(0);
457        let options = EncodingOptions::default();
458        let compression = Arc::new(DefaultCompressionStrategy::new());
459
460        let encoder = BlobStructuralEncoder::new(&field, column_idx, &options, compression);
461
462        assert!(encoder.is_ok());
463    }
464
465    #[tokio::test]
466    async fn test_blob_encoding_simple() {
467        let field = Field::try_from(
468            ArrowField::new("blob_field", DataType::LargeBinary, true).with_metadata(
469                HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]),
470            ),
471        )
472        .unwrap();
473        let mut column_index = ColumnIndexSequence::default();
474        let column_idx = column_index.next_column_index(0);
475        let options = EncodingOptions::default();
476        let compression = Arc::new(DefaultCompressionStrategy::new());
477
478        let mut encoder =
479            BlobStructuralEncoder::new(&field, column_idx, &options, compression).unwrap();
480
481        // Create test data with larger blobs
482        let large_data = vec![0u8; 1024 * 100]; // 100KB blob
483        let data: Vec<Option<&[u8]>> =
484            vec![Some(b"hello world"), None, Some(&large_data), Some(b"")];
485        let array = Arc::new(LargeBinaryArray::from(data));
486
487        // Test encoding
488        let mut external_buffers = OutOfLineBuffers::new(0, 8);
489        let repdef = RepDefBuilder::default();
490
491        let tasks = encoder
492            .maybe_encode(array, &mut external_buffers, repdef, 0, 4)
493            .unwrap();
494
495        // If no tasks yet, flush to force encoding
496        if tasks.is_empty() {
497            let _flush_tasks = encoder.flush(&mut external_buffers).unwrap();
498        }
499
500        // Should produce encode tasks for the descriptor (or we need more data)
501        // For now, just verify no errors occurred
502        assert!(encoder.num_columns() > 0);
503
504        // Verify external buffers were used for large data
505        let buffers = external_buffers.take_buffers();
506        assert!(
507            !buffers.is_empty(),
508            "Large blobs should be stored in external buffers"
509        );
510    }
511
512    #[tokio::test]
513    async fn test_blob_round_trip() {
514        // Test round-trip encoding with blob metadata
515        let blob_metadata =
516            HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);
517
518        // Create test data
519        let val1: &[u8] = &vec![1u8; 1024]; // 1KB
520        let val2: &[u8] = &vec![2u8; 10240]; // 10KB
521        let val3: &[u8] = &vec![3u8; 102400]; // 100KB
522        let array = Arc::new(LargeBinaryArray::from(vec![
523            Some(val1),
524            None,
525            Some(val2),
526            Some(val3),
527        ]));
528
529        // Use the standard test harness
530        check_round_trip_encoding_of_data(
531            vec![array],
532            &TestCases::default().with_max_file_version(LanceFileVersion::V2_1),
533            blob_metadata,
534        )
535        .await;
536    }
537
538    #[tokio::test]
539    async fn test_blob_v2_external_round_trip() {
540        let blob_metadata = HashMap::from([(
541            lance_arrow::ARROW_EXT_NAME_KEY.to_string(),
542            lance_arrow::BLOB_V2_EXT_NAME.to_string(),
543        )]);
544
545        let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
546        let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
547        let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
548        let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
549        let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
550        let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));
551
552        let kind_array = UInt8Array::from(vec![
553            BlobKind::Inline as u8,
554            BlobKind::External as u8,
555            BlobKind::External as u8,
556        ]);
557        let data_array = LargeBinaryArray::from(vec![Some(b"inline".as_ref()), None, None]);
558        let uri_array = StringArray::from(vec![
559            None,
560            Some("file:///tmp/external.bin"),
561            Some("s3://bucket/blob"),
562        ]);
563        let blob_id_array = UInt32Array::from(vec![0, 0, 0]);
564        let blob_size_array = UInt64Array::from(vec![0, 0, 0]);
565        let position_array = UInt64Array::from(vec![0, 0, 0]);
566
567        let struct_array = StructArray::from(vec![
568            (kind_field, Arc::new(kind_array) as ArrayRef),
569            (data_field, Arc::new(data_array) as ArrayRef),
570            (uri_field, Arc::new(uri_array) as ArrayRef),
571            (blob_id_field, Arc::new(blob_id_array) as ArrayRef),
572            (blob_size_field, Arc::new(blob_size_array) as ArrayRef),
573            (position_field, Arc::new(position_array) as ArrayRef),
574        ]);
575
576        let expected_descriptor = StructArray::from(vec![
577            (
578                Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
579                Arc::new(UInt8Array::from(vec![
580                    BlobKind::Inline as u8,
581                    BlobKind::External as u8,
582                    BlobKind::External as u8,
583                ])) as ArrayRef,
584            ),
585            (
586                Arc::new(ArrowField::new("position", DataType::UInt64, false)),
587                Arc::new(UInt64Array::from(vec![0, 0, 0])) as ArrayRef,
588            ),
589            (
590                Arc::new(ArrowField::new("size", DataType::UInt64, false)),
591                Arc::new(UInt64Array::from(vec![6, 0, 0])) as ArrayRef,
592            ),
593            (
594                Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
595                Arc::new(UInt32Array::from(vec![0, 0, 0])) as ArrayRef,
596            ),
597            (
598                Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
599                Arc::new(StringArray::from(vec![
600                    "",
601                    "file:///tmp/external.bin",
602                    "s3://bucket/blob",
603                ])) as ArrayRef,
604            ),
605        ]);
606
607        check_round_trip_encoding_of_data_with_expected(
608            vec![Arc::new(struct_array)],
609            Some(Arc::new(expected_descriptor)),
610            &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
611            blob_metadata,
612        )
613        .await;
614    }
615
616    #[tokio::test]
617    async fn test_blob_v2_dedicated_round_trip() {
618        let blob_metadata = HashMap::from([(
619            lance_arrow::ARROW_EXT_NAME_KEY.to_string(),
620            lance_arrow::BLOB_V2_EXT_NAME.to_string(),
621        )]);
622
623        let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
624        let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
625        let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
626        let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
627        let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
628        let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));
629
630        let kind_array = UInt8Array::from(vec![BlobKind::Dedicated as u8, BlobKind::Inline as u8]);
631        let data_array = LargeBinaryArray::from(vec![None, Some(b"abc".as_ref())]);
632        let uri_array = StringArray::from(vec![Option::<&str>::None, None]);
633        let blob_id_array = UInt32Array::from(vec![42, 0]);
634        let blob_size_array = UInt64Array::from(vec![12, 0]);
635        let position_array = UInt64Array::from(vec![0, 0]);
636
637        let struct_array = StructArray::from(vec![
638            (kind_field, Arc::new(kind_array) as ArrayRef),
639            (data_field, Arc::new(data_array) as ArrayRef),
640            (uri_field, Arc::new(uri_array) as ArrayRef),
641            (blob_id_field, Arc::new(blob_id_array) as ArrayRef),
642            (blob_size_field, Arc::new(blob_size_array) as ArrayRef),
643            (position_field, Arc::new(position_array) as ArrayRef),
644        ]);
645
646        let expected_descriptor = StructArray::from(vec![
647            (
648                Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
649                Arc::new(UInt8Array::from(vec![
650                    BlobKind::Dedicated as u8,
651                    BlobKind::Inline as u8,
652                ])) as ArrayRef,
653            ),
654            (
655                Arc::new(ArrowField::new("position", DataType::UInt64, false)),
656                Arc::new(UInt64Array::from(vec![0, 0])) as ArrayRef,
657            ),
658            (
659                Arc::new(ArrowField::new("size", DataType::UInt64, false)),
660                Arc::new(UInt64Array::from(vec![12, 3])) as ArrayRef,
661            ),
662            (
663                Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
664                Arc::new(UInt32Array::from(vec![42, 0])) as ArrayRef,
665            ),
666            (
667                Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
668                Arc::new(StringArray::from(vec!["", ""])) as ArrayRef,
669            ),
670        ]);
671
672        check_round_trip_encoding_of_data_with_expected(
673            vec![Arc::new(struct_array)],
674            Some(Arc::new(expected_descriptor)),
675            &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
676            blob_metadata,
677        )
678        .await;
679    }
680
681    #[tokio::test]
682    async fn test_blob_v2_external_with_range_round_trip() {
683        let blob_metadata = HashMap::from([(
684            lance_arrow::ARROW_EXT_NAME_KEY.to_string(),
685            lance_arrow::BLOB_V2_EXT_NAME.to_string(),
686        )]);
687
688        let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
689        let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
690        let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
691        let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
692        let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
693        let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));
694
695        let kind_array = UInt8Array::from(vec![BlobKind::External as u8]);
696        let data_array = LargeBinaryArray::from(vec![None::<&[u8]>]);
697        let uri_array = StringArray::from(vec![Some("memory://container.pack")]);
698        let blob_id_array = UInt32Array::from(vec![0]);
699        let blob_size_array = UInt64Array::from(vec![42]);
700        let position_array = UInt64Array::from(vec![7]);
701
702        let struct_array = StructArray::from(vec![
703            (kind_field, Arc::new(kind_array) as ArrayRef),
704            (data_field, Arc::new(data_array) as ArrayRef),
705            (uri_field, Arc::new(uri_array) as ArrayRef),
706            (blob_id_field, Arc::new(blob_id_array) as ArrayRef),
707            (blob_size_field, Arc::new(blob_size_array) as ArrayRef),
708            (position_field, Arc::new(position_array) as ArrayRef),
709        ]);
710
711        let expected_descriptor = StructArray::from(vec![
712            (
713                Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
714                Arc::new(UInt8Array::from(vec![BlobKind::External as u8])) as ArrayRef,
715            ),
716            (
717                Arc::new(ArrowField::new("position", DataType::UInt64, false)),
718                Arc::new(UInt64Array::from(vec![7])) as ArrayRef,
719            ),
720            (
721                Arc::new(ArrowField::new("size", DataType::UInt64, false)),
722                Arc::new(UInt64Array::from(vec![42])) as ArrayRef,
723            ),
724            (
725                Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
726                Arc::new(UInt32Array::from(vec![0])) as ArrayRef,
727            ),
728            (
729                Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
730                Arc::new(StringArray::from(vec!["memory://container.pack"])) as ArrayRef,
731            ),
732        ]);
733
734        check_round_trip_encoding_of_data_with_expected(
735            vec![Arc::new(struct_array)],
736            Some(Arc::new(expected_descriptor)),
737            &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
738            blob_metadata,
739        )
740        .await;
741    }
742
743    #[tokio::test]
744    async fn test_blob_v2_packed_round_trip() {
745        let blob_metadata = HashMap::from([(
746            lance_arrow::ARROW_EXT_NAME_KEY.to_string(),
747            lance_arrow::BLOB_V2_EXT_NAME.to_string(),
748        )]);
749
750        let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
751        let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
752        let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
753        let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
754        let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
755        let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));
756
757        let kind_array = UInt8Array::from(vec![BlobKind::Packed as u8]);
758        let data_array = LargeBinaryArray::from(vec![None::<&[u8]>]);
759        let uri_array = StringArray::from(vec![None::<&str>]);
760        let blob_id_array = UInt32Array::from(vec![7]);
761        let blob_size_array = UInt64Array::from(vec![5]);
762        let position_array = UInt64Array::from(vec![10]);
763
764        let struct_array = StructArray::from(vec![
765            (kind_field, Arc::new(kind_array) as ArrayRef),
766            (data_field, Arc::new(data_array) as ArrayRef),
767            (uri_field, Arc::new(uri_array) as ArrayRef),
768            (blob_id_field, Arc::new(blob_id_array) as ArrayRef),
769            (blob_size_field, Arc::new(blob_size_array) as ArrayRef),
770            (position_field, Arc::new(position_array) as ArrayRef),
771        ]);
772
773        let expected_descriptor = StructArray::from(vec![
774            (
775                Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
776                Arc::new(UInt8Array::from(vec![BlobKind::Packed as u8])) as ArrayRef,
777            ),
778            (
779                Arc::new(ArrowField::new("position", DataType::UInt64, false)),
780                Arc::new(UInt64Array::from(vec![10])) as ArrayRef,
781            ),
782            (
783                Arc::new(ArrowField::new("size", DataType::UInt64, false)),
784                Arc::new(UInt64Array::from(vec![5])) as ArrayRef,
785            ),
786            (
787                Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
788                Arc::new(UInt32Array::from(vec![7])) as ArrayRef,
789            ),
790            (
791                Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
792                Arc::new(StringArray::from(vec![""])) as ArrayRef,
793            ),
794        ]);
795
796        check_round_trip_encoding_of_data_with_expected(
797            vec![Arc::new(struct_array)],
798            Some(Arc::new(expected_descriptor)),
799            &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
800            blob_metadata,
801        )
802        .await;
803    }
804}