1use std::{collections::HashMap, sync::Arc};
5
6use arrow_array::{
7 builder::{PrimitiveBuilder, StringBuilder},
8 cast::AsArray,
9 types::{UInt32Type, UInt64Type, UInt8Type},
10 Array, ArrayRef, StructArray, UInt64Array,
11};
12use arrow_buffer::Buffer;
13use arrow_schema::{DataType, Field as ArrowField, Fields};
14use futures::{future::BoxFuture, FutureExt};
15use lance_core::{
16 datatypes::Field, datatypes::BLOB_V2_DESC_FIELDS, error::LanceOptionExt, Error, Result,
17};
18use snafu::location;
19
20use crate::{
21 buffer::LanceBuffer,
22 constants::PACKED_STRUCT_META_KEY,
23 decoder::PageEncoding,
24 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
25 encodings::logical::primitive::PrimitiveStructuralEncoder,
26 format::ProtobufUtils21,
27 repdef::{DefinitionInterpretation, RepDefBuilder},
28};
29
30pub struct BlobStructuralEncoder {
36 descriptor_encoder: Box<dyn FieldEncoder>,
38 def_meaning: Option<Arc<[DefinitionInterpretation]>>,
40}
41
42impl BlobStructuralEncoder {
43 pub fn new(
44 field: &Field,
45 column_index: u32,
46 options: &crate::encoder::EncodingOptions,
47 compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
48 ) -> Result<Self> {
49 let mut descriptor_metadata = HashMap::with_capacity(1);
52 descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
53
54 let descriptor_data_type = DataType::Struct(Fields::from(vec![
55 ArrowField::new("position", DataType::UInt64, false),
56 ArrowField::new("size", DataType::UInt64, false),
57 ]));
58
59 let descriptor_field = Field::try_from(
61 ArrowField::new(&field.name, descriptor_data_type, field.nullable)
62 .with_metadata(descriptor_metadata),
63 )?;
64
65 let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
67 options,
68 compression_strategy,
69 column_index,
70 descriptor_field,
71 Arc::new(HashMap::new()),
72 )?);
73
74 Ok(Self {
75 descriptor_encoder,
76 def_meaning: None,
77 })
78 }
79
80 fn wrap_tasks(
81 tasks: Vec<EncodeTask>,
82 def_meaning: Arc<[DefinitionInterpretation]>,
83 ) -> Vec<EncodeTask> {
84 tasks
85 .into_iter()
86 .map(|task| {
87 let def_meaning = def_meaning.clone();
88 task.then(|encoded_page| async move {
89 let encoded_page = encoded_page?;
90
91 let PageEncoding::Structural(inner_layout) = encoded_page.description else {
92 return Err(Error::Internal {
93 message: "Expected inner encoding to return structural layout"
94 .to_string(),
95 location: location!(),
96 });
97 };
98
99 let wrapped = ProtobufUtils21::blob_layout(inner_layout, &def_meaning);
100 Ok(EncodedPage {
101 column_idx: encoded_page.column_idx,
102 data: encoded_page.data,
103 description: PageEncoding::Structural(wrapped),
104 num_rows: encoded_page.num_rows,
105 row_number: encoded_page.row_number,
106 })
107 })
108 .boxed()
109 })
110 .collect::<Vec<_>>()
111 }
112}
113
114impl FieldEncoder for BlobStructuralEncoder {
115 fn maybe_encode(
116 &mut self,
117 array: ArrayRef,
118 external_buffers: &mut OutOfLineBuffers,
119 mut repdef: RepDefBuilder,
120 row_number: u64,
121 num_rows: u64,
122 ) -> Result<Vec<EncodeTask>> {
123 if let Some(validity) = array.nulls() {
124 repdef.add_validity_bitmap(validity.clone());
125 } else {
126 repdef.add_no_null(array.len());
127 }
128
129 let binary_array = array
131 .as_binary_opt::<i64>()
132 .ok_or_else(|| Error::InvalidInput {
133 source: format!("Expected LargeBinary array, got {}", array.data_type()).into(),
134 location: location!(),
135 })?;
136
137 let repdef = RepDefBuilder::serialize(vec![repdef]);
138
139 let rep = repdef.repetition_levels.as_ref();
140 let def = repdef.definition_levels.as_ref();
141 let def_meaning: Arc<[DefinitionInterpretation]> = repdef.def_meaning.into();
142
143 if self.def_meaning.is_none() {
144 self.def_meaning = Some(def_meaning.clone());
145 } else {
146 debug_assert_eq!(self.def_meaning.as_ref().unwrap(), &def_meaning);
147 }
148
149 let mut positions = Vec::with_capacity(binary_array.len());
151 let mut sizes = Vec::with_capacity(binary_array.len());
152
153 for i in 0..binary_array.len() {
154 if binary_array.is_null(i) {
155 let mut repdef = (def.expect_ok()?[i] as u64) << 16;
159 if let Some(rep) = rep {
160 repdef += rep[i] as u64;
161 }
162
163 debug_assert_ne!(repdef, 0);
164 positions.push(repdef);
165 sizes.push(0);
166 } else {
167 let value = binary_array.value(i);
168 if value.is_empty() {
169 positions.push(0);
171 sizes.push(0);
172 } else {
173 let position =
175 external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
176 positions.push(position);
177 sizes.push(value.len() as u64);
178 }
179 }
180 }
181
182 let position_array = Arc::new(UInt64Array::from(positions));
184 let size_array = Arc::new(UInt64Array::from(sizes));
185 let descriptor_array = Arc::new(StructArray::new(
186 Fields::from(vec![
187 ArrowField::new("position", DataType::UInt64, false),
188 ArrowField::new("size", DataType::UInt64, false),
189 ]),
190 vec![position_array as ArrayRef, size_array as ArrayRef],
191 None, ));
193
194 let encode_tasks = self.descriptor_encoder.maybe_encode(
196 descriptor_array,
197 external_buffers,
198 RepDefBuilder::default(),
199 row_number,
200 num_rows,
201 )?;
202
203 Ok(Self::wrap_tasks(encode_tasks, def_meaning))
204 }
205
206 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
207 let encode_tasks = self.descriptor_encoder.flush(external_buffers)?;
208
209 let def_meaning = self
212 .def_meaning
213 .clone()
214 .unwrap_or_else(|| Arc::new([DefinitionInterpretation::AllValidItem]));
215
216 Ok(Self::wrap_tasks(encode_tasks, def_meaning))
217 }
218
219 fn finish(
220 &mut self,
221 external_buffers: &mut OutOfLineBuffers,
222 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
223 self.descriptor_encoder.finish(external_buffers)
224 }
225
226 fn num_columns(&self) -> u32 {
227 self.descriptor_encoder.num_columns()
228 }
229}
230
231pub struct BlobV2StructuralEncoder {
233 descriptor_encoder: Box<dyn FieldEncoder>,
234}
235
236impl BlobV2StructuralEncoder {
237 pub fn new(
238 field: &Field,
239 column_index: u32,
240 options: &crate::encoder::EncodingOptions,
241 compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
242 ) -> Result<Self> {
243 let mut descriptor_metadata = HashMap::with_capacity(1);
244 descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
245
246 let descriptor_data_type = DataType::Struct(BLOB_V2_DESC_FIELDS.clone());
247
248 let descriptor_field = Field::try_from(
249 ArrowField::new(&field.name, descriptor_data_type, field.nullable)
250 .with_metadata(descriptor_metadata),
251 )?;
252
253 let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
254 options,
255 compression_strategy,
256 column_index,
257 descriptor_field,
258 Arc::new(HashMap::new()),
259 )?);
260
261 Ok(Self { descriptor_encoder })
262 }
263}
264
265impl FieldEncoder for BlobV2StructuralEncoder {
266 fn maybe_encode(
267 &mut self,
268 array: ArrayRef,
269 external_buffers: &mut OutOfLineBuffers,
270 _repdef: RepDefBuilder,
271 row_number: u64,
272 num_rows: u64,
273 ) -> Result<Vec<EncodeTask>> {
274 let DataType::Struct(fields) = array.data_type() else {
276 return Err(Error::InvalidInput {
277 source: "Blob v2 requires struct<data, uri> input".into(),
278 location: location!(),
279 });
280 };
281
282 let struct_arr = array.as_struct();
283 let mut data_idx = None;
284 let mut uri_idx = None;
285 for (idx, field) in fields.iter().enumerate() {
286 match field.name().as_str() {
287 "data" => data_idx = Some(idx),
288 "uri" => uri_idx = Some(idx),
289 _ => {}
290 }
291 }
292 let (data_idx, uri_idx) = data_idx.zip(uri_idx).ok_or_else(|| Error::InvalidInput {
293 source: "Blob v2 struct must contain 'data' and 'uri' fields".into(),
294 location: location!(),
295 })?;
296
297 let data_col = struct_arr.column(data_idx).as_binary::<i64>();
298 let uri_col = struct_arr.column(uri_idx).as_string::<i32>();
299
300 for i in 0..struct_arr.len() {
302 if struct_arr.is_null(i) {
303 continue;
304 }
305 let data_is_set = !data_col.is_null(i);
306 let uri_is_set = !uri_col.is_null(i);
307 if data_is_set == uri_is_set {
308 return Err(Error::InvalidInput {
309 source: "Each blob row must set exactly one of data or uri".into(),
310 location: location!(),
311 });
312 }
313 if uri_is_set {
314 return Err(Error::NotSupported {
315 source: "External blob (uri) is not supported yet".into(),
316 location: location!(),
317 });
318 }
319 }
320
321 let binary_array = data_col;
322
323 let mut kind_builder = PrimitiveBuilder::<UInt8Type>::with_capacity(binary_array.len());
324 let mut position_builder =
325 PrimitiveBuilder::<UInt64Type>::with_capacity(binary_array.len());
326 let mut size_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(binary_array.len());
327 let mut blob_id_builder = PrimitiveBuilder::<UInt32Type>::with_capacity(binary_array.len());
328 let mut uri_builder = StringBuilder::with_capacity(binary_array.len(), 0);
329
330 for i in 0..binary_array.len() {
331 let is_null_row = match array.data_type() {
332 DataType::Struct(_) => array.is_null(i),
333 _ => binary_array.is_null(i),
334 };
335 if is_null_row {
336 kind_builder.append_null();
337 position_builder.append_null();
338 size_builder.append_null();
339 blob_id_builder.append_null();
340 uri_builder.append_null();
341 continue;
342 }
343
344 let value = binary_array.value(i);
345 kind_builder.append_value(0);
346
347 if value.is_empty() {
348 position_builder.append_value(0);
349 size_builder.append_value(0);
350 } else {
351 let position = external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
352 position_builder.append_value(position);
353 size_builder.append_value(value.len() as u64);
354 }
355
356 blob_id_builder.append_null();
357 uri_builder.append_null();
358 }
359
360 let children: Vec<ArrayRef> = vec![
361 Arc::new(kind_builder.finish()),
362 Arc::new(position_builder.finish()),
363 Arc::new(size_builder.finish()),
364 Arc::new(blob_id_builder.finish()),
365 Arc::new(uri_builder.finish()),
366 ];
367
368 let descriptor_array = Arc::new(StructArray::try_new(
369 BLOB_V2_DESC_FIELDS.clone(),
370 children,
371 None,
372 )?) as ArrayRef;
373
374 self.descriptor_encoder.maybe_encode(
375 descriptor_array,
376 external_buffers,
377 RepDefBuilder::default(),
378 row_number,
379 num_rows,
380 )
381 }
382
383 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
384 self.descriptor_encoder.flush(external_buffers)
385 }
386
387 fn finish(
388 &mut self,
389 external_buffers: &mut OutOfLineBuffers,
390 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
391 self.descriptor_encoder.finish(external_buffers)
392 }
393
394 fn num_columns(&self) -> u32 {
395 self.descriptor_encoder.num_columns()
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use crate::{
403 compression::DefaultCompressionStrategy,
404 encoder::{ColumnIndexSequence, EncodingOptions},
405 testing::{check_round_trip_encoding_of_data, TestCases},
406 };
407 use arrow_array::LargeBinaryArray;
408
409 #[test]
410 fn test_blob_encoder_creation() {
411 let field =
412 Field::try_from(ArrowField::new("blob_field", DataType::LargeBinary, true)).unwrap();
413 let mut column_index = ColumnIndexSequence::default();
414 let column_idx = column_index.next_column_index(0);
415 let options = EncodingOptions::default();
416 let compression = Arc::new(DefaultCompressionStrategy::new());
417
418 let encoder = BlobStructuralEncoder::new(&field, column_idx, &options, compression);
419
420 assert!(encoder.is_ok());
421 }
422
423 #[tokio::test]
424 async fn test_blob_encoding_simple() {
425 let field = Field::try_from(
426 ArrowField::new("blob_field", DataType::LargeBinary, true).with_metadata(
427 HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]),
428 ),
429 )
430 .unwrap();
431 let mut column_index = ColumnIndexSequence::default();
432 let column_idx = column_index.next_column_index(0);
433 let options = EncodingOptions::default();
434 let compression = Arc::new(DefaultCompressionStrategy::new());
435
436 let mut encoder =
437 BlobStructuralEncoder::new(&field, column_idx, &options, compression).unwrap();
438
439 let large_data = vec![0u8; 1024 * 100]; let data: Vec<Option<&[u8]>> =
442 vec![Some(b"hello world"), None, Some(&large_data), Some(b"")];
443 let array = Arc::new(LargeBinaryArray::from(data));
444
445 let mut external_buffers = OutOfLineBuffers::new(0, 8);
447 let repdef = RepDefBuilder::default();
448
449 let tasks = encoder
450 .maybe_encode(array, &mut external_buffers, repdef, 0, 4)
451 .unwrap();
452
453 if tasks.is_empty() {
455 let _flush_tasks = encoder.flush(&mut external_buffers).unwrap();
456 }
457
458 assert!(encoder.num_columns() > 0);
461
462 let buffers = external_buffers.take_buffers();
464 assert!(
465 !buffers.is_empty(),
466 "Large blobs should be stored in external buffers"
467 );
468 }
469
470 #[tokio::test]
471 async fn test_blob_round_trip() {
472 let blob_metadata =
474 HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);
475
476 let val1: &[u8] = &vec![1u8; 1024]; let val2: &[u8] = &vec![2u8; 10240]; let val3: &[u8] = &vec![3u8; 102400]; let array = Arc::new(LargeBinaryArray::from(vec![
481 Some(val1),
482 None,
483 Some(val2),
484 Some(val3),
485 ]));
486
487 check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await;
489 }
490}