lance_encoding/encodings/logical/
blob.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, sync::Arc};
5
6use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray, UInt64Array};
7use arrow_buffer::Buffer;
8use arrow_schema::{DataType, Field as ArrowField, Fields};
9use futures::future::BoxFuture;
10use lance_core::{datatypes::Field, Error, Result};
11use snafu::location;
12
13use crate::{
14    buffer::LanceBuffer,
15    constants::PACKED_STRUCT_META_KEY,
16    encoder::{EncodeTask, EncodedColumn, FieldEncoder, OutOfLineBuffers},
17    encodings::logical::primitive::PrimitiveStructuralEncoder,
18    repdef::RepDefBuilder,
19};
20
21/// Blob structural encoder - stores large binary data in external buffers
22///
23/// This encoder takes large binary arrays and stores them outside the normal
24/// page structure. It creates a descriptor (position, size) for each blob
25/// that is stored inline in the page.
26pub struct BlobStructuralEncoder {
27    // Encoder for the descriptors (position/size struct)
28    descriptor_encoder: Box<dyn FieldEncoder>,
29}
30
31impl BlobStructuralEncoder {
32    pub fn new(
33        field: &Field,
34        column_index: u32,
35        options: &crate::encoder::EncodingOptions,
36        compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
37    ) -> Result<Self> {
38        // Create descriptor field: struct<position: u64, size: u64>
39        // Preserve the original field's metadata for packed struct
40        let mut descriptor_metadata = HashMap::with_capacity(1);
41        descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
42
43        let descriptor_data_type = DataType::Struct(Fields::from(vec![
44            ArrowField::new("position", DataType::UInt64, false),
45            ArrowField::new("size", DataType::UInt64, false),
46        ]));
47
48        // Use the original field's name for the descriptor
49        let descriptor_field = Field::try_from(
50            ArrowField::new(&field.name, descriptor_data_type, field.nullable)
51                .with_metadata(descriptor_metadata),
52        )?;
53
54        // Use PrimitiveStructuralEncoder to handle the descriptor
55        let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
56            options,
57            compression_strategy,
58            column_index,
59            descriptor_field,
60            Arc::new(HashMap::new()),
61        )?);
62
63        Ok(Self { descriptor_encoder })
64    }
65}
66
67impl FieldEncoder for BlobStructuralEncoder {
68    fn maybe_encode(
69        &mut self,
70        array: ArrayRef,
71        external_buffers: &mut OutOfLineBuffers,
72        repdef: RepDefBuilder,
73        row_number: u64,
74        num_rows: u64,
75    ) -> Result<Vec<EncodeTask>> {
76        // Convert input array to LargeBinary
77        let binary_array = array
78            .as_binary_opt::<i64>()
79            .ok_or_else(|| Error::InvalidInput {
80                source: format!("Expected LargeBinary array, got {}", array.data_type()).into(),
81                location: location!(),
82            })?;
83
84        // Collect positions and sizes
85        let mut positions = Vec::with_capacity(binary_array.len());
86        let mut sizes = Vec::with_capacity(binary_array.len());
87
88        for i in 0..binary_array.len() {
89            if binary_array.is_null(i) {
90                // Null values are handled in the structural layer
91                // We just need placeholders here
92                positions.push(0);
93                sizes.push(0);
94            } else {
95                let value = binary_array.value(i);
96                if value.is_empty() {
97                    // Empty values
98                    positions.push(0);
99                    sizes.push(0);
100                } else {
101                    // Add data to external buffers
102                    let position =
103                        external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
104                    positions.push(position);
105                    sizes.push(value.len() as u64);
106                }
107            }
108        }
109
110        // Create descriptor array
111        let position_array = Arc::new(UInt64Array::from(positions));
112        let size_array = Arc::new(UInt64Array::from(sizes));
113        let descriptor_array = Arc::new(StructArray::new(
114            Fields::from(vec![
115                ArrowField::new("position", DataType::UInt64, false),
116                ArrowField::new("size", DataType::UInt64, false),
117            ]),
118            vec![position_array as ArrayRef, size_array as ArrayRef],
119            binary_array.nulls().cloned(), // Pass through null buffer
120        ));
121
122        // Delegate to descriptor encoder
123        self.descriptor_encoder.maybe_encode(
124            descriptor_array,
125            external_buffers,
126            repdef,
127            row_number,
128            num_rows,
129        )
130    }
131
132    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
133        self.descriptor_encoder.flush(external_buffers)
134    }
135
136    fn finish(
137        &mut self,
138        external_buffers: &mut OutOfLineBuffers,
139    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
140        self.descriptor_encoder.finish(external_buffers)
141    }
142
143    fn num_columns(&self) -> u32 {
144        self.descriptor_encoder.num_columns()
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use crate::{
152        compression::DefaultCompressionStrategy,
153        encoder::{ColumnIndexSequence, EncodingOptions},
154        testing::{check_round_trip_encoding_of_data, TestCases},
155    };
156    use arrow_array::LargeBinaryArray;
157
158    #[test]
159    fn test_blob_encoder_creation() {
160        let field =
161            Field::try_from(ArrowField::new("blob_field", DataType::LargeBinary, true)).unwrap();
162        let mut column_index = ColumnIndexSequence::default();
163        let column_idx = column_index.next_column_index(0);
164        let options = EncodingOptions::default();
165        let compression = Arc::new(DefaultCompressionStrategy::new());
166
167        let encoder = BlobStructuralEncoder::new(&field, column_idx, &options, compression);
168
169        assert!(encoder.is_ok());
170    }
171
172    #[tokio::test]
173    async fn test_blob_encoding_simple() {
174        let field = Field::try_from(
175            ArrowField::new("blob_field", DataType::LargeBinary, true).with_metadata(
176                HashMap::from([(
177                    lance_core::datatypes::BLOB_META_KEY.to_string(),
178                    "true".to_string(),
179                )]),
180            ),
181        )
182        .unwrap();
183        let mut column_index = ColumnIndexSequence::default();
184        let column_idx = column_index.next_column_index(0);
185        let options = EncodingOptions::default();
186        let compression = Arc::new(DefaultCompressionStrategy::new());
187
188        let mut encoder =
189            BlobStructuralEncoder::new(&field, column_idx, &options, compression).unwrap();
190
191        // Create test data with larger blobs
192        let large_data = vec![0u8; 1024 * 100]; // 100KB blob
193        let data: Vec<Option<&[u8]>> =
194            vec![Some(b"hello world"), None, Some(&large_data), Some(b"")];
195        let array = Arc::new(LargeBinaryArray::from(data));
196
197        // Test encoding
198        let mut external_buffers = OutOfLineBuffers::new(0, 8);
199        let repdef = RepDefBuilder::default();
200
201        let tasks = encoder
202            .maybe_encode(array, &mut external_buffers, repdef, 0, 4)
203            .unwrap();
204
205        // If no tasks yet, flush to force encoding
206        if tasks.is_empty() {
207            let _flush_tasks = encoder.flush(&mut external_buffers).unwrap();
208        }
209
210        // Should produce encode tasks for the descriptor (or we need more data)
211        // For now, just verify no errors occurred
212        assert!(encoder.num_columns() > 0);
213
214        // Verify external buffers were used for large data
215        let buffers = external_buffers.take_buffers();
216        assert!(
217            !buffers.is_empty(),
218            "Large blobs should be stored in external buffers"
219        );
220    }
221
222    #[tokio::test]
223    async fn test_blob_round_trip() {
224        // Test round-trip encoding with blob metadata
225        let blob_metadata = HashMap::from([(
226            lance_core::datatypes::BLOB_META_KEY.to_string(),
227            "true".to_string(),
228        )]);
229
230        // Create test data
231        let val1: &[u8] = &vec![1u8; 1024]; // 1KB
232        let val2: &[u8] = &vec![2u8; 10240]; // 10KB
233        let val3: &[u8] = &vec![3u8; 102400]; // 100KB
234        let array = Arc::new(LargeBinaryArray::from(vec![
235            Some(val1),
236            None,
237            Some(val2),
238            Some(val3),
239        ]));
240
241        // Use the standard test harness
242        check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await;
243    }
244}