lance_encoding/encodings/logical/
blob.rs1use std::{collections::HashMap, sync::Arc};
5
6use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray, UInt64Array};
7use arrow_buffer::Buffer;
8use arrow_schema::{DataType, Field as ArrowField, Fields};
9use futures::{future::BoxFuture, FutureExt};
10use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
11use snafu::location;
12
13use crate::{
14 buffer::LanceBuffer,
15 constants::PACKED_STRUCT_META_KEY,
16 decoder::PageEncoding,
17 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
18 encodings::logical::primitive::PrimitiveStructuralEncoder,
19 format::ProtobufUtils21,
20 repdef::{DefinitionInterpretation, RepDefBuilder},
21};
22
23pub struct BlobStructuralEncoder {
29 descriptor_encoder: Box<dyn FieldEncoder>,
31 def_meaning: Option<Arc<[DefinitionInterpretation]>>,
33}
34
35impl BlobStructuralEncoder {
36 pub fn new(
37 field: &Field,
38 column_index: u32,
39 options: &crate::encoder::EncodingOptions,
40 compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
41 ) -> Result<Self> {
42 let mut descriptor_metadata = HashMap::with_capacity(1);
45 descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
46
47 let descriptor_data_type = DataType::Struct(Fields::from(vec![
48 ArrowField::new("position", DataType::UInt64, false),
49 ArrowField::new("size", DataType::UInt64, false),
50 ]));
51
52 let descriptor_field = Field::try_from(
54 ArrowField::new(&field.name, descriptor_data_type, field.nullable)
55 .with_metadata(descriptor_metadata),
56 )?;
57
58 let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
60 options,
61 compression_strategy,
62 column_index,
63 descriptor_field,
64 Arc::new(HashMap::new()),
65 )?);
66
67 Ok(Self {
68 descriptor_encoder,
69 def_meaning: None,
70 })
71 }
72
73 fn wrap_tasks(
74 tasks: Vec<EncodeTask>,
75 def_meaning: Arc<[DefinitionInterpretation]>,
76 ) -> Vec<EncodeTask> {
77 tasks
78 .into_iter()
79 .map(|task| {
80 let def_meaning = def_meaning.clone();
81 task.then(|encoded_page| async move {
82 let encoded_page = encoded_page?;
83
84 let PageEncoding::Structural(inner_layout) = encoded_page.description else {
85 return Err(Error::Internal {
86 message: "Expected inner encoding to return structural layout"
87 .to_string(),
88 location: location!(),
89 });
90 };
91
92 let wrapped = ProtobufUtils21::blob_layout(inner_layout, &def_meaning);
93 Ok(EncodedPage {
94 column_idx: encoded_page.column_idx,
95 data: encoded_page.data,
96 description: PageEncoding::Structural(wrapped),
97 num_rows: encoded_page.num_rows,
98 row_number: encoded_page.row_number,
99 })
100 })
101 .boxed()
102 })
103 .collect::<Vec<_>>()
104 }
105}
106
107impl FieldEncoder for BlobStructuralEncoder {
108 fn maybe_encode(
109 &mut self,
110 array: ArrayRef,
111 external_buffers: &mut OutOfLineBuffers,
112 mut repdef: RepDefBuilder,
113 row_number: u64,
114 num_rows: u64,
115 ) -> Result<Vec<EncodeTask>> {
116 if let Some(validity) = array.nulls() {
117 repdef.add_validity_bitmap(validity.clone());
118 } else {
119 repdef.add_no_null(array.len());
120 }
121
122 let binary_array = array
124 .as_binary_opt::<i64>()
125 .ok_or_else(|| Error::InvalidInput {
126 source: format!("Expected LargeBinary array, got {}", array.data_type()).into(),
127 location: location!(),
128 })?;
129
130 let repdef = RepDefBuilder::serialize(vec![repdef]);
131
132 let rep = repdef.repetition_levels.as_ref();
133 let def = repdef.definition_levels.as_ref();
134 let def_meaning: Arc<[DefinitionInterpretation]> = repdef.def_meaning.into();
135
136 if self.def_meaning.is_none() {
137 self.def_meaning = Some(def_meaning.clone());
138 } else {
139 debug_assert_eq!(self.def_meaning.as_ref().unwrap(), &def_meaning);
140 }
141
142 let mut positions = Vec::with_capacity(binary_array.len());
144 let mut sizes = Vec::with_capacity(binary_array.len());
145
146 for i in 0..binary_array.len() {
147 if binary_array.is_null(i) {
148 let mut repdef = (def.expect_ok()?[i] as u64) << 16;
152 if let Some(rep) = rep {
153 repdef += rep[i] as u64;
154 }
155
156 debug_assert_ne!(repdef, 0);
157 positions.push(repdef);
158 sizes.push(0);
159 } else {
160 let value = binary_array.value(i);
161 if value.is_empty() {
162 positions.push(0);
164 sizes.push(0);
165 } else {
166 let position =
168 external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
169 positions.push(position);
170 sizes.push(value.len() as u64);
171 }
172 }
173 }
174
175 let position_array = Arc::new(UInt64Array::from(positions));
177 let size_array = Arc::new(UInt64Array::from(sizes));
178 let descriptor_array = Arc::new(StructArray::new(
179 Fields::from(vec![
180 ArrowField::new("position", DataType::UInt64, false),
181 ArrowField::new("size", DataType::UInt64, false),
182 ]),
183 vec![position_array as ArrayRef, size_array as ArrayRef],
184 None, ));
186
187 let encode_tasks = self.descriptor_encoder.maybe_encode(
189 descriptor_array,
190 external_buffers,
191 RepDefBuilder::default(),
192 row_number,
193 num_rows,
194 )?;
195
196 Ok(Self::wrap_tasks(encode_tasks, def_meaning))
197 }
198
199 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
200 let encode_tasks = self.descriptor_encoder.flush(external_buffers)?;
201
202 let def_meaning = self
205 .def_meaning
206 .clone()
207 .unwrap_or_else(|| Arc::new([DefinitionInterpretation::AllValidItem]));
208
209 Ok(Self::wrap_tasks(encode_tasks, def_meaning))
210 }
211
212 fn finish(
213 &mut self,
214 external_buffers: &mut OutOfLineBuffers,
215 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
216 self.descriptor_encoder.finish(external_buffers)
217 }
218
219 fn num_columns(&self) -> u32 {
220 self.descriptor_encoder.num_columns()
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use crate::{
228 compression::DefaultCompressionStrategy,
229 encoder::{ColumnIndexSequence, EncodingOptions},
230 testing::{check_round_trip_encoding_of_data, TestCases},
231 };
232 use arrow_array::LargeBinaryArray;
233
234 #[test]
235 fn test_blob_encoder_creation() {
236 let field =
237 Field::try_from(ArrowField::new("blob_field", DataType::LargeBinary, true)).unwrap();
238 let mut column_index = ColumnIndexSequence::default();
239 let column_idx = column_index.next_column_index(0);
240 let options = EncodingOptions::default();
241 let compression = Arc::new(DefaultCompressionStrategy::new());
242
243 let encoder = BlobStructuralEncoder::new(&field, column_idx, &options, compression);
244
245 assert!(encoder.is_ok());
246 }
247
248 #[tokio::test]
249 async fn test_blob_encoding_simple() {
250 let field = Field::try_from(
251 ArrowField::new("blob_field", DataType::LargeBinary, true).with_metadata(
252 HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]),
253 ),
254 )
255 .unwrap();
256 let mut column_index = ColumnIndexSequence::default();
257 let column_idx = column_index.next_column_index(0);
258 let options = EncodingOptions::default();
259 let compression = Arc::new(DefaultCompressionStrategy::new());
260
261 let mut encoder =
262 BlobStructuralEncoder::new(&field, column_idx, &options, compression).unwrap();
263
264 let large_data = vec![0u8; 1024 * 100]; let data: Vec<Option<&[u8]>> =
267 vec![Some(b"hello world"), None, Some(&large_data), Some(b"")];
268 let array = Arc::new(LargeBinaryArray::from(data));
269
270 let mut external_buffers = OutOfLineBuffers::new(0, 8);
272 let repdef = RepDefBuilder::default();
273
274 let tasks = encoder
275 .maybe_encode(array, &mut external_buffers, repdef, 0, 4)
276 .unwrap();
277
278 if tasks.is_empty() {
280 let _flush_tasks = encoder.flush(&mut external_buffers).unwrap();
281 }
282
283 assert!(encoder.num_columns() > 0);
286
287 let buffers = external_buffers.take_buffers();
289 assert!(
290 !buffers.is_empty(),
291 "Large blobs should be stored in external buffers"
292 );
293 }
294
295 #[tokio::test]
296 async fn test_blob_round_trip() {
297 let blob_metadata =
299 HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);
300
301 let val1: &[u8] = &vec![1u8; 1024]; let val2: &[u8] = &vec![2u8; 10240]; let val3: &[u8] = &vec![3u8; 102400]; let array = Arc::new(LargeBinaryArray::from(vec![
306 Some(val1),
307 None,
308 Some(val2),
309 Some(val3),
310 ]));
311
312 check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await;
314 }
315}