1use std::{collections::HashMap, sync::Arc};
5
6use arrow_array::{
7 Array, ArrayRef, StructArray, UInt64Array,
8 builder::{PrimitiveBuilder, StringBuilder},
9 cast::AsArray,
10 types::{UInt8Type, UInt32Type, UInt64Type},
11};
12use arrow_buffer::Buffer;
13use arrow_schema::{DataType, Field as ArrowField, Fields};
14use futures::{FutureExt, future::BoxFuture};
15use lance_core::{
16 Error, Result, datatypes::BLOB_V2_DESC_FIELDS, datatypes::Field, error::LanceOptionExt,
17};
18
19use crate::{
20 buffer::LanceBuffer,
21 constants::PACKED_STRUCT_META_KEY,
22 decoder::PageEncoding,
23 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
24 encodings::logical::primitive::PrimitiveStructuralEncoder,
25 format::ProtobufUtils21,
26 repdef::{DefinitionInterpretation, RepDefBuilder},
27};
28use lance_core::datatypes::BlobKind;
29
30pub struct BlobStructuralEncoder {
36 descriptor_encoder: Box<dyn FieldEncoder>,
38 def_meaning: Option<Arc<[DefinitionInterpretation]>>,
40}
41
42impl BlobStructuralEncoder {
43 pub fn new(
44 field: &Field,
45 column_index: u32,
46 options: &crate::encoder::EncodingOptions,
47 compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
48 ) -> Result<Self> {
49 let mut descriptor_metadata = HashMap::with_capacity(1);
52 descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
53
54 let descriptor_data_type = DataType::Struct(Fields::from(vec![
55 ArrowField::new("position", DataType::UInt64, false),
56 ArrowField::new("size", DataType::UInt64, false),
57 ]));
58
59 let descriptor_field = Field::try_from(
61 ArrowField::new(&field.name, descriptor_data_type, field.nullable)
62 .with_metadata(descriptor_metadata),
63 )?;
64
65 let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
67 options,
68 compression_strategy,
69 column_index,
70 descriptor_field,
71 Arc::new(HashMap::new()),
72 )?);
73
74 Ok(Self {
75 descriptor_encoder,
76 def_meaning: None,
77 })
78 }
79
80 fn wrap_tasks(
81 tasks: Vec<EncodeTask>,
82 def_meaning: Arc<[DefinitionInterpretation]>,
83 ) -> Vec<EncodeTask> {
84 tasks
85 .into_iter()
86 .map(|task| {
87 let def_meaning = def_meaning.clone();
88 task.then(|encoded_page| async move {
89 let encoded_page = encoded_page?;
90
91 let PageEncoding::Structural(inner_layout) = encoded_page.description else {
92 return Err(Error::internal(
93 "Expected inner encoding to return structural layout".to_string(),
94 ));
95 };
96
97 let wrapped = ProtobufUtils21::blob_layout(inner_layout, &def_meaning);
98 Ok(EncodedPage {
99 column_idx: encoded_page.column_idx,
100 data: encoded_page.data,
101 description: PageEncoding::Structural(wrapped),
102 num_rows: encoded_page.num_rows,
103 row_number: encoded_page.row_number,
104 })
105 })
106 .boxed()
107 })
108 .collect::<Vec<_>>()
109 }
110}
111
112impl FieldEncoder for BlobStructuralEncoder {
113 fn maybe_encode(
114 &mut self,
115 array: ArrayRef,
116 external_buffers: &mut OutOfLineBuffers,
117 mut repdef: RepDefBuilder,
118 row_number: u64,
119 num_rows: u64,
120 ) -> Result<Vec<EncodeTask>> {
121 if let Some(validity) = array.nulls() {
122 repdef.add_validity_bitmap(validity.clone());
123 } else {
124 repdef.add_no_null(array.len());
125 }
126
127 let binary_array = array.as_binary_opt::<i64>().ok_or_else(|| {
129 Error::invalid_input_source(
130 format!("Expected LargeBinary array, got {}", array.data_type()).into(),
131 )
132 })?;
133
134 let repdef = RepDefBuilder::serialize(vec![repdef]);
135
136 let rep = repdef.repetition_levels.as_ref();
137 let def = repdef.definition_levels.as_ref();
138 let def_meaning: Arc<[DefinitionInterpretation]> = repdef.def_meaning.into();
139
140 match self.def_meaning.as_ref() {
141 None => {
142 self.def_meaning = Some(def_meaning.clone());
143 }
144 Some(existing) => {
145 debug_assert_eq!(existing, &def_meaning);
146 }
147 }
148
149 let mut positions = Vec::with_capacity(binary_array.len());
151 let mut sizes = Vec::with_capacity(binary_array.len());
152
153 for i in 0..binary_array.len() {
154 if binary_array.is_null(i) {
155 let mut repdef = (def.expect_ok()?[i] as u64) << 16;
159 if let Some(rep) = rep {
160 repdef += rep[i] as u64;
161 }
162
163 debug_assert_ne!(repdef, 0);
164 positions.push(repdef);
165 sizes.push(0);
166 } else {
167 let value = binary_array.value(i);
168 if value.is_empty() {
169 positions.push(0);
171 sizes.push(0);
172 } else {
173 let position =
175 external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
176 positions.push(position);
177 sizes.push(value.len() as u64);
178 }
179 }
180 }
181
182 let position_array = Arc::new(UInt64Array::from(positions));
184 let size_array = Arc::new(UInt64Array::from(sizes));
185 let descriptor_array = Arc::new(StructArray::new(
186 Fields::from(vec![
187 ArrowField::new("position", DataType::UInt64, false),
188 ArrowField::new("size", DataType::UInt64, false),
189 ]),
190 vec![position_array as ArrayRef, size_array as ArrayRef],
191 None, ));
193
194 let encode_tasks = self.descriptor_encoder.maybe_encode(
196 descriptor_array,
197 external_buffers,
198 RepDefBuilder::default(),
199 row_number,
200 num_rows,
201 )?;
202
203 Ok(Self::wrap_tasks(encode_tasks, def_meaning))
204 }
205
206 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
207 let encode_tasks = self.descriptor_encoder.flush(external_buffers)?;
208
209 let def_meaning = self
212 .def_meaning
213 .clone()
214 .unwrap_or_else(|| Arc::new([DefinitionInterpretation::AllValidItem]));
215
216 Ok(Self::wrap_tasks(encode_tasks, def_meaning))
217 }
218
219 fn finish(
220 &mut self,
221 external_buffers: &mut OutOfLineBuffers,
222 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
223 self.descriptor_encoder.finish(external_buffers)
224 }
225
226 fn num_columns(&self) -> u32 {
227 self.descriptor_encoder.num_columns()
228 }
229}
230
231pub struct BlobV2StructuralEncoder {
233 descriptor_encoder: Box<dyn FieldEncoder>,
234}
235
236impl BlobV2StructuralEncoder {
237 pub fn new(
238 field: &Field,
239 column_index: u32,
240 options: &crate::encoder::EncodingOptions,
241 compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
242 ) -> Result<Self> {
243 let mut descriptor_metadata = HashMap::with_capacity(1);
244 descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
245
246 let descriptor_data_type = DataType::Struct(BLOB_V2_DESC_FIELDS.clone());
247
248 let descriptor_field = Field::try_from(
249 ArrowField::new(&field.name, descriptor_data_type, field.nullable)
250 .with_metadata(descriptor_metadata),
251 )?;
252
253 let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
254 options,
255 compression_strategy,
256 column_index,
257 descriptor_field,
258 Arc::new(HashMap::new()),
259 )?);
260
261 Ok(Self { descriptor_encoder })
262 }
263}
264
265impl FieldEncoder for BlobV2StructuralEncoder {
266 fn maybe_encode(
267 &mut self,
268 array: ArrayRef,
269 external_buffers: &mut OutOfLineBuffers,
270 mut repdef: RepDefBuilder,
271 row_number: u64,
272 num_rows: u64,
273 ) -> Result<Vec<EncodeTask>> {
274 let struct_arr = array.as_struct();
275 if let Some(validity) = struct_arr.nulls() {
276 repdef.add_validity_bitmap(validity.clone());
277 } else {
278 repdef.add_no_null(struct_arr.len());
279 }
280
281 let kind_col = struct_arr
282 .column_by_name("kind")
283 .ok_or_else(|| {
284 Error::invalid_input_source("Blob v2 struct missing `kind` field".into())
285 })?
286 .as_primitive::<UInt8Type>();
287 let data_col = struct_arr
288 .column_by_name("data")
289 .ok_or_else(|| {
290 Error::invalid_input_source("Blob v2 struct missing `data` field".into())
291 })?
292 .as_binary::<i64>();
293 let uri_col = struct_arr
294 .column_by_name("uri")
295 .ok_or_else(|| {
296 Error::invalid_input_source("Blob v2 struct missing `uri` field".into())
297 })?
298 .as_string::<i32>();
299 let blob_id_col = struct_arr
300 .column_by_name("blob_id")
301 .ok_or_else(|| {
302 Error::invalid_input_source("Blob v2 struct missing `blob_id` field".into())
303 })?
304 .as_primitive::<UInt32Type>();
305 let blob_size_col = struct_arr
306 .column_by_name("blob_size")
307 .ok_or_else(|| {
308 Error::invalid_input_source("Blob v2 struct missing `blob_size` field".into())
309 })?
310 .as_primitive::<UInt64Type>();
311 let packed_position_col = struct_arr
312 .column_by_name("position")
313 .ok_or_else(|| {
314 Error::invalid_input_source("Blob v2 struct missing `position` field".into())
315 })?
316 .as_primitive::<UInt64Type>();
317
318 let row_count = struct_arr.len();
319
320 let mut kind_builder = PrimitiveBuilder::<UInt8Type>::with_capacity(row_count);
321 let mut position_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(row_count);
322 let mut size_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(row_count);
323 let mut blob_id_builder = PrimitiveBuilder::<UInt32Type>::with_capacity(row_count);
324 let mut uri_builder = StringBuilder::with_capacity(row_count, row_count * 16);
325
326 for i in 0..row_count {
327 let (kind_value, position_value, size_value, blob_id_value, uri_value) =
328 if struct_arr.is_null(i) || kind_col.is_null(i) {
329 (BlobKind::Inline as u8, 0, 0, 0, "".to_string())
330 } else {
331 let kind_val = BlobKind::try_from(kind_col.value(i))?;
332 match kind_val {
333 BlobKind::Dedicated => (
334 BlobKind::Dedicated as u8,
335 0,
336 blob_size_col.value(i),
337 blob_id_col.value(i),
338 "".to_string(),
339 ),
340 BlobKind::External => {
341 let uri = uri_col.value(i).to_string();
342 let position = if packed_position_col.is_null(i) {
343 0
344 } else {
345 packed_position_col.value(i)
346 };
347 let size = if blob_size_col.is_null(i) {
348 0
349 } else {
350 blob_size_col.value(i)
351 };
352 let external_base_id = if blob_id_col.is_null(i) {
353 0
354 } else {
355 blob_id_col.value(i)
356 };
357 (
358 BlobKind::External as u8,
359 position,
360 size,
361 external_base_id,
362 uri,
363 )
364 }
365 BlobKind::Packed => (
366 BlobKind::Packed as u8,
367 packed_position_col.value(i),
368 blob_size_col.value(i),
369 blob_id_col.value(i),
370 "".to_string(),
371 ),
372 BlobKind::Inline => {
373 let data_val = data_col.value(i);
374 let blob_len = data_val.len() as u64;
375 let position = external_buffers
376 .add_buffer(LanceBuffer::from(Buffer::from(data_val)));
377
378 (
379 BlobKind::Inline as u8,
380 position,
381 blob_len,
382 0,
383 "".to_string(),
384 )
385 }
386 }
387 };
388
389 kind_builder.append_value(kind_value);
390 position_builder.append_value(position_value);
391 size_builder.append_value(size_value);
392 blob_id_builder.append_value(blob_id_value);
393 uri_builder.append_value(uri_value);
394 }
395 let children: Vec<ArrayRef> = vec![
396 Arc::new(kind_builder.finish()),
397 Arc::new(position_builder.finish()),
398 Arc::new(size_builder.finish()),
399 Arc::new(blob_id_builder.finish()),
400 Arc::new(uri_builder.finish()),
401 ];
402
403 let descriptor_array = Arc::new(StructArray::try_new(
404 BLOB_V2_DESC_FIELDS.clone(),
405 children,
406 None,
407 )?) as ArrayRef;
408
409 self.descriptor_encoder.maybe_encode(
410 descriptor_array,
411 external_buffers,
412 repdef,
413 row_number,
414 num_rows,
415 )
416 }
417
418 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
419 self.descriptor_encoder.flush(external_buffers)
420 }
421
422 fn finish(
423 &mut self,
424 external_buffers: &mut OutOfLineBuffers,
425 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
426 self.descriptor_encoder.finish(external_buffers)
427 }
428
429 fn num_columns(&self) -> u32 {
430 self.descriptor_encoder.num_columns()
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437 use crate::{
438 compression::DefaultCompressionStrategy,
439 encoder::{ColumnIndexSequence, EncodingOptions},
440 testing::{
441 TestCases, check_round_trip_encoding_of_data,
442 check_round_trip_encoding_of_data_with_expected,
443 },
444 version::LanceFileVersion,
445 };
446 use arrow_array::{
447 ArrayRef, LargeBinaryArray, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array,
448 };
449 use arrow_schema::{DataType, Field as ArrowField};
450
451 #[test]
452 fn test_blob_encoder_creation() {
453 let field =
454 Field::try_from(ArrowField::new("blob_field", DataType::LargeBinary, true)).unwrap();
455 let mut column_index = ColumnIndexSequence::default();
456 let column_idx = column_index.next_column_index(0);
457 let options = EncodingOptions::default();
458 let compression = Arc::new(DefaultCompressionStrategy::new());
459
460 let encoder = BlobStructuralEncoder::new(&field, column_idx, &options, compression);
461
462 assert!(encoder.is_ok());
463 }
464
465 #[tokio::test]
466 async fn test_blob_encoding_simple() {
467 let field = Field::try_from(
468 ArrowField::new("blob_field", DataType::LargeBinary, true).with_metadata(
469 HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]),
470 ),
471 )
472 .unwrap();
473 let mut column_index = ColumnIndexSequence::default();
474 let column_idx = column_index.next_column_index(0);
475 let options = EncodingOptions::default();
476 let compression = Arc::new(DefaultCompressionStrategy::new());
477
478 let mut encoder =
479 BlobStructuralEncoder::new(&field, column_idx, &options, compression).unwrap();
480
481 let large_data = vec![0u8; 1024 * 100]; let data: Vec<Option<&[u8]>> =
484 vec![Some(b"hello world"), None, Some(&large_data), Some(b"")];
485 let array = Arc::new(LargeBinaryArray::from(data));
486
487 let mut external_buffers = OutOfLineBuffers::new(0, 8);
489 let repdef = RepDefBuilder::default();
490
491 let tasks = encoder
492 .maybe_encode(array, &mut external_buffers, repdef, 0, 4)
493 .unwrap();
494
495 if tasks.is_empty() {
497 let _flush_tasks = encoder.flush(&mut external_buffers).unwrap();
498 }
499
500 assert!(encoder.num_columns() > 0);
503
504 let buffers = external_buffers.take_buffers();
506 assert!(
507 !buffers.is_empty(),
508 "Large blobs should be stored in external buffers"
509 );
510 }
511
512 #[tokio::test]
513 async fn test_blob_round_trip() {
514 let blob_metadata =
516 HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);
517
518 let val1: &[u8] = &vec![1u8; 1024]; let val2: &[u8] = &vec![2u8; 10240]; let val3: &[u8] = &vec![3u8; 102400]; let array = Arc::new(LargeBinaryArray::from(vec![
523 Some(val1),
524 None,
525 Some(val2),
526 Some(val3),
527 ]));
528
529 check_round_trip_encoding_of_data(
531 vec![array],
532 &TestCases::default().with_max_file_version(LanceFileVersion::V2_1),
533 blob_metadata,
534 )
535 .await;
536 }
537
538 #[tokio::test]
539 async fn test_blob_v2_external_round_trip() {
540 let blob_metadata = HashMap::from([(
541 lance_arrow::ARROW_EXT_NAME_KEY.to_string(),
542 lance_arrow::BLOB_V2_EXT_NAME.to_string(),
543 )]);
544
545 let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
546 let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
547 let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
548 let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
549 let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
550 let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));
551
552 let kind_array = UInt8Array::from(vec![
553 BlobKind::Inline as u8,
554 BlobKind::External as u8,
555 BlobKind::External as u8,
556 ]);
557 let data_array = LargeBinaryArray::from(vec![Some(b"inline".as_ref()), None, None]);
558 let uri_array = StringArray::from(vec![
559 None,
560 Some("file:///tmp/external.bin"),
561 Some("s3://bucket/blob"),
562 ]);
563 let blob_id_array = UInt32Array::from(vec![0, 0, 0]);
564 let blob_size_array = UInt64Array::from(vec![0, 0, 0]);
565 let position_array = UInt64Array::from(vec![0, 0, 0]);
566
567 let struct_array = StructArray::from(vec![
568 (kind_field, Arc::new(kind_array) as ArrayRef),
569 (data_field, Arc::new(data_array) as ArrayRef),
570 (uri_field, Arc::new(uri_array) as ArrayRef),
571 (blob_id_field, Arc::new(blob_id_array) as ArrayRef),
572 (blob_size_field, Arc::new(blob_size_array) as ArrayRef),
573 (position_field, Arc::new(position_array) as ArrayRef),
574 ]);
575
576 let expected_descriptor = StructArray::from(vec![
577 (
578 Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
579 Arc::new(UInt8Array::from(vec![
580 BlobKind::Inline as u8,
581 BlobKind::External as u8,
582 BlobKind::External as u8,
583 ])) as ArrayRef,
584 ),
585 (
586 Arc::new(ArrowField::new("position", DataType::UInt64, false)),
587 Arc::new(UInt64Array::from(vec![0, 0, 0])) as ArrayRef,
588 ),
589 (
590 Arc::new(ArrowField::new("size", DataType::UInt64, false)),
591 Arc::new(UInt64Array::from(vec![6, 0, 0])) as ArrayRef,
592 ),
593 (
594 Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
595 Arc::new(UInt32Array::from(vec![0, 0, 0])) as ArrayRef,
596 ),
597 (
598 Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
599 Arc::new(StringArray::from(vec![
600 "",
601 "file:///tmp/external.bin",
602 "s3://bucket/blob",
603 ])) as ArrayRef,
604 ),
605 ]);
606
607 check_round_trip_encoding_of_data_with_expected(
608 vec![Arc::new(struct_array)],
609 Some(Arc::new(expected_descriptor)),
610 &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
611 blob_metadata,
612 )
613 .await;
614 }
615
616 #[tokio::test]
617 async fn test_blob_v2_dedicated_round_trip() {
618 let blob_metadata = HashMap::from([(
619 lance_arrow::ARROW_EXT_NAME_KEY.to_string(),
620 lance_arrow::BLOB_V2_EXT_NAME.to_string(),
621 )]);
622
623 let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
624 let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
625 let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
626 let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
627 let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
628 let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));
629
630 let kind_array = UInt8Array::from(vec![BlobKind::Dedicated as u8, BlobKind::Inline as u8]);
631 let data_array = LargeBinaryArray::from(vec![None, Some(b"abc".as_ref())]);
632 let uri_array = StringArray::from(vec![Option::<&str>::None, None]);
633 let blob_id_array = UInt32Array::from(vec![42, 0]);
634 let blob_size_array = UInt64Array::from(vec![12, 0]);
635 let position_array = UInt64Array::from(vec![0, 0]);
636
637 let struct_array = StructArray::from(vec![
638 (kind_field, Arc::new(kind_array) as ArrayRef),
639 (data_field, Arc::new(data_array) as ArrayRef),
640 (uri_field, Arc::new(uri_array) as ArrayRef),
641 (blob_id_field, Arc::new(blob_id_array) as ArrayRef),
642 (blob_size_field, Arc::new(blob_size_array) as ArrayRef),
643 (position_field, Arc::new(position_array) as ArrayRef),
644 ]);
645
646 let expected_descriptor = StructArray::from(vec![
647 (
648 Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
649 Arc::new(UInt8Array::from(vec![
650 BlobKind::Dedicated as u8,
651 BlobKind::Inline as u8,
652 ])) as ArrayRef,
653 ),
654 (
655 Arc::new(ArrowField::new("position", DataType::UInt64, false)),
656 Arc::new(UInt64Array::from(vec![0, 0])) as ArrayRef,
657 ),
658 (
659 Arc::new(ArrowField::new("size", DataType::UInt64, false)),
660 Arc::new(UInt64Array::from(vec![12, 3])) as ArrayRef,
661 ),
662 (
663 Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
664 Arc::new(UInt32Array::from(vec![42, 0])) as ArrayRef,
665 ),
666 (
667 Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
668 Arc::new(StringArray::from(vec!["", ""])) as ArrayRef,
669 ),
670 ]);
671
672 check_round_trip_encoding_of_data_with_expected(
673 vec![Arc::new(struct_array)],
674 Some(Arc::new(expected_descriptor)),
675 &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
676 blob_metadata,
677 )
678 .await;
679 }
680
681 #[tokio::test]
682 async fn test_blob_v2_external_with_range_round_trip() {
683 let blob_metadata = HashMap::from([(
684 lance_arrow::ARROW_EXT_NAME_KEY.to_string(),
685 lance_arrow::BLOB_V2_EXT_NAME.to_string(),
686 )]);
687
688 let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
689 let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
690 let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
691 let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
692 let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
693 let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));
694
695 let kind_array = UInt8Array::from(vec![BlobKind::External as u8]);
696 let data_array = LargeBinaryArray::from(vec![None::<&[u8]>]);
697 let uri_array = StringArray::from(vec![Some("memory://container.pack")]);
698 let blob_id_array = UInt32Array::from(vec![0]);
699 let blob_size_array = UInt64Array::from(vec![42]);
700 let position_array = UInt64Array::from(vec![7]);
701
702 let struct_array = StructArray::from(vec![
703 (kind_field, Arc::new(kind_array) as ArrayRef),
704 (data_field, Arc::new(data_array) as ArrayRef),
705 (uri_field, Arc::new(uri_array) as ArrayRef),
706 (blob_id_field, Arc::new(blob_id_array) as ArrayRef),
707 (blob_size_field, Arc::new(blob_size_array) as ArrayRef),
708 (position_field, Arc::new(position_array) as ArrayRef),
709 ]);
710
711 let expected_descriptor = StructArray::from(vec![
712 (
713 Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
714 Arc::new(UInt8Array::from(vec![BlobKind::External as u8])) as ArrayRef,
715 ),
716 (
717 Arc::new(ArrowField::new("position", DataType::UInt64, false)),
718 Arc::new(UInt64Array::from(vec![7])) as ArrayRef,
719 ),
720 (
721 Arc::new(ArrowField::new("size", DataType::UInt64, false)),
722 Arc::new(UInt64Array::from(vec![42])) as ArrayRef,
723 ),
724 (
725 Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
726 Arc::new(UInt32Array::from(vec![0])) as ArrayRef,
727 ),
728 (
729 Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
730 Arc::new(StringArray::from(vec!["memory://container.pack"])) as ArrayRef,
731 ),
732 ]);
733
734 check_round_trip_encoding_of_data_with_expected(
735 vec![Arc::new(struct_array)],
736 Some(Arc::new(expected_descriptor)),
737 &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
738 blob_metadata,
739 )
740 .await;
741 }
742
743 #[tokio::test]
744 async fn test_blob_v2_packed_round_trip() {
745 let blob_metadata = HashMap::from([(
746 lance_arrow::ARROW_EXT_NAME_KEY.to_string(),
747 lance_arrow::BLOB_V2_EXT_NAME.to_string(),
748 )]);
749
750 let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
751 let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
752 let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
753 let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
754 let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
755 let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));
756
757 let kind_array = UInt8Array::from(vec![BlobKind::Packed as u8]);
758 let data_array = LargeBinaryArray::from(vec![None::<&[u8]>]);
759 let uri_array = StringArray::from(vec![None::<&str>]);
760 let blob_id_array = UInt32Array::from(vec![7]);
761 let blob_size_array = UInt64Array::from(vec![5]);
762 let position_array = UInt64Array::from(vec![10]);
763
764 let struct_array = StructArray::from(vec![
765 (kind_field, Arc::new(kind_array) as ArrayRef),
766 (data_field, Arc::new(data_array) as ArrayRef),
767 (uri_field, Arc::new(uri_array) as ArrayRef),
768 (blob_id_field, Arc::new(blob_id_array) as ArrayRef),
769 (blob_size_field, Arc::new(blob_size_array) as ArrayRef),
770 (position_field, Arc::new(position_array) as ArrayRef),
771 ]);
772
773 let expected_descriptor = StructArray::from(vec![
774 (
775 Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
776 Arc::new(UInt8Array::from(vec![BlobKind::Packed as u8])) as ArrayRef,
777 ),
778 (
779 Arc::new(ArrowField::new("position", DataType::UInt64, false)),
780 Arc::new(UInt64Array::from(vec![10])) as ArrayRef,
781 ),
782 (
783 Arc::new(ArrowField::new("size", DataType::UInt64, false)),
784 Arc::new(UInt64Array::from(vec![5])) as ArrayRef,
785 ),
786 (
787 Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
788 Arc::new(UInt32Array::from(vec![7])) as ArrayRef,
789 ),
790 (
791 Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
792 Arc::new(StringArray::from(vec![""])) as ArrayRef,
793 ),
794 ]);
795
796 check_round_trip_encoding_of_data_with_expected(
797 vec![Arc::new(struct_array)],
798 Some(Arc::new(expected_descriptor)),
799 &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
800 blob_metadata,
801 )
802 .await;
803 }
804}