lance_encoding/encodings/logical/
blob.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, sync::Arc};
5
6use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray, UInt64Array};
7use arrow_buffer::Buffer;
8use arrow_schema::{DataType, Field as ArrowField, Fields};
9use futures::{future::BoxFuture, FutureExt};
10use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
11use snafu::location;
12
13use crate::{
14    buffer::LanceBuffer,
15    constants::PACKED_STRUCT_META_KEY,
16    decoder::PageEncoding,
17    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
18    encodings::logical::primitive::PrimitiveStructuralEncoder,
19    format::ProtobufUtils21,
20    repdef::{DefinitionInterpretation, RepDefBuilder},
21};
22
23/// Blob structural encoder - stores large binary data in external buffers
24///
25/// This encoder takes large binary arrays and stores them outside the normal
26/// page structure. It creates a descriptor (position, size) for each blob
27/// that is stored inline in the page.
28pub struct BlobStructuralEncoder {
29    // Encoder for the descriptors (position/size struct)
30    descriptor_encoder: Box<dyn FieldEncoder>,
31    // Set when we first see data
32    def_meaning: Option<Arc<[DefinitionInterpretation]>>,
33}
34
35impl BlobStructuralEncoder {
36    pub fn new(
37        field: &Field,
38        column_index: u32,
39        options: &crate::encoder::EncodingOptions,
40        compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
41    ) -> Result<Self> {
42        // Create descriptor field: struct<position: u64, size: u64>
43        // Preserve the original field's metadata for packed struct
44        let mut descriptor_metadata = HashMap::with_capacity(1);
45        descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
46
47        let descriptor_data_type = DataType::Struct(Fields::from(vec![
48            ArrowField::new("position", DataType::UInt64, false),
49            ArrowField::new("size", DataType::UInt64, false),
50        ]));
51
52        // Use the original field's name for the descriptor
53        let descriptor_field = Field::try_from(
54            ArrowField::new(&field.name, descriptor_data_type, field.nullable)
55                .with_metadata(descriptor_metadata),
56        )?;
57
58        // Use PrimitiveStructuralEncoder to handle the descriptor
59        let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
60            options,
61            compression_strategy,
62            column_index,
63            descriptor_field,
64            Arc::new(HashMap::new()),
65        )?);
66
67        Ok(Self {
68            descriptor_encoder,
69            def_meaning: None,
70        })
71    }
72
73    fn wrap_tasks(
74        tasks: Vec<EncodeTask>,
75        def_meaning: Arc<[DefinitionInterpretation]>,
76    ) -> Vec<EncodeTask> {
77        tasks
78            .into_iter()
79            .map(|task| {
80                let def_meaning = def_meaning.clone();
81                task.then(|encoded_page| async move {
82                    let encoded_page = encoded_page?;
83
84                    let PageEncoding::Structural(inner_layout) = encoded_page.description else {
85                        return Err(Error::Internal {
86                            message: "Expected inner encoding to return structural layout"
87                                .to_string(),
88                            location: location!(),
89                        });
90                    };
91
92                    let wrapped = ProtobufUtils21::blob_layout(inner_layout, &def_meaning);
93                    Ok(EncodedPage {
94                        column_idx: encoded_page.column_idx,
95                        data: encoded_page.data,
96                        description: PageEncoding::Structural(wrapped),
97                        num_rows: encoded_page.num_rows,
98                        row_number: encoded_page.row_number,
99                    })
100                })
101                .boxed()
102            })
103            .collect::<Vec<_>>()
104    }
105}
106
107impl FieldEncoder for BlobStructuralEncoder {
108    fn maybe_encode(
109        &mut self,
110        array: ArrayRef,
111        external_buffers: &mut OutOfLineBuffers,
112        mut repdef: RepDefBuilder,
113        row_number: u64,
114        num_rows: u64,
115    ) -> Result<Vec<EncodeTask>> {
116        if let Some(validity) = array.nulls() {
117            repdef.add_validity_bitmap(validity.clone());
118        } else {
119            repdef.add_no_null(array.len());
120        }
121
122        // Convert input array to LargeBinary
123        let binary_array = array
124            .as_binary_opt::<i64>()
125            .ok_or_else(|| Error::InvalidInput {
126                source: format!("Expected LargeBinary array, got {}", array.data_type()).into(),
127                location: location!(),
128            })?;
129
130        let repdef = RepDefBuilder::serialize(vec![repdef]);
131
132        let rep = repdef.repetition_levels.as_ref();
133        let def = repdef.definition_levels.as_ref();
134        let def_meaning: Arc<[DefinitionInterpretation]> = repdef.def_meaning.into();
135
136        if self.def_meaning.is_none() {
137            self.def_meaning = Some(def_meaning.clone());
138        } else {
139            debug_assert_eq!(self.def_meaning.as_ref().unwrap(), &def_meaning);
140        }
141
142        // Collect positions and sizes
143        let mut positions = Vec::with_capacity(binary_array.len());
144        let mut sizes = Vec::with_capacity(binary_array.len());
145
146        for i in 0..binary_array.len() {
147            if binary_array.is_null(i) {
148                // Null values are smuggled into the positions array
149
150                // If we have null values we must have definition levels
151                let mut repdef = (def.expect_ok()?[i] as u64) << 16;
152                if let Some(rep) = rep {
153                    repdef += rep[i] as u64;
154                }
155
156                debug_assert_ne!(repdef, 0);
157                positions.push(repdef);
158                sizes.push(0);
159            } else {
160                let value = binary_array.value(i);
161                if value.is_empty() {
162                    // Empty values
163                    positions.push(0);
164                    sizes.push(0);
165                } else {
166                    // Add data to external buffers
167                    let position =
168                        external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
169                    positions.push(position);
170                    sizes.push(value.len() as u64);
171                }
172            }
173        }
174
175        // Create descriptor array
176        let position_array = Arc::new(UInt64Array::from(positions));
177        let size_array = Arc::new(UInt64Array::from(sizes));
178        let descriptor_array = Arc::new(StructArray::new(
179            Fields::from(vec![
180                ArrowField::new("position", DataType::UInt64, false),
181                ArrowField::new("size", DataType::UInt64, false),
182            ]),
183            vec![position_array as ArrayRef, size_array as ArrayRef],
184            None, // Descriptors are never null
185        ));
186
187        // Delegate to descriptor encoder
188        let encode_tasks = self.descriptor_encoder.maybe_encode(
189            descriptor_array,
190            external_buffers,
191            RepDefBuilder::default(),
192            row_number,
193            num_rows,
194        )?;
195
196        Ok(Self::wrap_tasks(encode_tasks, def_meaning))
197    }
198
199    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
200        let encode_tasks = self.descriptor_encoder.flush(external_buffers)?;
201
202        // Use the cached def meaning.  If we haven't seen any data yet then we can just use a dummy
203        // value (not clear there would be any encode tasks in that case)
204        let def_meaning = self
205            .def_meaning
206            .clone()
207            .unwrap_or_else(|| Arc::new([DefinitionInterpretation::AllValidItem]));
208
209        Ok(Self::wrap_tasks(encode_tasks, def_meaning))
210    }
211
212    fn finish(
213        &mut self,
214        external_buffers: &mut OutOfLineBuffers,
215    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
216        self.descriptor_encoder.finish(external_buffers)
217    }
218
219    fn num_columns(&self) -> u32 {
220        self.descriptor_encoder.num_columns()
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use crate::{
228        compression::DefaultCompressionStrategy,
229        encoder::{ColumnIndexSequence, EncodingOptions},
230        testing::{check_round_trip_encoding_of_data, TestCases},
231    };
232    use arrow_array::LargeBinaryArray;
233
234    #[test]
235    fn test_blob_encoder_creation() {
236        let field =
237            Field::try_from(ArrowField::new("blob_field", DataType::LargeBinary, true)).unwrap();
238        let mut column_index = ColumnIndexSequence::default();
239        let column_idx = column_index.next_column_index(0);
240        let options = EncodingOptions::default();
241        let compression = Arc::new(DefaultCompressionStrategy::new());
242
243        let encoder = BlobStructuralEncoder::new(&field, column_idx, &options, compression);
244
245        assert!(encoder.is_ok());
246    }
247
248    #[tokio::test]
249    async fn test_blob_encoding_simple() {
250        let field = Field::try_from(
251            ArrowField::new("blob_field", DataType::LargeBinary, true).with_metadata(
252                HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]),
253            ),
254        )
255        .unwrap();
256        let mut column_index = ColumnIndexSequence::default();
257        let column_idx = column_index.next_column_index(0);
258        let options = EncodingOptions::default();
259        let compression = Arc::new(DefaultCompressionStrategy::new());
260
261        let mut encoder =
262            BlobStructuralEncoder::new(&field, column_idx, &options, compression).unwrap();
263
264        // Create test data with larger blobs
265        let large_data = vec![0u8; 1024 * 100]; // 100KB blob
266        let data: Vec<Option<&[u8]>> =
267            vec![Some(b"hello world"), None, Some(&large_data), Some(b"")];
268        let array = Arc::new(LargeBinaryArray::from(data));
269
270        // Test encoding
271        let mut external_buffers = OutOfLineBuffers::new(0, 8);
272        let repdef = RepDefBuilder::default();
273
274        let tasks = encoder
275            .maybe_encode(array, &mut external_buffers, repdef, 0, 4)
276            .unwrap();
277
278        // If no tasks yet, flush to force encoding
279        if tasks.is_empty() {
280            let _flush_tasks = encoder.flush(&mut external_buffers).unwrap();
281        }
282
283        // Should produce encode tasks for the descriptor (or we need more data)
284        // For now, just verify no errors occurred
285        assert!(encoder.num_columns() > 0);
286
287        // Verify external buffers were used for large data
288        let buffers = external_buffers.take_buffers();
289        assert!(
290            !buffers.is_empty(),
291            "Large blobs should be stored in external buffers"
292        );
293    }
294
295    #[tokio::test]
296    async fn test_blob_round_trip() {
297        // Test round-trip encoding with blob metadata
298        let blob_metadata =
299            HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);
300
301        // Create test data
302        let val1: &[u8] = &vec![1u8; 1024]; // 1KB
303        let val2: &[u8] = &vec![2u8; 10240]; // 10KB
304        let val3: &[u8] = &vec![3u8; 102400]; // 100KB
305        let array = Arc::new(LargeBinaryArray::from(vec![
306            Some(val1),
307            None,
308            Some(val2),
309            Some(val3),
310        ]));
311
312        // Use the standard test harness
313        check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await;
314    }
315}