lance_encoding/encodings/logical/
blob.rs1use std::{collections::VecDeque, sync::Arc, vec};
5
6use arrow::{array::AsArray, datatypes::UInt64Type};
7use arrow_array::{Array, ArrayRef, LargeBinaryArray, PrimitiveArray, StructArray, UInt64Array};
8use arrow_buffer::{
9 BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer,
10};
11use arrow_schema::DataType;
12use bytes::Bytes;
13use futures::{future::BoxFuture, FutureExt};
14use snafu::location;
15
16use lance_core::{datatypes::BLOB_DESC_FIELDS, Error, Result};
17
18use crate::{
19 buffer::LanceBuffer,
20 decoder::{
21 DecodeArrayTask, DecoderReady, FieldScheduler, FilterExpression, LogicalPageDecoder,
22 MessageType, NextDecodeTask, PriorityRange, ScheduledScanLine, SchedulerContext,
23 SchedulingJob,
24 },
25 encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
26 format::pb::{column_encoding, Blob, ColumnEncoding},
27 repdef::RepDefBuilder,
28 EncodingsIo,
29};
30
31#[derive(Debug)]
43pub struct BlobFieldScheduler {
44 descriptions_scheduler: Arc<dyn FieldScheduler>,
45}
46
47impl BlobFieldScheduler {
48 pub fn new(descriptions_scheduler: Arc<dyn FieldScheduler>) -> Self {
49 Self {
50 descriptions_scheduler,
51 }
52 }
53}
54
55#[derive(Debug)]
56struct BlobFieldSchedulingJob<'a> {
57 descriptions_job: Box<dyn SchedulingJob + 'a>,
58}
59
60impl SchedulingJob for BlobFieldSchedulingJob<'_> {
61 fn schedule_next(
62 &mut self,
63 context: &mut SchedulerContext,
64 priority: &dyn PriorityRange,
65 ) -> Result<ScheduledScanLine> {
66 let next_descriptions = self.descriptions_job.schedule_next(context, priority)?;
67 let mut priority = priority.current_priority();
68 let decoders = next_descriptions.decoders.into_iter().map(|decoder| {
69 let decoder = decoder.into_legacy();
70 let path = decoder.path;
71 let mut decoder = decoder.decoder;
72 let num_rows = decoder.num_rows();
73 let descriptions_fut = async move {
74 decoder
75 .wait_for_loaded(decoder.num_rows() - 1)
76 .await
77 .unwrap();
78 let descriptions_task = decoder.drain(decoder.num_rows()).unwrap();
79 descriptions_task.task.decode()
80 }
81 .boxed();
82 let decoder = Box::new(BlobFieldDecoder {
83 io: context.io().clone(),
84 unloaded_descriptions: Some(descriptions_fut),
85 positions: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
86 sizes: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
87 num_rows,
88 loaded: VecDeque::new(),
89 validity: VecDeque::new(),
90 rows_loaded: 0,
91 rows_drained: 0,
92 base_priority: priority,
93 });
94 priority += num_rows;
95 MessageType::DecoderReady(DecoderReady { decoder, path })
96 });
97 Ok(ScheduledScanLine {
98 decoders: decoders.collect(),
99 rows_scheduled: next_descriptions.rows_scheduled,
100 })
101 }
102
103 fn num_rows(&self) -> u64 {
104 self.descriptions_job.num_rows()
105 }
106}
107
108impl FieldScheduler for BlobFieldScheduler {
109 fn schedule_ranges<'a>(
110 &'a self,
111 ranges: &[std::ops::Range<u64>],
112 filter: &FilterExpression,
113 ) -> Result<Box<dyn SchedulingJob + 'a>> {
114 let descriptions_job = self
115 .descriptions_scheduler
116 .schedule_ranges(ranges, filter)?;
117 Ok(Box::new(BlobFieldSchedulingJob { descriptions_job }))
118 }
119
120 fn num_rows(&self) -> u64 {
121 self.descriptions_scheduler.num_rows()
122 }
123
124 fn initialize<'a>(
125 &'a self,
126 filter: &'a FilterExpression,
127 context: &'a SchedulerContext,
128 ) -> BoxFuture<'a, Result<()>> {
129 self.descriptions_scheduler.initialize(filter, context)
130 }
131}
132
133pub struct BlobFieldDecoder {
134 io: Arc<dyn EncodingsIo>,
135 unloaded_descriptions: Option<BoxFuture<'static, Result<ArrayRef>>>,
136 positions: PrimitiveArray<UInt64Type>,
137 sizes: PrimitiveArray<UInt64Type>,
138 num_rows: u64,
139 loaded: VecDeque<Bytes>,
140 validity: VecDeque<BooleanBuffer>,
141 rows_loaded: u64,
142 rows_drained: u64,
143 base_priority: u64,
144}
145
146impl BlobFieldDecoder {
147 fn drain_validity(&mut self, num_values: usize) -> Result<Option<NullBuffer>> {
148 let mut validity = BooleanBufferBuilder::new(num_values);
149 let mut remaining = num_values;
150 while remaining > 0 {
151 let next = self.validity.front_mut().unwrap();
152 if remaining < next.len() {
153 let slice = next.slice(0, remaining);
154 validity.append_buffer(&slice);
155 *next = next.slice(remaining, next.len() - remaining);
156 remaining = 0;
157 } else {
158 validity.append_buffer(next);
159 remaining -= next.len();
160 self.validity.pop_front();
161 }
162 }
163 let nulls = NullBuffer::new(validity.finish());
164 if nulls.null_count() == 0 {
165 Ok(None)
166 } else {
167 Ok(Some(nulls))
168 }
169 }
170}
171
172impl std::fmt::Debug for BlobFieldDecoder {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 f.debug_struct("BlobFieldDecoder")
175 .field("num_rows", &self.num_rows)
176 .field("rows_loaded", &self.rows_loaded)
177 .field("rows_drained", &self.rows_drained)
178 .finish()
179 }
180}
181
182impl LogicalPageDecoder for BlobFieldDecoder {
183 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
184 async move {
185 if self.unloaded_descriptions.is_some() {
186 let descriptions = self.unloaded_descriptions.take().unwrap().await?;
187 let descriptions = descriptions.as_struct();
188 self.positions = descriptions.column(0).as_primitive().clone();
189 self.sizes = descriptions.column(1).as_primitive().clone();
190 }
191 let start = self.rows_loaded as usize;
192 let end = (loaded_need + 1).min(self.num_rows) as usize;
193 let positions = self.positions.values().slice(start, end - start);
194 let sizes = self.sizes.values().slice(start, end - start);
195 let ranges = positions
196 .iter()
197 .zip(sizes.iter())
198 .map(|(position, size)| *position..(*position + *size))
199 .collect::<Vec<_>>();
200 let validity = positions
201 .iter()
202 .zip(sizes.iter())
203 .map(|(p, s)| *p != 1 || *s != 0)
204 .collect::<BooleanBuffer>();
205 self.validity.push_back(validity);
206 self.rows_loaded = end as u64;
207 let bytes = self
208 .io
209 .submit_request(ranges, self.base_priority + start as u64)
210 .await?;
211 self.loaded.extend(bytes);
212 Ok(())
213 }
214 .boxed()
215 }
216
217 fn rows_loaded(&self) -> u64 {
218 self.rows_loaded
219 }
220
221 fn num_rows(&self) -> u64 {
222 self.num_rows
223 }
224
225 fn rows_drained(&self) -> u64 {
226 self.rows_drained
227 }
228
229 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
230 let bytes = self.loaded.drain(0..num_rows as usize).collect::<Vec<_>>();
231 let validity = self.drain_validity(num_rows as usize)?;
232 self.rows_drained += num_rows;
233 Ok(NextDecodeTask {
234 num_rows,
235 task: Box::new(BlobArrayDecodeTask::new(bytes, validity)),
236 })
237 }
238
239 fn data_type(&self) -> &DataType {
240 &DataType::LargeBinary
241 }
242}
243
244struct BlobArrayDecodeTask {
245 bytes: Vec<Bytes>,
246 validity: Option<NullBuffer>,
247}
248
249impl BlobArrayDecodeTask {
250 fn new(bytes: Vec<Bytes>, validity: Option<NullBuffer>) -> Self {
251 Self { bytes, validity }
252 }
253}
254
255impl DecodeArrayTask for BlobArrayDecodeTask {
256 fn decode(self: Box<Self>) -> Result<ArrayRef> {
257 let num_bytes = self.bytes.iter().map(|b| b.len()).sum::<usize>();
258 let offsets = self
259 .bytes
260 .iter()
261 .scan(0, |state, b| {
262 let start = *state;
263 *state += b.len();
264 Some(start as i64)
265 })
266 .chain(std::iter::once(num_bytes as i64))
267 .collect::<Vec<_>>();
268 let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
269 let mut buffer = Vec::with_capacity(num_bytes);
270 for bytes in self.bytes {
271 buffer.extend_from_slice(&bytes);
272 }
273 let data_buf = Buffer::from_vec(buffer);
274 Ok(Arc::new(LargeBinaryArray::new(
275 offsets,
276 data_buf,
277 self.validity,
278 )))
279 }
280}
281
282pub struct BlobFieldEncoder {
283 description_encoder: Box<dyn FieldEncoder>,
284}
285
286impl BlobFieldEncoder {
287 pub fn new(description_encoder: Box<dyn FieldEncoder>) -> Self {
288 Self {
289 description_encoder,
290 }
291 }
292
293 fn write_bins(array: ArrayRef, external_buffers: &mut OutOfLineBuffers) -> Result<ArrayRef> {
294 let binarray = array
295 .as_binary_opt::<i64>()
296 .ok_or_else(|| Error::InvalidInput {
297 source: format!("Expected large_binary and received {}", array.data_type()).into(),
298 location: location!(),
299 })?;
300 let mut positions = Vec::with_capacity(array.len());
301 let mut sizes = Vec::with_capacity(array.len());
302 let data = binarray.values();
303 let nulls = binarray
304 .nulls()
305 .cloned()
306 .unwrap_or(NullBuffer::new_valid(binarray.len()));
307 for (w, is_valid) in binarray.value_offsets().windows(2).zip(nulls.into_iter()) {
308 if is_valid {
309 let start = w[0] as u64;
310 let end = w[1] as u64;
311 let size = end - start;
312 if size > 0 {
313 let val = data.slice_with_length(start as usize, size as usize);
314 let position = external_buffers.add_buffer(LanceBuffer::Borrowed(val));
315 positions.push(position);
316 sizes.push(size);
317 } else {
318 positions.push(0);
320 sizes.push(0);
321 }
322 } else {
323 positions.push(1);
325 sizes.push(0);
326 }
327 }
328 let positions = Arc::new(UInt64Array::from(positions));
329 let sizes = Arc::new(UInt64Array::from(sizes));
330 let descriptions = Arc::new(StructArray::new(
331 BLOB_DESC_FIELDS.clone(),
332 vec![positions, sizes],
333 None,
334 ));
335 Ok(descriptions)
336 }
337}
338
339impl FieldEncoder for BlobFieldEncoder {
340 fn maybe_encode(
341 &mut self,
342 array: ArrayRef,
343 external_buffers: &mut OutOfLineBuffers,
344 repdef: RepDefBuilder,
345 row_number: u64,
346 num_rows: u64,
347 ) -> Result<Vec<EncodeTask>> {
348 let descriptions = Self::write_bins(array, external_buffers)?;
349 self.description_encoder.maybe_encode(
350 descriptions,
351 external_buffers,
352 repdef,
353 row_number,
354 num_rows,
355 )
356 }
357
358 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
360 self.description_encoder.flush(external_buffers)
361 }
362
363 fn num_columns(&self) -> u32 {
364 self.description_encoder.num_columns()
365 }
366
367 fn finish(
368 &mut self,
369 external_buffers: &mut OutOfLineBuffers,
370 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
371 let inner_finished = self.description_encoder.finish(external_buffers);
372 async move {
373 let mut cols = inner_finished.await?;
374 assert_eq!(cols.len(), 1);
375 let encoding = std::mem::take(&mut cols[0].encoding);
376 let wrapped_encoding = ColumnEncoding {
377 column_encoding: Some(column_encoding::ColumnEncoding::Blob(Box::new(Blob {
378 inner: Some(Box::new(encoding)),
379 }))),
380 };
381 cols[0].encoding = wrapped_encoding;
382 Ok(cols)
383 }
384 .boxed()
385 }
386}
387
388#[cfg(test)]
389pub mod tests {
390 use std::{collections::HashMap, sync::Arc};
391
392 use arrow_array::LargeBinaryArray;
393 use arrow_schema::{DataType, Field};
394 use lance_core::datatypes::BLOB_META_KEY;
395
396 use crate::{
397 format::pb::column_encoding,
398 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
399 version::LanceFileVersion,
400 };
401
402 lazy_static::lazy_static! {
403 static ref BLOB_META: HashMap<String, String> =
404 [(BLOB_META_KEY.to_string(), "true".to_string())]
405 .iter()
406 .cloned()
407 .collect::<HashMap<_, _>>();
408 }
409
410 #[test_log::test(tokio::test)]
411 async fn test_blob() {
412 let field = Field::new("", DataType::LargeBinary, false).with_metadata(BLOB_META.clone());
413 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
414 }
415
416 #[test_log::test(tokio::test)]
417 async fn test_simple_blob() {
418 let val1: &[u8] = &[1, 2, 3];
419 let val2: &[u8] = &[7, 8, 9];
420 let array = Arc::new(LargeBinaryArray::from(vec![Some(val1), None, Some(val2)]));
421 let test_cases = TestCases::default().with_verify_encoding(Arc::new(|cols| {
422 assert_eq!(cols.len(), 1);
423 let col = &cols[0];
424 assert!(matches!(
425 col.encoding.column_encoding.as_ref().unwrap(),
426 column_encoding::ColumnEncoding::Blob(_)
427 ));
428 }));
429 check_round_trip_encoding_of_data(vec![array.clone()], &test_cases, BLOB_META.clone())
431 .await;
432
433 let test_cases = TestCases::default().with_verify_encoding(Arc::new(|cols| {
434 assert_eq!(cols.len(), 1);
435 let col = &cols[0];
436 assert!(!matches!(
437 col.encoding.column_encoding.as_ref().unwrap(),
438 column_encoding::ColumnEncoding::Blob(_)
439 ));
440 }));
441 check_round_trip_encoding_of_data(vec![array], &test_cases, Default::default()).await;
443 }
444}