lance_encoding/encodings/logical/
blob.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, sync::Arc};
5
6use arrow_array::{
7    builder::{PrimitiveBuilder, StringBuilder},
8    cast::AsArray,
9    types::{UInt32Type, UInt64Type, UInt8Type},
10    Array, ArrayRef, StructArray, UInt64Array,
11};
12use arrow_buffer::Buffer;
13use arrow_schema::{DataType, Field as ArrowField, Fields};
14use futures::{future::BoxFuture, FutureExt};
15use lance_core::{
16    datatypes::Field, datatypes::BLOB_V2_DESC_FIELDS, error::LanceOptionExt, Error, Result,
17};
18use snafu::location;
19
20use crate::{
21    buffer::LanceBuffer,
22    constants::PACKED_STRUCT_META_KEY,
23    decoder::PageEncoding,
24    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
25    encodings::logical::primitive::PrimitiveStructuralEncoder,
26    format::ProtobufUtils21,
27    repdef::{DefinitionInterpretation, RepDefBuilder},
28};
29
30/// Blob structural encoder - stores large binary data in external buffers
31///
32/// This encoder takes large binary arrays and stores them outside the normal
33/// page structure. It creates a descriptor (position, size) for each blob
34/// that is stored inline in the page.
35pub struct BlobStructuralEncoder {
36    // Encoder for the descriptors (position/size struct)
37    descriptor_encoder: Box<dyn FieldEncoder>,
38    // Set when we first see data
39    def_meaning: Option<Arc<[DefinitionInterpretation]>>,
40}
41
42impl BlobStructuralEncoder {
43    pub fn new(
44        field: &Field,
45        column_index: u32,
46        options: &crate::encoder::EncodingOptions,
47        compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
48    ) -> Result<Self> {
49        // Create descriptor field: struct<position: u64, size: u64>
50        // Preserve the original field's metadata for packed struct
51        let mut descriptor_metadata = HashMap::with_capacity(1);
52        descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
53
54        let descriptor_data_type = DataType::Struct(Fields::from(vec![
55            ArrowField::new("position", DataType::UInt64, false),
56            ArrowField::new("size", DataType::UInt64, false),
57        ]));
58
59        // Use the original field's name for the descriptor
60        let descriptor_field = Field::try_from(
61            ArrowField::new(&field.name, descriptor_data_type, field.nullable)
62                .with_metadata(descriptor_metadata),
63        )?;
64
65        // Use PrimitiveStructuralEncoder to handle the descriptor
66        let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
67            options,
68            compression_strategy,
69            column_index,
70            descriptor_field,
71            Arc::new(HashMap::new()),
72        )?);
73
74        Ok(Self {
75            descriptor_encoder,
76            def_meaning: None,
77        })
78    }
79
80    fn wrap_tasks(
81        tasks: Vec<EncodeTask>,
82        def_meaning: Arc<[DefinitionInterpretation]>,
83    ) -> Vec<EncodeTask> {
84        tasks
85            .into_iter()
86            .map(|task| {
87                let def_meaning = def_meaning.clone();
88                task.then(|encoded_page| async move {
89                    let encoded_page = encoded_page?;
90
91                    let PageEncoding::Structural(inner_layout) = encoded_page.description else {
92                        return Err(Error::Internal {
93                            message: "Expected inner encoding to return structural layout"
94                                .to_string(),
95                            location: location!(),
96                        });
97                    };
98
99                    let wrapped = ProtobufUtils21::blob_layout(inner_layout, &def_meaning);
100                    Ok(EncodedPage {
101                        column_idx: encoded_page.column_idx,
102                        data: encoded_page.data,
103                        description: PageEncoding::Structural(wrapped),
104                        num_rows: encoded_page.num_rows,
105                        row_number: encoded_page.row_number,
106                    })
107                })
108                .boxed()
109            })
110            .collect::<Vec<_>>()
111    }
112}
113
114impl FieldEncoder for BlobStructuralEncoder {
115    fn maybe_encode(
116        &mut self,
117        array: ArrayRef,
118        external_buffers: &mut OutOfLineBuffers,
119        mut repdef: RepDefBuilder,
120        row_number: u64,
121        num_rows: u64,
122    ) -> Result<Vec<EncodeTask>> {
123        if let Some(validity) = array.nulls() {
124            repdef.add_validity_bitmap(validity.clone());
125        } else {
126            repdef.add_no_null(array.len());
127        }
128
129        // Convert input array to LargeBinary
130        let binary_array = array
131            .as_binary_opt::<i64>()
132            .ok_or_else(|| Error::InvalidInput {
133                source: format!("Expected LargeBinary array, got {}", array.data_type()).into(),
134                location: location!(),
135            })?;
136
137        let repdef = RepDefBuilder::serialize(vec![repdef]);
138
139        let rep = repdef.repetition_levels.as_ref();
140        let def = repdef.definition_levels.as_ref();
141        let def_meaning: Arc<[DefinitionInterpretation]> = repdef.def_meaning.into();
142
143        if self.def_meaning.is_none() {
144            self.def_meaning = Some(def_meaning.clone());
145        } else {
146            debug_assert_eq!(self.def_meaning.as_ref().unwrap(), &def_meaning);
147        }
148
149        // Collect positions and sizes
150        let mut positions = Vec::with_capacity(binary_array.len());
151        let mut sizes = Vec::with_capacity(binary_array.len());
152
153        for i in 0..binary_array.len() {
154            if binary_array.is_null(i) {
155                // Null values are smuggled into the positions array
156
157                // If we have null values we must have definition levels
158                let mut repdef = (def.expect_ok()?[i] as u64) << 16;
159                if let Some(rep) = rep {
160                    repdef += rep[i] as u64;
161                }
162
163                debug_assert_ne!(repdef, 0);
164                positions.push(repdef);
165                sizes.push(0);
166            } else {
167                let value = binary_array.value(i);
168                if value.is_empty() {
169                    // Empty values
170                    positions.push(0);
171                    sizes.push(0);
172                } else {
173                    // Add data to external buffers
174                    let position =
175                        external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
176                    positions.push(position);
177                    sizes.push(value.len() as u64);
178                }
179            }
180        }
181
182        // Create descriptor array
183        let position_array = Arc::new(UInt64Array::from(positions));
184        let size_array = Arc::new(UInt64Array::from(sizes));
185        let descriptor_array = Arc::new(StructArray::new(
186            Fields::from(vec![
187                ArrowField::new("position", DataType::UInt64, false),
188                ArrowField::new("size", DataType::UInt64, false),
189            ]),
190            vec![position_array as ArrayRef, size_array as ArrayRef],
191            None, // Descriptors are never null
192        ));
193
194        // Delegate to descriptor encoder
195        let encode_tasks = self.descriptor_encoder.maybe_encode(
196            descriptor_array,
197            external_buffers,
198            RepDefBuilder::default(),
199            row_number,
200            num_rows,
201        )?;
202
203        Ok(Self::wrap_tasks(encode_tasks, def_meaning))
204    }
205
206    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
207        let encode_tasks = self.descriptor_encoder.flush(external_buffers)?;
208
209        // Use the cached def meaning.  If we haven't seen any data yet then we can just use a dummy
210        // value (not clear there would be any encode tasks in that case)
211        let def_meaning = self
212            .def_meaning
213            .clone()
214            .unwrap_or_else(|| Arc::new([DefinitionInterpretation::AllValidItem]));
215
216        Ok(Self::wrap_tasks(encode_tasks, def_meaning))
217    }
218
219    fn finish(
220        &mut self,
221        external_buffers: &mut OutOfLineBuffers,
222    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
223        self.descriptor_encoder.finish(external_buffers)
224    }
225
226    fn num_columns(&self) -> u32 {
227        self.descriptor_encoder.num_columns()
228    }
229}
230
231/// Blob v2 structural encoder
232pub struct BlobV2StructuralEncoder {
233    descriptor_encoder: Box<dyn FieldEncoder>,
234}
235
236impl BlobV2StructuralEncoder {
237    pub fn new(
238        field: &Field,
239        column_index: u32,
240        options: &crate::encoder::EncodingOptions,
241        compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
242    ) -> Result<Self> {
243        let mut descriptor_metadata = HashMap::with_capacity(1);
244        descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
245
246        let descriptor_data_type = DataType::Struct(BLOB_V2_DESC_FIELDS.clone());
247
248        let descriptor_field = Field::try_from(
249            ArrowField::new(&field.name, descriptor_data_type, field.nullable)
250                .with_metadata(descriptor_metadata),
251        )?;
252
253        let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
254            options,
255            compression_strategy,
256            column_index,
257            descriptor_field,
258            Arc::new(HashMap::new()),
259        )?);
260
261        Ok(Self { descriptor_encoder })
262    }
263}
264
265impl FieldEncoder for BlobV2StructuralEncoder {
266    fn maybe_encode(
267        &mut self,
268        array: ArrayRef,
269        external_buffers: &mut OutOfLineBuffers,
270        _repdef: RepDefBuilder,
271        row_number: u64,
272        num_rows: u64,
273    ) -> Result<Vec<EncodeTask>> {
274        // Supported input: Struct<data:LargeBinary?, uri:Utf8?>
275        let DataType::Struct(fields) = array.data_type() else {
276            return Err(Error::InvalidInput {
277                source: "Blob v2 requires struct<data, uri> input".into(),
278                location: location!(),
279            });
280        };
281
282        let struct_arr = array.as_struct();
283        let mut data_idx = None;
284        let mut uri_idx = None;
285        for (idx, field) in fields.iter().enumerate() {
286            match field.name().as_str() {
287                "data" => data_idx = Some(idx),
288                "uri" => uri_idx = Some(idx),
289                _ => {}
290            }
291        }
292        let (data_idx, uri_idx) = data_idx.zip(uri_idx).ok_or_else(|| Error::InvalidInput {
293            source: "Blob v2 struct must contain 'data' and 'uri' fields".into(),
294            location: location!(),
295        })?;
296
297        let data_col = struct_arr.column(data_idx).as_binary::<i64>();
298        let uri_col = struct_arr.column(uri_idx).as_string::<i32>();
299
300        // Validate XOR(data, uri)
301        for i in 0..struct_arr.len() {
302            if struct_arr.is_null(i) {
303                continue;
304            }
305            let data_is_set = !data_col.is_null(i);
306            let uri_is_set = !uri_col.is_null(i);
307            if data_is_set == uri_is_set {
308                return Err(Error::InvalidInput {
309                    source: "Each blob row must set exactly one of data or uri".into(),
310                    location: location!(),
311                });
312            }
313            if uri_is_set {
314                return Err(Error::NotSupported {
315                    source: "External blob (uri) is not supported yet".into(),
316                    location: location!(),
317                });
318            }
319        }
320
321        let binary_array = data_col;
322
323        let mut kind_builder = PrimitiveBuilder::<UInt8Type>::with_capacity(binary_array.len());
324        let mut position_builder =
325            PrimitiveBuilder::<UInt64Type>::with_capacity(binary_array.len());
326        let mut size_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(binary_array.len());
327        let mut blob_id_builder = PrimitiveBuilder::<UInt32Type>::with_capacity(binary_array.len());
328        let mut uri_builder = StringBuilder::with_capacity(binary_array.len(), 0);
329
330        for i in 0..binary_array.len() {
331            let is_null_row = match array.data_type() {
332                DataType::Struct(_) => array.is_null(i),
333                _ => binary_array.is_null(i),
334            };
335            if is_null_row {
336                kind_builder.append_null();
337                position_builder.append_null();
338                size_builder.append_null();
339                blob_id_builder.append_null();
340                uri_builder.append_null();
341                continue;
342            }
343
344            let value = binary_array.value(i);
345            kind_builder.append_value(0);
346
347            if value.is_empty() {
348                position_builder.append_value(0);
349                size_builder.append_value(0);
350            } else {
351                let position = external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
352                position_builder.append_value(position);
353                size_builder.append_value(value.len() as u64);
354            }
355
356            blob_id_builder.append_null();
357            uri_builder.append_null();
358        }
359
360        let children: Vec<ArrayRef> = vec![
361            Arc::new(kind_builder.finish()),
362            Arc::new(position_builder.finish()),
363            Arc::new(size_builder.finish()),
364            Arc::new(blob_id_builder.finish()),
365            Arc::new(uri_builder.finish()),
366        ];
367
368        let descriptor_array = Arc::new(StructArray::try_new(
369            BLOB_V2_DESC_FIELDS.clone(),
370            children,
371            None,
372        )?) as ArrayRef;
373
374        self.descriptor_encoder.maybe_encode(
375            descriptor_array,
376            external_buffers,
377            RepDefBuilder::default(),
378            row_number,
379            num_rows,
380        )
381    }
382
383    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
384        self.descriptor_encoder.flush(external_buffers)
385    }
386
387    fn finish(
388        &mut self,
389        external_buffers: &mut OutOfLineBuffers,
390    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
391        self.descriptor_encoder.finish(external_buffers)
392    }
393
394    fn num_columns(&self) -> u32 {
395        self.descriptor_encoder.num_columns()
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402    use crate::{
403        compression::DefaultCompressionStrategy,
404        encoder::{ColumnIndexSequence, EncodingOptions},
405        testing::{check_round_trip_encoding_of_data, TestCases},
406    };
407    use arrow_array::LargeBinaryArray;
408
409    #[test]
410    fn test_blob_encoder_creation() {
411        let field =
412            Field::try_from(ArrowField::new("blob_field", DataType::LargeBinary, true)).unwrap();
413        let mut column_index = ColumnIndexSequence::default();
414        let column_idx = column_index.next_column_index(0);
415        let options = EncodingOptions::default();
416        let compression = Arc::new(DefaultCompressionStrategy::new());
417
418        let encoder = BlobStructuralEncoder::new(&field, column_idx, &options, compression);
419
420        assert!(encoder.is_ok());
421    }
422
423    #[tokio::test]
424    async fn test_blob_encoding_simple() {
425        let field = Field::try_from(
426            ArrowField::new("blob_field", DataType::LargeBinary, true).with_metadata(
427                HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]),
428            ),
429        )
430        .unwrap();
431        let mut column_index = ColumnIndexSequence::default();
432        let column_idx = column_index.next_column_index(0);
433        let options = EncodingOptions::default();
434        let compression = Arc::new(DefaultCompressionStrategy::new());
435
436        let mut encoder =
437            BlobStructuralEncoder::new(&field, column_idx, &options, compression).unwrap();
438
439        // Create test data with larger blobs
440        let large_data = vec![0u8; 1024 * 100]; // 100KB blob
441        let data: Vec<Option<&[u8]>> =
442            vec![Some(b"hello world"), None, Some(&large_data), Some(b"")];
443        let array = Arc::new(LargeBinaryArray::from(data));
444
445        // Test encoding
446        let mut external_buffers = OutOfLineBuffers::new(0, 8);
447        let repdef = RepDefBuilder::default();
448
449        let tasks = encoder
450            .maybe_encode(array, &mut external_buffers, repdef, 0, 4)
451            .unwrap();
452
453        // If no tasks yet, flush to force encoding
454        if tasks.is_empty() {
455            let _flush_tasks = encoder.flush(&mut external_buffers).unwrap();
456        }
457
458        // Should produce encode tasks for the descriptor (or we need more data)
459        // For now, just verify no errors occurred
460        assert!(encoder.num_columns() > 0);
461
462        // Verify external buffers were used for large data
463        let buffers = external_buffers.take_buffers();
464        assert!(
465            !buffers.is_empty(),
466            "Large blobs should be stored in external buffers"
467        );
468    }
469
470    #[tokio::test]
471    async fn test_blob_round_trip() {
472        // Test round-trip encoding with blob metadata
473        let blob_metadata =
474            HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);
475
476        // Create test data
477        let val1: &[u8] = &vec![1u8; 1024]; // 1KB
478        let val2: &[u8] = &vec![2u8; 10240]; // 10KB
479        let val3: &[u8] = &vec![3u8; 102400]; // 100KB
480        let array = Arc::new(LargeBinaryArray::from(vec![
481            Some(val1),
482            None,
483            Some(val2),
484            Some(val3),
485        ]));
486
487        // Use the standard test harness
488        check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await;
489    }
490}