lance_encoding/encodings/logical/
blob.rs1use std::{collections::HashMap, sync::Arc};
5
6use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray, UInt64Array};
7use arrow_buffer::Buffer;
8use arrow_schema::{DataType, Field as ArrowField, Fields};
9use futures::future::BoxFuture;
10use lance_core::{datatypes::Field, Error, Result};
11use snafu::location;
12
13use crate::{
14 buffer::LanceBuffer,
15 constants::PACKED_STRUCT_META_KEY,
16 encoder::{EncodeTask, EncodedColumn, FieldEncoder, OutOfLineBuffers},
17 encodings::logical::primitive::PrimitiveStructuralEncoder,
18 repdef::RepDefBuilder,
19};
20
21pub struct BlobStructuralEncoder {
27 descriptor_encoder: Box<dyn FieldEncoder>,
29}
30
31impl BlobStructuralEncoder {
32 pub fn new(
33 field: &Field,
34 column_index: u32,
35 options: &crate::encoder::EncodingOptions,
36 compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
37 ) -> Result<Self> {
38 let mut descriptor_metadata = HashMap::with_capacity(1);
41 descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
42
43 let descriptor_data_type = DataType::Struct(Fields::from(vec![
44 ArrowField::new("position", DataType::UInt64, false),
45 ArrowField::new("size", DataType::UInt64, false),
46 ]));
47
48 let descriptor_field = Field::try_from(
50 ArrowField::new(&field.name, descriptor_data_type, field.nullable)
51 .with_metadata(descriptor_metadata),
52 )?;
53
54 let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
56 options,
57 compression_strategy,
58 column_index,
59 descriptor_field,
60 Arc::new(HashMap::new()),
61 )?);
62
63 Ok(Self { descriptor_encoder })
64 }
65}
66
67impl FieldEncoder for BlobStructuralEncoder {
68 fn maybe_encode(
69 &mut self,
70 array: ArrayRef,
71 external_buffers: &mut OutOfLineBuffers,
72 repdef: RepDefBuilder,
73 row_number: u64,
74 num_rows: u64,
75 ) -> Result<Vec<EncodeTask>> {
76 let binary_array = array
78 .as_binary_opt::<i64>()
79 .ok_or_else(|| Error::InvalidInput {
80 source: format!("Expected LargeBinary array, got {}", array.data_type()).into(),
81 location: location!(),
82 })?;
83
84 let mut positions = Vec::with_capacity(binary_array.len());
86 let mut sizes = Vec::with_capacity(binary_array.len());
87
88 for i in 0..binary_array.len() {
89 if binary_array.is_null(i) {
90 positions.push(0);
93 sizes.push(0);
94 } else {
95 let value = binary_array.value(i);
96 if value.is_empty() {
97 positions.push(0);
99 sizes.push(0);
100 } else {
101 let position =
103 external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
104 positions.push(position);
105 sizes.push(value.len() as u64);
106 }
107 }
108 }
109
110 let position_array = Arc::new(UInt64Array::from(positions));
112 let size_array = Arc::new(UInt64Array::from(sizes));
113 let descriptor_array = Arc::new(StructArray::new(
114 Fields::from(vec![
115 ArrowField::new("position", DataType::UInt64, false),
116 ArrowField::new("size", DataType::UInt64, false),
117 ]),
118 vec![position_array as ArrayRef, size_array as ArrayRef],
119 binary_array.nulls().cloned(), ));
121
122 self.descriptor_encoder.maybe_encode(
124 descriptor_array,
125 external_buffers,
126 repdef,
127 row_number,
128 num_rows,
129 )
130 }
131
132 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
133 self.descriptor_encoder.flush(external_buffers)
134 }
135
136 fn finish(
137 &mut self,
138 external_buffers: &mut OutOfLineBuffers,
139 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
140 self.descriptor_encoder.finish(external_buffers)
141 }
142
143 fn num_columns(&self) -> u32 {
144 self.descriptor_encoder.num_columns()
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use crate::{
152 compression::DefaultCompressionStrategy,
153 encoder::{ColumnIndexSequence, EncodingOptions},
154 testing::{check_round_trip_encoding_of_data, TestCases},
155 };
156 use arrow_array::LargeBinaryArray;
157
158 #[test]
159 fn test_blob_encoder_creation() {
160 let field =
161 Field::try_from(ArrowField::new("blob_field", DataType::LargeBinary, true)).unwrap();
162 let mut column_index = ColumnIndexSequence::default();
163 let column_idx = column_index.next_column_index(0);
164 let options = EncodingOptions::default();
165 let compression = Arc::new(DefaultCompressionStrategy::new());
166
167 let encoder = BlobStructuralEncoder::new(&field, column_idx, &options, compression);
168
169 assert!(encoder.is_ok());
170 }
171
172 #[tokio::test]
173 async fn test_blob_encoding_simple() {
174 let field = Field::try_from(
175 ArrowField::new("blob_field", DataType::LargeBinary, true).with_metadata(
176 HashMap::from([(
177 lance_core::datatypes::BLOB_META_KEY.to_string(),
178 "true".to_string(),
179 )]),
180 ),
181 )
182 .unwrap();
183 let mut column_index = ColumnIndexSequence::default();
184 let column_idx = column_index.next_column_index(0);
185 let options = EncodingOptions::default();
186 let compression = Arc::new(DefaultCompressionStrategy::new());
187
188 let mut encoder =
189 BlobStructuralEncoder::new(&field, column_idx, &options, compression).unwrap();
190
191 let large_data = vec![0u8; 1024 * 100]; let data: Vec<Option<&[u8]>> =
194 vec![Some(b"hello world"), None, Some(&large_data), Some(b"")];
195 let array = Arc::new(LargeBinaryArray::from(data));
196
197 let mut external_buffers = OutOfLineBuffers::new(0, 8);
199 let repdef = RepDefBuilder::default();
200
201 let tasks = encoder
202 .maybe_encode(array, &mut external_buffers, repdef, 0, 4)
203 .unwrap();
204
205 if tasks.is_empty() {
207 let _flush_tasks = encoder.flush(&mut external_buffers).unwrap();
208 }
209
210 assert!(encoder.num_columns() > 0);
213
214 let buffers = external_buffers.take_buffers();
216 assert!(
217 !buffers.is_empty(),
218 "Large blobs should be stored in external buffers"
219 );
220 }
221
222 #[tokio::test]
223 async fn test_blob_round_trip() {
224 let blob_metadata = HashMap::from([(
226 lance_core::datatypes::BLOB_META_KEY.to_string(),
227 "true".to_string(),
228 )]);
229
230 let val1: &[u8] = &vec![1u8; 1024]; let val2: &[u8] = &vec![2u8; 10240]; let val3: &[u8] = &vec![3u8; 102400]; let array = Arc::new(LargeBinaryArray::from(vec![
235 Some(val1),
236 None,
237 Some(val2),
238 Some(val3),
239 ]));
240
241 check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await;
243 }
244}